In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, col
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType, ShortType, DateType
from time import sleep

# Set up Spark session
sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("steam_count")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

# Create the spark session
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

####################
# Set up GCS configs 

## Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

## Use the Cloud Storage bucket for temporary BigQuery export data used by the connector.
bucket = "temp_de2024_trs"
spark.conf.set('temporaryGcsBucket', bucket)

####################
# Get data
## Create schema
dataSchema = StructType(
    [StructField("city", StringType(), True),
     StructField("datetime", DateType(), True),
])

## Read from a source - File: https://raw.githubusercontent.com/DeskThom/DE2024_assignment2/refs/heads/main/stream/data/outage_data_20241127_121735.json
sdf = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1) \
        .json("/home/jovyan/data/stream-data")



# Compute and save
## Do a calculation - Count all outages per city
activityCounts = sdf.groupBy("city").count()

## Save to BigQuery
def my_foreach_batch_function(df, batch_id):
   # Saving the data to BigQuery as batch processing sink -see, use write(), save(), etc.
    df.write.format('bigquery') \
      .option('table', 'data-engineering-435508.labdataset.outagecount') \
      .mode("overwrite") \
      .save()

## Write to a sink
activityQuery = activityCounts.writeStream.outputMode("complete") \
                    .trigger(processingTime = '2 seconds').foreachBatch(my_foreach_batch_function).start()

##################
# Stop session

try:
    activityQuery.awaitTermination()
except KeyboardInterrupt:
    activityQuery.stop()
    # Stop the spark context
    spark.stop()
    print("Stoped the streaming query and the spark context")