In [1]:
# Check the environment
!java --version
!python --version

openjdk 11.0.18 2023-01-17
OpenJDK Runtime Environment (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1)
OpenJDK 64-Bit Server VM (build 11.0.18+10-post-Ubuntu-0ubuntu120.04.1, mixed mode, sharing)
Python 3.9.16


In [2]:
# Download Apache Spark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=58be773d472fde2f9960c47469eb8b512df45e663ae954d27dddf363ec9b4b5f
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
# Initiate the Spark Session
from pyspark.sql import SparkSession

# Create Spark Session/Context
spark = SparkSession.builder \
  .master("local") \
  .appName("Streaming Process Files") \
  .config ("spark.streaming.stopGracefullyOnShutdown", True) \
  .getOrCreate()

In [4]:
# Check spark session
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f0506bd4430>


In [None]:
# To allow automatic schemaInference while reading
spark.conf.set("spark.sql.streaming.schemaInference", True)

# Create the streaming_df to read from input directory
streaming_df = spark.readStream \
  .format("json") \
  .option("cleanSource", "archive")\
  .option("sourceArchiveDir" , "data/archive/device_data/") \
  .option("maxFilesPerTrigger" , 1) \
  .load("data/input/")

In [None]:
# To the schema of the data, place a sample json file and change readStream to read
streaming_df.printSchema()
streaming_df.shoe(truncate=False)

In [None]:
# Explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode, col

exploded_df = streaming_df \
  .select("","data") \
  .withColumn("devices", explode("data.devices")) \
  .drop("data")

In [None]:
# Flatten the exploded df

flattened_df = exploded_df \
  .selectExpr("","","devices.deviceId as deviceId", "devices.measure as measure",
  "devices.status as status", "devices.temperature as temperature")

In [None]:
# Write the output to console sink to check the output
writing_df = flattened_df.writeStream \
  .format("json") \
  .option("path", "data/output/device_data") \
  .option("checkpointLocation" , "checkpoint_dir") \
  .outputMode("append") \
  .start()

In [None]:
# Check the data @output location
out_df = spark.read.json("data/output/device_data/")
out_df.show(truncate=False)