## Roll no : P23DS019, P23DS021

In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
import seaborn as sns
import sklearn
import random
import os

from pyspark.sql import SparkSession 
from pyspark.ml  import Pipeline     
from pyspark.sql import SQLContext  
from pyspark.sql.functions import mean,col,split, col, regexp_extract, when, lit
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import QuantileDiscretizer
from pyspark.sql import functions as F

In [2]:
DATASET_COLUMNS = StructType([
    StructField("target", StringType(), True),
    StructField("ids", StringType(), True),
    StructField("date", StringType(), True),
    StructField("flag", StringType(), True),
    StructField("user", StringType(), True),
    StructField("text", StringType(), True)])

DATASET_ENCODING = "ISO-8859-1"

In [3]:
spark = SparkSession.builder.appName('Tweets Sentiment1').getOrCreate()
df = spark.read.csv('train_test.csv',header = 'False',schema=DATASET_COLUMNS)
spark.sparkContext.setLogLevel('ERROR')

Veriler Twitter Search api’den yaralanılarak oluşturulmuştur. 1.6m satıra 6 sütuna sahiptir. Sütunlar sırasıyla
- target (0 = negatif, 2 = nötr,4 = pozitif)
- id 
- date
- query (NO_QUERY)
- user
- text (tweet içeriği)

Veri setinin resmî sitesi: http://help.sentiment140.com/for-students/

Veri setinin kaggle sitesi: https://www.kaggle.com/kazanova/sentiment140


In [4]:
df.printSchema()

root
 |-- target: string (nullable = true)
 |-- ids: string (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)



In [5]:
df.show(25)

+------+----------+--------------------+--------+---------------+--------------------+
|target|       ids|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|@Kwesidei not the...|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|         Need a hug |
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|@LOLTrish hey  lo...|
|     0|1467811795|Mon Apr 06 22:20:...|NO_

In [6]:
df = df.dropna()
df.count()

1600000

In [7]:
def preprocessing(sparkDF,col):
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, r'http\S+', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '@\w+', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '#', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, 'RT', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, ':', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '[^A-Za-z0-9]+', ' '))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '\-', ''))
    sparkDF = sparkDF.withColumn(col, F.regexp_replace(col, '[ ]+', ' '))
    sparkDF = sparkDF.withColumn(col, F.trim(sparkDF[col]))

    return sparkDF

In [8]:
df = preprocessing(df,'text')

In [9]:
df.show(25)

+------+----------+--------------------+--------+---------------+--------------------+
|target|       ids|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|Awww that s a bum...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|I dived many time...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|no it s not behav...|
|     0|1467811372|Mon Apr 06 22:20:...|NO_QUERY|       joy_wolf|  not the whole crew|
|     0|1467811592|Mon Apr 06 22:20:...|NO_QUERY|        mybirch|          Need a hug|
|     0|1467811594|Mon Apr 06 22:20:...|NO_QUERY|           coZZ|hey long time no ...|
|     0|1467811795|Mon Apr 06 22:20:...|NO_

In [10]:
# df.groupby("user").count().show()
df.groupBy('user').count().sort('count',ascending=False).show(10)

+---------------+-----+
|           user|count|
+---------------+-----+
|       lost_dog|  549|
|        webwoke|  345|
|       tweetpet|  310|
|SallytheShizzle|  281|
|    VioletsCRUK|  279|
|    mcraddictal|  276|
|       tsarnick|  248|
|    what_bugs_u|  246|
|    Karen230683|  238|
|      DarkPiano|  236|
+---------------+-----+
only showing top 10 rows



In [11]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [12]:
(train_set, val_set, test_set) = df.randomSplit([0.98, 0.01, 0.01], seed = 99)

In [13]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "target", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+------+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+-----+
|target|       ids|                date|    flag|           user|                text|               words|                  tf|            features|label|
+------+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+-----+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|Awww that s a bum...|[awww, that, s, a...|(65536,[18354,216...|(65536,[18354,216...|  0.0|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|[is, upset, that,...|(65536,[1981,3085...|(65536,[1981,3085...|  0.0|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|I dived many time...|[i, dived, many, ...|(65536,[2548,2888...|(65536,[2548,2888...|  0.0|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF

In [14]:
train_df.printSchema()

root
 |-- target: string (nullable = true)
 |-- ids: string (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [15]:
val_df.show(5)

+------+----------+--------------------+--------+-------------+--------------------+--------------------+--------------------+--------------------+-----+
|target|       ids|                date|    flag|         user|                text|               words|                  tf|            features|label|
+------+----------+--------------------+--------+-------------+--------------------+--------------------+--------------------+--------------------+-----+
|     0|1467813579|Mon Apr 06 22:20:...|NO_QUERY|   starkissed|ahh ive always wa...|[ahh, ive, always...|(65536,[8538,1424...|(65536,[8538,1424...|  0.0|
|     0|1467814438|Mon Apr 06 22:20:...|NO_QUERY|ChicagoCubbie|I hate when I hav...|[i, hate, when, i...|(65536,[7173,1223...|(65536,[7173,1223...|  0.0|
|     0|1467838362|Mon Apr 06 22:26:...|NO_QUERY|      Zella17|I m sooo sad they...|[i, m, sooo, sad,...|(65536,[1880,2062...|(65536,[1880,2062...|  0.0|
|     0|1467874103|Mon Apr 06 22:36:...|NO_QUERY|  Lisaherrity|poor john thi

In [16]:
val_df.printSchema()

root
 |-- target: string (nullable = true)
 |-- ids: string (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [17]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictionsLojistic = lrModel.transform(val_df)

In [18]:
predictionsLojistic.show(25)

+------+----------+--------------------+--------+--------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|target|       ids|                date|    flag|          user|                text|               words|                  tf|            features|label|       rawPrediction|         probability|prediction|
+------+----------+--------------------+--------+--------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|     0|1467813579|Mon Apr 06 22:20:...|NO_QUERY|    starkissed|ahh ive always wa...|[ahh, ive, always...|(65536,[8538,1424...|(65536,[8538,1424...|  0.0|[-1.0858743146480...|[0.25239596767567...|       1.0|
|     0|1467814438|Mon Apr 06 22:20:...|NO_QUERY| ChicagoCubbie|I hate when I hav...|[i, hate, when, i...|(65536,[7173,1223...|(65536,[7173,1223...|  0.0|[1.89950725223

In [19]:
predictionsLojistic.printSchema()

root
 |-- target: string (nullable = true)
 |-- ids: string (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- tf: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [20]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictionsLojistic)

0.8542319985469665

In [21]:
accuracy = predictionsLojistic.filter(predictionsLojistic.label == predictionsLojistic.prediction).count() / float(val_set.count())
accuracy

0.7843501984126984

In [22]:
test_df = pipelineFit.transform(test_set)
testPredictionsLogistic = lrModel.transform(test_df)

In [23]:
testPredictionsLogistic.show(25)

+------+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|target|       ids|                date|    flag|           user|                text|               words|                  tf|            features|label|       rawPrediction|         probability|prediction|
+------+----------+--------------------+--------+---------------+--------------------+--------------------+--------------------+--------------------+-----+--------------------+--------------------+----------+
|     0|1467814119|Mon Apr 06 22:20:...|NO_QUERY|      cooliodoc|I baked you a cak...|[i, baked, you, a...|(65536,[13007,183...|(65536,[13007,183...|  0.0|[-0.6593204018116...|[0.34089229037371...|       1.0|
|     0|1467907751|Mon Apr 06 22:45:...|NO_QUERY|   mementototem|STOU site not upd...|[stou, site, not,...|(65536,[1328,1523...|(65536,[1328,1523...|  0.0|[1.454455

In [24]:
test_accuracy = testPredictionsLogistic.filter(testPredictionsLogistic.label == testPredictionsLogistic.prediction).count() / float(test_set.count())
test_roc_auc = evaluator.evaluate(testPredictionsLogistic)
print("Logistic HashingTF Test Accuracy Score: {0:.4f}".format(test_accuracy))
print("Logistic HashingTF Test ROC-AUC: {0:.4f}".format(test_roc_auc))

Logistic HashingTF Test Accuracy Score: 0.7844
Logistic HashingTF Test ROC-AUC: 0.8545


In [27]:
def preprocess_input_text(text):
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'@\w+', '', text)
    text = re.sub(r'#', '', text)
    text = re.sub(r'RT', '', text)
    text = re.sub(r':', '', text)
    text = re.sub(r'[^A-Za-z0-9]+', ' ', text)
    text = re.sub(r'\-', '', text)
    text = re.sub(r'[ ]+', ' ', text)
    text = text.strip()
    return text

def predict_sentiment(input_text, pipeline_fit, lr_model):
    preprocessed_text = preprocess_input_text(input_text)
    input_df = spark.createDataFrame([(preprocessed_text,)], ["text"])
    input_transformed = pipeline_fit.transform(input_df)
    prediction = lr_model.transform(input_transformed)
    sentiment = prediction.select("prediction").collect()[0][0]
    if sentiment == 0.0:
        return "Negative"
    elif sentiment == 1.0:
        return "Positive"
    else:
        return "Neutral"

# Example usage:
flag = True
while flag == True:
    input_text = str(input("Enter text"))
    if input_text == 'stop':
        flag = False
        break
    predicted_sentiment = predict_sentiment(input_text, pipelineFit, lrModel)
    print("Predicted sentiment:", predicted_sentiment)

Enter textthis week is not going as i had hoped 


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Predicted sentiment: Negative
Enter text@Viennah Yay! I'm happy for you with your job! But that also means less time for me and you... 


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Predicted sentiment: Positive
Enter text@JonathanRKnight I guess that's a no then. 


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Predicted sentiment: Negative
Enter textI finished a wholesale order for Blythe Mary Jane shoes for 40 pairs. Lots of pretty colors &amp; glitters 


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Predicted sentiment: Positive
Enter textstop


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Predicted sentiment: Positive
