# Data ETL
In this Notebook, we will ingest the data, make some simple transformation and save that into Databricks File System.
1. **Extract**: We will get weather from the Free API [Open Weather Map](https://openweathermap.org/).
2. **Transform**: Next, we will select the columns we want and format the result as a table
3. **Load**: Finally, let's save that to a Delta Table for later consumption.


In [0]:
%pip install openai --quiet

In [0]:
# Imports
import requests
import json
import pyspark.sql.functions as F
from pyspark.sql.functions import col

# Transform sunrise and sunset to datetime in NYC timezone
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
import time

In [0]:
# Get the API OpenWeatherMap key
API_KEY = dbutils.widgets.get('API_KEY')

## Extract

In [0]:
# Creating a class to modularize our code

class Weather:
    
    # Define the constructor
    def __init__(self, API_KEY):
        self.API_KEY = API_KEY

    # Define a method to retrieve weather data
    def get_weather(self, city, country, units='imperial'):
        self.city = city
        self.country = country
        self.units = units

        # Make a GET request to an API endpoint that returns JSON data
        url = f"https://api.openweathermap.org/data/2.5/weather?q={city},{country}&APPID={w.API_KEY}&units={units}"
        response = requests.get(url)

        # Use the .json() method to parse the response text and return
        if response.status_code != 200:
            raise Exception(f"Error: {response.status_code} - {response.text}")
        return response.json()

In [0]:
# Instantiate the class
w = Weather(API_KEY=API_KEY)

# Get the weather data
nyc = w.get_weather(city='New York', country='US')
nyc

{'coord': {'lon': -74.006, 'lat': 40.7143},
 'weather': [{'id': 804,
   'main': 'Clouds',
   'description': 'overcast clouds',
   'icon': '04d'}],
 'base': 'stations',
 'main': {'temp': 54.14,
  'feels_like': 53.44,
  'temp_min': 51.76,
  'temp_max': 56.26,
  'pressure': 992,
  'humidity': 89,
  'sea_level': 992,
  'grnd_level': 993},
 'visibility': 10000,
 'wind': {'speed': 21.85, 'deg': 270, 'gust': 37.98},
 'clouds': {'all': 100},
 'dt': 1766161441,
 'sys': {'type': 1,
  'id': 4610,
  'country': 'US',
  'sunrise': 1766146541,
  'sunset': 1766179850},
 'timezone': -18000,
 'id': 5128581,
 'name': 'New York',
 'cod': 200}

## Transform

In [0]:
# Getting information
id = nyc['id']
timestamp = nyc['dt']
weather = nyc['weather'][0]['main']
temp = nyc['main']['temp']
tmin = nyc['main']['temp_min']
tmax = nyc['main']['temp_max']
country = nyc['sys']['country']
city = nyc['name']
sunrise = nyc['sys']['sunrise']
sunset = nyc['sys']['sunset']

# Timestamp, Sunrise and Sunset to NYC timezone
target_timezone = ZoneInfo("America/New_York")
dt_utc = datetime.fromtimestamp(sunrise, tz=timezone.utc)
sunrise_nyc = str(dt_utc.astimezone(target_timezone).time()) # get only sunrise time time
dt_utc = datetime.fromtimestamp(sunset, tz=timezone.utc)
sunset_nyc = str(dt_utc.astimezone(target_timezone).time()) # get only sunset time time
dt_utc = datetime.fromtimestamp(timestamp, tz=timezone.utc)
time_nyc = str(dt_utc.astimezone(target_timezone))

# Create a dataframe from the variables
df = spark.createDataFrame([[id, time_nyc, weather, temp, tmin, tmax, country, city, sunrise_nyc, sunset_nyc]], schema=['id', 'timestamp','weather', 'temp', 'tmin', 'tmax', 'country', 'city', 'sunrise', 'sunset'])


display(df)

id,timestamp,weather,temp,tmin,tmax,country,city,sunrise,sunset
5128581,2025-12-19 00:13:32-05:00,Clouds,48.02,43.57,49.95,US,New York,07:15:41,16:30:50


In [0]:
from openai import OpenAI

# Get OpenAI Key
OPENAI_API_KEY= dbutils.widgets.get('OPENAI_API_KEY')

client = OpenAI(
    # This is the default and can be omitted
    api_key=OPENAI_API_KEY
)

response = client.responses.create(
    model="gpt-4o-mini",
    instructions="You are a weatherman that gives suggestions about how to dress based on the weather. Answer in one sentence.",
    input=f"The weather is {weather}, with max temperature {tmax} and min temperature {tmin}. How should I dress?"
)

suggestion = response.output_text

# Add the suggestion to the df
df = df.withColumn('suggestion', F.lit(suggestion))
display(df)

id,timestamp,weather,temp,tmin,tmax,country,city,sunrise,sunset,suggestion
5128581,2025-12-19 00:13:32-05:00,Clouds,48.02,43.57,49.95,US,New York,07:15:41,16:30:50,"Dress in layers with a warm sweater or light jacket, pants, and closed-toe shoes to stay comfortable in the cool, cloudy weather."


## Load

Let's create a volume that will hold all the weather data that we get from the API.

In [0]:
%sql
-- Creating a Catalog
CREATE CATALOG IF NOT EXISTS pipeline_weather
COMMENT 'This is the catalog for the weather pipeline';

In [0]:
%sql
-- Creating a Schema
CREATE SCHEMA IF NOT EXISTS pipeline_weather.lakehouse
COMMENT 'This is the schema for the weather pipeline';

In [0]:
%sql
-- Let's create a volume
CREATE VOLUME IF NOT EXISTS pipeline_weather.lakehouse.raw_data
COMMENT 'This is the raw data volume for the weather pipeline';

In [0]:
%sql
--Creating Schema to hold transformed data
CREATE SCHEMA IF NOT EXISTS pipeline_weather.silver
COMMENT 'This is the schema for the weather pipeline';

Let's save Raw JSON into the Raw volume.

In [0]:
# Get timestamp
stamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

# Path to save
json_path = f'/Volumes/pipeline_weather/lakehouse/raw_data/weather_{stamp}.json'

# Save the data into a json file
df.write.mode('append').json(json_path)


Next, we can save the transformed data into a Table

In [0]:
# Save the transformed data into a table (schema)
(
    df
    .write
    .format('delta')
    .mode("append")
    .saveAsTable('pipeline_weather.silver.weather')
)

In [0]:
%sql
SELECT *
FROM pipeline_weather.silver.weather

id,timestamp,weather,temp,tmin,tmax,country,city,sunrise,sunset,suggestion
5128581,2025-12-19 01:59:23-05:00,Clouds,50.79,44.58,52.14,US,New York,07:15:41,16:30:50,"Dress in layers with a light sweater or long-sleeve shirt, paired with a comfortable jacket, and consider wearing pants and closed-toe shoes to stay warm."
5128581,2025-12-19 06:56:11-05:00,Rain,53.71,51.93,55.98,US,New York,07:15:41,16:30:50,"Dress in a waterproof rain jacket, along with a warm sweater or long sleeves, and wear waterproof shoes to keep comfortable in the wet and cool conditions."
5128581,2025-12-19 00:59:56-05:00,Mist,49.26,44.96,50.95,US,New York,07:15:41,16:30:50,"Wear a warm, waterproof jacket, long pants, and closed-toe shoes, and consider layering with a sweater to stay comfortable in the misty conditions."
5128581,2025-12-19 05:56:51-05:00,Rain,53.56,51.93,55.96,US,New York,07:15:41,16:30:50,"Wear a waterproof jacket, long pants, and a pair of waterproof shoes, along with layers to keep warm in the cool, rainy conditions."
5128581,2025-12-19 00:34:20-05:00,Mist,48.56,44.96,50.22,US,New York,07:15:41,16:30:50,"Dress in layers with a light to medium jacket, long pants, and consider a scarf or hat to stay warm in the misty conditions."
5128581,2025-12-19 00:13:32-05:00,Clouds,48.02,43.57,49.95,US,New York,07:15:41,16:30:50,"Dress in layers with a warm sweater or light jacket, pants, and closed-toe shoes to stay comfortable in the cool, cloudy weather."
5128581,2025-12-19 08:00:12-05:00,Rain,55.83,53.55,57.58,US,New York,07:15:41,16:30:50,"Dress in a waterproof jacket, long sleeves, and wear waterproof shoes, as temperatures are cool and rain is expected."
5128581,2025-12-19 10:59:44-05:00,Rain,55.54,53.02,57.54,US,New York,07:15:41,16:30:50,"Dress in a waterproof jacket, long pants, and wear waterproof shoes to stay dry and comfortable in the rainy weather."
5128581,2025-12-19 08:59:24-05:00,Rain,54.7,52.97,57.45,US,New York,07:15:41,16:30:50,"Dress in a waterproof jacket, long pants, and wear comfortable, closed-toe shoes to stay warm and dry in the rain."
5128581,2025-12-19 02:57:16-05:00,Rain,52.34,48.58,54.27,US,New York,07:15:41,16:30:50,"Wear a waterproof jacket, layered clothing, and sturdy waterproof shoes to stay warm and dry in the rain."
