# ETL

In this notebook our goal is create an ETL pipeline from Open-Meteo Weather API. Our goal is to extract data from the API, transform this data, and  store it in a postgreSQL database. The models we will later train will try to predict if it will rain or not based on certain features.

## Extraction

We have weather data from Gainesville florida. Our goal is to predict wheather it will on a given day provided a set of features. Let's first extract the data from the API and see its structure.

In [1]:
import requests
import pandas as pd
from datetime import datetime, timedelta
from  tqdm import tqdm

import os 
os.chdir("../")


# Gainesville
LAT = 29.6516
LON = -82.3248

END_DATE = datetime.today().date() - timedelta(days=5)
START_DATE = END_DATE - timedelta(days=1)

# Format the date strings
start = START_DATE.strftime("%Y-%m-%d")
end = END_DATE.strftime("%Y-%m-%d")

# (hourly) features we will extract

HOURLY_FEATURES = [
    "temperature_2m",
    "relative_humidity_2m",
    "precipitation",
    "cloud_cover",
    "wind_speed_10m",
    "wind_direction_10m",
    "shortwave_radiation",
    "surface_pressure",
    "sunshine_duration",
    "et0_fao_evapotranspiration"
]

# Perform the data Extraction
def extract_weather_data(lat = LAT, lon= LON, start_date= start, end_date = end, hourly_features = HOURLY_FEATURES):
    url = "https://archive-api.open-meteo.com/v1/archive"

    params = {
        "latitude": lat,
        "longitude": lon,
        "start_date": start_date,   
        "end_date": end_date,
        "hourly": ",".join(hourly_features),
        "timezone": "auto"
    }

    response = requests.get(url=url, params=params)
    return pd.DataFrame(response.json()["hourly"])

Notice the dictionary we obtain:

In [2]:
extract_weather_data().head()

Unnamed: 0,time,temperature_2m,relative_humidity_2m,precipitation,cloud_cover,wind_speed_10m,wind_direction_10m,shortwave_radiation,surface_pressure,sunshine_duration,et0_fao_evapotranspiration
0,2025-07-29T00:00,27.7,74,0.0,74,4.2,168,0.0,1009.9,0.0,0.01
1,2025-07-29T01:00,27.1,79,0.0,96,6.1,203,0.0,1009.8,0.0,0.01
2,2025-07-29T02:00,26.7,83,0.0,99,6.5,251,0.0,1009.8,0.0,0.01
3,2025-07-29T03:00,26.2,87,0.0,10,5.3,204,0.0,1009.4,0.0,0.0
4,2025-07-29T04:00,25.8,91,0.0,15,5.8,202,0.0,1009.4,0.0,0.0


## Transformation

Our goal is to have a table with columns our `HOURLY_FEATURES` and each row representing a single day. But notice that since we are obtaining hourly data, for each feature, we obtain 24 values, each connected to an hour of the day. We have several solutions to this problem, we can either agregate the 24 values for each feature, for example find the mean of them. We could also store vectors for each feature, but notice this would require us to use extensions when saving the data to a SQL database since  SQL does not have a vector data type. Finally, we can simply add more features: each feature will be expanded into 24 features each component feature just represents one of the 24 values we obtained. We will use the latter approach. Since `HOURLY_FEATURES` has 10. We will have at the end 240 features.

In [3]:
def flatten_weather_data(data: pd.DataFrame):

    # This line transforms the list of strings data["time"] into a list of datetime64 objets
    # dt.date just extracts the date part. We add this as a new column
    data["date"] = pd.to_datetime(data["time"]).dt.date

    # Drop the time column 
    data = data.drop(columns=["time"])

    # Group by date and flatten
    flattened_rows = []
    for date, groupdf in data.groupby("date"):
        flat_row = {"date": date}
        for col in groupdf.columns:
            if col == "date":
                continue
            for i , val in enumerate(groupdf[col].values):
                flat_row[f"{col}_{i+1}"] = val
        flattened_rows.append(flat_row)

    return pd.DataFrame(flattened_rows)

    

In [4]:
hourly_data = extract_weather_data()
flatten_weather_data(hourly_data).head()

Unnamed: 0,date,temperature_2m_1,temperature_2m_2,temperature_2m_3,temperature_2m_4,temperature_2m_5,temperature_2m_6,temperature_2m_7,temperature_2m_8,temperature_2m_9,...,et0_fao_evapotranspiration_15,et0_fao_evapotranspiration_16,et0_fao_evapotranspiration_17,et0_fao_evapotranspiration_18,et0_fao_evapotranspiration_19,et0_fao_evapotranspiration_20,et0_fao_evapotranspiration_21,et0_fao_evapotranspiration_22,et0_fao_evapotranspiration_23,et0_fao_evapotranspiration_24
0,2025-07-29,27.7,27.1,26.7,26.2,25.8,25.5,25.2,25.1,26.9,...,0.68,0.69,0.47,0.36,0.31,0.21,0.07,0.04,0.0,0.0
1,2025-07-30,25.7,25.9,25.9,25.6,25.4,25.1,24.9,24.8,26.8,...,0.72,0.7,0.51,0.4,0.41,0.24,0.11,0.05,0.02,0.0


## Loading

We now want to load the previously transformed data to a SQL database.

In [5]:
from dotenv import load_dotenv
import os

load_dotenv()

print("Host:", os.getenv("POSTGRES_HOST"))
print("User:", os.getenv("POSTGRES_USER"))
print("Password:", os.getenv("POSTGRES_PASSWORD"))
print("Database:", os.getenv("POSTGRES_DB"))

Host: weather-db.cgbqc24u8du6.us-east-1.rds.amazonaws.com
User: postgres
Password: postgres
Database: postgres


In [6]:
import psycopg2
from dotenv import load_dotenv
import os
from src.datascience import logger

load_dotenv()
print("Host:", os.getenv("POSTGRES_HOST"))
print("User:", os.getenv("POSTGRES_USER"))
print("Password:", os.getenv("POSTGRES_PASSWORD"))
print("Database:", os.getenv("POSTGRES_DB"))

def connect():

    host = os.getenv("POSTGRES_HOST")
    port = 5432
    db_name = os.getenv("POSTGRES_DB")
    user = os.getenv("POSTGRES_USER")
    pw = os.getenv("POSTGRES_PASSWORD")

    try:

        conn = psycopg2.connect(
            host = host,
            port = port, 
            database = db_name,
            user = user,
            password = pw
        )

        logger.info("Connected to the PostgresSQL databse succesfully")
        return conn

    except psycopg2.Error as e:
        logger.error(f"Error connecting to PostgreSQL database: {e}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error during connection: {e}")
        raise

[2025-08-04 22:47:03,065: INFO: __init__: Logger initialized for the datascience package.]
Host: weather-db.cgbqc24u8du6.us-east-1.rds.amazonaws.com
User: postgres
Password: postgres
Database: postgres


Let's now create the table

In [7]:
def create_wether_table(conn):
    features = [
        "temperature_2m", "relative_humidity_2m", "precipitation", "cloud_cover",
        "wind_speed_10m", "wind_direction_10m", "shortwave_radiation",
        "surface_pressure", "sunshine_duration", "et0_fao_evapotranspiration"
    ]

    columns = ["date DATE PRIMARY KEY"]

    for feat in features:
        for hour in range(1, 25):
            col_name = f"{feat}_{hour}"
            columns.append(f"{col_name} FLOAT")


    create_table_sql = f""" 
    CREATE TABLE IF NOT EXISTS  weather_data (
        {', '.join(columns)}
    );
    """
    # Execute the SQL to create the table

    with conn.cursor() as cur:
        cur.execute(create_table_sql)
        conn.commit()

Let's create a class with the database logic, and also add a method to add data to the database

In [None]:
import os
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from dotenv import load_dotenv
from datascienceendtoend1.src.datascience import logger

load_dotenv()

class PostgreSQLConnection:
    def __init__(self):

        self.config = {
            'host': os.getenv("POSTGRES_HOST"),
            'port': int(os.getenv("POSTGRES_PORT", "5432")),
            'database': os.getenv("POSTGRES_DB"),
            'user': os.getenv("POSTGRES_USER"),
            'password': os.getenv("POSTGRES_PASSWORD")
        }

        self.connection = None
        logger.info("PostgreSQL connection manager initialized")

    def connect(self):
        try:
            if self.connection is None or self.connection.closed:
                self.connection = psycopg2.connect(**self.config)
                logger.info("Successfully connected to PostgreSQL database")
            
            return self.connection
            
        except psycopg2.Error as e:
            logger.error(f"Error connecting to PostgreSQL database: {e}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error during connection: {e}")
            raise
    
    def disconnect(self):
        if self.connection and not self.connection.closed:
            self.connection.close()
            logger.info("PostgreSQL connection closed")

    def create_weather_table(self):
        features = [
            "temperature_2m", "relative_humidity_2m", "precipitation", "cloud_cover",
            "wind_speed_10m", "wind_direction_10m", "shortwave_radiation",
            "surface_pressure", "sunshine_duration", "et0_fao_evapotranspiration"
        ]

        try:
            self.connect()

            columns = ["date DATE PRIMARY KEY"]

            for feat in features:
                for hour in range(1, 25):
                    col_name = f"{feat}_{hour}"
                    columns.append(f"{col_name} FLOAT")


            create_table_sql = f""" 
            CREATE TABLE IF NOT EXISTS  weather_data (
                {', '.join(columns)}
            );
            """
            # Execute the SQL to create the table

            with self.connection.cursor() as cur:
                cur.execute(create_table_sql)
                self.connection.commit()
            

            logger.info(f"Weather table created")   

        except psycopg2.Error as e:
            logger.error(f"PostgreSQL error creating weather table: {e}")
            if self.connection:
                self.connection.rollback()
            return False
        except Exception as e:
            logger.error(f"Unexpected error creating weather table: {e}")
            if self.connection:
                self.connection.rollback()
            return False
    
    def insert_data(self, data_df: pd.DataFrame) -> bool:
        """
        Inserts data into the table.

        Args:
            data (dict): Dictionary containing weather data
            
        Returns:
            bool: True if insert successful
        """

        try:
            ## Ensure we are connected to the database

            self.connect()

            # Create the insert query
            columns = data_df.columns.tolist() 
            columns_sql = ", ".join(columns)
            placeholders = ", ".join(["%s"] * len(columns))

            VALUES ({placeholders})
            insert_sql = f""" 
            INSERT INTO weather_data ({columns_sql})
            ON CONFLICT (date) DO NOTHING; 
            """

            # Prepare the values to insert as list of tuples
            values = [tuple(row) for row in data_df.to_numpy()]

            with self.connection.cursor() as cur:
                execute_values(cur, insert_sql, values)
                self.connection.commit()
                data_df.commit()
                logger.info("Data inserted successfully")
            return True
        
        except Exception as e:
            logger.error(f"Error inserting data into PostgresSQL: {e}")
            self.connection.rollback()
            return False
            
        