# Perth Weather Consumer - CSV

The following consumer will consumer the message and output two csvs: 
1. The average temperature in Perth over the past x hours
2. THe top 3 frequent weather text in Perth over the past x hours.

### Table of Content: 
* [Initialisation](#ONE)
* [Extract data from the stream](#TWO)
* [Transform the dataset](#THREE)
* [Load the dataset for data analysis](#FOUR)
* [Output result to CSV](#FIVE)


## Initialisation<a class="anchor" id="ONE"></a>
Initialise the pyspark session and import libraries

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from time import sleep

spark = SparkSession \
    .builder \
    .appName("Weather Analysis") \
    .getOrCreate()

In [2]:
# Specify configuration and location
time_window = "12 seconds"
watermark = "6 seconds"
output_location = os.path.join(os.getcwd(), "result")

if not os.path.exists(output_location):
    os.makedirs(output_location)

## Extract data from the stream<a class="anchor" id="TWO"></a>
Subscribe to the topic and load the data from the stream 

In [3]:
topic = "Perth"

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [4]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



## Transform the dataset<a class="anchor" id="THREE"></a>
Transform the dataset based on requirepments for analytical usage.  

Steps: 
* [1 Convert the key/value from the kafka data stream to string](#1)
* [2 Cast dataframe based on the schema](#2)
* [3. Remove nested structure of the temperature](#3)
* [4. Focuse on the temperature in Celsius](#4)
* [5. Continue removing nested structure of the temperature](#5)
* [6. Remove unwanted rows](#6)
* [7. Rename and change the data type of the temperature column](#7)
* [8. Data Wrangling](#8)

### 1. Convert the key/value from the kafka data stream to string<a class="anchor" id="1"></a>

In [5]:
df = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [6]:
df.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



### 2. Cast dataframe based on the schema<a class="anchor" id="2"></a>

In [7]:
schema = StructType([
    StructField("Time", TimestampType(), True), 
    StructField("city", StringType(), True), 
    StructField("WeatherText", StringType(), True),
    StructField("Temperature", MapType(StringType(), MapType(StringType(), StringType())), True)        
])

df1=df.select(F.from_json(F.col("value").cast("string"), schema).alias('parsed_value'))

In [8]:
df1.printSchema()

root
 |-- parsed_value: struct (nullable = true)
 |    |-- Time: timestamp (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- WeatherText: string (nullable = true)
 |    |-- Temperature: map (nullable = true)
 |    |    |-- key: string
 |    |    |-- value: map (valueContainsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)



### 3. Remove nested structure of the temperature<a class="anchor" id="3"></a>

In [9]:
df2 = df1.select(
    F.col("parsed_value.Time").alias("Time"),
    F.col("parsed_value.city").alias("City"),
    F.col("parsed_value.WeatherText").alias("WeatherText"),
    explode("parsed_value.Temperature")
)

In [10]:
df2.printSchema()

root
 |-- Time: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- WeatherText: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



### 4. Focuse on the temperature in Celsius<a class="anchor" id="4"></a>
Here, I only focus on the temperature in Celsius

In [11]:
df3 = df2.filter(df2.key == "Metric")

### 5. Continue removing nested structure of the temperature<a class="anchor" id="5"></a>

In [12]:
df4 = df3.select("Time", 
                 "City",
                 "WeatherText",
                 explode("value")
)

In [13]:
df4.printSchema()

root
 |-- Time: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- WeatherText: string (nullable = true)
 |-- key: string (nullable = false)
 |-- value: string (nullable = true)



### 6. Remove unwanted rows<a class="anchor" id="6"></a>
We only want the value of the temperature, not the symbol C

In [14]:
df5 = df4.filter(df4.key == "Value")

### 7. Rename and change the data type of the temperature column<a class="anchor" id="7"></a>

In [15]:
df6 = df5.select("Time", 
                 "City", 
                 "WeatherText", 
                 F.col("value").alias("TemperatureC"))

df_formatted = df6.withColumn("TemperatureC", df6.TemperatureC.cast(DoubleType()))

In [16]:
df_formatted.printSchema()

root
 |-- Time: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- WeatherText: string (nullable = true)
 |-- TemperatureC: double (nullable = true)



### 8. Data Wrangling<a class="anchor" id="8"></a>
Remove excessive spaces in some of the text columns

In [17]:
df_formatted2 = df_formatted.select("Time",
                                    F.trim(F.initcap(F.col("City"))).alias("City"), 
                                    F.trim(F.lower(F.col("WeatherText"))).alias("WeatherText"),
                                    F.col("TemperatureC"))

In [18]:
df_formatted2.printSchema()

root
 |-- Time: timestamp (nullable = true)
 |-- City: string (nullable = true)
 |-- WeatherText: string (nullable = true)
 |-- TemperatureC: double (nullable = true)



## Load the dataset for data analysis<a class="anchor" id="FOUR"></a>
### Average Temperature

Every second, the average temperature will be calculated.   
Watermark has been set to allow data that is coming late. 

In [19]:
Avg_temp = df_formatted2\
    .withWatermark("Time", watermark)\
    .groupBy("City", 
             F.window(df_formatted2.Time, time_window, "1 second"))\
    .agg(F.avg("TemperatureC").alias("AvgTempC"))\
    .sort(F.col("window").desc())

In [20]:
Avg_temp.printSchema()

root
 |-- City: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- AvgTempC: double (nullable = true)



In [21]:
Avg_temp = Avg_temp.withColumn("AvgTempC", F.round(Avg_temp["AvgTempC"], 2))

In [22]:
# # Debug purpose
# query = Avg_temp \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .trigger(processingTime='12 seconds') \
#     .option("truncate", False) \
#     .start()

In [23]:
# query.stop()

### Frequent weather text

Every second, the frequency of the weather text has been generated.   
Watermark has been set to allow data that is coming late. 

In [24]:
Weather_text = df_formatted2\
    .withWatermark("Time", watermark)\
    .groupBy("City", 
             "WeatherText", 
             F.window(df_formatted2.Time, time_window, "1 second")).agg(
    F.count("WeatherText").alias("Count"))

In [25]:
Weather_text = Weather_text.sort(
    F.col("window").desc(), 
    F.col("Count").desc())

In [26]:
# # Debug purpose
# query = Weather_text \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .trigger(processingTime='12 seconds') \
#     .option("truncate", False) \
#     .start()

In [27]:
# query.stop()

In [28]:
Weather_text.printSchema()

root
 |-- City: string (nullable = true)
 |-- WeatherText: string (nullable = true)
 |-- window: struct (nullable = true)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- Count: long (nullable = false)



## Output result to CSV<a class="anchor" id="FIVE"></a>
Once happy with the result, we can output to csv

### Average Temperature

In [35]:
query1 = Avg_temp \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("temp_query") \
    .trigger(processingTime='12 seconds') \
    .start()

# To get enough data for our csv
print("Sleeping...")
sleep(60)
print("Waking Up...")

query1.stop()

Sleeping...
Waking Up...


In [36]:
spark.sql("SELECT * FROM temp_query limit 10").toPandas().head(10)

Unnamed: 0,City,window,AvgTempC
0,Perth,"(2022-09-11 19:15:59, 2022-09-11 19:16:11)",15.0
1,Perth,"(2022-09-11 19:15:58, 2022-09-11 19:16:10)",15.55
2,Perth,"(2022-09-11 19:15:57, 2022-09-11 19:16:09)",15.37
3,Perth,"(2022-09-11 19:15:56, 2022-09-11 19:16:08)",15.0
4,Perth,"(2022-09-11 19:15:55, 2022-09-11 19:16:07)",15.22
5,Perth,"(2022-09-11 19:15:54, 2022-09-11 19:16:06)",14.82
6,Perth,"(2022-09-11 19:15:53, 2022-09-11 19:16:05)",14.84
7,Perth,"(2022-09-11 19:15:52, 2022-09-11 19:16:04)",14.51
8,Perth,"(2022-09-11 19:15:51, 2022-09-11 19:16:03)",14.13
9,Perth,"(2022-09-11 19:15:50, 2022-09-11 19:16:02)",13.72


In [37]:
spark.sql("SELECT * FROM temp_query").toPandas().to_csv(
    os.path.join(output_location, "avg_temp.csv"), 
    index = False)

### Top Frequent Weather Text

In [32]:
query2 = Weather_text \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("weather_query") \
    .trigger(processingTime='12 seconds') \
    .start()

# To get enough data for our csv
print("Sleeping...")
sleep(60)
print("Waking Up...")
    
query2.stop()

Sleeping...
Waking Up...


In [33]:
# Return the top 3 frequent weather text in each city and window frame. 
spark.sql('''
    SELECT City, 
        WeatherText, 
        window, 
        Count
    FROM (
        SELECT *, 
            row_number() OVER (PARTITION BY City, window order BY Count desc) AS rn
    FROM weather_query) temp
    WHERE rn <= 3
    ORDER BY window DESC, rn
    LIMIT 30
    ''').toPandas()

Unnamed: 0,City,WeatherText,window,Count
0,Perth,cloudy,"(2022-09-11 19:09:13, 2022-09-11 19:09:25)",1
1,Perth,cloudy,"(2022-09-11 19:09:12, 2022-09-11 19:09:24)",1
2,Perth,rain shower,"(2022-09-11 19:09:12, 2022-09-11 19:09:24)",1
3,Perth,cloudy,"(2022-09-11 19:09:11, 2022-09-11 19:09:23)",2
4,Perth,rain shower,"(2022-09-11 19:09:11, 2022-09-11 19:09:23)",1
5,Perth,cloudy,"(2022-09-11 19:09:10, 2022-09-11 19:09:22)",2
6,Perth,rain shower,"(2022-09-11 19:09:10, 2022-09-11 19:09:22)",1
7,Perth,heavy rain shower,"(2022-09-11 19:09:10, 2022-09-11 19:09:22)",1
8,Perth,cloudy,"(2022-09-11 19:09:09, 2022-09-11 19:09:21)",2
9,Perth,light rain shower,"(2022-09-11 19:09:09, 2022-09-11 19:09:21)",1


In [34]:
spark.sql('''
    SELECT City, 
        WeatherText, 
        window, 
        Count
    FROM (
        SELECT *, 
            row_number() OVER (PARTITION BY City, window order BY Count desc) AS rn
    FROM weather_query) temp
    WHERE rn <= 3
    ORDER BY window DESC, rn
    ''').toPandas().to_csv(os.path.join(output_location, "weather_text.csv"), index = False)