# Silver -> Gold Transformations

## Retrieving the names of the files in the Silver layer

import boto3

# Initialize an S3 client
s3 = boto3.client('s3')

# Define the bucket name
bucket_name = 'de-upskill-weatherforecasting'
folder_name = 'Silver/'

# Use a paginator to handle large lists
paginator = s3.get_paginator('list_objects_v2')
parquet_files = set()  # Use a set to avoid duplicates

for page in paginator.paginate(Bucket=bucket_name):
    for obj in page.get('Contents', []):
        key = obj['Key']
        # Check if the key is in the Silver folder and ends with .parquet
        if key.startswith(folder_name) and key.endswith('.parquet'):
            # Remove the 'Silver/' prefix to get only the file name
            file_name = key.replace(folder_name, '').split('/')[0]
            parquet_files.add(file_name)

# Print the list of unique Parquet file names in the Silver folder
print("Parquet file names in the Silver folder:")
for file_name in parquet_files:
    print(file_name)


In [0]:
container_name = "de-upskilling-weather"
folder_name = "Silver/"

file_names = dbutils.fs.ls(f"/mnt/{container_name}/{folder_name}")

parquet_files = [file_info.name.replace(".parquet/", ".parquet") for file_info in file_names]


for file_name in parquet_files:
    print(file_name)

7day_daily_forecast.parquet
7day_hourly_forecast.parquet
Forecast_Data/
Forecasts/
Updates/
cities_dim.parquet
daily_historical.parquet
daily_historical_forecast.parquet
hourly_historical.parquet
hourly_historical_forecast.parquet
temp_forecasts.parquet


In [0]:
# Filter for file names that contain "daily"
daily_files = [file_name for file_name in parquet_files if "daily" in file_name]

# Print the filtered list
print("Parquet file names containing 'daily':")
for file_name in daily_files:
    print(file_name)

Parquet file names containing 'daily':
7day_daily_forecast.parquet
daily_historical.parquet
daily_historical_forecast.parquet


## Reading the cities dim table

In [0]:
df_cities = spark.read.parquet(f"/mnt/{container_name}/Silver/cities_dim.parquet")

df_cities_daily = df_cities.select("city", "country", "latitude", "longitude")

df_cities.show(10)

+------------+--------------+---------+-----------+---------+
|        city|       country| latitude|  longitude|time_zone|
+------------+--------------+---------+-----------+---------+
|      Vienna|       Austria|48.210033|  16.363449|    +1:00|
|   Ljubljana|      Slovenia|46.056946|  14.505751|    +1:00|
|      Denver| United States|39.676938|-104.977053|    -6:00|
|       Paris|        France|48.864716|   2.349014|    +1:00|
|      Zurich|   Switzerland|47.373878|   8.545094|    +1:00|
|      London|United Kingdom|51.509865|  -0.118092|    +0:00|
|      Berlin|       Germany|52.520008|  13.404954|    +1:00|
|Philadelphia| United States|39.952583| -75.165222|    -4:00|
|Indianapolis| United States|   39.791| -86.148003|    -4:00|
|   Nashville| United States|36.174465|  -86.76796|    -5:00|
+------------+--------------+---------+-----------+---------+
only showing top 10 rows



## Performing the transformations needed on the daily and hourly files

In [0]:
print("Parquet file names containing 'daily':")

for file_name in daily_files:
    
    df_daily = spark.read.parquet(f"/mnt/{container_name}/{folder_name}{file_name}")
    
    df_daily = df_daily.join(df_cities_daily, on=['latitude', 'longitude'])
    
    print(df_daily.printSchema())
    
    df_daily.write.mode("overwrite").parquet(f"/mnt/{container_name}/Gold/{file_name}")

Parquet file names containing 'daily':
root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- weather_code: double (nullable = true)
 |-- temperature_2m_max: double (nullable = true)
 |-- temperature_2m_min: double (nullable = true)
 |-- apparent_temperature_max: double (nullable = true)
 |-- apparent_temperature_min: double (nullable = true)
 |-- precipitation_sum: double (nullable = true)
 |-- rain_sum: double (nullable = true)
 |-- showers_sum: double (nullable = true)
 |-- snowfall_sum: double (nullable = true)
 |-- wind_speed_10m_max: double (nullable = true)
 |-- wind_gusts_10m_max: double (nullable = true)
 |-- wind_direction_10m_dominant: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)

None
root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- t

In [0]:
#deupskilling-weatherforecast

In [0]:
hourly_files = [file_name for file_name in parquet_files if "hourly" in file_name]
hourly_files

['7day_hourly_forecast.parquet',
 'hourly_historical.parquet',
 'hourly_historical_forecast.parquet']

### Create a new col with local time
### Write hourly dfs to gold

In [0]:
from pyspark.sql.functions import col, from_unixtime, expr, split
from pyspark.sql import functions as F

for file_name in hourly_files:
    
    df_hourly = spark.read.parquet(f"/mnt/{container_name}/{folder_name}{file_name}")
    
    df_hourly = df_hourly.join(df_cities, on=['latitude', 'longitude'])
    
    
    
    # Combine year, month, day, and time into a single datetime string
    df_with_datetime = df_hourly.withColumn(
        "datetime",
        F.concat_ws(
            " ",
            F.concat_ws("-", F.col("year"), F.lpad(F.col("month"), 2, "0"), F.lpad(F.col("day"), 2, "0")),
            F.col("time")
        )
    )

    # Convert the datetime string to UNIX timestamp
    df_with_unix_time = df_with_datetime.withColumn(
        "Unix_Time",
        F.unix_timestamp(F.col("datetime"), "yyyy-MM-dd HH:mm:ss")
    )

    
    
    # Split the offset into hours and minutes
    local_df = df_with_unix_time.withColumn("offset_seconds",
        (split(col("time_zone"), ":")[0].cast("int") * 3600) +  # Hours to seconds
        (split(col("time_zone"), ":")[1].cast("int") * 60 * expr("sign(split(time_zone, ':')[0])"))  # Minutes to seconds, adjust sign
    )

    # Convert UTC time to local time
    local_df = local_df.withColumn(
        "Local_Time",
        from_unixtime(col("Unix_Time") + col("offset_seconds"))
    )

    
    local_df.show(5)
    
    #local_df.write.mode("overwrite").partitionBy("city", "year").parquet(f"/mnt/{container_name}/Gold/{file_name}")
    local_df.write.mode("overwrite").parquet(f"/mnt/{container_name}/Gold/{file_name}")

+---------+-----------+------------------+--------------------+-------------------------+-------------+----+-------+--------+----------+------------+-----------------+------------------+------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+--------+-----+---+----+-------+-------------+---------+-------------------+----------+--------------+-------------------+
| latitude|  longitude|    temperature_2m|relative_humidity_2m|precipitation_probability|precipitation|rain|showers|snowfall|snow_depth|weather_code|   wind_speed_10m|    wind_speed_80m|   wind_speed_120m|   wind_speed_180m|wind_direction_10m|wind_direction_80m|wind_direction_120m|wind_direction_180m|    wind_gusts_10m|    time|month|day|year|   city|      country|time_zone|           datetime| Unix_Time|offset_seconds|         Local_Time|
+---------+-----------+------------------+--------------------+-------------------------+-------------+----+----

from pyspark.sql.functions import col, from_unixtime, expr, split
from pyspark.sql import functions as F

for file_name in hourly_files:
    
    df_hourly = spark.read.parquet(f"s3://{bucket_name}/{folder_name}{file_name}")
    
    df_hourly = df_hourly.join(df_cities, on=['latitude', 'longitude'])
    
    
    print(df_hourly.printSchema())
    
    df_hourly.show(5)
    
    #df_hourly.write.mode("overwrite").partitionBy("city", "year").parquet(f"s3://{bucket_name}/Gold/{file_name}")