In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier,RandomForestClassifier, LogisticRegressionModel
from pyspark.ml.feature import VectorAssembler, HashingTF, Tokenizer, StopWordsRemover, StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.tuning import CrossValidatorModel
import numpy as np



In [3]:
# start spark session
spark = SparkSession \
    .builder \
    .appName("sentiment analysis") \
    .getOrCreate()

In [4]:
tweets_schema= StructType([
    StructField("label", IntegerType(), True),
    StructField("tweet_id", IntegerType(), True),
    StructField("date", DateType(), True),
    StructField("query", StringType(), True),
    StructField("user", StringType(), True),
    StructField("tweet", StringType(), True),
])

In [5]:
tweets_df= spark.read.csv(r"C:\Users\Abdelrahman\project2\tweetsGivenData\training.1600000.processed.noemoticon.csv",  header= True, schema= tweets_schema)

In [6]:
data= tweets_df.select(col("tweet").alias("text"), col("label").cast("Int"))
data.show(truncate= False, n= 10)

+---------------------------------------------------------------------------------------------------------------+-----+
|text                                                                                                           |label|
+---------------------------------------------------------------------------------------------------------------+-----+
|is upset that he can't update his Facebook by texting it... and might cry as a result  School today also. Blah!|0    |
|@Kenichan I dived many times for the ball. Managed to save 50%  The rest go out of bounds                      |0    |
|my whole body feels itchy and like its on fire                                                                 |0    |
|@nationwideclass no, it's not behaving at all. i'm mad. why am i here? because I can't see you all over there. |0    |
|@Kwesidei not the whole crew                                                                                   |0    |
|Need a hug                             

# Prepare and clean data

In [7]:
# check for null values
print(data.filter("tweet IS NULL").count())
print(data.filter("label IS NULL").count())

0
0


In [8]:
# clean trainig data

def clean_text(c):
    c = lower(c)
    c = regexp_replace(c, "^rt ", "")
    c = regexp_replace(c, "(https?\://)\S+", "")
    c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
    c = regexp_replace(c, "\d", "")
    return c



def data_cleaning(data):
    data_clean = data.withColumn('text', clean_text(data.text).alias("text"))
    data_clean = data_clean.na.replace('', None)
    data_clean = data_clean.withColumn('text', trim(data_clean.text).alias("text"))
    return data_clean

data_clean= data_cleaning(data)

    

In [9]:
data_clean.show(truncate= False,n= 10)


+---------------------------------------------------------------------------------------------------------+-----+
|text                                                                                                     |label|
+---------------------------------------------------------------------------------------------------------+-----+
|is upset that he cant update his facebook by texting it and might cry as a result  school today also blah|0    |
|kenichan i dived many times for the ball managed to save   the rest go out of bounds                     |0    |
|my whole body feels itchy and like its on fire                                                           |0    |
|nationwideclass no its not behaving at all im mad why am i here because i cant see you all over there    |0    |
|kwesidei not the whole crew                                                                              |0    |
|need a hug                                                                             

In [10]:
print(data_clean.filter("label == 0").count())
print(data_clean.filter("label == 2").count())
print(data_clean.filter("label == 4").count())

799999
0
800000


In [11]:
data_clean= data_clean.na.replace(4, 1)
print(data_clean.filter("label == 0").count())
print(data_clean.filter("label == 2").count())
print(data_clean.filter("label == 1").count())

799999
0
800000


# Pipeline

In [12]:
# define stages
#indexer= StringIndexer(inputCol= "label_text", outputCol= "label")
tokenizer= Tokenizer(inputCol= "text", outputCol= "SentimentWords")
swr= StopWordsRemover(inputCol= tokenizer.getOutputCol(), outputCol= "meaningfullWords")
hashTF= HashingTF(inputCol= swr.getOutputCol(), outputCol= "features")


In [13]:
# instantiate pipleine
pipeline = Pipeline(stages=[tokenizer, swr, hashTF])

In [14]:
# training model
piped_data = pipeline.fit(data_clean).transform(data_clean)
training, test = piped_data.randomSplit([.9, .1])

classifier= LogisticRegression(labelCol= "label", featuresCol= "features", maxIter= 10, regParam=0.1)


# Cross validation

In [25]:
import pyspark.ml.evaluation as evals

# Create a BinaryClassificationEvaluator
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

In [26]:
# Import the tuning submodule
import pyspark.ml.tuning as tune

# Create the parameter grid
grid = tune.ParamGridBuilder()

# Add the hyperparameter
grid = grid.addGrid(classifier.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(classifier.elasticNetParam, [0, 1])

# Build the grid
grid = grid.build()


In [27]:
# Create the CrossValidator
cv = tune.CrossValidator(estimator=classifier,
                         estimatorParamMaps=grid,
                         evaluator=evaluator
                         )

In [18]:
# Call lr.fit()
models  = cv.fit(training)


# Extract the best model
best_lr = models.bestModel

# Print best_lr
print(best_lr)


'\n# Call lr.fit()\nmodels  = cv.fit(training)\n\n\n# Extract the best model\nbest_lr = models.bestModel\n\n# Print best_lr\nprint(best_lr)\n'

In [15]:
model= classifier.fit(training)

# Prediction and evaluation

In [21]:
prediction= model.transform(test)

In [22]:
prediction.show(n= 5)

+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|                text|label|      SentimentWords|    meaningfullWords|            features|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|a big angry spide...|    0|[a, big, angry, s...|[big, angry, spid...|(262144,[93587,11...|[2.06133141978092...|[0.88708759792833...|       0.0|
|a bit hacked off ...|    0|[a, bit, hacked, ...|[bit, hacked, hun...|(262144,[25363,31...|[0.98803426734413...|[0.72869947854260...|       0.0|
|a blessed sunday ...|    0|[a, blessed, sund...|[blessed, sunday,...|(262144,[39504,51...|[-1.1909756909738...|[0.23308447963403...|       1.0|
|a bus full of kid...|    0|[a, bus, full, of...|[bus, full, kids,...|(262144,[50172,54...|[-1.4131660551966...|[0.19573516656649.

In [23]:
# evaluate model
correctPrediction= prediction.filter(prediction["prediction"]==prediction["label"]).count()
totalData= prediction.count()
accuracy= correctPrediction/ totalData
print(accuracy)

0.7556721532919752


In [28]:
# Find AUC
Evaluator = evals.BinaryClassificationEvaluator()
auc = Evaluator.evaluate(prediction, {Evaluator.metricName: "areaUnderROC"})


In [29]:
print(auc)

0.821824151755046


#  Tweets sentiment analysis streaming


In [35]:
def preprocessing(lines):
    words = lines.select(explode(split(lines.value, "t_end")).alias("text"))
    words = words.na.replace('', None)
    words = words.na.drop()
    words = words.withColumn('text', regexp_replace('text', r'http\S+', ''))
    words = words.withColumn('text', regexp_replace('text', '@\w+', ''))
    words = words.withColumn('text', regexp_replace('text', '#', ''))
    words = words.withColumn('text', regexp_replace('text', 'RT', ''))
    words = words.withColumn('text', regexp_replace('text', ':', ''))
    words = words.withColumn('text', regexp_replace('text', ',', ''))
    words = words.withColumn('text', regexp_replace('text', ';', ''))
    return words


In [None]:
# read the tweet data from socket
lines = spark.readStream.format("socket").option("host", "0.0.0.0").option("port", 5555).load()

# Preprocess the data
words= preprocessing(lines)
words= pipeline.fit(words).transform(words)
words= model.transform(words)
words= words.select("text", "meaningfullWords", "features", "prediction")

query = words.writeStream.queryName("all_tweets")\
        .outputMode("append").format("parquet")\
        .option("path", "./tweetsTable")\
        .option("checkpointLocation", "./tweetsTableChec")\
        .trigger(processingTime='60 seconds').start()
query.awaitTermination()

