In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [2]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [3]:
sc

In [4]:
spark

In [5]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType

In [6]:
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', 
                                                                  inferschema='true', 
                                                                  quote = '"', 
                                                                  escape = '"',
                                                                 multiline = 'true',
                                                                 ignoreTrailingWhiteSpace = 'true').load('data.csv')

In [7]:
data = data.na.drop(subset=["review_text"])

In [8]:
from pyspark.sql import functions as ff
data = data.withColumn('text', ff.concat(ff.col('review_title'),ff.lit(' '),ff.col('review_text')))

In [9]:
drop_list = ['X', 'book_title', 'review_title', 'review_user', 'book_id', 'review_id', 
             'timestamp', 'review_text']
data = data.select([column for column in data.columns if column not in drop_list])

In [10]:
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
import nltk
#nltk.download("stopwords")

In [11]:
(trainingData, testData) = data.randomSplit([0.8, 0.2], seed = 12345)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 3333
Test Dataset Count: 905


In [12]:
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")

In [13]:
stopwordList = nltk.corpus.stopwords.words('english')
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stopwordList)

In [14]:
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=20000, minDF=50)

In [15]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

label_stringIdx = StringIndexer(inputCol = "review_score", outputCol = "label") 
labels_stars = label_stringIdx.fit(trainingData).labels # Save this levels to be able later to transform back

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])

In [16]:
pipelineFit = pipeline.fit(trainingData)
dataset = pipelineFit.transform(trainingData)
#dataset.show(1, truncate = False)
dataset.show(5)

+------------+--------------------+--------------------+--------------------+--------------------+-----+
|review_score|                text|               words|            filtered|            features|label|
+------------+--------------------+--------------------+--------------------+--------------------+-----+
|           1|*Eye roll* I firs...|[eye, roll, i, fi...|[eye, roll, first...|(415,[0,2,3,4,6,9...|  4.0|
|           1|1 of the normal H...|[1, of, the, norm...|[1, normal, start...|(415,[0,16,32,42,...|  4.0|
|           1|580 pages to just...|[580, pages, to, ...|[580, pages, quit...|(415,[0,1,2,3,4,5...|  4.0|
|           1|A political hit j...|[a, political, hi...|[political, hit, ...|(415,[0,22,54,227...|  4.0|
|           1|Amazing Read but ...|[amazing, read, b...|[amazing, read, d...|(415,[0,1,12,25,5...|  4.0|
+------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [17]:
test_dataset = pipelineFit.transform(testData)
test_dataset.show(1, truncate = False)
test_dataset.show(5)

+------------+--------------------+--------------------+--------------------+--------------------+-----+
|review_score|                text|               words|            filtered|            features|label|
+------------+--------------------+--------------------+--------------------+--------------------+-----+
|           1|A waste of money ...|[a, waste, of, mo...|[waste, money, ti...|(415,[5,10,56,375...|  4.0|
|           1|Attn AMAZON Kindl...|[attn, amazon, ki...|[attn, amazon, ki...|(415,[0,11,116,13...|  4.0|
|           1|Baid and Switch I...|[baid, and, switc...|[baid, switch, ha...|(415,[21,23,138,2...|  4.0|
|           1|Boring ! I'm a hu...|[boring, i, m, a,...|[boring, huge, fa...|(415,[2,22,144,30...|  4.0|
|           1|Boring Great plot...|[boring, great, p...|[boring, great, p...|(415,[4,25,63,230...|  4.0|
+------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 5 rows



In [18]:
from pyspark.ml.feature import HashingTF, IDF

In [19]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=20000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms

pipeline_tfidf = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])

In [20]:
pipelineFit_tfidf = pipeline_tfidf.fit(trainingData)
dataset_tfidf = pipelineFit_tfidf.transform(trainingData)
dataset_tfidf.show(1)

+------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|review_score|                text|               words|            filtered|         rawFeatures|            features|label|
+------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|           1|*Eye roll* I firs...|[eye, roll, i, fi...|[eye, roll, first...|(20000,[456,1305,...|(20000,[456,1305,...|  4.0|
+------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 1 row



In [24]:
test_dataset_tfidf = pipelineFit_tfidf.transform(testData)
test_dataset_tfidf.show(1)

+------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|review_score|                text|               words|            filtered|         rawFeatures|            features|label|
+------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|           1|A waste of money ...|[a, waste, of, mo...|[waste, money, ti...|(20000,[437,450,1...|(20000,[437,450,1...|  4.0|
+------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
only showing top 1 row



In [25]:
from pyspark.ml.feature import IndexToString
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedScore", labels = labels_stars)

from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(smoothing=1)

model = nb.fit(dataset)
predictions = model.transform(test_dataset)
predictions = labelConverter.transform(predictions) # Transform labels

predictions.filter(predictions['prediction'] == 0) \
    .select("text","probability","review_score","predictedScore") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------------------------+------------+--------------+
|                          text|                   probability|review_score|predictedScore|
+------------------------------+------------------------------+------------+--------------+
|The best book you will ever...|[0.9999962814618197,3.50321...|           5|             5|
|What a POWERFUL story!! Unp...|[0.9999770160552786,2.29834...|           5|             5|
|Beautifully written with el...|[0.9999733020487866,2.50650...|           4|             5|
|Amazing, beautiful, accessi...|[0.9999701132582072,2.15653...|           5|             5|
|Bravo, Mr. Towles! After re...|[0.9999586010516772,1.89201...|           5|             5|
|exceptional novel - one of ...|[0.9999424971293328,5.64261...|           5|             5|
|Rachel Provides Easy Steps ...|[0.999927561543873,6.041262...|           5|             5|
|Anyone ready to change thei...|[0.9999228354249022,7.71509...|           5|    

In [26]:
from pyspark.ml.classification import LogisticRegression

lr_tfidf = LogisticRegression(maxIter=20, regParam=0, elasticNetParam=0)

lrModel_tfidf = lr_tfidf.fit(dataset_tfidf)
predictions_tfidf = lrModel_tfidf.transform(test_dataset_tfidf)
predictions_tfidf = labelConverter.transform(predictions_tfidf) # Transform labels

predictions_tfidf.filter(predictions_tfidf['prediction'] == 0) \
    .select("text","probability","review_score","predictedScore") \
    .orderBy("probability", ascending=False) \
    .show(n = 15, truncate = 40)

+----------------------------------------+----------------------------------------+------------+--------------+
|                                    text|                             probability|review_score|predictedScore|
+----------------------------------------+----------------------------------------+------------+--------------+
|Beautifully done and down to earth I ...|[1.0,1.0951071313748669E-16,5.0335173...|           5|             5|
|SO Twisty Twisted “I want my kids to ...|[1.0,6.689446406497144E-17,2.61317220...|           5|             5|
|Engrossing story! During the first pa...|[1.0,3.252631500218192E-17,7.17720871...|           5|             5|
|SLOW BUILD TO THE TOP OF THE ROLLER C...|[1.0,3.044315421944498E-17,3.72133604...|           5|             5|
|On the edge of my seat The author rea...|[1.0,2.4405740959251534E-17,2.8124606...|           5|             5|
|Great book! This has been the best bo...|[1.0,1.6857201275336582E-17,4.7220161...|           5|        

In [25]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator_tfidf = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_tfidf.evaluate(predictions_tfidf)

0.7441898341686789

In [28]:
pipeline_final= Pipeline(stages=[regexTokenizer, stopwordsRemover, label_stringIdx, nb])
pipelineFit1=pipeline_final.fit(trainingData)

In [29]:
pipeline_final2= Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx, lr_tfidf])
pipelineFit2=pipeline_final2.fit(trainingData)

In [30]:
#pipelineFit1.save['pipelinemodel']
pipelineFit2.save['pipelinemodel2']

TypeError: 'method' object is not subscriptable

In [31]:
globals()['models_loaded'] = False

#def predict(df):
    # Replace this with something smarter
    
#predict_udf = udf(predictions_tfidf, IntegerType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    # Utilize our predict function
    df_withpreds = df.withColumn("pred", predict_udf(struct([df[x] for x in df.columns])))
    df_withpreds.show()
    
    #df_withpreds = df.withColumn("pred", model_tfidf.transform(struct([df[x] for x in df.columns])))
    #df_withpreds.show()
    
    # Normally, you wouldn't use a Python function to predict
    # But an MLlib model you've built and saved with Spark
    
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] =   PipelineFit1.load('naivebayes') # Replace this with:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
        
        #pipelineFit=PipelineFit1.load('naivebayes')
        #predictions= pipelineFit.transform(trainingdata)
        
    # Predict using the model: 
    df_result = globals()['model_tfidf'].transform(df)
    df_result.show()

In [32]:
globals()['models_loaded'] = False

#def predict(df):
    # Replace this with something smarter
    
#predict_udf = udf(predictions_tfidf, IntegerType())

def process(time, rdd):
    if rdd.isEmpty():
        return
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    
    # Utilize our predict function
    df_withpreds = df.withColumn("pred", predict_udf(struct([df[x] for x in df.columns])))
    df_withpreds.show()
    
    #df_withpreds = df.withColumn("pred", model_tfidf.transform(struct([df[x] for x in df.columns])))
    #df_withpreds.show()
    
    # Normally, you wouldn't use a Python function to predict
    # But an MLlib model you've built and saved with Spark
    
    # In this case, you need to prevent loading your model in every call to "process" as follows:
    
    # Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['my_model'] =   PipelineFit2.load('logisticregtfidf') # Replace this with:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
        
        #pipelineFit=PipelineFit2.load('logisticregtfidf')
        #predictions= pipelineFit.transform(trainingdata)
        
    # Predict using the model: 
    df_result = globals()['model_tfidf'].transform(df)
    df_result.show()

In [33]:
ssc = StreamingContext(sc, 10)

In [34]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [35]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

+----------+--------------------+--------------+------------+--------------------+--------------------+-----------+----------+
|   book_id|          book_title|     review_id|review_score|         review_text|        review_title|review_user| timestamp|
+----------+--------------------+--------------+------------+--------------------+--------------------+-----------+----------+
|1328557472|No Crumbs Left: W...|R35EMLV2B1CEPZ|           5|In this love lett...|              Superb|    Jenn N.|1559385539|
|1982102314|           Elevation|R3BCWAFPY05KH5|           5|Lovely novelette....|Good Old Stephen ...| R. Vincent|1559385675|
+----------+--------------------+--------------+------------+--------------------+--------------------+-----------+----------+



Exception in thread Thread-8:
Traceback (most recent call last):
  File "C:\Users\Gianina\Anaconda3\lib\threading.py", line 917, in _bootstrap_inner
    self.run()
  File "<ipython-input-2-19fbf29765bb>", line 9, in run
    ssc.awaitTermination()
  File "C:\Users\Gianina\Documents\MSc\spark\spark-2.4.0-bin-hadoop2.7\python\pyspark\streaming\context.py", line 192, in awaitTermination
    self._jssc.awaitTermination()
  File "C:\Users\Gianina\Documents\MSc\spark\spark-2.4.0-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\Gianina\Documents\MSc\spark\spark-2.4.0-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\Gianina\Documents\MSc\spark\spark-2.4.0-bin-hadoop2.7\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error o

+----------+--------------------+--------------+------------+--------------------+--------------------+-----------+----------+
|   book_id|          book_title|     review_id|review_score|         review_text|        review_title|review_user| timestamp|
+----------+--------------------+--------------+------------+--------------------+--------------------+-----------+----------+
|1476773092|Unfreedom of the ...| RLH9V0K65R60T|           5|Mark is a great A...|        A must read!|Serenity...|1559385829|
|1982102314|           Elevation|R33M9K1SE0GEFG|           5|Lovely novelette....|Good Old Stephen ...| R. Vincent|1559386080|
+----------+--------------------+--------------+------------+--------------------+--------------------+-----------+----------+



In [None]:
ssc_t.stop()

----- Stopping... this may take a few seconds -----
