In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import max as sparkMax
from pyspark.sql.functions import *
from pyspark.sql.types import *
from time import sleep
import pandas as pd

# Set up Spark configuration
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Assignment2")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# Create Spark session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# Read BTC and Twitter data streams
dfBTC = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9093") \
        .option("subscribe", "price") \
        .option("startingOffsets", "earliest") \
        .load()

dfTwitter = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka1:9093") \
        .option("subscribe", "twitter") \
        .option("startingOffsets", "earliest") \
        .load()

# Decode BTC data stream and store values in new data frame with correct schema
price = dfBTC.selectExpr("CAST(value AS STRING)")

dataSchema = StructType(
        [StructField("Batch", StringType(), True),
         StructField("Timestamp", StringType(), True),
         StructField("Symbol", StringType(), True),
         StructField("Price", StringType(), True),
         StructField("Percent_change_1h", StringType(), True),
         ])

priceDf = price.select(from_json(col("value"), dataSchema).alias("data")).select("data.*")

priceDf2 = priceDf \
    .withColumn("Batch",col("Batch").cast(IntegerType())) \
    .withColumn("Timestamp",col("Timestamp").cast(StringType())) \
    .withColumn("Symbol",col("Symbol").cast(StringType())) \
    .withColumn("Price",col("Price").cast(FloatType())) \
    .withColumn("Percent_change_1h",col("Percent_change_1h").cast(FloatType()))

# Decode Twitter data stream and store values in new data frame with correct schema
twitter = dfTwitter.selectExpr("CAST(value AS STRING)")

dataSchemaTwitter = StructType(
        [StructField("Time", StringType(), True),
         StructField("Text", StringType(), True),
         StructField("Positive", StringType(), True),
         StructField("Neutral", StringType(), True),
         StructField("Negative", StringType(), True),
         ])

twitterDf = twitter.select(from_json(col("value"), dataSchemaTwitter).alias("data")).select("data.*")

twitterDf2 = twitterDf \
    .withColumn("Time",col("Time").cast(TimestampType())) \
    .withColumn("Text",col("Text").cast(StringType())) \
    .withColumn("Positive",col("Positive").cast(IntegerType())) \
    .withColumn("Neutral",col("Neutral").cast(IntegerType())) \
    .withColumn("Negative",col("Negative").cast(IntegerType())) 

# Setup configuration to enable use of Google Cloud Storage
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use Google Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "model_repo_de2021marleen"
spark.conf.set('temporaryGcsBucket', bucket)

# Batch function for BTC data: select most expensive currency per batch number
# Also compute time windows of 60 seconds to be able to join observations later in Google Data Studio
# Then, send data to Google Big Query table
def foreach_batch_functionBTC(dfBTC, batch_id):
    windowBatch = Window.partitionBy(col("Batch")).orderBy(col("Price").desc())
    priceWindow = dfBTC.withColumn("priceRank", dense_rank().over(windowBatch))

    highestPrice = priceWindow.where(priceWindow.priceRank == 1)
    
    finalBTC = highestPrice.withColumn("event_window", window(col("Timestamp"), "60 seconds"))
    
    finalBTC.write.format('bigquery') \
      .option('table', 'de2021-325314.assignment2.demoBTC') \
      .mode('append') \
      .save() 

# Batch function for Twitter data: sum positive, neutral and negative tweets per micro-batch of one minute
# Then, send batch to Google Big Query table
def foreach_batch_functionTwitter(dfTwitter, batch_id):
    pos = dfTwitter.groupBy(window("Time", "60 seconds")).agg(sum("Positive").alias("Positive"))
    neu = dfTwitter.groupBy(window("Time", "60 seconds")).agg(sum("Neutral").alias("Neutral"))
    neg = dfTwitter.groupBy(window("Time", "60 seconds")).agg(sum("Negative").alias("Negative"))

    finalTwitter = pos.join(neu, ["window"]).join(neg, ["window"])
        
    finalTwitter.write.format('bigquery') \
      .option('table', 'de2021-325314.assignment2.demoTwitter') \
      .mode('append') \
      .save()


# Write to a sink: the output is written to a Big Query Table
# Create two queries, one for BTC, one for Twitter, and trigger a micro-batch every 60 seconds
BTCQuery = priceDf2.writeStream \
                        .outputMode("append") \
                        .trigger(processingTime = '60 seconds') \
                        .foreachBatch(foreach_batch_functionBTC) \
                        .start()

twitterQuery = twitterDf2.writeStream \
                        .outputMode("append") \
                        .trigger(processingTime = '60 seconds') \
                        .foreachBatch(foreach_batch_functionTwitter) \
                        .start()

# Start queries. Stop queries and spark context after keyboard interrupt
try:
    BTCQuery.awaitTermination()
    twitterQuery.awaitTermination()
except KeyboardInterrupt:
    BTCQuery.stop()
    twitterQuery.stop()

    # Stop the spark context
    spark.stop()
    print("Stopped the streaming query and the spark context")


Stopped the streaming query and the spark context
