In [1]:
from __future__ import unicode_literals
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from bidi.algorithm import get_display
from arabic_reshaper import reshape
from hazm import Normalizer, Stemmer, word_tokenize
from sklearn.metrics import classification_report
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.layers import Dense, Input, Flatten, Dropout, Add
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Embedding
from keras.layers.merge import concatenate, add
from sklearn.preprocessing import OneHotEncoder

In [2]:
class DataPreprocessor:
    
#     train_data = pd.DataFrame()
#     test_data = pd.DataFrame()

    train_data = pd.read_pickle('train_dataframe')
    test_data = pd.read_pickle('test_dataframe')

    stop_words = list()

    n_tokens = 0
#     n_tokens = 3222248

    frequencies = {}
    wordToIndex = {}
    indexToWord = {}
    index = 0

    def __init__(self, address='./data/'):
#         self.frequencies = self.open_dic()
#         self.read_data(address)
#         self.plot_distribution()
#         self.clean_text()
        self.count_words()
        self.map_word_index()


    def read_data(self, address):
        self.train_data = pd.read_csv(address + 'train.csv', sep='\t', error_bad_lines= False , encoding= 'utf-8')
        self.train_data.drop(self.train_data[self.train_data.category=='category'].index, axis=0, inplace=True)
        self.train_data.dropna(subset=['text'], inplace=True)
        self.test_data = pd.read_csv(address + 'test.csv', sep='\t', error_bad_lines= False , encoding= 'utf-8')
        self.test_data.drop(self.test_data[self.test_data.category=='category'].index, axis=0, inplace=True)
        self.test_data.dropna(subset=['text'], inplace=True)
        with open(address + 'Stop_words.txt', encoding="utf8") as f:
            self.stop_words = f.read().splitlines()
        norm = Normalizer()
        self.stop_words = [norm.normalize(i) for i in self.stop_words]
#         print(self.stop_words)
#         print('-------------------------------------------------------------')
#         print(self.train_data.info())
#         print('-------------------------------------------------------------')
#         print(self.test_data.info())

    def draw_plot(self, labels, counts, fname):
        font = {"family": "B Nazanin", "size": 15}
        plt.rc("font", **font)

        persian_labels = [get_display(reshape(label)) for label in labels]
        fig = plt.figure(figsize = (15, 10))
        plt.bar(persian_labels, counts, width = 0.7, color='darkblue')

        for index,data in enumerate(counts):
            plt.text(x=index, y =data+1, s=f"{data}", color='darkgreen', fontdict=dict(fontsize=13))

        plt.savefig(fname, dpi=200)

    def plot_distribution(self):
        labels, counts = np.unique(self.train_data['category'], return_counts=True)
        self.draw_plot(labels, counts, 'train.png')
        print('-------------------------------------------------------------')
        test_labels, test_counts = np.unique(self.test_data['category'], return_counts=True)
        self.draw_plot(test_labels, test_counts, 'test.png')
    
    def del_stop_words(self, words):
        return [word for word in words if word not in self.stop_words]
    
    def remove_none_alpha(self, d):
        persian_chars = u'\u200c ‌آابپتثجچحخدذرزژسشصضطظعغفقکگلمنوهی۰۱۲۳۴۵۶۷۸۹.؟?0123456789'
        dd = ''
        d = d.replace('\n', ' ')
        for c in d:
            if c in persian_chars:
                dd += c

        return dd
    
    def replace_digits(self, d):
        persian_digits = u'0123456789۱۲۳۴۵۶۷۸۹۰'
        for c in d:
            if c in persian_digits:
                d = d.replace(c, 'N')
        while 'NN' in d:
            d = d.replace('NN', 'N')
        while 'N.N' in d:
            d = d.replace('N.N', 'N')

        return d
    
    def dfstemmer(self, tokens):
        stemmer = Stemmer()
        
        return [stemmer.stem(l) for l in tokens]

    def clean_text(self):
        self.train_data.drop(self.train_data[self.train_data.category=='category'].index, axis=0, inplace=True)
        self.train_data.dropna(subset=['text'], inplace=True)
        self.train_data = self.train_data[['category', 'text']]
        normalizer = Normalizer()
        self.train_data['text'] = self.train_data['text'].apply(normalizer.normalize)
        self.train_data['text'] = self.train_data['text'].apply(self.remove_none_alpha)
        self.train_data['text'] = self.train_data['text'].apply(self.replace_digits)
        self.train_data['text'] = self.train_data['text'].apply(word_tokenize)
        self.train_data['text'] = self.train_data['text'].apply(self.del_stop_words)
        self.train_data['text'] = self.train_data['text'].apply(self.dfstemmer)
        self.train_data.reset_index(drop=True, inplace=True)
        self.train_data.to_pickle('train_dataframe')
#         print(self.train_data['text'][1])
        
        self.test_data.drop(self.test_data[self.test_data.category=='category'].index, axis=0, inplace=True)
        self.test_data.dropna(subset=['text'], inplace=True)
        self.test_data = self.test_data[['category', 'text']]
        self.test_data['text'] = self.test_data['text'].apply(normalizer.normalize)
        self.test_data['text'] = self.test_data['text'].apply(self.remove_none_alpha)
        self.test_data['text'] = self.test_data['text'].apply(self.replace_digits)
        self.test_data['text'] = self.test_data['text'].apply(word_tokenize)
        self.test_data['text'] = self.test_data['text'].apply(self.del_stop_words)
        self.test_data['text'] = self.test_data['text'].apply(self.dfstemmer)
        self.test_data.reset_index(drop=True, inplace=True)
        self.test_data.to_pickle('test_dataframe')
#         print(self.test_data['text'][1])

    def save_dict(self, d, n=200):
        f_out = open("frequent.txt", "w", encoding="utf-8")
        for item, count in list(d.items())[:n]:
            f_out.write(f'{item}, {count}\n')

    def freq(self, tokens):
        for t in tokens:
            self.n_tokens += 1
            if t in self.frequencies:
                self.frequencies[t] += 1
            else:
                self.frequencies[t] = 1

    def count_words(self):
        self.train_data['text'].map(self.freq)
        self.frequencies = dict(sorted(self.frequencies.items(), key=lambda item: item[1], reverse=True))
        self.save_dict(self.frequencies)

    def map_word_index(self):
        self.wordToIndex = {k: v + 1 for v, k in enumerate(list(self.frequencies.keys()))}
        self.indexToWord = {v + 1: k for v, k in enumerate(list(self.frequencies.keys()))}

    def tokenize(self, tokens):
        return [self.wordToIndex[i] for i in tokens]
    
    def open_dic(self):
        d = {}
        with open("frequent_list.txt", encoding="utf8") as f:
            for line in f:
                (key, val) = line.split(',')
                d[key] = val
        
        return d

In [3]:
class CNNClassifier:
    
    train_data = pd.read_pickle('train_dataframe')
    test_data = pd.read_pickle('test_dataframe')
    X_train_df = None
    y_train_df = None
    X_test_df = None
    y_test_df = None
    model = None
    y_pred = None
    result = None

    def __init__(self):
        self.train_data['text'] = self.train_data['text'].apply(self.convert)
        self.test_data['text'] = self.test_data['text'].apply(self.convert)
        self.vectorize()
        self.define_model()
        self.train_model()
        self.predict()
        self.evaluate()
    
    def vectorize(self):
        lb = LabelEncoder()
        self.y_train_df = pd.DataFrame(to_categorical(lb.fit_transform(self.train_data['category'])))
        self.y_test_df = pd.DataFrame(to_categorical(lb.fit_transform(self.test_data['category'])))
        L = [len(i) for i in self.train_data['text']]
        self.X_train_df = pd.DataFrame(pad_sequences(self.train_data['text'], maxlen=int(sum(L)/len(L))+1, dtype='int32'))
        self.X_test_df = pd.DataFrame(pad_sequences(self.test_data['text'], maxlen=int(sum(L)/len(L))+1, dtype='int32'))
        
    def define_model(self):
        num_words = len(d.wordToIndex) + 1
        embedding_dim = 128
        max_sequence_length = 185
        labels_index = 10
        
        embedding_layer = Embedding(num_words, embedding_dim, input_length=max_sequence_length)
        sequence_input = Input(shape=(max_sequence_length,))
        embedded_sequences = embedding_layer(sequence_input)
        convs = []
        filter_sizes = [3, 4, 5, 6, 7]
        
        for filter_size in filter_sizes:
            l_conv = Conv1D(filters=3, kernel_size=filter_size, activation='relu')(embedded_sequences)
            l_pool = MaxPooling1D(pool_size=10)(l_conv)
            convs.append(l_pool)
        
#         l_merge = concatenate(convs, axis=1)
        
        conv = Conv1D(filters=128, kernel_size=3, activation='relu')(embedded_sequences)
        pool = MaxPooling1D(pool_size=3)(conv)
        
#         if extra_conv==True:
#             x = Dropout(0.5)(l_merge)  
#         else:
        x = Dropout(0.5)(pool)
        x = Flatten()(x)
        x = Dense(128, activation='relu')(x)
        x = Dropout(0.5)(x)
        preds = Dense(labels_index, activation='softmax')(x)
        
        self.model = Model(sequence_input, preds)
        
        self.model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])

    def train_model(self):
        early_stopping = EarlyStopping(monitor='val_loss', min_delta=0.01, patience=4, verbose=1)
        callbacks_list = [early_stopping]
        hist = self.model.fit(self.X_train_df, self.y_train_df, epochs=10, callbacks=callbacks_list, validation_split=0.1, shuffle=True, batch_size = 256)

    def predict(self):
        self.y_pred = self.model.predict(self.X_test_df)
    
    def evaluate(self):
        self.result = self.model.evaluate(self.X_test_df, self.y_test_df)
        rounded_labels = np.argmax(np.array(self.y_test_df), axis=1)
        rounded_pred = np.argmax(self.y_pred,axis = 1)
        report = classification_report(rounded_labels,rounded_pred)
        print(report)

    def convert(self, text):
        return np.asarray([d.wordToIndex.get(text[i], 0) for i in range(len(text))])

In [4]:
if __name__ == '__main__':
    d = DataPreprocessor()
    c = CNNClassifier()

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 00006: early stopping
              precision    recall  f1-score   support

           0       0.88      0.87      0.88      2002
           1       0.91      0.89      0.90      1745
           2       0.94      0.97      0.95      4105
           3       0.92      0.86      0.89      2473
           4       0.89      0.94      0.91      1821
           5       0.92      0.95      0.93      2191
           6       0.74      0.76      0.75       339
           7       0.49      0.38      0.43        73
           8       0.53      0.49      0.51       239
           9       0.98      0.94      0.96      2506

    accuracy                           0.91     17494
   macro avg       0.82      0.81      0.81     17494
weighted avg       0.91      0.91      0.91     17494

