In [None]:
from datetime import datetime, timezone, timedelta
from pathlib import Path
from pyspark.sql.types import StructType, DoubleType, StringType
from pyspark.sql.functions import col, struct, lit, to_json
from kafka import KafkaProducer

import requests
import shutil
import os

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .config("spark.sql.streaming.schemaInference", True)\
    .getOrCreate()

Verify that spark is running by visiting:
    
http://localhost:4040/jobs/

Now we will download air quality data for the entire country of Poland for a single day. PM2.5 and PM10 values will be written separately.

In [None]:
base_url = 'https://api.openaq.org/v1/measurements?country=PL&parameter={}&date_from={}&date_to={}&format=csv'

In [None]:
day_in_the_past0 = datetime(2020, 12, 1)
day_in_the_past1 = day_in_the_past0 + timedelta(days=1)

day_in_the_past0 = day_in_the_past0.strftime("%Y-%m-%d")
day_in_the_past1 = day_in_the_past1.strftime("%Y-%m-%d")

In [None]:
pm25 = 'pm25'
pm10 = 'pm10'

parameters = [
    pm25,
    pm10
]

base_dir_pl = os.getcwd()+'/monitoring_data_pl_{}'
base_path_pl = '{}/{}_{}.csv'

for parameter in parameters:
    # Create directory for the data.
    parameter_dir = str.format(base_dir_pl, parameter)
    dirpath = Path(parameter_dir)
    if dirpath.exists() and dirpath.is_dir():
        shutil.rmtree(dirpath)
    os.makedirs(parameter_dir)
    
    # Download the data itself
    url = str.format(base_url, parameter, day_in_the_past0, day_in_the_past1)
    parameter_csv = requests.get(url)
    with open(str.format(base_path_pl, parameter_dir, day_in_the_past0, parameter), 'bw+') as f:
        f.write(parameter_csv.content)

Schema specific to the data.

At this point we are making some simplifications. For example ommitting the `TimestampType` in favour of `StringType`

In [None]:
openAQSchema = StructType()\
              .add("location", StringType())\
              .add("city", StringType())\
              .add("country", StringType())\
              .add("utc", StringType())\
              .add("local", StringType())\
              .add("parameter", StringType())\
              .add("value", DoubleType())\
              .add("unit", StringType())\
              .add("latitude", DoubleType())\
              .add("longitude", DoubleType())\
              .add("attribution", StringType())

In this example we will read data from a `*.csv` source but obiously you can use any other stream.

In [None]:
smog_stream25 = spark\
    .readStream\
    .option("sep", ",")\
    .option("checkpointLocation", "checkpoint")\
    .schema(openAQSchema)\
    .csv(str.format(base_dir_pl, pm25)+'/*.csv')

smog_stream10 = spark\
    .readStream\
    .option("sep", ",")\
    .option("checkpointLocation", "checkpoint")\
    .schema(openAQSchema)\
    .csv(str.format(base_dir_pl, pm10)+'/*.csv')

In [None]:
smog_stream25.printSchema()

Now according to documentation the dataframe needs specific columns to be present:
    
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

In [None]:
smog_stream25 = smog_stream25\
    .withColumn(\
               'kafka_value',\
               to_json(struct(*[col('location').alias('location'),\
                         col('city').alias('city'),\
                         col('country').alias('country'),\
                         col('utc').alias('timestamp'),\
                         col('parameter').alias('parameter'),\
                         col('value').alias('value'),\
                         col('unit').alias('unit'),\
                         col('latitude').alias('latitude'),\
                         col('longitude').alias('longitude'),\
                         col('attribution').alias('attribution')])))\
    .withColumn('key', lit('pm10'))\
    .select(col('key'), col('kafka_value'))\
    .withColumnRenamed('kafka_value', 'value')

smog_stream10 = smog_stream10\
    .withColumn(\
               'kafka_value',\
               to_json(struct(*[col('location').alias('location'),\
                         col('city').alias('city'),\
                         col('country').alias('country'),\
                         col('utc').alias('timestamp'),\
                         col('parameter').alias('parameter'),\
                         col('value').alias('value'),\
                         col('unit').alias('unit'),\
                         col('latitude').alias('latitude'),\
                         col('longitude').alias('longitude'),\
                         col('attribution').alias('attribution')])))\
    .withColumn('key', lit('pm10'))\
    .select(col('key'), col('kafka_value'))\
    .withColumnRenamed('kafka_value', 'value')

In [None]:
smog_stream25.printSchema()

Now sending data from a spark RDD to a Kafka broker is as simple as:

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

In [None]:
query_pm25 = smog_stream25\
    .writeStream\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka_broker:9093')\
    .option("topic", "pm25_topic")\
    .option("checkpointLocation", "/tmp/jovyan/checkpoint")\
    .trigger(processingTime='6 seconds')\
    .start()

In [None]:
query_pm25.awaitTermination(30)

In [None]:
query_pm10 = smog_stream10\
    .writeStream\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka_broker:9093')\
    .option("topic", "pm25_topic")\
    .option("checkpointLocation", "/tmp/jovyan/checkpoint")\
    .trigger(processingTime='6 seconds')\
    .start()

In [None]:
query_pm10.awaitTermination(30)