# Spark Structured Streaming with Kafka

In [None]:
# spark.conf.set("spark.sql.session.timeZone", "UTC")

## Preparing DataFrame

In [None]:
d = spark.readStream.format('kafka'). \
    option('kafka.bootstrap.servers', 'master:9092,slave01:9092,slave02:9092,slave03:9092,slave04:9092,slave05:9092'). \
    option('subscribe', 'gios'). \
    option('startingOffsets', 'earliest'). \
    load()

In [None]:
q = d.selectExpr('CAST(key AS STRING)', 'CAST(value AS STRING)', 'partition', 'offset', 'timestamp')

### Additional attributes

In [None]:
from pyspark.sql.types import DoubleType

qWithValueAsDouble = q.withColumn('value_as_double', q['value'].cast(DoubleType()))

### Parsing Kafka's timestamp

In [None]:
qWithTs = qWithValueAsDouble.withColumn("parsed_timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"))

### Filtering (getting last 5 days without today)

## Analytics

### Grouping by station name and day

In [None]:
grouped = qWithDay.groupBy('key', 'day').avg('value_as_double').withColumnRenamed('avg(value_as_double)', 'avg_NO2')

### Writing stream

In [None]:
grouped.writeStream.format('memory').queryName('in_memory').outputMode('complete').start()

## Results

In [None]:
spark.sql('SELECT t.rank, t.key, t.day, t.avg_NO2 FROM (SELECT key, day, avg_NO2, DENSE_RANK() OVER (PARTITION BY key ORDER BY avg_NO2 DESC) as rank FROM in_memory) as t WHERE t.rank < 3 ORDER BY t.key ASC, t.day ASC, t.avg_NO2 DESC').show()

In [None]:
spark.stop()