In [139]:
import os
%load_ext autoreload
%autoreload 2
import pandas as pd
import pyspark
import json
import pyarrow
from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row
from py4j.protocol import Py4JJavaError
from datetime import datetime, timedelta

from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, BooleanType

os.environ['SPARK_HOME'] = 'C:/Users/saul2/Spark_DF/spark-3.5.5-bin-hadoop3'
os.environ['HADOOP_HOME'] = 'C:/Users/saul2/Spark_DF/spark-3.5.5-bin-hadoop3'
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_202'

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [5]:
spark = SparkSession.builder.appName("Weather_Session").getOrCreate()

In [224]:
path = "C:/Users/saul2/OneDrive/Desktop/PastProjects/ETL_Project/Final_ETL_Test/50_City_20250517_180038.json"
try:
    df = spark.read.json(path)
except AnalysisException  as e:
    # Path does not exists
    print(e)

In [275]:
#df.printSchema()
output_dir = "clean_data"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

2025_05_18_15_07_01


In [189]:
row = df.first()

In [191]:
print("Temperature:", row["main"])

Temperature: Row(feels_like=24.86, grnd_level=998, humidity=57, pressure=999, sea_level=999, temp=24.83, temp_max=26.08, temp_min=24.41)


In [193]:
for column, value in row.asDict().items():
    if isinstance(value, Row):
        print(f"\n{column}:")
        for subfield, subvalue in value.asDict().items():
            print(f"  {subfield}: {subvalue}")
    elif isinstance(value, list):
        print(f"\n{column}:")
        for i, item in enumerate(value):
            if isinstance(item, Row):
                print(f"  Item {i}:")
                for subfield, subvalue in item.asDict().items():
                    print(f"    {subfield}: {subvalue}")
            else:
                print(f"  {item}")
    else:
        print(f"\n{column}: {value}")


base: stations

clouds:
  all: 47

cod: 200

coord:
  lat: 40.7143
  lon: -74.006

dt: 1747529148

id: 5128581

main:
  feels_like: 24.86
  grnd_level: 998
  humidity: 57
  pressure: 999
  sea_level: 999
  temp: 24.83
  temp_max: 26.08
  temp_min: 24.41

name: New York

rain: None

sys:
  country: US
  id: 2080163
  sunrise: 1747474621
  sunset: 1747526884
  type: 2

timezone: -14400

visibility: 10000

weather:
  Item 0:
    description: scattered clouds
    icon: 03n
    id: 802
    main: Clouds

wind:
  deg: 274
  gust: 11.22
  speed: 6.01


In [245]:
# Create Tables
# Create location DF
location_df = df.select(
    df["id"].alias("ID"),
    df["name"].alias("City"),
    df["sys.country"].alias("Country"),
    df["coord.lat"].alias("Latitude"),
    df["coord.lon"].alias("Longitude")
).orderBy("ID")
#location_df.show()

pandas_df = location_df.toPandas()
path = os.path.join(output_dir, 'Location_Table.parquet')
pandas_df.to_parquet(path, engine='pyarrow') 
path = os.path.join(output_dir, 'Location_Table.csv')
pandas_df.to_csv(path, index=False)

+-------+------------+-------+--------+---------+
|     ID|        City|Country|Latitude|Longitude|
+-------+------------+-------+--------+---------+
|  98182|     Baghdad|     IQ| 33.3406|  44.4009|
| 108410|      Riyadh|     SA| 24.6877|  46.7219|
| 112931|      Tehran|     IR| 35.6944|  51.4215|
| 184745|     Nairobi|     KE| -1.2833|  36.8167|
| 264371|      Athens|     GR| 37.9795|  23.7162|
| 292223|       Dubai|     AE| 25.2582|  55.3047|
| 293397|    Tel Aviv|     IL| 32.0809|  34.7806|
| 360630|       Cairo|     EG| 30.0626|  31.2497|
| 524901|      Moscow|     RU| 55.7522|  37.6156|
| 658225|    Helsinki|     FI| 60.1695|  24.9355|
| 745044|    Istanbul|     TR| 41.0138|  28.9497|
| 756135|      Warsaw|     PL| 52.2298|  21.0118|
|1275339|      Mumbai|     IN| 19.0144|  72.8479|
|1581130|       Hanoi|     VN| 21.0245| 105.8412|
|1609350|     Bangkok|     TH|   13.75| 100.5167|
|1642911|     Jakarta|     ID| -6.2146| 106.8451|
|1668341|      Taipei|     TW| 25.0478| 121.5319|


In [249]:
# Create Temperature & Pressure Table
temperature_df = df.select(
    df["id"].alias("City_ID"),
    df["main.temp"].alias("Temp"),
    df["main.temp_max"].alias("Temp_Max"),
    df["main.temp_min"].alias("Temp_Min"),
    df["main.feels_like"].alias("Feels_Like"),
    df["main.humidity"].alias("Humidity"),
    df["main.pressure"].alias("Pressure"),
    df["main.sea_level"].alias("Sea_Level")
).orderBy("City_ID")
#temperature_df.show()

pandas_df = temperature_df.toPandas()
path = os.path.join(output_dir, 'Temperature_Table.parquet')
pandas_df.to_parquet(path, engine='pyarrow') 
path = os.path.join(output_dir, 'Temperature_Table.csv')
pandas_df.to_csv(path, index=False)

In [251]:
# Create Wind & Clouds Table
wind_df = df.select(
    df["id"].alias("City_ID"),
    df["clouds.all"].alias("Cloudiness_Percentage"),
    df["wind.deg"].alias("Wind_Direction_Degree"),
    df["wind.gust"].alias("Gust_Speed"),
    df["wind.speed"].alias("Wind_Speed")
).orderBy("City_ID")
#wind_df.show()
pandas_df = wind_df.toPandas()
path = os.path.join(output_dir, 'Cloud_Wind_Table.parquet')
pandas_df.to_parquet(path, engine='pyarrow') 
path = os.path.join(output_dir, 'Cloud_Wind_Table.csv')
pandas_df.to_csv(path, index=False)

In [253]:
# Create Weather Description
weather_desc_df = df.select(
    df["id"].alias("City_ID"),
    df["weather"][0]["main"].alias("Main_Weather"),
    df["weather"][0]["description"].alias("Description"),
    df["weather"][0]["icon"].alias("Icon")
).orderBy("City_ID")
#weather_desc_df.show()

pandas_df = weather_desc_df.toPandas()
path = os.path.join(output_dir, 'Weather_Description_Table.parquet')
pandas_df.to_parquet(path, engine='pyarrow') 
path = os.path.join(output_dir, 'Weather_Description_Table.csv')
pandas_df.to_csv(path, index=False)

In [263]:
# Sunrise_Sunset_Table
sunrise_sunset_df = df.select(
    df["id"].alias("City_ID"),
    df["sys.sunrise"].alias("Sunrise"),
    df["sys.sunset"].alias("Sunset"),
    df["timezone"].alias("Timezone")
).orderBy("City_ID")
sunrise_sunset_df.show()
#pandas_df = sunrise_sunset_df.toPandas()
#path = os.path.join(output_dir, 'Sunrise_Sunset_Table.parquet')
#pandas_df.to_parquet(path, engine='pyarrow') 
#path = os.path.join(output_dir, 'Sunrise_Sunset_Table.csv')
#pandas_df.to_csv(path, index=False)

+-------+----------+----------+--------+
|City_ID|   Sunrise|    Sunset|Timezone|
+-------+----------+----------+--------+
|  98182|1747533631|1747583843|   10800|
| 108410|1747534087|1747582272|   10800|
| 112931|1747531633|1747582471|   12600|
| 184745|1747538855|1747582259|   10800|
| 264371|1747537953|1747589450|   10800|
| 292223|1747531967|1747580273|   14400|
| 293397|1747536099|1747585992|   10800|
| 360630|1747537193|1747586593|   10800|
| 524901|1747530767|1747589964|   10800|
| 658225|1747532059|1747594758|   10800|
| 745044|1747536226|1747588664|   10800|
| 756135|1747535801|1747592899|    7200|
|1275339|1747528397|1747575421|   19800|
|1581130|1747520281|1747567700|   25200|
|1609350|1747522255|1747568282|   25200|
|1642911|1747522468|1747565031|   25200|
|1668341|1747516099|1747564350|   28800|
|1701668|1747517265|1747563448|   28800|
|1735161|1747522908|1747567067|   28800|
|1816670|1747515428|1747567486|   28800|
+-------+----------+----------+--------+
only showing top

In [238]:
# Change Sunrise_Sunset_Table
