In [15]:
# Install required libraries
!pip install pyspark requests google-cloud-storage google-cloud-bigquery




In [16]:
#Cell 2: Extract (API Fetch with GCS Backup)
import requests
import json
from google.cloud import storage
from datetime import datetime

url = "https://api.open-meteo.com/v1/forecast?latitude=19.07&longitude=72.88&daily=temperature_2m_max,temperature_2m_min,precipitation_sum&timezone=Asia/Kolkata&past_days=3"  # Mumbai, India
response = requests.get(url)
if response.status_code == 200:
    data = response.json()
    # Save to local temp file
    with open('weather_data.json', 'w') as f:
        json.dump(data, f)
    # Save to GCS for backup
    client = storage.Client.from_service_account_json('Your-key')  # Replace with your local key path
    bucket = client.get_bucket('your-bucket')  # Replace with your bucket
    blob_name = f'weather_raw_{datetime.now().strftime("%Y-%m-%d")}.json'
    blob = bucket.blob(blob_name)
    blob.upload_from_string(json.dumps(data))
    print(f"Extracted and saved to GCS: {blob_name}")
else:
    print("API error:", response.status_code)

Extracted and saved to GCS: weather_raw_2025-09-28.json


In [17]:
#Cell 3:Transform (with City and End Date Columns)
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, sum, col, lit, max, min, round, concat, row_number
from pyspark.sql.window import Window
from datetime import datetime, timedelta
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64'  # Keep as is for testing
spark = SparkSession.builder.appName('WeatherETL').config("spark.driver.memory", "1g").config("spark.executor.memory", "1g").getOrCreate()
df = spark.read.json('weather_data.json')
df_daily = df.selectExpr("explode(arrays_zip(daily.time, daily.temperature_2m_max, daily.temperature_2m_min, daily.precipitation_sum)) as daily_data")
df_exploded = df_daily.select(
    col("daily_data.time").alias("date"),
    col("daily_data.temperature_2m_max").alias("max_temp"),
    col("daily_data.temperature_2m_min").alias("min_temp"),
    col("daily_data.precipitation_sum").alias("precipitation")
)
# Filter to yesterday's date
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
df_exploded = df_exploded.filter(col("date") == yesterday)
end_date = df_exploded.agg(max("date").alias("end_date")).collect()[0]["end_date"]
analysis = df_exploded.agg(
    max("max_temp").alias("max_temp"),
    min("min_temp").alias("min_temp"),
    sum("precipitation").alias("total_precipitation")
).withColumn("city", lit("Mumbai")).withColumn("end_date", lit(end_date))
window_spec = Window.orderBy(lit(1))  # Dummy order to ensure row_number works
analysis = analysis.withColumn("id", row_number().over(window_spec) - 1 + 1)  # Start from 1
analysis = analysis.select("id", "city", "max_temp", "min_temp", "total_precipitation", "end_date") \
    .withColumn("max_temp", concat(round(col("max_temp"), 2), lit("°C"))) \
    .withColumn("min_temp", concat(round(col("min_temp"), 2), lit("°C"))) \
    .withColumn("total_precipitation", concat(round(col("total_precipitation"), 2), lit(" mm")))
analysis.coalesce(1).write.mode('overwrite').csv('transformed_weather', header=True)
print("Transformed!")

Transformed!


In [18]:
#Cell 4: Load (to BigQuery)
from google.cloud import bigquery
import os
client = bigquery.Client.from_service_account_json('your-key')  # Replace with your local key path
dataset_id = 'weather_dataset'
try:
    client.get_dataset(dataset_id)
except:
    client.create_dataset(dataset_id)
table_id = 'your-project-id.weather_dataset.weather_analysis'  # Replace with your Project ID
for file in os.listdir('transformed_weather'):
    if file.startswith('part-') and file.endswith('.csv'):
        csv_file = f'transformed_weather/{file}'
        break
else:
    raise Exception("No CSV found in 'transformed_weather'")
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    autodetect=True,
    write_disposition='WRITE_TRUNCATE'  # Overwrite to avoid duplicates
)
with open(csv_file, 'rb') as source_file:
    job = client.load_table_from_file(source_file, table_id, job_config=job_config)
job.result()
print("Loaded to BigQuery!")

Loaded to BigQuery!
