In [0]:
import requests
import uuid
from pyspark.sql.functions import col, from_unixtime, lit
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType, ArrayType

In [0]:
df = spark.read.format('delta').load('dbfs:/FileStore/shared_uploads/export_deltatable')
city_list = df.select('name').limit(5).rdd.flatMap(lambda x: x).collect()
city_list

Out[103]: ['Wāliṅ', 'Upardang Gadhi', 'Tulsīpur', 'Tikoli', 'Ṭikāpur']

#### Schema for log table

In [0]:
log_schema = StructType([
    StructField("id",StringType()),
    StructField("load_type", StringType()),
    StructField("table_name", StringType()),
    StructField("process_start_time", TimestampType()),
    StructField("process_end_time", TimestampType()),
    StructField("status", StringType()),
    StructField("comments", StringType()),
    StructField("start_date_time", TimestampType()),
    StructField("end_date_time", TimestampType()),
    StructField("created_on", TimestampType()),
    StructField("created_by", StringType())
])

#### Schema for raw json

In [0]:
raw_response_schema = StructType([
    StructField('coord', StructType([
        StructField('lon', FloatType(), True),
        StructField('lat', FloatType(), True)
    ])),
    StructField('weather', ArrayType(StructType([
                StructField('id', IntegerType(), True),
                StructField('main', StringType(), True),
                StructField('description', StringType(), True),
                StructField('icon', StringType(), True)
    ]), True)),
    StructField('base', StringType(), True),
    StructField('main', StructType([
        StructField('temp', FloatType(), True),
        StructField('feels_like', FloatType(), True),
        StructField('temp_min', FloatType(), True),
        StructField('temp_max', FloatType(), True),
        StructField('pressure', IntegerType(), True),
        StructField('humidity', IntegerType(), True),
        StructField('sea_level', IntegerType(), True),
        StructField('grnd_level', IntegerType(), True)
    ])),
    StructField('visibility', IntegerType(), True),
    StructField('wind', StructType([
        StructField('speed', FloatType(), True),
        StructField('deg', IntegerType(), True),
        StructField('gust', StringType(), True)
    ])),
    StructField('clouds', StructType([
        StructField('all', IntegerType(), True)
    ])),
    StructField('dt', IntegerType(), True),
    StructField('sys', StructType([
        StructField('country', StringType(), True),
        StructField('sunrise', IntegerType(), True),
        StructField('sunset', IntegerType(), True)
    ])),
    StructField('timezone', IntegerType(), True),
    StructField('id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('cod', IntegerType(), True)
])

In [0]:
# Function to retrieve data from API

def get_raw_data(city_list):
    api_key = '{your api key}'
    response = []
    for city in city_list:
        url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}"
        response.append(requests.get(url).json())
    
    return response
        

In [0]:
# Function to convert raw data to a dataframe

@update_log
def get_raw(city_list):
    response = get_raw_data(city_list)
    df = spark.createDataFrame(response, raw_response_schema)
    
    start_datetime = datetime.fromtimestamp(df.selectExpr('min(dt)').first()[0])
    end_datetime = datetime.fromtimestamp(df.selectExpr('max(dt)').first()[0])
    
    return df, start_datetime, end_datetime

In [0]:
# Function to convert raw dataframe to processed dataframe
@update_log
def get_processed_data(df):
    df = df.select(
        col('dt'),
        from_unixtime(col('dt')).alias('date'),
        col('id').alias('city_id'),
        col('name').alias('city_name'),
        col('timezone'),
        col('sys.country'),
        col('coord.lat'),
        col('coord.lon'),
        col('main.temp'),
        col('main.temp_min'),
        col('main.temp_max'),
        col('main.pressure'),
        col('main.humidity'),
        col('visibility'),
        col('wind.speed').alias('wind_speed'),
        col('wind.deg').alias('wind_deg'),
        col('wind.gust').alias('wind_gust'),
        col('clouds.all').alias('clouds_all')
    )
    start_datetime = datetime.fromtimestamp(df.selectExpr('min(dt)').first()[0])
    end_datetime = datetime.fromtimestamp(df.selectExpr('max(dt)').first()[0])
    
    return df, start_datetime, end_datetime

In [0]:
# Using decorator for implementing the log table
def update_log(func):
    def wrapper_function(*args):
        id = str(uuid.uuid4())
        load_type = args[0]
        table_name = args[1]
        comments = args[2]
        process_start_time = datetime.now()
        status = 'EXTRACTING'
        
        load_run_id = id
        created_by = 'Ananda Thakur'
        created_on = process_start_time
        
        try:
            df, start_date_time, end_date_time = func(*args[3:])
            
            df = df.withColumn('load_run_id', lit(load_run_id))
            df = df.withColumn('created_on', lit(created_on))
            df = df.withColumn('created_by', lit(created_by))
            
            df.write.format('delta').mode('append').saveAsTable(table_name)
            status = 'COMPLETED'
        except:
            status = 'ERROR'
        
        process_end_time = datetime.now()
        
        log_data = [(id, load_type, table_name, process_start_time, process_end_time, status, comments, start_date_time, end_date_time, created_on, created_by)]
        
        log_df = spark.createDataFrame(log_data, schema=log_schema)
        log_df.write.format('delta').mode('append').saveAsTable('log_table')
        
        return df
    return wrapper_function

#### Testing loading raw data from API

In [0]:
df_test = get_raw('RAW', 'weather_raw', 'loading raw data', city_list)

In [0]:
display(df_test)

coord,weather,base,main,visibility,wind,clouds,dt,sys,timezone,id,name,cod,load_run_id,created_on,created_by
"List(83.7667, 27.9833)","List(List(800, Clear, clear sky, 01d))",stations,"List(303.3, 301.78, 303.3, 303.3, 1007, 26, 1007, 923)",10000,"List(0.7, 13, 0.84)",List(10),1685929630,"List(NP, 1685921308, 1685971109)",20700,1282616,Wāliṅ,200,4439cf40-cc9b-49fa-999d-92223ef0abbf,2023-06-05T01:56:50.047+0000,Ananda Thakur
"List(84.5667, 27.7667)","List(List(800, Clear, clear sky, 01d))",stations,"List(301.26, 300.05, 301.26, 301.26, 1006, 23, 1006, 893)",10000,"List(0.2, 40, 0.79)",List(2),1685929546,"List(NP, 1685921145, 1685970888)",20700,1282621,Upardang Gadhi,200,4439cf40-cc9b-49fa-999d-92223ef0abbf,2023-06-05T01:56:50.047+0000,Ananda Thakur
"List(82.2973, 28.131)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(304.91, 303.02, 304.91, 304.91, 1006, 22, 1006, 934)",10000,"List(1.09, 78, 1.28)",List(91),1685929631,"List(NP, 1685921641, 1685971481)",20700,1282635,Tulsīpur,200,4439cf40-cc9b-49fa-999d-92223ef0abbf,2023-06-05T01:56:50.047+0000,Ananda Thakur
"List(84.5, 27.6333)","List(List(800, Clear, clear sky, 01d))",stations,"List(306.29, 304.48, 306.29, 306.29, 1006, 23, 1006, 984)",10000,"List(0.18, 16, 0.8)",List(2),1685929546,"List(NP, 1685921179, 1685970886)",20700,1282665,Tikoli,200,4439cf40-cc9b-49fa-999d-92223ef0abbf,2023-06-05T01:56:50.047+0000,Ananda Thakur
"List(81.1333, 28.5)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(305.46, 303.55, 305.46, 305.46, 1004, 22, 1004, 987)",10000,"List(2.05, 286, 2.98)",List(93),1685929632,"List(NP, 1685921871, 1685971810)",20700,1282666,Ṭikāpur,200,4439cf40-cc9b-49fa-999d-92223ef0abbf,2023-06-05T01:56:50.047+0000,Ananda Thakur


In [0]:
%sql
select * from log_table

id,load_type,table_name,process_start_time,process_end_time,status,comments,start_date_time,end_date_time,created_on,created_by
4439cf40-cc9b-49fa-999d-92223ef0abbf,RAW,weather_raw,2023-06-05T01:56:50.047+0000,2023-06-05T01:56:58.691+0000,COMPLETED,loading raw data,2023-06-05T01:45:46.000+0000,2023-06-05T01:47:12.000+0000,2023-06-05T01:56:50.047+0000,Ananda Thakur


In [0]:
%sql
select * from weather_raw;

coord,weather,base,main,visibility,wind,clouds,dt,sys,timezone,id,name,cod,load_run_id,created_on,created_by
"List(81.1333, 28.5)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(305.46, 303.55, 305.46, 305.46, 1004, 22, 1004, 987)",10000,"List(2.05, 286, 2.98)",List(93),1685929632,"List(NP, 1685921871, 1685971810)",20700,1282666,Ṭikāpur,200,64c512f5-2e08-4752-8739-9ab83c6e9e91,2023-06-05T01:47:10.401+0000,Ananda Thakur
"List(81.1333, 28.5)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(305.46, 303.55, 305.46, 305.46, 1004, 22, 1004, 987)",10000,"List(2.05, 286, 2.98)",List(93),1685929543,"List(NP, 1685921871, 1685971810)",20700,1282666,Ṭikāpur,200,9b686045-7c4c-4490-8485-f9c0dd3ae953,2023-06-05T01:50:30.299+0000,Ananda Thakur
"List(81.1333, 28.5)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(305.46, 303.55, 305.46, 305.46, 1004, 22, 1004, 987)",10000,"List(2.05, 286, 2.98)",List(93),1685929970,"List(NP, 1685921871, 1685971810)",20700,1282666,Ṭikāpur,200,890c0cf8-872c-4323-a2c6-048b2acaa891,2023-06-05T01:55:50.771+0000,Ananda Thakur
"List(81.1333, 28.5)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(305.46, 303.55, 305.46, 305.46, 1004, 22, 1004, 987)",10000,"List(2.05, 286, 2.98)",List(93),1685929632,"List(NP, 1685921871, 1685971810)",20700,1282666,Ṭikāpur,200,4439cf40-cc9b-49fa-999d-92223ef0abbf,2023-06-05T01:56:50.047+0000,Ananda Thakur
"List(82.2973, 28.131)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(304.91, 303.02, 304.91, 304.91, 1006, 22, 1006, 934)",10000,"List(1.09, 78, 1.28)",List(91),1685929546,"List(NP, 1685921641, 1685971481)",20700,1282635,Tulsīpur,200,9b686045-7c4c-4490-8485-f9c0dd3ae953,2023-06-05T01:50:30.299+0000,Ananda Thakur
"List(82.2973, 28.131)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(304.91, 303.02, 304.91, 304.91, 1006, 22, 1006, 934)",10000,"List(1.09, 78, 1.28)",List(91),1685930151,"List(NP, 1685921641, 1685971481)",20700,1282635,Tulsīpur,200,890c0cf8-872c-4323-a2c6-048b2acaa891,2023-06-05T01:55:50.771+0000,Ananda Thakur
"List(82.2973, 28.131)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(304.91, 303.02, 304.91, 304.91, 1006, 22, 1006, 934)",10000,"List(1.09, 78, 1.28)",List(91),1685929631,"List(NP, 1685921641, 1685971481)",20700,1282635,Tulsīpur,200,64c512f5-2e08-4752-8739-9ab83c6e9e91,2023-06-05T01:47:10.401+0000,Ananda Thakur
"List(82.2973, 28.131)","List(List(804, Clouds, overcast clouds, 04d))",stations,"List(304.91, 303.02, 304.91, 304.91, 1006, 22, 1006, 934)",10000,"List(1.09, 78, 1.28)",List(91),1685929631,"List(NP, 1685921641, 1685971481)",20700,1282635,Tulsīpur,200,4439cf40-cc9b-49fa-999d-92223ef0abbf,2023-06-05T01:56:50.047+0000,Ananda Thakur
"List(84.5667, 27.7667)","List(List(800, Clear, clear sky, 01d))",stations,"List(301.26, 300.05, 301.26, 301.26, 1006, 23, 1006, 893)",10000,"List(0.2, 40, 0.79)",List(2),1685929546,"List(NP, 1685921145, 1685970888)",20700,1282621,Upardang Gadhi,200,64c512f5-2e08-4752-8739-9ab83c6e9e91,2023-06-05T01:47:10.401+0000,Ananda Thakur
"List(84.5667, 27.7667)","List(List(800, Clear, clear sky, 01d))",stations,"List(301.26, 300.05, 301.26, 301.26, 1006, 23, 1006, 893)",10000,"List(0.2, 40, 0.79)",List(2),1685929542,"List(NP, 1685921145, 1685970888)",20700,1282621,Upardang Gadhi,200,9b686045-7c4c-4490-8485-f9c0dd3ae953,2023-06-05T01:50:30.299+0000,Ananda Thakur


#### Getting processed data from raw data

In [0]:
df_processed = get_processed_data('PROCESSED', 'weather_processed', 'processing the data', df_test)

In [0]:
display(df_processed)

dt,date,city_id,city_name,timezone,country,lat,lon,temp,temp_min,temp_max,pressure,humidity,visibility,wind_speed,wind_deg,wind_gust,clouds_all,load_run_id,created_on,created_by
1685929630,2023-06-05 01:47:10,1282616,Wāliṅ,20700,NP,27.9833,83.7667,303.3,303.3,303.3,1007,26,10000,0.7,13,0.84,10,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur
1685929546,2023-06-05 01:45:46,1282621,Upardang Gadhi,20700,NP,27.7667,84.5667,301.26,301.26,301.26,1006,23,10000,0.2,40,0.79,2,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur
1685929631,2023-06-05 01:47:11,1282635,Tulsīpur,20700,NP,28.131,82.2973,304.91,304.91,304.91,1006,22,10000,1.09,78,1.28,91,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur
1685929546,2023-06-05 01:45:46,1282665,Tikoli,20700,NP,27.6333,84.5,306.29,306.29,306.29,1006,23,10000,0.18,16,0.8,2,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur
1685929632,2023-06-05 01:47:12,1282666,Ṭikāpur,20700,NP,28.5,81.1333,305.46,305.46,305.46,1004,22,10000,2.05,286,2.98,93,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur


In [0]:
%sql
select * from log_table

id,load_type,table_name,process_start_time,process_end_time,status,comments,start_date_time,end_date_time,created_on,created_by
2db409d6-4abf-467d-bf49-903b7016f602,PROCESSED,weather_processed,2023-06-05T02:03:38.814+0000,2023-06-05T02:03:47.499+0000,COMPLETED,processing the data,2023-06-05T01:45:46.000+0000,2023-06-05T01:47:12.000+0000,2023-06-05T02:03:38.814+0000,Ananda Thakur
4439cf40-cc9b-49fa-999d-92223ef0abbf,RAW,weather_raw,2023-06-05T01:56:50.047+0000,2023-06-05T01:56:58.691+0000,COMPLETED,loading raw data,2023-06-05T01:45:46.000+0000,2023-06-05T01:47:12.000+0000,2023-06-05T01:56:50.047+0000,Ananda Thakur


In [0]:
%sql
select * from weather_processed

dt,date,city_id,city_name,timezone,country,lat,lon,temp,temp_min,temp_max,pressure,humidity,visibility,wind_speed,wind_deg,wind_gust,clouds_all,load_run_id,created_on,created_by
1685929546,2023-06-05 01:45:46,1282621,Upardang Gadhi,20700,NP,27.7667,84.5667,301.26,301.26,301.26,1006,23,10000,0.2,40,0.79,2,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur
1685929632,2023-06-05 01:47:12,1282666,Ṭikāpur,20700,NP,28.5,81.1333,305.46,305.46,305.46,1004,22,10000,2.05,286,2.98,93,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur
1685929631,2023-06-05 01:47:11,1282635,Tulsīpur,20700,NP,28.131,82.2973,304.91,304.91,304.91,1006,22,10000,1.09,78,1.28,91,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur
1685929630,2023-06-05 01:47:10,1282616,Wāliṅ,20700,NP,27.9833,83.7667,303.3,303.3,303.3,1007,26,10000,0.7,13,0.84,10,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur
1685929546,2023-06-05 01:45:46,1282665,Tikoli,20700,NP,27.6333,84.5,306.29,306.29,306.29,1006,23,10000,0.18,16,0.8,2,2db409d6-4abf-467d-bf49-903b7016f602,2023-06-05T02:03:38.814+0000,Ananda Thakur
