In [0]:
%pip install -q -U google-genai --quiet

In [0]:

# Imports
import requests
import json
import pyspark.sql.functions as F
from pyspark.sql.functions import col

# Transform sunrise and sunset to datetime in NYC timezone
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
import time
import uuid

In [0]:
# Get the API OpenWeatherMap key
API_KEY = dbutils.widgets.get('API_KEY')

In [0]:
class Weather:
    
    # Define the constructor
    def __init__(self, API_KEY):
        self.API_KEY = API_KEY

    # Define a method to retrieve weather data
    def get_weather(self, city, country, units='imperial'):
        self.city = city
        self.country = country
        self.units = units

        # Make a GET request to an API endpoint that returns JSON data
        url = f"https://api.openweathermap.org/data/2.5/weather?q={city},{country}&APPID={w.API_KEY}&units={units}"
        response = requests.get(url)

        # Use the .json() method to parse the response text and return
        if response.status_code != 200:
            raise Exception(f"Error: {response.status_code} - {response.text}")
        return response.json()

In [0]:
# Instantiate the class
w = Weather(API_KEY=API_KEY)

# Get the weather data
nyc = w.get_weather(city='New York', country='US')
nyc

In [0]:
# Getting information
run_id = str(uuid.uuid4())
id = nyc['id']
timestamp = nyc['dt']
weather = nyc['weather'][0]['main']
temp = nyc['main']['temp']
tmin = nyc['main']['temp_min']
tmax = nyc['main']['temp_max']
country = nyc['sys']['country']
city = nyc['name']
sunrise = nyc['sys']['sunrise']
sunset = nyc['sys']['sunset']

# Timestamp, Sunrise and Sunset to NYC timezone
target_timezone = ZoneInfo("America/New_York")
ingest_ts_nyc = datetime.now(timezone.utc).astimezone(target_timezone).isoformat()
dt_utc = datetime.fromtimestamp(sunrise, tz=timezone.utc)
sunrise_nyc = str(dt_utc.astimezone(target_timezone).time()) # get only sunrise time time
dt_utc = datetime.fromtimestamp(sunset, tz=timezone.utc)
sunset_nyc = str(dt_utc.astimezone(target_timezone).time()) # get only sunset time time
dt_utc = datetime.fromtimestamp(timestamp, tz=timezone.utc)

# Create a dataframe from the variables
df = spark.createDataFrame([[run_id, id, ingest_ts_nyc, weather, temp, tmin, tmax, country, city, sunrise_nyc, sunset_nyc]], schema=['run_id', 'id', 'ingest_ts_nyc','weather', 'temp', 'tmin', 'tmax', 'country', 'city', 'sunrise', 'sunset'])


display(df)
     

In [0]:
# from google import genai
# from google.genai import types
# # Get OpenAI Key
# GEMINI_API_KEY= dbutils.widgets.get('GEMINI_API_KEY')

# client = genai.Client(api_key= GEMINI_API_KEY)

# response = client.models.generate_content(
#     model="gemini-2.5-flash",
#     config=types.GenerateContentConfig(
#         system_instruction="You are a weatherman that gives suggestions about how to dress based on the weather. Answer in one sentence."),
#     contents= f"The weather is {weather}, temperature now is {temp} fahrenheit, with minimum temperature {tmin} fahrenheit and maximum temperature {tmax} fahrenheit for the day. How should I dress?"
# )

# suggestion = response.text

# # Add the suggestion to the df
# df = df.withColumn('suggestion', F.lit(suggestion))
# display(df)

In [0]:
%sql
-- Creating a Catalog
CREATE CATALOG IF NOT EXISTS travel_planning
COMMENT 'This is the catalog for the planning travel ';

In [0]:
%sql
-- Creating a Schema
CREATE SCHEMA IF NOT EXISTS travel_planning.lakehouse
COMMENT 'This is the schema for the travel planning pipeline';

In [0]:

%sql
-- Let's create a volume
CREATE VOLUME IF NOT EXISTS travel_planning.lakehouse.raw_data_weather
COMMENT 'This is the raw data volume for the weather pipeline';

In [0]:

%sql
--Creating Schema to hold transformed data
CREATE SCHEMA IF NOT EXISTS travel_planning.silver
COMMENT 'This is the schema for the weather pipeline';

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS travel_planning.gold
COMMENT 'Gold layer for Gemini-ready travel planning context';

In [0]:

# Get timestamp
stamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')

# Path to save
json_path = f'/Volumes/travel_planning/lakehouse/raw_data_weather/weather_{stamp}.json'

# Save the data into a json file
df.write.mode('append').json(json_path)

In [0]:

# Save the transformed data into a table (schema)
(
    df
    .write
    .format('delta')
    .mode("append")
    .saveAsTable('travel_planning.silver.weather')
)

In [0]:
%sql
SELECT *
FROM pipeline_weather.silver.weather