In [1]:
%load_ext autoreload
%autoreload 2
import boto3
import requests
import urllib.request
import httpx
import os
import pandas as pd
import pyspark
import json
import pyarrow
import logging

from GetWeather import Import_Weather_Data
from SetupWeatherData import Setup_Weather_Data
from Upload_Weather_Data import UploadWeatherData
from ExportWeatherData import Export_Weather_Data
from SaveWeatherTables import Save_Weather_Tables

from dotenv import load_dotenv

from pyspark.sql import SparkSession
from pyspark.sql.utils import AnalysisException
from pyspark.sql import Row
from pyspark.sql.window import Window
from py4j.protocol import Py4JJavaError
from datetime import datetime, timedelta
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, BooleanType

In [2]:
logging.basicConfig(
    level = logging.INFO,
    format= '%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

In [3]:
# Hello Github

In [4]:
# Load my .env files for API keys

# Get Current Directory
current_directory = os.getcwd()

# Load my scripts for API keys
#base_dir = os.path.abspath(os.path.join(current_directory, "../../sensitive_data"))
#print(base_dir)
base_dir = "/home/cephuez/sensitive_data"

# Access my API info
weather_env_path = os.path.join(base_dir, "weather_api.env")
aws_env_path = os.path.join(base_dir, "aws_info.env")
google_env_path = os.path.join(base_dir, "google_info.env")
azure_env_path = os.path.join(base_dir, "azure_info.env")

load_dotenv(dotenv_path=weather_env_path)
load_dotenv(dotenv_path=aws_env_path)
load_dotenv(dotenv_path=google_env_path)
load_dotenv(dotenv_path=azure_env_path)

True

In [5]:
spark = SparkSession.builder.appName("Weather_Session").getOrCreate()

# Set up some log levels. I'll renable them if I want to check for some performance checks
spark.sparkContext.setLogLevel("WARN")
spark.sparkContext.setLogLevel("ERROR")

25/06/24 16:21:08 WARN Utils: Your hostname, DESKTOP-J91G8VC resolves to a loopback address: 127.0.1.1; using 172.19.120.149 instead (on interface eth0)
25/06/24 16:21:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/24 16:21:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/24 16:21:10 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [26]:
# I want to make a timestamp of when I gathered my data.
# timestamp = "Year-Month-Day_Hour-Minute
timestamp = datetime.now().strftime("%Y-%B-%d_%H-%M")

# Keep track of timestamp
weather_data = Import_Weather_Data(timestamp)

# Gets the filename
# filename are named based on timestamp
# filename= "weather_data/50_City + filename"
filename = weather_data.get_filename()

In [25]:
# Obtain the weather data info by making a request through OpenWeatherMap's API
setup_weather_data = Setup_Weather_Data(spark, filename, timestamp)

# Get Dataframe created json from API
df = setup_weather_data.get_data_frame()

In [28]:
# Get weather result directory name
# weather_result_filename = "weather_results/50_City_Results + timestamp"
weather_result_filename = setup_weather_data.get_weather_result_filename()

# Stop showing warning logs in console when pushing data to Azure
logging.getLogger('azure').setLevel(logging.WARNING)

In [31]:
# Make sure to pass in directory name where tables will be saved
export_data = Export_Weather_Data(weather_result_filename)

In [32]:
# Pass timestamp to use when saving results based on date retrieved
save_tables = Save_Weather_Tables(timestamp)

In [33]:
# Create location DFx
location_df = df.select(
    df["id"].alias("ID"),
    df["name"].alias("City"),
    df["sys.country"].alias("Country"),
    df["coord.lat"].alias("Latitude"),
    df["coord.lon"].alias("Longitude")
).orderBy("ID")

# Save a copy of this "Location_Table"
save_tables.store_tables(location_df, "Location_Table")

Make Directory


In [34]:
# Create Temperature & Pressure Table
temperature_df = df.select(
    df["id"].alias("City_ID"),
    df["main.temp"].alias("Temp"),
    df["main.temp_max"].alias("Temp_Max"),
    df["main.temp_min"].alias("Temp_Min"),
    df["main.feels_like"].alias("Feels_Like"),
    df["main.humidity"].alias("Humidity"),
    df["main.pressure"].alias("Pressure"),
    df["main.sea_level"].alias("Sea_Level")
).orderBy("City_ID")

# Save a copy of this "Temperature_Table"
save_tables.store_tables(temperature_df, "Temperature_Table")

Make Directory


In [35]:
# Create Wind & Clouds Table
wind_df = df.select(
    df["id"].alias("City_ID"),
    df["clouds.all"].alias("Cloudiness_Percentage"),
    df["wind.deg"].alias("Wind_Direction_Degree"),
    df["wind.gust"].alias("Gust_Speed"),
    df["wind.speed"].alias("Wind_Speed")
).orderBy("City_ID")

# Save a copy of this "Wind_Cloud_Table"
save_tables.store_tables(wind_df, "Wind_Cloud_Table")

Make Directory


In [36]:
# Create Weather Description
weather_desc_df = df.select(
    df["id"].alias("City_ID"),
    df["weather"][0]["main"].alias("Main_Weather"),
    df["weather"][0]["description"].alias("Description"),
    df["weather"][0]["icon"].alias("Icon")
).orderBy("City_ID")

# Save a copy of this "Weather_Description_Table"
save_tables.store_tables(weather_desc_df, "Weather_Description_Table")

Make Directory


In [37]:
# Sunrise_Sunset_Table
sunrise_sunset_df = df.select(
    df["id"].alias("City_ID"),
    df["sys.sunrise"].alias("Sunrise"),
    df["sys.sunset"].alias("Sunset"),
    df["timezone"].alias("Timezone")
).orderBy("City_ID")

# Save a copy of this "Sunrise_Sunset_Table"
save_tables.store_tables(sunrise_sunset_df, "Sunrise_Sunset_Table")

Make Directory


In [39]:
# Save current tables
# Save a copy of the tables into the cloud service
save_tables.store_into_cloud()

AWS Table: weather_tables/2025-June-24_19-11/Temperature_Table.parquet uploaded
AWS Table: weather_tables/2025-June-24_19-11/Temperature_Table.json uploaded
AWS Table: weather_tables/2025-June-24_19-11/Weather_Description_Table.csv uploaded
AWS Table: weather_tables/2025-June-24_19-11/Location_Table.json uploaded
AWS Table: weather_tables/2025-June-24_19-11/Location_Table.parquet uploaded
AWS Table: weather_tables/2025-June-24_19-11/Wind_Cloud_Table.parquet uploaded
AWS Table: weather_tables/2025-June-24_19-11/Weather_Description_Table.json uploaded
AWS Table: weather_tables/2025-June-24_19-11/Sunrise_Sunset_Table.json uploaded
AWS Table: weather_tables/2025-June-24_19-11/Wind_Cloud_Table.csv uploaded
AWS Table: weather_tables/2025-June-24_19-11/Sunrise_Sunset_Table.csv uploaded
AWS Table: weather_tables/2025-June-24_19-11/Sunrise_Sunset_Table.parquet uploaded
AWS Table: weather_tables/2025-June-24_19-11/Temperature_Table.csv uploaded
AWS Table: weather_tables/2025-June-24_19-11/Locati

In [18]:
# Which cities have the longest daylight duration?
# Convert the time into readable time. Order by daylight hour
top_10_cities = sunrise_sunset_df.select(
    col("City_ID"), 
    date_format(from_unixtime(col("Sunrise") + col("Timezone")),"HH:mm:ss").alias("Sunrise"),
    date_format(from_unixtime(col("Sunset") + col("Timezone")),"HH:mm:ss").alias("Sunset"), 
    round(((col("Sunset") - col("Sunrise")) / 3600),2).alias("Daylight_Hours"),
    col("Timezone")).orderBy(col("Daylight_Hours").desc()).limit(10)

final_table = top_10_cities.join(location_df, top_10_cities["City_ID"] == location_df["ID"]
                ).select(location_df["ID"], 
                         location_df["City"], 
                         location_df["Country"], 
                         top_10_cities["Sunrise"], 
                         top_10_cities["Sunset"], 
                         top_10_cities["Daylight_Hours"], 
                         top_10_cities["Timezone"]
                ).orderBy(col("Daylight_Hours").desc())
#final_table.show()

final_table.createOrReplaceTempView("Final_Table")

query = '''
        SELECT ID, CITY, COUNTRY, SUNRISE, SUNSET, DAYLIGHT_HOURS, TIMEZONE, RANK()OVER(ORDER BY DAYLIGHT_HOURS DESC) RANK
        FROM FINAL_TABLE
    '''
result = spark.sql(query)
result.show()


data = result.toPandas()

export_data.to_parquet(data, 'Longest_Daytime.parquet')
export_data.to_csv(data, 'Longest_Daytime.csv')
export_data.to_json(data, 'Longest_Daytime.json')

+-------+----------+-------+--------+--------+--------------+--------+----+
|     ID|      CITY|COUNTRY| SUNRISE|  SUNSET|DAYLIGHT_HOURS|TIMEZONE|RANK|
+-------+----------+-------+--------+--------+--------------+--------+----+
|3413829| Reykjavik|     IS|20:56:47|18:03:09|         21.11|       0|   1|
| 658225|  Helsinki|     FI|21:55:36|16:50:03|         18.91|   10800|   2|
|3143244|      Oslo|     NO|21:55:19|16:43:51|         18.81|    7200|   3|
|2673730| Stockholm|     SE|21:32:22|16:08:14|          18.6|    7200|   4|
| 524901|    Moscow|     RU|21:45:53|15:18:18|         17.54|   10800|   5|
|2618425|Copenhagen|     DK|22:26:42|15:57:55|         17.52|    7200|   6|
|2964574|    Dublin|     IE|22:57:37|15:57:17|         16.99|    3600|   7|
|2950159|    Berlin|     DE|22:44:18|15:33:33|         16.82|    7200|   8|
|2759794| Amsterdam|     NL|23:19:17|16:06:45|         16.79|    7200|   9|
| 756135|    Warsaw|     PL|22:15:37|15:01:25|         16.76|    7200|  10|
+-------+---

In [19]:
# Which city has the highest difference between actual temperature and feels-like temperature?
# temperature_df
top_10_cities = temperature_df.select(
    col("City_ID"), 
    col("Temp"), 
    col("Feels_Like"), 
    round(abs(col("Temp") - col("Feels_Like")),2).alias("Difference")
        ).orderBy(col("Difference").desc()).limit(10)
#top_10_cities.show()

final_top_10_cities_temperature = top_10_cities.join(location_df, top_10_cities["City_ID"] == location_df["ID"]).select(
    top_10_cities["City_ID"], 
    location_df["City"], 
    location_df["Country"], 
    top_10_cities["Temp"], 
    top_10_cities["Feels_Like"], 
    top_10_cities["Difference"]
        ).orderBy(col("Difference").desc())
#final_top_10_cities_temperature.show()

final_top_10_cities_temperature.createOrReplaceTempView("Final_Table")

query = '''
        SELECT CITY_ID, CITY, COUNTRY, TEMP, FEELS_LIKE, DIFFERENCE, RANK()OVER(ORDER BY DIFFERENCE DESC) RANK
        FROM FINAL_TABLE
    '''

final_result = spark.sql(query)
final_result.show()

data = final_result.toPandas()

export_data.to_parquet(data, 'Temperature_Feel_Like_Temperature_Diff.parquet')
export_data.to_csv(data, 'Temperature_Feel_Like_Temperature_Diff.csv')
export_data.to_json(data, 'Temperature_Feel_Like_Temperature_Diff.json')

+-------+---------+-------+-----+----------+----------+----+
|CITY_ID|     CITY|COUNTRY| TEMP|FEELS_LIKE|DIFFERENCE|RANK|
+-------+---------+-------+-----+----------+----------+----+
|6167865|  Toronto|     CA|33.86|     39.84|      5.98|   1|
|1880252|Singapore|     SG|28.82|     34.66|      5.84|   2|
|1275339|   Mumbai|     IN|27.99|     32.54|      4.55|   3|
|1581130|    Hanoi|     VN| 27.0|     30.95|      3.95|   4|
| 292223|    Dubai|     AE|31.96|     35.56|       3.6|   5|
|1701668|   Manila|     PH| 26.7|     29.98|      3.28|   6|
|1609350|  Bangkok|     TH|27.94|     30.81|      2.87|   7|
|4887398|  Chicago|     US|30.56|     33.06|       2.5|   8|
|5128581| New York|     US|37.92|     40.36|      2.44|   9|
|  98182|  Baghdad|     IQ|32.95|     30.56|      2.39|  10|
+-------+---------+-------+-----+----------+----------+----+



In [20]:
weather_uploader = UploadWeatherData(weather_result_filename)

In [21]:
# Upload to AWS
weather_uploader.upload_to_AWS()

AWS Result: weather_results/50_City_Results_2025-June-24_16-21/Temperature_Feel_Like_Temperature_Diff.parquet uploaded
AWS Result: weather_results/50_City_Results_2025-June-24_16-21/Longest_Daytime.csv uploaded
AWS Result: weather_results/50_City_Results_2025-June-24_16-21/Longest_Daytime.parquet uploaded
AWS Result: weather_results/50_City_Results_2025-June-24_16-21/Longest_Daytime.json uploaded
AWS Result: weather_results/50_City_Results_2025-June-24_16-21/Temperature_Feel_Like_Temperature_Diff.json uploaded
AWS Result: weather_results/50_City_Results_2025-June-24_16-21/Temperature_Feel_Like_Temperature_Diff.csv uploaded


In [22]:
weather_uploader.upload_to_Google()

Google Result: weather_results/50_City_Results_2025-June-24_16-21/Temperature_Feel_Like_Temperature_Diff.parquet
Google Result: weather_results/50_City_Results_2025-June-24_16-21/Longest_Daytime.csv
Google Result: weather_results/50_City_Results_2025-June-24_16-21/Longest_Daytime.parquet
Google Result: weather_results/50_City_Results_2025-June-24_16-21/Longest_Daytime.json
Google Result: weather_results/50_City_Results_2025-June-24_16-21/Temperature_Feel_Like_Temperature_Diff.json
Google Result: weather_results/50_City_Results_2025-June-24_16-21/Temperature_Feel_Like_Temperature_Diff.csv


In [23]:
weather_uploader.upload_to_Azure()

Azure Result: weather_results/50_City_Results_2025-June-24_16-21/Temperature_Feel_Like_Temperature_Diff.parquet uploaded
Azure Result: weather_results/50_City_Results_2025-June-24_16-21/Longest_Daytime.csv uploaded
Azure Result: weather_results/50_City_Results_2025-June-24_16-21/Longest_Daytime.parquet uploaded
Azure Result: weather_results/50_City_Results_2025-June-24_16-21/Longest_Daytime.json uploaded
Azure Result: weather_results/50_City_Results_2025-June-24_16-21/Temperature_Feel_Like_Temperature_Diff.json uploaded
Azure Result: weather_results/50_City_Results_2025-June-24_16-21/Temperature_Feel_Like_Temperature_Diff.csv uploaded
