In [3]:
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col, avg, concat, lit
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, IntegerType, DateType
from time import sleep
import pandas as pd

# goal of query: get the most commonly used word to describe a crime in each community of chicago
#the data is devided into batched of each week of the year 2015, 

In [32]:
dataSchema = StructType([
    # StructField("ID", IntegerType(), True),
    StructField("Case Number", StringType(), True),
    StructField("Date", DateType(), True),
    StructField("Block", StringType(), True),
    StructField("Primary Type", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("District", StringType(), True),
    StructField("Community Area", StringType(), True),
    StructField("weeknumber", IntegerType(), True),
    StructField("year", IntegerType(), True),
])

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Lab7_1")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

# We need to set the following configuration whenever we need to use GCS.
# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "group6_chicagocrime"
spark.conf.set('temporaryGcsBucket', bucket)



In [33]:
path = 'gs://chicagocrime_tempstreamingresult'
sdf = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1) \
        .csv(path) #"/home/jovyan/data/crimes2"

In [34]:
from pyspark.sql.functions import explode, split, concat, col, lit
words = sdf.select('weeknumber','year','Community Area', explode(split(sdf.Description, " ")).alias("word"))


In [35]:
import pyspark.sql.functions as f
from pyspark.sql import Window

#window(col('event_time'), '7 days') Window not needed anymore since batches are split in weeks
word_count = words.groupBy('year','weeknumber','Community Area', 'word').count()

In [36]:
word_count.printSchema()

root
 |-- year: integer (nullable = true)
 |-- weeknumber: integer (nullable = true)
 |-- Community Area: string (nullable = true)
 |-- word: string (nullable = false)
 |-- count: long (nullable = false)



In [37]:
def my_foreach_batch_function(df, batch_id):
    w = Window.partitionBy('Community Area')
    word_count_max = df.withColumn('maxCount', f.max('count').over(w))\
                                           .where(f.col('count') == f.col('maxCount'))\
                                           .drop('maxCount')
     
    word_count_max.write.format('bigquery') \
      .option('table', 'datatengineering-group6.crimedescription.mostusedwordspercommunity') \
      .mode("append") \
      .save()
    
# Write to a sink - here, the output is written to a Big Query Table
# Use your gcp bucket name. 
# ProcessingTime trigger with two-seconds micro-batch interval
activityQuery = word_count.writeStream.outputMode("update")\
                    .foreachBatch(my_foreach_batch_function).start() #.trigger(processingTime = '2 seconds')

In [None]:
import time

from IPython.display import clear_output
last_mess = None
for i in range(3200):
    # clear_output(wait=True)
    
    if activityQuery.status['message'] != last_mess:
        print(f'step {i}')
        print(activityQuery.status)
        last_mess = activityQuery.status['message'] 
    time.sleep(1)

step 0
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}


In [31]:
# Stop the spark context
spark.stop()