# importing libraries

In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import SparseVector
from pyspark.sql import SparkSession
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.classification import FMClassifier
import nltk
from pathlib import Path
from pyspark.sql.functions import udf

from nltk.stem import WordNetLemmatizer

# data manipulation

# splitting dataset 

In [2]:
def data_splitting(data):
   # data=data.limit(10000)
    data=data.replace(4,1)
    dividedData = data.randomSplit([0.7, 0.3]) 
    trainingData = dividedData[0] #index 0 = data training
    testingData = dividedData[1] #index 1 = data testing
    train_rows = trainingData.count()
    test_rows = testingData.count()
    print ("Training data rows:", train_rows, "; Testing data rows:", test_rows)
    return trainingData,testingData


# loading dataset

In [3]:
def data_loading(datadir):
    #reading data from directory 
    tweets=spark.read.csv(datadir,inferSchema=True, header=False)
    #setting meaningful column name
    tweets.createOrReplaceTempView("tweets")
    tweets=spark.sql("select _c0 as label, _c1 as ID, _c2 as value ,_c3 as flag ,_c4 as user,_c5 as SentimentText from tweets")
    #selecting relevent columns   
    data = tweets.select("SentimentText", 'Label')
    return data

# data preprocessing

In [4]:
def data_preprocessing(input_col_tockenizer,output_col_tockenizer,stop_word_column,tokenizedTrain):
    tokenizer = Tokenizer(inputCol=input_col_tockenizer,outputCol=output_col_tockenizer)
    tokenizedTrain = tokenizer.transform(tokenizedTrain)
    stopwords_remover = StopWordsRemover(inputCol=output_col_tockenizer,outputCol=stop_word_column)
    SwRemovedTrain = stopwords_remover.transform(tokenizedTrain)
    hashTF = HashingTF(inputCol=stopwords_remover.getOutputCol(), outputCol="features")
    numericTrainData = hashTF.transform(SwRemovedTrain).select('label', 'MeaningfulWords', 'features')

    return numericTrainData

# machine learning models

# naive bayes model

In [5]:
def best_naivebayes_model():
    nb = NaiveBayes(modelType="multinomial")
    nbparamGrid = (ParamGridBuilder().addGrid(nb.smoothing, [0.0, 0.2, 0.4, 0.6, 0.8, 1.0]).build())

    eval_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")
    nbcv = CrossValidator(estimator = nb,estimatorParamMaps = nbparamGrid,
                        evaluator = eval_auc,
                        numFolds = 5)
    return nbcv


# gradient booster model

In [6]:
def gradient_boosted_classifier(maxIter):
    gbtr = GBTRegressor(featuresCol='features', labelCol='label', maxIter=maxIter)
    return gbtr

# logistic regression model 

In [7]:
def logisitic_regression():
    lr = LogisticRegression(labelCol="label", featuresCol="features")
    grid = ParamGridBuilder().addGrid(lr.maxIter, [1,20,30,1000]) \
                                .addGrid(lr.regParam, [0.1,0.01]) \
                                .addGrid(lr.elasticNetParam, [1]) \
                                .build()
    lr_cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, \
                            evaluator=eval_auc, numFolds=10)
    return lr_cv


# fm classifier model

In [8]:
def fmclassifier():
    fm = FMClassifier(labelCol="label", featuresCol="features")
    grid = ParamGridBuilder().addGrid(fm.maxIter, [1,20,30]) \
                                .addGrid(fm.regParam, [0.1,0.01]) \
                                 \
                                .build()
    fm_class = CrossValidator(estimator=fm, estimatorParamMaps=grid, \
                            evaluator=eval_auc, numFolds=10)

    return fm_class

# test data preprocessing

In [9]:
def preprocessing_for_testing(data):
    data.createOrReplaceTempView("tweets")
    tweets=spark.sql("select _c0 as Label, _c1 as SentimentText from tweets")
    #selecting relevent columns   
    data = tweets.select("SentimentText", 'Label')
    return data 

# start spark session

In [10]:
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "15g") \
    .appName('my-cool-app') \
    .getOrCreate()

# loading dataset

In [11]:
datadir=r'C:\Users\ahmed hatem\Downloads\archive (2)\training.1600000.processed.noemoticon.csv'
data=data_loading(datadir)

# preprocessing data

In [12]:
data=data_preprocessing('SentimentText','SentimentWords','MeaningfulWords',data)

# data splitting

In [13]:
train,test=data_splitting(data)

Training data rows: 1119652 ; Testing data rows: 480348


# machine elarning models used for prediciton

In [14]:
eval_auc = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="prediction")

### naive bayes

In [15]:
nbcv=best_naivebayes_model()


###  gradient boosted

In [16]:
GBC=gradient_boosted_classifier(20)

### logisitc regression

In [17]:
logistic_model=logisitic_regression()

### fmclassifier

In [18]:
fm=fmclassifier()

# training machine learning models

In [19]:
nbcvModel = nbcv.fit(train)
print(' naive bayes training is done')

 naive bayes training is done


In [20]:
lrModel = logistic_model.fit(train)
print(' naive bayes training is done')

 naive bayes training is done


In [2]:
gbtr = GBC.fit(train)
print(' GBC trianing is done')

 GBC trianing is done


In [3]:
FM = fm.fit(train)
print(' FM training is done')

 FM training is done


# testing phase

In [None]:
naive_bayes = nbcvModel.bestModel.transform(test)

In [None]:
logisitc = lrModel.bestModel.transform(test)

In [None]:
gbt = gbtr.transform(test)

In [None]:
FM_predict = FM.transform(test)

# calculating accuarcy

In [None]:
nb_prediction = eval_auc.evaluate(naive_bayes)
nb_prediction

In [None]:
logisitc_predict = eval_auc.evaluate(logisitc)
logisitc_predict

In [None]:
gbt_predict = eval_auc.evaluate(gbt)
gbt_predict

In [None]:
fm_predict = eval_auc.evaluate(FM_predict)
fm_predict

# testing on live dataset

In [None]:
live_tweets=spark.read.csv(r'C:\Users\ahmed hatem\Downloads\live twiiter data\live tweets for testing_cleaned.csv',inferSchema=True,header=False)


In [None]:
live_tweets=live_tweets.na.drop()


# preprocessing data for testing

In [None]:
live_data=preprocessing_for_testing(live_tweets)

In [None]:
data_testing=data_preprocessing('SentimentText','SentimentWords','MeaningfulWords',live_data)

# testing on live dataset

In [None]:
naive_bayes_prediction=nbcvModel.bestModel.transform(data_testing.select('features'))

In [None]:
logisitic_regression_predictions = lrModel.bestModel.transform(data_testing.select('features'))

In [None]:
GBTR_predictions = gbtr.transform(data_testing.select('features'))

In [None]:
fm_predictions = FM.transform(data_testing.select('features'))