In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from time import sleep
import json

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("SparkStreamLab1")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# {"year":2008,"month":4,"day":1,"order":3,"country":29,"session ID":140,"page 1 (main category)":4,
# "page 2 (clothing model)":"P8","colour":2,"location":3,"model photography":1,"price":28,"price 2":2,"page":1}
schema = StructType([
        StructField("year", IntegerType()),
        StructField("month", IntegerType()),
        StructField("day", IntegerType()),
        StructField("order", IntegerType()),
        StructField("country", IntegerType()),
        StructField("session ID", IntegerType()),
        StructField("page 1 (main category)", IntegerType()),
        StructField("page 2 (clothing model)", StringType()),
        StructField("colour", IntegerType()),
        StructField("location", IntegerType()),
        StructField("model photography", IntegerType()),
        StructField("price", IntegerType()),
        StructField("price 2", IntegerType()),
        StructField("page", IntegerType()), 
        ])


# Read the whole dataset as a batch
df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "34.140.5.74:9092").option("failOnDataLoss", "false") \
        .option("subscribe", "sales") \
        .option("startingOffsets", "earliest") \
        .load()

df.printSchema()

order_df = df.selectExpr("CAST(value AS STRING)", "timestamp")

order_df2 = order_df \
        .select(from_json(col("value"), schema)\
        .alias("orders"), "timestamp")

all_orders = order_df2.select("orders.*", "timestamp")

#rename columns that would have a space in the name
all_orders = all_orders.withColumnRenamed("session ID","session_ID")
all_orders = all_orders.withColumnRenamed("page 1 (main category)","page1_main_category")
all_orders = all_orders.withColumnRenamed("page 2 (clothing model)","page2_clothing_model")
all_orders = all_orders.withColumnRenamed("model photography","model_photography")
all_orders = all_orders.withColumnRenamed("price 2","price2")
all_orders.printSchema()

"""
order_wrangle = all_orders.groupBy("month", "country") \
    .agg(sum("order").alias("ordersum")) \
    .sort(desc("ordersum"))

order_wrangle.printSchema()
""" 

# We need to set the following configuration whenever we need to use GCS.
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "geit_stream"
spark.conf.set('temporaryGcsBucket', bucket)
"""
query = all_orders \
        .writeStream \
        .format("bigquery") \
        .outputMode("update") \
        .trigger(processingTime='5 seconds') \
        .option("truncate", "false") \
        .option("checkpointLocation", "checkpoint") \
        .option('table', 'data-goats-de.stream_dataset.all_order') \
        .option("temporaryGcsBucket", bucket) \
        .start()




"""

def my_foreach_batch_function(df, batch_id):
   # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    df.write.format('bigquery') \
      .option('table', 'data-goats-de.stream_dataset.all_order') \
      .mode("overwrite") \
      .save()

# Write to a sink - here, the output is written to a Big Query Table
# Use your gcp bucket name. 
# ProcessingTime trigger with two-seconds micro-batch interval
query = all_orders.writeStream.outputMode("append") \
                    .trigger(processingTime = '2 seconds').foreachBatch(my_foreach_batch_function).start()

try:
    query.awaitTermination()
except KeyboardInterrupt:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")
except:
    query.stop()
    # Stop the spark context
    spark.stop()
    print("Unexpected error")
    print("Stoped the streaming query and the spark context")

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- order: integer (nullable = true)
 |-- country: integer (nullable = true)
 |-- session_ID: integer (nullable = true)
 |-- page1_main_category: integer (nullable = true)
 |-- page2_clothing_model: string (nullable = true)
 |-- colour: integer (nullable = true)
 |-- location: integer (nullable = true)
 |-- model_photography: integer (nullable = true)
 |-- price: integer (nullable = true)
 |-- price2: integer (nullable = true)
 |-- page: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [135]:
# Stop the spark context
spark.stop().ipynb_checkpoints/