In [1]:
import findspark
findspark.init()

import pyspark

from pyspark.sql import SparkSession
from pyspark.sql import *
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark import ml
from nltk.tokenize import TweetTokenizer
import config
import os
from itertools import product
import plotly.express as px
import numpy as np
import pandas as pd
from custom_transformer import EmojiExtractor
import importlib
import string

spark = SparkSession.builder.appName("Train SVM")\
    .config("spark.executor.memory","4g")\
    .config("spark.driver.memory","5g")\
    .master('local[24]')\
    .getOrCreate()

In [2]:
config = importlib.reload(config)

In [3]:
field_names = ['label','text','rand']
field_types = [T.IntegerType(), T.StringType(), T.FloatType()]

training_schema = T.StructType([
    T.StructField(field_name, field_type, True) for field_name, field_type in zip(field_names, field_types)
])

In [4]:
train_data = spark.read.load(os.path.join(config.DATA_DIR, 'emojified_train.csv'), format = 'csv', sep = ',', 
                            schema = training_schema, header = True).drop('rand')

train_data = train_data.filter(train_data['label'] <= 1)

In [5]:
with open('bigram_words.txt', 'r') as f:
    bigram_words = [line.strip() for line in f.readlines()]

tokenize_str = r'((?:{})\s)?([\w\']+|\$[\d\.]+|\S+)'.format('|'.join(bigram_words))
all_stopwords = ml.feature.StopWordsRemover.loadDefaultStopWords('english')
all_stopwords.extend(['http','https'])

In [7]:
emoji_extractor = EmojiExtractor(inputCol = 'text', outputCol = 'emojis')

sql_filter = ml.feature.SQLTransformer(statement = 
        "SELECT *, REGEXP_REPLACE(text, {}, \"\") AS cleaned_text FROM __THIS__"\
        .format("\"" + config.EMOJI_PATTERN[:-1] + config.REMOVE_PATTERN +"\""))

tokenizer = ml.feature.RegexTokenizer(inputCol = 'cleaned_text', 
                    outputCol = 'tokenized', pattern = tokenize_str, gaps = False)
#tokenizer = ml.feature.Tokenizer(inputCol = 'cleaned_text', outputCol = 'tokenized')

stopword_remover = ml.feature.StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                                               outputCol='filtered', stopWords=all_stopwords)

sql_assembler = ml.feature.SQLTransformer(statement = 
        "SELECT *, CONCAT(filtered, emojis) AS bag FROM __THIS__")

counter = ml.feature.CountVectorizer(minDF = 20, maxDF = 75000, vocabSize = 50000, 
                        inputCol = 'bag', outputCol = 'counts')

normalizer = ml.feature.Normalizer(p = 1.0, inputCol = counter.getOutputCol(), outputCol = 'tf_normalized')

df_normalizer = ml.feature.IDF(inputCol=normalizer.getOutputCol(), outputCol='features')

preprocessing_pipe = ml.Pipeline(stages=[
    emoji_extractor, sql_filter, tokenizer, stopword_remover, 
    sql_assembler, counter, normalizer, df_normalizer
]).fit(train_data)

In [8]:
featurized_data = preprocessing_pipe.transform(train_data)
featurized_data = featurized_data.select('features','label')
featurized_data.persist()

featurized_data.count(), featurized_data.groupby('label').count().show()

+-----+------+
|label| count|
+-----+------+
|    1|792059|
|    0|792012|
+-----+------+



(1584071, None)

In [9]:
regresser = ml.classification.LogisticRegression(maxIter = 100, featuresCol='features', labelCol= 'label')

paramGrid = ml.tuning.ParamGridBuilder().addGrid(regresser.regParam, [0.1,0.01,0.001])\
    .addGrid(regresser.elasticNetParam, [1.0, 0.5, 0.0]).build()
evaluator = ml.evaluation.BinaryClassificationEvaluator()
tvs = ml.tuning.TrainValidationSplit(estimator=regresser, estimatorParamMaps=paramGrid, evaluator=evaluator, trainRatio=0.95)
trained_regressor = tvs.fit(featurized_data).bestModel

In [10]:
test_data = spark.read.load(os.path.join(config.DATA_DIR, 'emojified_test.csv'), format = 'csv', sep = ',', 
                            schema = training_schema, header = True).drop('rand')

In [11]:
pipeline = ml.PipelineModel(stages = [preprocessing_pipe, trained_regressor])

predictions = pipeline.transform(test_data)

evaluator = ml.evaluation.BinaryClassificationEvaluator(labelCol='label', rawPredictionCol ='rawPrediction')
auc = evaluator.evaluate(predictions)

auc

0.8853303882281052

In [12]:
pipeline.save('LR_with_emoji_pipeline')