In [None]:
"""this code accepts the raw files of spammers and normal users messages and other parameters (such as number of recipients/first and last message sent, number of chats of the specified type, and returns trained model which can classify new users according to their messages and parameters it trains different models, indicates the level of accuracy and save the one which have the best output parameters. The code is built in the way to reduce processing time and "save to files/pickles", in order to restart the process from the middle (in case if part of job is already done)"""

import os
import numpy as np
import pandas as pd
import seaborn as sns
import dask.dataframe as dd
import matplotlib.pyplot as plt
import fastparquet
import pyarrow.parquet as pq
import warnings
import spacy
import langdetect
import pickle
import time
from sklearn.decomposition import SparsePCA, TruncatedSVD
from scipy.sparse import csr_matrix
from datetime import datetime
from deep_translator import GoogleTranslator
from deep_translator.exceptions import NotValidLength
from langdetect import LangDetectException
from sklearn.svm import SVC
from tqdm import tqdm
from settings import RAW_DATA_PATH
from settings import SEPARATED_DATA_PATH
from settings import ROOT_PATH
from settings import PROCESSED_DATA_PATH
from settings import NORMALIZED_DATA_PATH
from pandas.errors import SettingWithCopyWarning
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LogisticRegression
from sklearn.naive_bayes import MultinomialNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.metrics import confusion_matrix


LABEL_KEY = 'label'
CHAT_KEY = "ad_chat/ad_trigger/user_to_user"
PHRASES_TO_DELETE = [
    "посмотрел(а) ваш номер телефона",
    "актуально",
    "просмотрел(а) ваш номер телефона",
    "ваше объявление",
    "Name:",
    "bottom.payload",
    "Name: , dtype: object",
    "Name: , Length: , dtype: object",
    "Name: , dtype: object",
    "добавил(а) в Избранное", "dtype:", "object", "dtype: object", "просмотрел(а) ваш номер телефона",
    "Length: , dtype: object", "Ещё актуально?", "n/", "name", "Name", ":"]

ROWS_TO_SEPARATE = 50000
UNKNOWN_KEY = 'Unknown'
RUSSIAN_CODE = 'ru'
ENGLISH_CODE = 'en'
POLISH_CODE = 'pl'
CONVERSATION_TYPE_COL = 'conversartion_type'
OWNER_COL = 'owner'
RECIPIENT_COL = 'recipient'
CHAT_ID_COL = 'chat_id'
AD_TRIGGER_COL = '{ad_trigger,'
AD_CHAT_COL = '{ad_chat,'
USER_TO_USER_COL = '{user_to_user'
UPDATED_COL = 'updated'
MESSAGES_COL = 'user_messages'
DURATION = 'duration'
MONGO_ID_COL = '_id'


#loading models at this step, helps to speed up the process and no future need to load it every time code meets the language.
RUSSIAN_MODEL = spacy.load('ru_core_news_sm')
POLISH_MODEL = spacy.load('pl_core_news_sm')
ENGLISH_MODEL = spacy.load('en_core_web_sm')

dfs_list = []
newtext = ["Free entry"]
header_text = ['owner', 'total_recipients', 'total_chats', 'newest_message', 'oldest_message', 'label', 'text', 'duration', 'chats_per_day', 'recipients_per_day', 'language']
header = ['owner', 'total_recipients', 'total_chats', 'newest_message', 'oldest_message', 'label', 'duration', 'chats_per_day', 'recipients_per_day', 'text', 'language']



In [None]:
# Ignore the warning about setting a value on a copy of a slice from a DataFrame
warnings.filterwarnings('ignore', category=SettingWithCopyWarning)

In [None]:
#initital files were too big to be processed in one file, so this function separates code to small and efficient .parquet files
def files_separation(file_paths):
    marker = os.path.basename(file_paths).split('_')[0]
    df = pd.read_csv(file_paths, encoding="utf-8-sig")
    list_df = [df[i:i + ROWS_TO_SEPARATE] for i in range(0, len(df), ROWS_TO_SEPARATE)]
    for i, df in tqdm(enumerate(list_df)):
        df[LABEL_KEY] = marker
        df.to_parquet(os.path.join(SEPARATED_DATA_PATH, f"{marker}_{i}.parquet"), index=False)

In [None]:
#this step was specific to data provided
def replacing_commas(file_path):
    df = pd.read_parquet(file_path)
    df.replace({r'[\n]+': ''}, regex=True).dropna().reset_index(drop=True)
    return df

In [1]:
#this two functions parse set[] string from mongo db table column, which looks like 
# {<user_id>,<chat_type>,<recipient_id>,<owner_id>,<chat_id>}. 
# according to the type of chat, for example, if the chat was of specified type "ad_trigger"
# this set sting changes its appearance to 
# {<user_id>,<chat_type>,<owner_id>,<recipient_id>}
# so the code parses it tp the flat table
def parse_values(set_string):
    values_list = set_string[1:-1].split(' ')

    if USER_TO_USER_COL in set_string:
        owner = values_list[0]
        recipient = values_list[3]
        chat_id = 'unknown'
    elif any([t in set_string for t in [AD_CHAT_COL, AD_TRIGGER_COL]]):
        owner = values_list[0]
        recipient = values_list[4]
        chat_id = values_list[2]
    else:
        raise Exception(f"{set_string} contains unknown value")

    return {
        CONVERSATION_TYPE_COL: values_list[1],
        OWNER_COL: owner,
        RECIPIENT_COL: recipient,
        CHAT_ID_COL: chat_id
    }

def separating_colummns(df):
    df[MONGO_ID_COL] = df[MONGO_ID_COL].map(lambda x: parse_values(x))

    for key in OWNER_COL, RECIPIENT_COL, CHAT_ID_COL, CONVERSATION_TYPE_COL:
        df[key] = df[MONGO_ID_COL].map(lambda x: x[key])

    conversation_type_df = pd.get_dummies(df[CONVERSATION_TYPE_COL])
    df.drop(columns=[MONGO_ID_COL, CONVERSATION_TYPE_COL], inplace=True)

    return pd.concat([df, conversation_type_df], axis=1)

In [None]:
# this function groups all data by owner id, since we have more than one row of data for the user. Also this function transforms dataframes in order to replace "blocked", "regular" with dummy coding
def grouping_by(df):
    df = pd.DataFrame(df)
    df['label'] = df['label'].apply(lambda x: 1 if x == "blocked" else 0)
    if 'bottom.payload' not in df.columns:
        df = df.rename(columns={'user_messages': 'bottom.payload'})

    df = (
        df.assign(owner_index=df.index)
        .groupby(OWNER_COL)
        .agg(
            label=(LABEL_KEY, "first"),
            total_recipients=(RECIPIENT_COL, "count"),
            total_chats=(CHAT_ID_COL, "count"),
            newest_message=(UPDATED_COL, "max"),
            oldest_message=(UPDATED_COL, "min"),
            user_messages=("bottom.payload", lambda x: "".join([str(e) for e in x]))
        )
        .reset_index()
    )
    if USER_TO_USER_COL in df:
        df['total_user_to_user'] = df.groupby(USER_TO_USER_COL)["sum"]
    if AD_CHAT_COL in df:
        df['total_ad_chats'] = df.groupby(AD_CHAT_COL)["sum"]
    if AD_TRIGGER_COL in df:
        df['total_ad_trigger'] = df.groupby(AD_TRIGGER_COL)["sum"]

    return df

In [None]:
#creating "duration" column. Duration will be our "feature" for the future analysis. Based on assumptions that normal users and spammers may have different time periods spend with platform 
def duration_processing(df):
    def calculate_duration(first_message_date, last_message_date):
        FORMAT = "%Y-%m-%d %H:%M:%S"
        return datetime.strptime(str(first_message_date), FORMAT) - \
            datetime.strptime(str(last_message_date), FORMAT)

    FIRST_MESSAGE = 'newest_message'
    LAST_MESSAGE = 'oldest_message'
    df['duration'] = list(map(lambda f, l: calculate_duration(f, l), df[FIRST_MESSAGE], df[LAST_MESSAGE]))
    return df

In [None]:
def calculate_total(duration, total):
    if duration.total_seconds() > 0:
        return total / duration.total_seconds()
    else:
        return 0

In [None]:
#another "feature" total chats. Based on assumption that total number of chats can be statistically significantly different 
def total_chats_count(df):
    CHATS_TOTAL = 'total_chats'
    df['chats_per_day'] = list(map(lambda d, t: calculate_total(d, t), df[DURATION], df[CHATS_TOTAL]))
    return df

In [None]:
#another "feature" total recipients. Based on assumptions that spammers, probably, could have bigger/different number of recipients.
def total_recipients_count(df):
    RECIPIENTS_TOTAL = 'total_recipients'
    df['recipients_per_day'] = list(map(lambda d, t: calculate_total(d, t), df[DURATION], df[RECIPIENTS_TOTAL]))
    return df


In [None]:
#since we had a lot of parquet files, i need to concatenate it back in order to save it to one pickle of already processed data
def concatenate(df):
    dfs_list.append(df)
    concatenated_df = dd.concat(dfs_list, ignore_index=True)
    with open("pickle.pkl", "wb") as f:
        pickle.dump(concatenated_df, f)
    return concatenated_df

In [3]:
#next three cells demonstrates the code, which make a lemmatization process in a multilingual context. 
#1. Detect language by the first 15 words in a message 
#2. If the length is less or the language is not of the most popular in dataset, then mark the language to "unknown" and later translate it to english. If this is not possible, left it as unknown.
#3. This process is performed in chunks, according to the length of the 'text' and saved to csv's.(here could be parquet's, but it was no performance issues with csv's on this step)

In [None]:
def preprocess_single_text(text):
    nlp = RUSSIAN_MODEL
    if len(text.split()) < 15 or len(text) > 5000:
        lang = "unknown"
        processed_text = text
    else:
        try:
            lang = langdetect.detect(" ".join(text.split(" ")[:15]))
            if lang == RUSSIAN_CODE:
                nlp = RUSSIAN_MODEL
            elif lang == POLISH_CODE:
                nlp = POLISH_MODEL
            elif lang == ENGLISH_CODE:
                nlp = ENGLISH_MODEL

            processed_text = " ".join([token.lemma_ for token in nlp(text)])
        except NotValidLength as ntl:
            lang = "unknown"
            processed_text = text

        except langdetect.lang_detect_exception.LangDetectException:

            translator = GoogleTranslator()
            translation = translator.translate(text, target='en')
            if translation is not None:
                lang = 'translated'
                nlp = spacy.load('en_core_web_sm')
                processed_text = " ".join([token.lemma_ for token in nlp(translation)])
            else:

                lang = 'unknown'
                processed_text = text

    return {
        'text': processed_text,
        'language': lang
    }

In [None]:

def lemmatize(df):
    df = pd.DataFrame(df)
    df.columns = header_text[:10]
    if len(df['text']) > 1000:
        chunk_size = 500
        total_rows = len(df['text'])
        num_chunks = total_rows // chunk_size

        for i in tqdm(range(num_chunks + 1)):
            start_idx = i * chunk_size
            end_idx = min((i + 1) * chunk_size, total_rows)
            process_chunk_and_save(df[start_idx:end_idx], i)
    else:
        process_chunk_and_save(df, 1)

In [None]:
def process_chunk_and_save(chunk_df, i):
    processed_chunk = []

    for text in chunk_df['text']:
        result = preprocess_single_text(text)
        processed_chunk.append(result)

    processed_df = pd.DataFrame(processed_chunk)
    processed_df = processed_df.reset_index(drop=True)
    chunk_df = chunk_df.drop(columns='text')
    processed_df = pd.concat([chunk_df, processed_df], axis=1)
    print("processed_df_len", len(processed_df))
    processed_df.to_csv(os.path.join(PROCESSED_DATA_PATH, f'preprocessed_data_{i}.csv'),
                        encoding='utf-8-sig',
                        index=False, header=False, errors='ignore')


In [None]:
def preprocess_files():
    if not os.path.exists(check_data_preprocessed):
        file_list = os.listdir(SEPARATED_DATA_PATH)

        for i in tqdm(range(len(file_list))):
            file_name = file_list[i]
            file_path = os.path.join(SEPARATED_DATA_PATH, file_name)
            df = replacing_commas(file_path)
            df = separating_colummns(df)
            df = grouping_by(df)
            df = duration_processing(df)
            df = total_chats_count(df)
            df = total_recipients_count(df)
            dfc = concatenate(df)
    with open(r"pickle.pkl", "rb") as f:
        dfc = pickle.load(f)

    dfc = pd.DataFrame(dfc)
    df = lemmatize(dfc)

    return df


In [None]:
#since i have "duration" column. I no longer need this columns here. Arguably i need to take this function before saving to pickle. 
def drop_columns(df):
    columns_to_drop = ['newest_message', 'oldest_message']
    columns_to_drop_existing = []

    for col in columns_to_drop:
        if col in df.columns:
            columns_to_drop_existing.append(col)

    if columns_to_drop_existing:
        df = df.drop(columns=columns_to_drop_existing)

    return df



In [None]:
#dropping columns to create dataframe with only numbers
#TODO: exclude ownerid
def numerical(df):
    numerical_df = df.drop(columns=['text', 'label'])
    return numerical_df


In [4]:
#using MinMaxScaler in order to normalize numerical data to one scale. There are other ways to normalize numerical data: Z-normalize, Log Transform, Box-Cox transformation, Max Abs and other all used in a different scenarios. In scenario like this, simple classification, the coise should be around MinMaxScaler and Z-normalize. Z-normalize assumes normal distribution of a data and helps to fix outliers. I don't want to suggest the normality of the data, so that's why not using Z-normalization.
def normalize_numerical(df):
    df = df.drop(columns=['duration', 'language', 'owner'])
    scaler = MinMaxScaler()
    df = df.replace([np.inf, -np.inf], np.nan)
    dft = scaler.fit_transform(df)
    dft = pd.DataFrame(dft, columns=df.columns)
    return dft

In [None]:
#I was interested in a most common words used by different groups of users. 
def text_df_count(df):
    text_spam = df[df['label'] == 'blocked']['text']
    text_normal = df[df['label'] == 'regular']['text']

    blocked_dict = {}
    normal_dict = {}

    for text in text_spam:
        words = text.split()
        for word in words:
            blocked_dict[word] = blocked_dict.get(word, 0) + 1

    for text in text_normal:
        words = text.split()
        for word in words:
            normal_dict[word] = normal_dict.get(word, 0) + 1

    common_words = set(blocked_dict.keys()) & set(normal_dict.keys())
    common_words = pd.DataFrame(common_words)

    blocked_dict = {word: freq for word, freq in blocked_dict.items() if word not in common_words}
    normal_dict = {word: freq for word, freq in normal_dict.items() if word not in common_words}

    blocked_df = pd.DataFrame(list(blocked_dict.items()), columns=['word', 'spam_frequency'])
    normal_df = pd.DataFrame(list(normal_dict.items()), columns=['word', 'normal_frequency'])

    count_df = pd.concat([blocked_df, normal_df], axis=1)
    count_df.to_csv('word_frequencies.csv', index=False, encoding='utf-8-sig')
    common_words.to_csv('common_words.csv', index=False, encoding='utf-8-sig')
    # return common_words

In [None]:
#vectorizing process of the "text" 
def vectorizing_vectorizer(df):
    vectorizer = TfidfVectorizer(stop_words='english')
    vectors = vectorizer.fit_transform((df['text']))
    features_df = pd.DataFrame(vectors.todense(), columns=vectorizer.get_feature_names_out())
    return features_df


In [None]:
#, separate and normalize numerical features, perform tfidf on non-numerical data and save to parquet. 
def prepare_dfs_for_train(df):
    def prepare_dfs_for_train(df):
    fraction_to_sample = 0.5
    df = df.sample(frac=fraction_to_sample, random_state=1)
    number_df = label_dummy_coding(drop_columns(df))
    numerical_df = numerical(number_df)
    normalized_df = normalize_numerical(numerical_df)
    tfidf_df = vectorizing_vectorizer(number_df)
    features_df = pd.concat([normalized_df, tfidf_df], axis=1, ignore_index=True)
    features_path = os.path.join(NORMALIZED_DATA_PATH, f"normalized_features.parquet")
    lable_df_path = os.path.join(NORMALIZED_DATA_PATH, f"lable_df.parquet")
    features_df.to_parquet(features_path, compression='gzip')
    label_df = number_df.filter(["id", "label"])
    label_df.to_parquet(lable_df_path, compression='gzip')
    return features_df, label_df

In [None]:
#this piece of code takes only 100 most influential features of the high dimensional (due to tf_idf) datatable. Due to decrease run time
def perform_pca(X_sparse, n_components=100):
    svd = TruncatedSVD(n_components=n_components)
    X_reduced = svd.fit_transform(X_sparse)
    return csr_matrix(X_reduced)

In [None]:
from sklearn.metrics import classification_report


#This block of code trains models in a loop. The models suggested by the best performers for the spam classification task. Each model is trained 100 times, and than on a mean classification report and matrices the decision of which model to save is made
def train_files(data_for_models, lable_df):
    X_sparse = csr_matrix(data_for_models)
    X_sparse = perform_pca(X_sparse)
    X_train, X_test, y_train, y_test = train_test_split(X_sparse, lable_df.iloc[:, 0], test_size=0.15,
                                                        random_state=111)

    models_list = {'Logistic Regression': LogisticRegression(), 'Decision Tree': DecisionTreeClassifier(),
                   'Random Forest': RandomForestClassifier(), 'SVC': SVC()}
    pred_scores_word_vectors = {}
    results = {}
    from sklearn.model_selection import StratifiedKFold
    n_splits = 100
    skf = StratifiedKFold(n_splits=n_splits, shuffle=True)  # чи треба тут random state

    for name, classifier in tqdm(models_list.items()):
        accuracy_scores = []
        classification_reports = []
        confusion_matrices = []
        y_preds = []

        for train_idx, test_idx in tqdm(skf.split(X_sparse, label_df['label'])):
            X_train, X_test = X_sparse[train_idx], X_sparse[test_idx]
            y_train, y_test = label_df['label'].iloc[train_idx], label_df['label'].iloc[test_idx]

            classifier.fit(X_train, y_train)
            y_pred = classifier.predict(X_test)
            y_preds.append(y_pred)

            accuracy = accuracy_score(y_test, y_pred)
            pred_scores_word_vectors[name] = accuracy
            accuracy_scores.append(accuracy)

            classification_rep = classification_report(y_test, y_pred,
                                                       target_names=['class_0_normal', 'class_1_spammers'])
            #results[name]['Classification Report'] = classification_rep
            classification_reports.append(classification_rep)

            cm = confusion_matrix(y_test, y_pred)
            confusion_matrices.append(cm)

        mean_y_pred = np.concatenate(y_preds)
        mean_accuracy = sum(accuracy_scores) / n_splits
        mean_confusion_matrix = np.mean(confusion_matrices, axis=0)
        f, ax = plt.subplots(figsize=(5, 5))
        sns.heatmap(mean_confusion_matrix, annot=True, linewidths=0.5, linecolor="red", fmt=".0f", ax=ax)
        plt.xlabel("y_pred")
        plt.ylabel("y_true")
        plt.title(f"Confusion Matrix - {name} - {mean_accuracy}")
        plt.savefig(f"Confusion Matrix - {name} - {mean_accuracy}.png")
        plt.close()

        mean_classification_report = classification_report(y_test, y_pred,
                                                           target_names=['class_0_normal', 'class_1_spammers'])

        report_file_path = f'classification_report_{name}.txt'

        # Open the file in write mode and write the classification report content
        with open(report_file_path, 'w') as report_file:
            report_file.write(mean_classification_report)

    best_clf_key = max(pred_scores_word_vectors, key=lambda k: pred_scores_word_vectors[k])
    best_clf_value = models_list.get(best_clf_key)
    # explainer = shap.Explainer(best_clf_value, X_train)
    # shap_values = explainer.shap_values(X_test)
    # shap.summary_plot(shap_values, X_test)
    # print(shap.summary_plot(shap_values, X_test))
    BEST_CLF = best_clf_value
    print(best_clf_key, best_clf_value)
    filename = f'{BEST_CLF}.pkl'
    with open(filename, 'wb') as file:
        pickle.dump(BEST_CLF, file)

    return results

In [None]:
#This block of code transforms "normalized_features"parquet to dataframe and separates it to "fearutes" abd "lable" df
def prepare_data_for_train(dataframe):
    features_df, label_df = prepare_dfs_for_train(dataframe)
    features_path = os.path.join(ROOT_PATH, f"normalized_features.parquet")
    table = pq.read_table("normalized_features.parquet")
    features_df = table.to_pandas()
    return features_df, label_df

In [None]:
#this code start the program in an iterative way
#1.It checks whether the specified files are already existed, and if so or some of them so, the code starts from the block of code which produced first not existed file/info and continue to run using the existed data
#2. First block separated initial files 
#3. Second block processes files (perform all initial cleaning and transformations.
#4. Third block takes normalized data, concatenate it than splits it to train,test data, trains, tests and creates matrices and  
if __name__ == '__main__':
    files_list = []
    dataframes = pd.DataFrame()
    check_raw_data_separated = os.path.join(SEPARATED_DATA_PATH, "blocked_0.parquet")
    check_data_preprocessed = os.path.join(ROOT_PATH, f"pickle.pkl")
    check_data_lemmatized = os.path.join(PROCESSED_DATA_PATH, "preprocessed_data_1.csv")
    check_data_normalized = os.path.join(ROOT_PATH, "lable_df.parquet")
    check_data_trained = os.path.join(PROCESSED_DATA_PATH, f"*.png")

    if not os.path.exists(check_raw_data_separated):
        file_names_list = ["blocked_user.csv", "regular_user.csv"]
        list_of_all = []
        for file_name in file_names_list:
            file_path = os.path.join(RAW_DATA_PATH, file_name)
            list_of_all.append(file_path)
            files_separation(file_path)

    if not os.path.exists(check_data_lemmatized):
        preprocess_files()

    if not os.path.exists(check_data_trained):
        if not os.path.exists(check_data_normalized):
            file_list = os.listdir(PROCESSED_DATA_PATH)
            print(len(file_list))
            dataframes = pd.DataFrame()
            for i in tqdm(range(len(file_list))):
                file_name = file_list[i]
                file_path = os.path.join(PROCESSED_DATA_PATH, file_name)
                df = pd.read_csv(file_path, encoding='utf=8=sig')
                df.columns = ["owner", "label", "total_recipients", "total_chats", "newest_message", "oldest_message",
                              "duration", 'chats_per_day', 'recipients_per_day', "text", "language"]
                dataframes = pd.concat([dataframes, df], axis=0, ignore_index=True)
                dataframes.columns = ["owner", "label", "total_recipients", "total_chats", "newest_message",
                                      "oldest_message", "duration", 'chats_per_day', 'recipients_per_day', "text",
                                      "language"]
             
            features_df, label_df = prepare_data_for_train(dataframes)
            trained_data = train_files(features_df, label_df)
        else:
            features_path = os.path.join(ROOT_PATH, "normalized_features.parquet")
            pf = fastparquet.ParquetFile("normalized_features.parquet")
            table = pq.read_table("normalized_features.parquet")
            features_df = table.to_pandas()
            # features_df = fastparquet.ParquetFile("normalized_features.parquet").to_pandas()
            label_df_path = os.path.join(ROOT_PATH, "label_df.parquet")
            label_df = fastparquet.ParquetFile("label_df.parquet").to_pandas()
            trained_data = train_files(features_df, label_df)