#### About
Apache spark streaming project.
- Dataset - https://www.kaggle.com/datasets/hiteshsoneji/historical-weather-data-for-indian-cities?select=bengaluru.csv

In [31]:
#importing modules
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, from_unixtime,unix_timestamp
from pyspark.sql.types import TimestampType
#for reading config
import configparser
configread = configparser.ConfigParser()
configread.read('/home/suraj/ClickUp/Mar-Apr/data-engineering-mlops-projects/13-apache-spark-streaming/config.ini')


['/home/suraj/ClickUp/Mar-Apr/data-engineering-mlops-projects/13-apache-spark-streaming/config.ini']

In [13]:
spark_session = SparkSession.builder.master("local").appName("City weather analysis").getOrCreate()
sc = spark_session.sparkContext

In [21]:
#analysing data for kanpur.
data_path = "/home/suraj/ClickUp/Mar-Apr/data-engineering-mlops-projects/13-apache-spark-streaming/kanpur.csv"
valid_data = "output/valid"
invalid_data = "output/invalid"

In [22]:
df = spark_session.read.csv(data_path,header=True)
df.show()

23/03/14 07:24:40 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: date_time, maxtempC, mintempC, totalSnow_cm, sunHour, uvIndex, uvIndex, moon_illumination, moonrise, moonset, sunrise, sunset, DewPointC, FeelsLikeC, HeatIndexC, WindChillC, WindGustKmph, cloudcover, humidity, precipMM, pressure, tempC, visibility, winddirDegree, windspeedKmph
 Schema: date_time, maxtempC, mintempC, totalSnow_cm, sunHour, uvIndex5, uvIndex6, moon_illumination, moonrise, moonset, sunrise, sunset, DewPointC, FeelsLikeC, HeatIndexC, WindChillC, WindGustKmph, cloudcover, humidity, precipMM, pressure, tempC, visibility, winddirDegree, windspeedKmph
Expected: uvIndex5 but found: uvIndex
CSV file: file:///home/suraj/ClickUp/Mar-Apr/data-engineering-mlops-projects/13-apache-spark-streaming/kanpur.csv
+-------------------+--------+--------+------------+-------+--------+--------+-----------------+--------+--------+--------+--------+---------+----------+----------+----------+------------+

In [23]:
# selecting maxtempC, mintempC, pressure and humidity to move forward
df = df.select("date_time","maxtempC", "mintempC", "pressure","humidity")
df.show()
#caching the df
df.cache()

+-------------------+--------+--------+--------+--------+
|          date_time|maxtempC|mintempC|pressure|humidity|
+-------------------+--------+--------+--------+--------+
|2009-01-01 00:00:00|      24|      10|    1015|      50|
|2009-01-01 01:00:00|      24|      10|    1015|      52|
|2009-01-01 02:00:00|      24|      10|    1015|      55|
|2009-01-01 03:00:00|      24|      10|    1015|      57|
|2009-01-01 04:00:00|      24|      10|    1016|      54|
|2009-01-01 05:00:00|      24|      10|    1016|      52|
|2009-01-01 06:00:00|      24|      10|    1017|      49|
|2009-01-01 07:00:00|      24|      10|    1017|      44|
|2009-01-01 08:00:00|      24|      10|    1017|      38|
|2009-01-01 09:00:00|      24|      10|    1017|      33|
|2009-01-01 10:00:00|      24|      10|    1016|      31|
|2009-01-01 11:00:00|      24|      10|    1015|      30|
|2009-01-01 12:00:00|      24|      10|    1014|      28|
|2009-01-01 13:00:00|      24|      10|    1014|      43|
|2009-01-01 14

DataFrame[date_time: string, maxtempC: string, mintempC: string, pressure: string, humidity: string]

In [33]:
# Handling missing values
#filtering data so that it doesn't contain any null values
valid_df = df.filter("`maxtempC` is not null and `mintempC` is not null and `pressure` is not null and `humidity` is not null")
invalid_df = df.filter("`maxtempC` is null or `mintempC` is null or `pressure` is null or `humidity` is null")

valid_df.write.format('csv').mode('overwrite').option('header',True).save(valid_data)
invalid_df.write.format('csv').mode('overwrite').option('header',True).save(invalid_data)

# to execute via command line - execute spark-submit --master local file_name.py

In [25]:
# creating partition by date i.e date time to date since the data is huge
valid_df.show()

+-------------------+--------+--------+--------+--------+
|          date_time|maxtempC|mintempC|pressure|humidity|
+-------------------+--------+--------+--------+--------+
|2009-01-01 00:00:00|      24|      10|    1015|      50|
|2009-01-01 01:00:00|      24|      10|    1015|      52|
|2009-01-01 02:00:00|      24|      10|    1015|      55|
|2009-01-01 03:00:00|      24|      10|    1015|      57|
|2009-01-01 04:00:00|      24|      10|    1016|      54|
|2009-01-01 05:00:00|      24|      10|    1016|      52|
|2009-01-01 06:00:00|      24|      10|    1017|      49|
|2009-01-01 07:00:00|      24|      10|    1017|      44|
|2009-01-01 08:00:00|      24|      10|    1017|      38|
|2009-01-01 09:00:00|      24|      10|    1017|      33|
|2009-01-01 10:00:00|      24|      10|    1016|      31|
|2009-01-01 11:00:00|      24|      10|    1015|      30|
|2009-01-01 12:00:00|      24|      10|    1014|      28|
|2009-01-01 13:00:00|      24|      10|    1014|      43|
|2009-01-01 14

In [26]:
# date format - yyyy-mm-dd
spark_session.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

date_df = valid_df.withColumn("Date",to_date(from_unixtime(unix_timestamp("date_time","yyyy-MM-dd hh:mm:ss")).cast(TimestampType())))

date_df.show(100)

+-------------------+--------+--------+--------+--------+----------+
|          date_time|maxtempC|mintempC|pressure|humidity|      Date|
+-------------------+--------+--------+--------+--------+----------+
|2009-01-01 00:00:00|      24|      10|    1015|      50|      null|
|2009-01-01 01:00:00|      24|      10|    1015|      52|2009-01-01|
|2009-01-01 02:00:00|      24|      10|    1015|      55|2009-01-01|
|2009-01-01 03:00:00|      24|      10|    1015|      57|2009-01-01|
|2009-01-01 04:00:00|      24|      10|    1016|      54|2009-01-01|
|2009-01-01 05:00:00|      24|      10|    1016|      52|2009-01-01|
|2009-01-01 06:00:00|      24|      10|    1017|      49|2009-01-01|
|2009-01-01 07:00:00|      24|      10|    1017|      44|2009-01-01|
|2009-01-01 08:00:00|      24|      10|    1017|      38|2009-01-01|
|2009-01-01 09:00:00|      24|      10|    1017|      33|2009-01-01|
|2009-01-01 10:00:00|      24|      10|    1016|      31|2009-01-01|
|2009-01-01 11:00:00|      24|    

In [32]:
#reading params from config file
dataset_path = configread.get('dataset_path','data_path')
data = spark_session.read.csv(dataset_path,header=True)
data.show()

23/03/14 07:26:37 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: date_time, maxtempC, mintempC, totalSnow_cm, sunHour, uvIndex, uvIndex, moon_illumination, moonrise, moonset, sunrise, sunset, DewPointC, FeelsLikeC, HeatIndexC, WindChillC, WindGustKmph, cloudcover, humidity, precipMM, pressure, tempC, visibility, winddirDegree, windspeedKmph
 Schema: date_time, maxtempC, mintempC, totalSnow_cm, sunHour, uvIndex5, uvIndex6, moon_illumination, moonrise, moonset, sunrise, sunset, DewPointC, FeelsLikeC, HeatIndexC, WindChillC, WindGustKmph, cloudcover, humidity, precipMM, pressure, tempC, visibility, winddirDegree, windspeedKmph
Expected: uvIndex5 but found: uvIndex
CSV file: file:///home/suraj/ClickUp/Mar-Apr/data-engineering-mlops-projects/13-apache-spark-streaming/kanpur.csv
+-------------------+--------+--------+------------+-------+--------+--------+-----------------+--------+--------+--------+--------+---------+----------+----------+----------+------------+

#### Spark Streaming Application via Kafka Source
- The messages consumed by Kafka stream can be JSON, CSV, AVRO etc.
