In [None]:
# Import libraries
from pyspark.ml.feature import HashingTF, Tokenizer, StopWordsRemover, CountVectorizer, VectorAssembler
from pyspark.ml.classification import LogisticRegression    
from pyspark.ml import Pipeline, PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

kafka_topic_name = "topicA"
kafka_bootstrap_servers = 'kafka:9092'


spark = SparkSession \
        .builder \
        .appName("Sentiment Analysis") \
        .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0')\
        .master("local[*]") \
        .getOrCreate()  
    
df = spark \
     .readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
     .option("subscribe", kafka_topic_name) \
     .option("startingOffsets", "latest") \
     .load()\
     .selectExpr("CAST(value AS STRING)") 

schema = "SentimentText STRING"               

df = df\
     .select(from_csv(col("value"),schema)\
     .alias("Sentiment"))      
df.printSchema()

df = df.select("Sentiment.*") 
df.printSchema()

df = df.select("SentimentText")
df.printSchema()
# Load a pipeline model
model = PipelineModel.load('SentimentAnalysis')

# Use the model to make predictions on streaming data
predictions = model.transform(df)
predictions.printSchema()
predictionFinal =  predictions.select(
                                "SentimentText", "prediction")
predictionFinal.printSchema()
predictionFinal.writeStream.format('console').outputMode('append').start().awaitTermination()

# predictionFinal.writeStream.format('csv')\
#            .option("checkpointLocation", "/tmp/pyspark6/")\
#            .option('path', '/output').start().awaitTermination()
# Write the streaming data to a Kafka topic with security options

# predictionFinal.selectExpr("CAST(value AS STRING)") \
# .writeStream.format('kafka')\
# .option('kafka.bootstrap.servers', "kafka:9092")\
# .option("checkpointLocation", "/tmp/pyspark6/")\
# .option('topic', 'topicB').start().awaitTermination()



root
 |-- Sentiment: struct (nullable = true)
 |    |-- SentimentText: string (nullable = true)

root
 |-- SentimentText: string (nullable = true)

root
 |-- SentimentText: string (nullable = true)

root
 |-- SentimentText: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- selected_features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

root
 |-- SentimentText: string (nullable = true)
 |-- prediction: double (nullable = false)

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------+----------+
|SentimentText|prediction|
+-------------+----------+
+-------------+----------+



22/12/14 10:53:56 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-2d9e2132-ce9a-4cdf-8870-887e88d5b924. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+----------+
|       SentimentText|prediction|
+--------------------+----------+
|I adore winter #f...|       1.0|
+--------------------+----------+

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+----------+
|       SentimentText|prediction|
+--------------------+----------+
|I dislike cheese ...|       0.0|
+--------------------+----------+

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+----------+
|       SentimentText|prediction|
+--------------------+----------+
|I dislike that fi...|       0.0|
+--------------------+----------+

-------------------------------------------
Batch: 4
-------------------------------------------
+-------------------+----------+
|      SentimentText|prediction|
+-------------------+----------+
