In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from pyspark.ml import Pipeline,PipelineModel

In [None]:
#Spark Session creation configured to interact with Kfka and MongoDB
spark = SparkSession.builder.appName("pyspark-notebook").\
config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-avro_2.12:3.0.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
config("spark.mongodb.input.uri","mongodb://docker_mongo_1:27017/twitter_db.tweets").\
config("spark.mongodb.output.uri","mongodb://docker_mongo_1:27017/twitter_db.tweets").\
getOrCreate()

In [None]:
#Read schema file and create schema of string type
json_schema = ''
with open("schema/out/tweet_schema.json") as f:
    new_schema = StructType.fromJson(json.load(f))
    json_schema = new_schema.simpleString()

In [None]:
#Read data from Kafka topic
json_tweets = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "ec2-34-217-75-40.us-west-2.compute.amazonaws.com:9092")\
  .option("subscribe", "twitter_demo")\
  .option("startingOffsets", "earliest")\
  .load()\
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
#Refine raw data red from Kafka topic
refined_tweets = json_tweets\
        .select(from_json("value", json_schema)\
        .alias("data"))\
        .where("data.lang='en'and data.created_at is not null and data.text is not null")\
        .select("data.text",
                from_unixtime(col("data.timestamp_ms")/1000,'yyyy-MM-dd HH:mm:ss').alias("timestamp_ms")) #Translate milliseconds to UTC timestamp
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', r'http\S+', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '@\w+', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '#', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', 'RT', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', ':', ''))

In [None]:
dir = "sentiment/"
model = PipelineModel.load(dir)

In [None]:
def process_row(df, epoch_id):
    """Applies model to the df and writes data to MongoDB

    Parameters
    ----------
    df : DataFrame
        Streaming Dataframe
    epoch_id : int
        Unique id for each micro batch/epoch
    """
    predictions = model.transform(df)
    #predictions.show()
    predictions.select("timestamp_ms","text","prediction").write.format("mongo").mode("append").save()

In [None]:
#Writes streaming dataframe to ForeachBatch console which ingests data to MongoDB
refined_tweets \
    .writeStream \
    .option("checkpointLocation", "checkpoint/data") \
    .foreachBatch(process_row).start().awaitTermination()

KeyboardInterrupt: 