#Weather Data Pipeline

In [0]:
import requests
import uuid
from datetime import datetime
from pyspark.sql.functions import col, lit, to_timestamp,udf, explode,from_unixtime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, ArrayType
from delta.tables import *

#Previous city data

In [0]:
#taking only 5 cities

df= spark.read.format("delta").load("dbfs:/user/hive/warehouse/cities")
df = df.select("*").limit(5)
df = df.withColumn('id', col('id').cast('int'))
display(df)

id,lat,lon,name,country
1282616,27.98333,83.76667,Wāliṅ,NP
1282621,27.766666,84.566666,Upardang Gadhi,NP
1282635,28.130989,82.297256,Tulsīpur,NP
1282665,27.633333,84.5,Tikoli,NP
1282666,28.5,81.133331,Ṭikāpur,NP


In [0]:
df.dtypes

Out[3]: [('id', 'int'),
 ('lat', 'double'),
 ('lon', 'double'),
 ('name', 'string'),
 ('country', 'string')]

#Creating a log table

In [0]:
%sql
CREATE OR REPLACE TABLE data_log_table (
  id STRING,
  load_type STRING,
  table_name STRING,
  process_start_time TIMESTAMP,
  process_end_time TIMESTAMP,
  status STRING,
  comments STRING,
  start_date_time TIMESTAMP,
  end_date_time TIMESTAMP,
  created_on TIMESTAMP,
  created_by STRING
)
USING DELTA;

#Schema for weather data

In [0]:
weather_schema = StructType([
    StructField('coord', StructType([
        StructField('lon', FloatType(), True),
        StructField('lat', FloatType(), True)
    ])),
    StructField('weather', ArrayType(StructType([
                StructField('id', IntegerType(), True),
                StructField('main', StringType(), True),
                StructField('description', StringType(), True),
                StructField('icon', StringType(), True)
    ]), True)),
    StructField('base', StringType(), True),
    StructField('main', StructType([
        StructField('temp', FloatType(), True),
        StructField('feels_like', FloatType(), True),
        StructField('temp_min', FloatType(), True),
        StructField('temp_max', FloatType(), True),
        StructField('pressure', IntegerType(), True),
        StructField('humidity', IntegerType(), True),
        StructField('sea_level', IntegerType(), True),
        StructField('grnd_level', IntegerType(), True)
    ])),
    StructField('visibility', IntegerType(), True),
    StructField('wind', StructType([
        StructField('speed', IntegerType(), True),
        StructField('deg', IntegerType(), True),
        StructField('gust', FloatType(), True)
    ])),
    StructField('clouds', StructType([
        StructField('all', FloatType(), True)
    ])),
    StructField('dt', IntegerType(), True),
    StructField('sys', StructType([
        StructField('country', StringType(), True),
        StructField('sunrise', IntegerType(), True),
        StructField('sunset', IntegerType(), True)
    ])),
    StructField('timezone', IntegerType(), True),
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('cod', IntegerType(), True)
])

In [0]:
def logs(func):
    def wrapper(*args, **kwargs):
        id = str(uuid.uuid4())
        load_type = args[0]
        table_name = args[1]
        process_start_dt = datetime.now()
        name = 'Eva Shrestha'

        spark.sql(f"INSERT INTO data_log_table (id, load_type, table_name, process_start_time, status, created_on, created_by) \
            VALUES ('{id}', '{load_type}', '{table_name}', '{process_start_dt}', 'EXTRACTING', '{process_start_dt}', '{name}')")


        df, start, end = func(*args[2:], **kwargs)

        df = df.withColumn('load_run_id', lit(id))
        df = df.withColumn('created_on', lit(process_start_dt).cast(TimestampType()))
        df = df.withColumn('created_by', lit(name))

        df.write.format('delta').mode('append').saveAsTable(table_name)

        process_end_dt = datetime.now()
        spark.sql(f"UPDATE data_log_table SET process_end_time = '{process_end_dt}', status='COMPLETED', start_date_time='{start}', end_date_time='{end}' WHERE id='{id}'")

        return df

    return wrapper

#Fetch weather data from API

In [0]:
@logs
def get_weather(df):
    
    def fetch_weather_data(id):
        url = f'https://api.openweathermap.org/data/2.5/weather?id={id}&appid=44719617124a11a924063c357bb44bc9'
        result = requests.get(url)
        return result.json()
    
    fetch_weather_udf = udf(lambda x: fetch_weather_data(x), weather_schema)
    
    df = df.withColumn('weather', fetch_weather_udf(col('id'))).select('id','name','weather')
        
    start = datetime.fromtimestamp(df.selectExpr("min(weather.dt)").first()[0])
    end = datetime.fromtimestamp(df.selectExpr("max(weather.dt)").first()[0])
    
    return df, start, end

#Loading the raw table weather_bronze

In [0]:
raw_bronze_df = get_weather('RAW', 'weather_bronze', df)

In [0]:
display(raw_bronze_df)

id,name,weather,load_run_id,created_on,created_by
7960439,Kusunde,"List(List(84.1681, 28.1228), List(List(803, Clouds, broken clouds, 04n)), stations, List(299.14, 299.14, 299.14, 299.14, 1006, 33, 1006, 928), 10000, List(null, 0, 2.87), List(null), 1685983400, List(NP, 1685921194, 1685971031), 20700, 7960439, Kusunde, 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha
7996868,Phorche,"List(List(86.7528, 27.8499), List(List(804, Clouds, overcast clouds, 04n)), stations, List(275.81, 275.81, 275.81, 275.81, 1023, 87, 1023, 632), 10000, List(null, 202, 1.43), List(null), 1685983423, List(NP, 1685920610, 1685970374), 20700, 7996868, Phorche, 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha
8004777,Patyarthok,"List(List(84.3341, 28.0313), List(List(802, Clouds, scattered clouds, 03n)), stations, List(296.33, 295.57, 296.33, 296.33, 1005, 33, 1005, 896), 10000, List(null, 21, 1.71), List(null), 1685983424, List(NP, 1685921166, 1685970979), 20700, 8004777, Patyarthok, 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha
7949204,(Pokharithok Bajar),"List(List(84.6307, 27.9962), List(List(801, Clouds, few clouds, 02n)), stations, List(296.03, 295.18, 296.03, 296.03, 1005, 31, 1005, 887), 10000, List(null, 35, 2.07), List(null), 1685983401, List(NP, 1685921099, 1685970903), 20700, 7949204, (Pokharithok Bajar), 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha
6941675,Khingar,"List(List(83.8295, 28.8217), List(List(804, Clouds, overcast clouds, 04n)), stations, List(281.2, null, 281.2, 281.2, 1020, 84, 1020, 689), 8125, List(null, 209, 3.58), List(null), 1685983401, List(NP, 1685921180, 1685971206), 20700, 6941675, Khingar, 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha


In [0]:
%sql
select * from data_log_table

id,load_type,table_name,process_start_time,process_end_time,status,comments,start_date_time,end_date_time,created_on,created_by
0c2d507e-8bd4-4564-8369-0c2487458e5e,RAW,weather_bronze,2023-06-05T16:42:57.580+0000,2023-06-05T16:43:27.185+0000,COMPLETED,,2023-06-05T16:43:11.000+0000,2023-06-05T16:43:16.000+0000,2023-06-05T16:42:57.580+0000,Eva Shrestha


Checking if raw data is loaded successfully

In [0]:
%sql
select * from weather_bronze

id,name,weather,load_run_id,created_on,created_by
7960439,Kusunde,"List(List(84.1681, 28.1228), List(List(803, Clouds, broken clouds, 04n)), stations, List(299.14, 299.14, 299.14, 299.14, 1006, 33, 1006, 928), 10000, List(null, 0, 2.87), List(null), 1685983400, List(NP, 1685921194, 1685971031), 20700, 7960439, Kusunde, 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha
7996868,Phorche,"List(List(86.7528, 27.8499), List(List(804, Clouds, overcast clouds, 04n)), stations, List(275.81, 275.81, 275.81, 275.81, 1023, 87, 1023, 632), 10000, List(null, 202, 1.43), List(null), 1685983400, List(NP, 1685920610, 1685970374), 20700, 7996868, Phorche, 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha
8004777,Patyarthok,"List(List(84.3341, 28.0313), List(List(802, Clouds, scattered clouds, 03n)), stations, List(296.33, 295.57, 296.33, 296.33, 1005, 33, 1005, 896), 10000, List(null, 21, 1.71), List(null), 1685983400, List(NP, 1685921166, 1685970979), 20700, 8004777, Patyarthok, 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha
7949204,(Pokharithok Bajar),"List(List(84.6307, 27.9962), List(List(801, Clouds, few clouds, 02n)), stations, List(296.03, 295.18, 296.03, 296.03, 1005, 31, 1005, 887), 10000, List(null, 35, 2.07), List(null), 1685983401, List(NP, 1685921099, 1685970903), 20700, 7949204, (Pokharithok Bajar), 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha
6941675,Khingar,"List(List(83.8295, 28.8217), List(List(804, Clouds, overcast clouds, 04n)), stations, List(281.2, null, 281.2, 281.2, 1020, 84, 1020, 689), 8125, List(null, 209, 3.58), List(null), 1685983401, List(NP, 1685921180, 1685971206), 20700, 6941675, Khingar, 200)",0c2d507e-8bd4-4564-8369-0c2487458e5e,2023-06-05T16:42:57.580+0000,Eva Shrestha


#Processing the raw data from the raw table

In [0]:
@logs
def process(df):
    df = df.select(
            col('weather.dt'),
            from_unixtime(col('weather.dt')).alias('date'),
            col('weather.id').alias('city_id'),
            col('weather.name').alias('city_name'),
            col('weather.timezone'),
            col('weather.sys.country'),
            col('weather.coord.lat'),
            col('weather.coord.lon'),
            col('weather.main.temp'),
            col('weather.main.temp_min'),
            col('weather.main.temp_max'),
            col('weather.main.pressure'),
            col('weather.main.humidity'),
            col('weather.visibility'),
            col('weather.wind.speed').alias("wind_speed"),
            col('weather.wind.deg').alias("wind_deg"),
            col('weather.wind.gust').alias("wind_gust"),
            col('weather.clouds.all').alias("clouds_all"))
    
    start = datetime.fromtimestamp(df.selectExpr("min(dt)").first()[0])
    end = datetime.fromtimestamp(df.selectExpr("max(dt)").first()[0])
    
    return df, start, end

#loading raw data to a processed / cleaned table

In [0]:
cleaned_silver_df = process('PROCESSED', 'weather_silver', raw_bronze_df)

In [0]:
display(cleaned_silver_df)

dt,date,city_id,city_name,timezone,country,lat,lon,temp,temp_min,temp_max,pressure,humidity,visibility,wind_speed,wind_deg,wind_gust,clouds_all,load_run_id,created_on,created_by
1685983394,2023-06-05 16:43:14,1282616,Wāliṅ,20700,NP,27.9833,83.7667,299.61,299.61,299.61,1006,29,10000,,326,1.73,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha
1685983152,2023-06-05 16:39:12,1282621,Upardang Gadhi,20700,NP,27.7667,84.5667,,,,1004,28,10000,,43,1.78,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha
1685983395,2023-06-05 16:43:15,1282635,Tulsīpur,20700,NP,28.131,82.2973,300.01,300.01,300.01,1006,36,10000,,15,2.82,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha
1685983395,2023-06-05 16:43:15,1282665,Tikoli,20700,NP,27.6333,84.5,303.24,303.24,303.24,1003,25,10000,,37,1.73,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha
1685983396,2023-06-05 16:43:16,1282666,Ṭikāpur,20700,NP,28.5,81.1333,302.73,302.73,302.73,1003,40,10000,,31,5.97,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha


In [0]:
%sql
select * from data_log_table

id,load_type,table_name,process_start_time,process_end_time,status,comments,start_date_time,end_date_time,created_on,created_by
4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,PROCESSED,weather_silver,2023-06-05T16:48:26.284+0000,2023-06-05T16:48:46.965+0000,COMPLETED,,2023-06-05T16:39:12.000+0000,2023-06-05T16:48:38.000+0000,2023-06-05T16:48:26.284+0000,Eva Shrestha
0c2d507e-8bd4-4564-8369-0c2487458e5e,RAW,weather_bronze,2023-06-05T16:42:57.580+0000,2023-06-05T16:43:27.185+0000,COMPLETED,,2023-06-05T16:43:11.000+0000,2023-06-05T16:43:16.000+0000,2023-06-05T16:42:57.580+0000,Eva Shrestha


Checking if data is loaded in silver table successfully

In [0]:
%sql
select * from weather_silver

dt,date,city_id,city_name,timezone,country,lat,lon,temp,temp_min,temp_max,pressure,humidity,visibility,wind_speed,wind_deg,wind_gust,clouds_all,load_run_id,created_on,created_by
1685983391,2023-06-05 16:43:11,1282616,Wāliṅ,20700,NP,27.9833,83.7667,299.61,299.61,299.61,1006,29,10000,,326,1.73,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha
1685983391,2023-06-05 16:43:11,1282621,Upardang Gadhi,20700,NP,27.7667,84.5667,,,,1004,28,10000,,43,1.78,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha
1685983391,2023-06-05 16:43:11,1282635,Tulsīpur,20700,NP,28.131,82.2973,300.01,300.01,300.01,1006,36,10000,,15,2.82,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha
1685983392,2023-06-05 16:43:12,1282665,Tikoli,20700,NP,27.6333,84.5,303.24,303.24,303.24,1003,25,10000,,37,1.73,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha
1685983392,2023-06-05 16:43:12,1282666,Ṭikāpur,20700,NP,28.5,81.1333,302.73,302.73,302.73,1003,40,10000,,31,5.97,,4a6e7e93-e69a-43b4-ad9e-adb7b41038f3,2023-06-05T16:48:26.284+0000,Eva Shrestha
