In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.types import StructType, StructField, LongType, StringType, DoubleType
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("Assignment2")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")
# create the spark session, which is the entry point to Spark SQL engine.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

dataSchema = StructType(
        [StructField("Customer ID", StringType(), True),
         StructField("Customer Name", StringType(), True),
         StructField("Segment", StringType(), True),
         StructField("Country", StringType(), True),
         StructField("State", StringType(), True),
         StructField("City", StringType(), True),
         StructField("Postal Code", IntegerType(), True),
         StructField("Region", StringType(), True)
         ])

# Read from a source 
sdf = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1) \
        .json("gs://data_de2023_287950/customer.csv")

# Do a calculation
activityCounts = sdf.groupBy("Country").count()

# Write to a sink - here, the output is memory (only for testing). The query name (i.e., activity_counts) will be the Spark SQL table name.
activityQuery = activityCounts.writeStream.queryName("country_counts") \
                    .format("memory").outputMode("complete") \
                    .start()
# Testing 
for x in range(10):
    spark.sql("SELECT * FROM country_counts").show()
    sleep(5)

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+---+-----+
| gt|count|
+---+-----+
+---+-----+

+----------+-----+
|        gt|count|
+----------+-----+
|  stairsup|10452|
|       sit|12309|
|     stand|11384|
|      walk|13256|
|      bike|10796|
|stairsdown| 9365|
|      null|10449|
+----------+-----+

+----------+-----+
|        gt|count|
+----------+-----+
|  stairsup|20905|
|       sit|24619|
|     stand|22769|
|      walk|26512|
|      bike|21593|
|stairsdown|18729|
|      null|20896|
+----------+-----+

+----------+-----+
|        gt|count|
+----------+-----+
|  stairsup|20905|
|       sit|24619|
|     stand|22769|
|      walk|26512|
|      bike|21593|
|stairsdown|18729|
|      null|20896|
+----------+-----+

+----------+-----+
|        gt|count|
+----------+-----+
|  stairsup|20905|
|       sit|24619|
|     stand|22769|
|      walk

In [2]:
# Stop the spark context
spark.stop()