In [None]:
#import modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover

#create Spark session
appName = "Sentiment Analysis in Spark"
conf = (SparkConf().setAppName("appName"))

sc = SparkContext.getOrCreate()
spark = SparkSession \
    .builder \
    .appName(appName) \
    .config("spark.sql.streaming.schemaInference", "true") \
    .config("appName", "TweetStream") \
    .getOrCreate() 

In [None]:
basePath = "/home/mecha/Documents/ml_models/sentiment_analyzer"
model = LogisticRegressionModel.load(basePath + "/modeliter10")

In [None]:
columns = ["text", "label"]
# data = [("I am happy with Java", 4), ("Unhappy with Python", 0), ("Scala sucks as a technology", 0)]
predictionData = spark \
    .readStream \
    .option("inferSchema", "true") \
    .option("header", "true") \
    .csv("tweetstream/*.csv",header=True) 
# rdd = spark.sparkContext.parallelize(data)
# predictionData = rdd.toDF(columns)
# predictionData.show()


tokenizer = Tokenizer(inputCol="text", outputCol="SentimentWords")
tokenizedTest = tokenizer.transform(predictionData)
swr = StopWordsRemover(inputCol=tokenizer.getOutputCol(), 
                       outputCol="MeaningfulWords")
SwRemovedTest = swr.transform(tokenizedTest)
hashTF = HashingTF(inputCol=swr.getOutputCol(), outputCol="features")
numericTest = hashTF.transform(SwRemovedTest).select('text', 'MeaningfulWords', 'features')



In [None]:
prediction = model.transform(numericTest)
prediction = prediction.select("text", "prediction")
tweetStream = prediction.writeStream \
  .format("csv") \
  .trigger(processingTime="10 seconds") \
  .option("checkpointLocation", "checkpoint/") \
  .option("path", "predictions_out/") \
  .outputMode("append") \
  .start() \
  .awaitTermination()