# Logistic Regression Model With Personal Data
## Includes Data Pre-Processing

In [None]:
import time

import findspark
findspark.init()
import pyspark
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, StringIndexer, CountVectorizer, NGram, VectorAssembler, ChiSqSelector
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

from pyspark.ml import Transformer
from pyspark.sql import DataFrame
from pyspark.ml.feature import StopWordsRemover
import re

## Context Variables and Dataset Loading

In [None]:
spark1 = SparkSession.builder\
            .master("local[16]")\
            .appName("TOT") \
            .getOrCreate()

In [None]:
path = "../resources/training_noemoticon.csv"

schema = StructType([
    StructField("target", IntegerType(), True),
    StructField("id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("query", StringType(), True),
    StructField("author", StringType(), True),
    StructField("tweet", StringType(), True)])

df = spark1.read.csv(path,
                     inferSchema=True,
                     header=False,
                     schema=schema)
df.dropna()

In [None]:
"""
Inheriting Class from PySpark Transformer in order to be usable in Pipeline or with a classname.transform(dataset)

__init__ :
initialize the column names, the regex allowing to filter the URLs and other Unwanted Characters


 _transform:
 apply the transformation on the given Dataset

 Attention : The Process is not well optimized and will take longer to complete
 but it will improve the accuracy
"""
class WordFormatter(Transformer):
    def __init__(self, *, inputCol, outputCol):

        super(WordFormatter, self).__init__()
        self.regpat = re.compile(r'(https|http)?:\/\/(\w|\.|\/|\?|\=|\&|\%)*\b')
        self.ponc = [',', '.', '?', '-']
        self.inputCol = inputCol
        self.outputCol = outputCol

    def stopw(self, wt):

        filtered_sentence = []

        # pour tout les mots
        for w in wt["filtered"]:
            wrd = w.strip().lower()

            # on vérifie si la ponctuation non désirée est présente, on l'enlève si oui
            for char in self.ponc:
                wrd = wrd.replace(char, '')

            # on ne garde pas les mots avec un URL, mot vide, mot ' ' ou des mots avec '@'
            if not re.fullmatch(self.regpat, wrd) and wrd != ' ' and wrd != '' and '@' not in wrd:
                filtered_sentence.append(wrd)

        # renvoie la ligne telle qu'elle était mais avec notre liste en plus
        return wt[0], wt[1], wt[2], wt[3], wt[4], wt[5], wt[6], filtered_sentence

    def _transform(self, dtf: DataFrame) -> DataFrame:

        # applique le stop words de PySpark
        rem = StopWordsRemover(inputCol=self.inputCol, outputCol="filtered")
        ndtf = rem.transform(dtf)

        # applique notre fonction en plus pour mieux filtrer
        rdd = ndtf.rdd.map(lambda x: self.stopw(x))
        ndtf = rdd.toDF()

        # remets les noms de colonne de départ
        official_col = ['target', 'id', 'date', 'query', 'author', 'tweet', self.inputCol, self.outputCol]

        for old, new in zip(ndtf.columns, official_col):
            ndtf = ndtf.withColumnRenamed(old, new)

        return ndtf

In [None]:
(train_set, test_set) = df.randomSplit([0.80, 0.20], seed = 2000)

## HashingTF - IDF (Default Parameters)

In [None]:
tokenizer = Tokenizer(inputCol="tweet", outputCol="tk")
wordform = WordFormatter(inputCol="tk", outputCol="words")
hashtf = HashingTF(inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features")

label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

lr = LogisticRegression()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

pipeline = Pipeline(stages=[tokenizer, wordform, hashtf, idf, label_stringIdx, lr])

In [None]:
%%time
pipelineFit = pipeline.fit(train_set)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

## HashingTF - IDF (Custom Parameters)

In [None]:
tokenizer = Tokenizer(inputCol="tweet", outputCol="tk")
wordform = WordFormatter(inputCol="tk", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

lr = LogisticRegression()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

pipeline = Pipeline(stages=[tokenizer, wordform, hashtf, idf, label_stringIdx, lr])

In [None]:
%%time
pipelineFit = pipeline.fit(train_set)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

## CountVectorizer - IDF (Dafault Parameters)

In [None]:
tokenizer = Tokenizer(inputCol="tweet", outputCol="tk")
wordform = WordFormatter(inputCol="tk", outputCol="words")
cv = CountVectorizer(inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features")

label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

lr = LogisticRegression()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

pipeline = Pipeline(stages=[tokenizer, wordform, cv, idf, label_stringIdx, lr])

In [None]:
%%time
pipelineFit = pipeline.fit(train_set)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

## CountVectorizer - IDF (Custom Parameters)

In [None]:
tokenizer = Tokenizer(inputCol="tweet", outputCol="tk")
wordform = WordFormatter(inputCol="tk", outputCol="words")
cv = CountVectorizer(vocabSize=2**16, inputCol="words", outputCol='cv')
idf = IDF(inputCol='cv', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")

lr = LogisticRegression()
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

pipeline = Pipeline(stages=[tokenizer, wordform, cv, idf, label_stringIdx, lr])

In [None]:
%%time
pipelineFit = pipeline.fit(train_set)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)

# CountVectorizer + NGram + ChisQSelector

In [None]:
def build_trigrams(inputCol=["tweet","target"], n=3):
    
    tokenizer = [Tokenizer(inputCol="tweet", outputCol="tk")]
    wordform = [WordFormatter(inputCol="tk", outputCol="words")]

    ngrams = [
        NGram(n=i, inputCol="words", outputCol="{0}_grams".format(i))
        for i in range(1, n + 1)
    ]

    cv = [
        CountVectorizer(vocabSize=2**14,inputCol="{0}_grams".format(i),
            outputCol="{0}_tf".format(i))
        for i in range(1, n + 1)
    ]
    
    idf = [IDF(inputCol="{0}_tf".format(i), outputCol="{0}_tfidf".format(i), minDocFreq=5) for i in range(1, n + 1)]

    assembler = [VectorAssembler(
        inputCols=["{0}_tfidf".format(i) for i in range(1, n + 1)],
        outputCol="rawFeatures"
    )]
    
    label_stringIdx = [StringIndexer(inputCol = "target", outputCol = "label")]
    
    selector = [ChiSqSelector(numTopFeatures=2**14,featuresCol='rawFeatures', outputCol="features")]
    
    lr = [LogisticRegression()]
    
    return Pipeline(stages=tokenizer + wordform + ngrams + cv + idf + assembler + label_stringIdx + selector + lr)

In [None]:
%%time
pipelineFit = build_trigrams().fit(train_set)

predictions = pipelineFit.transform(test_set)

accuracy = evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"})
precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})

# Print the results
print("Accuracy:", accuracy)
print("Precision:", precision)
print("Recall:", recall)