In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('twitter-sentiment').getOrCreate()
spark.sparkContext.addPyFile("/home/jovyan/work/sentiment_model.py")
print("Spark context started")

Spark context started


In [2]:
spark.sparkContext.addPyFile?

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, StringType

schema = StructType([
    StructField("target", IntegerType(), True),
    StructField("id", LongType(), True),
    StructField("raw_timestamp", StringType(), True),
    StructField("query_status", StringType(), True),
    StructField("author", StringType(), True),
    StructField("tweet", StringType(), True)
])
    
data_path = "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

raw_sentiment = spark.read.csv(data_path,header=False,schema=schema) \
    .selectExpr("(case when target=4 then 1 else 0 end) as target","tweet")



raw_sentiment.groupBy("target").count().show()

+------+------+
|target| count|
+------+------+
|     1|800000|
|     0|800000|
+------+------+



In [4]:
import pickle as pkl

def read_model(model_path):
    with open(model_path,'rb') as buffer:
        return pkl.load(buffer)

model_path = "/home/jovyan/models/tweet_sentiment.mdl"

model_object = read_model(model_path)
model_object

Pipeline(memory=None,
         steps=[('tfidf',
                 TfidfVectorizer(analyzer='word', binary=False,
                                 decode_error='strict',
                                 dtype=<class 'numpy.float64'>,
                                 encoding='utf-8', input='content',
                                 lowercase=True, max_df=0.75, max_features=None,
                                 min_df=1, ngram_range=(1, 1), norm='l2',
                                 preprocessor=<function preprocessor at 0x7fb955a06ef0>,
                                 smooth_idf=True, stop_words=None,
                                 strip_accents=None...
                 RandomForestClassifier(bootstrap=True, ccp_alpha=0.0,
                                        class_weight=None, criterion='gini',
                                        max_depth=8, max_features='auto',
                                        max_leaf_nodes=None, max_samples=None,
                                 

In [5]:
# raw_sentiment.summary()
# raw_sentiment.columns
# raw_sentiment.dtypes
raw_sentiment.printSchema()

root
 |-- target: integer (nullable = false)
 |-- tweet: string (nullable = true)



In [6]:
# raw_sentiment.rdd?
raw_sentiment.rdd.mapPartitions?

In [7]:
model_object_broadcast = spark.sparkContext.broadcast(model_object)
# model_object_broadcast?
# model_object_broadcast.value

In [8]:
model_object_broadcast.value

Pipeline(memory=None,
         steps=[('tfidf',
                 TfidfVectorizer(analyzer='word', binary=False,
                                 decode_error='strict',
                                 dtype=<class 'numpy.float64'>,
                                 encoding='utf-8', input='content',
                                 lowercase=True, max_df=0.75, max_features=None,
                                 min_df=1, ngram_range=(1, 1), norm='l2',
                                 preprocessor=<function preprocessor at 0x7fb955a06ef0>,
                                 smooth_idf=True, stop_words=None,
                                 strip_accents=None...
                 RandomForestClassifier(bootstrap=True, ccp_alpha=0.0,
                                        class_weight=None, criterion='gini',
                                        max_depth=8, max_features='auto',
                                        max_leaf_nodes=None, max_samples=None,
                                 

In [9]:
model_object_broadcast = spark.sparkContext.broadcast(model_object)

def block_iterator(iterator, size):
    bucket = list()
    for e in iterator:
        bucket.append(e)
        if len(bucket) >= size:
            yield bucket
            bucket = list()
    if bucket:
        yield bucket

def block_classify(iterator):
    model = model_object_broadcast.value
    for features in block_iterator(iterator, 10000):
        import pandas as pd
        import json
        features_df = pd.DataFrame(features, columns=["target","text"])
        pred = model.predict_proba(features_df["text"])
        res_df = features_df
        res_df["proba"] = pred[:,1]
        for e in json.loads(res_df.to_json(orient='records')):
            yield e

predicted_df = raw_sentiment.rdd.mapPartitions(block_classify).toDF()

predicted_df.show()



+------------+------+--------------------+
|       proba|target|                text|
+------------+------+--------------------+
|0.5172669619|     0|@switchfoot http:...|
|0.4887238607|     0|is upset that he ...|
|0.4964520864|     0|@Kenichan I dived...|
|0.4913687932|     0|my whole body fee...|
|0.4807232839|     0|@nationwideclass ...|
|0.5034112995|     0|@Kwesidei not the...|
|0.4965544755|     0|         Need a hug |
|0.4956864947|     0|@LOLTrish hey  lo...|
|0.4838413254|     0|@Tatiana_K nope t...|
|0.5034112995|     0|@twittera que me ...|
|0.5065598329|     0|spring break in p...|
|0.5034112995|     0|I just re-pierced...|
|0.4929139023|     0|@caregiving I cou...|
|0.4755583492|     0|@octolinz16 It it...|
|0.4844650004|     0|@smarrison i woul...|
|0.4688346285|     0|@iamjazzyfizzle I...|
|0.4845203707|     0|Hollis' death sce...|
|0.5034112995|     0|about to file taxes |
|0.5160874226|     0|@LettyA ahh ive a...|
|0.5034112995|     0|@FakerPattyPattz ...|
+----------

In [None]:
# predicted_df?
# predicted_df.columns
# predicted_df.dtypes
# predicted_df.printSchema()
print((predicted_df.count(), len(predicted_df.columns)))

In [None]:
spark.stop()