In [1]:
import sys
!{sys.executable} -m pip install nltk --user



In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .getOrCreate()

In [3]:
df_tweets = spark.read.json("hdfs:///user/nobody/tweet-lake/raw/2018/08/14/*")

In [4]:
df_tweets.count()

128000

In [5]:
df_tweets.createOrReplaceTempView("tweets")

In [6]:
df_positive_tweets = spark.sql("SELECT text FROM tweets WHERE (text LIKE '%\U0001F60D%' OR text LIKE '%\U0001F60A%' OR text LIKE '%\U0001F604%' OR text LIKE '%\U0001F603%' OR text LIKE '%\U0001F600%' OR text LIKE '%\U0001F606%') AND text NOT LIKE '%\U0001F62D%' AND text NOT LIKE '%\U0001F612%' AND text NOT LIKE '%\U0001F629%' AND text NOT LIKE '%\U0001F61E%' AND text NOT LIKE '%\U0001F62A%'")
df_positive_tweets.createOrReplaceTempView("positive_tweets")

In [7]:
df_negative_tweets = spark.sql("SELECT text FROM tweets WHERE (text LIKE '%\U0001F62D%' OR text LIKE '%\U0001F612%' OR text LIKE '%\U0001F629%' OR text LIKE '%\U0001F61E%' OR text LIKE '%\U0001F62A%') AND text NOT LIKE '%\U0001F60D%' AND text NOT LIKE '%\U0001F60A%' AND text NOT LIKE '%\U0001F604%' AND text NOT LIKE '%\U0001F603%' AND text NOT LIKE '%\U0001F600%' AND text NOT LIKE '%\U0001F606%'")
df_negative_tweets.createOrReplaceTempView("negative_tweets")

In [8]:
other_tweets = spark.sql("SELECT text FROM tweets WHERE text NOT LIKE '%\U0001F60D%' AND text NOT LIKE '%\U0001F60A%' AND text NOT LIKE '%\U0001F604%' AND text NOT LIKE '%\U0001F603%' AND text NOT LIKE '%\U0001F600%' AND text NOT LIKE '%\U0001F606%' AND text NOT LIKE '%\U0001F60D%' AND text NOT LIKE '%\U0001F60A%' AND text NOT LIKE '%\U0001F604%' AND text NOT LIKE '%\U0001F603%' AND text NOT LIKE '%\U0001F600%' AND text NOT LIKE '%\U0001F606%'")
other_tweets.count()

126086

In [9]:
df_sentiments = spark.sql("(SELECT text, CAST(1 AS DOUBLE) AS sentiment FROM positive_tweets) UNION ALL (SELECT text, CAST(0 AS DOUBLE) AS sentiment FROM negative_tweets) ORDER BY RAND()")
df_sentiments.count()


3129

In [10]:
import re
from bs4 import BeautifulSoup
from nltk.tokenize import WordPunctTokenizer
tok = WordPunctTokenizer()

pat1 = r'@[A-Za-z0-9_]+'
pat2 = r'https?://[^ ]+'
combined_pat = r'|'.join((pat1, pat2))
www_pat = r'www.[^ ]+'
negations_dic = {"isn't":"is not", "aren't":"are not", "wasn't":"was not", "weren't":"were not",
                "haven't":"have not","hasn't":"has not","hadn't":"had not","won't":"will not",
                "wouldn't":"would not", "don't":"do not", "doesn't":"does not","didn't":"did not",
                "can't":"can not","couldn't":"could not","shouldn't":"should not","mightn't":"might not",
                "mustn't":"must not"}
neg_pattern = re.compile(r'\b(' + '|'.join(negations_dic.keys()) + r')\b')

def tweet_cleaner_updated(row):
    text = row.text
    stripped = re.sub(combined_pat, '', text)
    stripped = re.sub(www_pat, '', stripped)
    lower_case = stripped.lower()
    neg_handled = neg_pattern.sub(lambda x: negations_dic[x.group()], lower_case)
    letters_only = re.sub("[^a-zA-Z]", " ", neg_handled)
    simple_spaced = re.sub(' +',' ',letters_only)
    return simple_spaced, row.sentiment

In [11]:
rdd_clean = df_sentiments.rdd.map(tweet_cleaner_updated)

In [12]:
df_clean = rdd_clean.toDF(["text","sentiment"])

In [13]:
df_clean.show(20, False)

+--------------------------------------------------------------------------------------------------------------------------+---------+
|text                                                                                                                      |sentiment|
+--------------------------------------------------------------------------------------------------------------------------+---------+
| u u                                                                                                                      |1.0      |
| abc                                                                                                                      |0.0      |
| mustwatch list week bepannaah udaan ishqbaaz kumkumbhagya                                                                |1.0      |
|rt mau lulusan sekolah negeri swasta agama gak masalah tapi gak usa ngaku jadi santri aja belum sudah ngaku post santri m |1.0      |
|                                                      

In [19]:
df_clean.createOrReplaceTempView("clean")
df_final = spark.sql("SELECT * FROM clean WHERE LENGTH(text) > 50")
df_final.show()

+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
| mustwatch list w...|      1.0|
|rt mau lulusan se...|      1.0|
|porque nunca me a...|      0.0|
|rt hello bro morn...|      1.0|
|rt c est toujours...|      0.0|
| mbok tuh kan bia...|      0.0|
|rt beauty at its ...|      1.0|
|rt so lyric just ...|      0.0|
|rt after three ce...|      0.0|
| ate riiii pag na...|      1.0|
|rt he s captured ...|      1.0|
|rt whitepaper rea...|      1.0|
|rt oh deer i will...|      1.0|
|rt bila aku ambik...|      0.0|
|do al ah ap detay...|      1.0|
|rt jimin asked ju...|      1.0|
|rt nice g zel uzu...|      1.0|
|excited na ko par...|      1.0|
|rt my article is ...|      1.0|
|ay no yo de verda...|      0.0|
+--------------------+---------+
only showing top 20 rows



In [15]:
(train_set, val_set, test_set) = df_final.randomSplit([0.98, 0.01, 0.01])

In [16]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
val_df = pipelineFit.transform(val_set)
train_df.show(5)

+--------------------+---------+--------------------+--------------------+--------------------+-----+
|                text|sentiment|               words|                  tf|            features|label|
+--------------------+---------+--------------------+--------------------+--------------------+-----+
|                abc |      0.0|             [, abc]|(65536,[52148,525...|(65536,[52148,525...|  1.0|
| mbok tuh kan bia...|      0.0|[, mbok, tuh, kan...|(65536,[10158,107...|(65536,[10158,107...|  1.0|
|          mizukimail|      1.0|      [, mizukimail]|(65536,[45789,525...|(65536,[45789,525...|  0.0|
| mustwatch list w...|      1.0|[, mustwatch, lis...|(65536,[338,3850,...|(65536,[338,3850,...|  0.0|
|                u u |      1.0|            [, u, u]|(65536,[15318,525...|(65536,[15318,525...|  0.0|
+--------------------+---------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [17]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(val_df)
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

0.8798076923076924

In [18]:
lrModel.save("hdfs:///tweets_model")