In [None]:
import os

# Set spark arguments
os.environ[
    'PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --master local[2] pyspark-shell'

import findspark

# Initialize spark location
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, concat, lit, col
from time import sleep

if __name__ == '__main__':
    # Create spark session and subscribe to tweets topic
    mySpark = SparkSession.builder.appName("movieData").master("local").getOrCreate()
    df = mySpark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "tweets") \
        .option("startingOffsets", "earliest") \
        .load()

    lines = df.selectExpr("CAST(value AS STRING)")

    # Split the lines into words
    words = (lines.select(
        explode(
            split(lines.value, " ")
        ).alias("word"))
    ) 
    
    # Load movies from mongodb via spark
    movies = mySpark.read.format("com.mongodb.spark.sql.DefaultSource") \
          .option("uri","mongodb://localhost:27017/movieDatabase.movies").load()
    
    # Create hashtag list
    hashtag_list = []
    for h in movies.select('title_hashtag').toPandas()['title_hashtag']: # get all documents
        hashtag_list.append(h)
    
    # Count words
    words = words.filter(words.word.isin(hashtag_list))
    wordCounts = words.groupBy("word").count()
    
    # Stream wordcounts to tweet_analysis in kafka
    query = wordCounts \
              .select(concat(col("word"), lit(" "), col("count")).alias("value")) \
              .writeStream \
              .format("kafka") \
              .option("kafka.bootstrap.servers", "localhost:9092") \
              .option("checkpointLocation", "C:\\tmp\\cp6")\
              .option("topic", "tweets_analysis") \
              .outputMode("complete") \
              .start()
    query.awaitTermination()
