# ETL Process Overview

## What is ETL
ETL stands for Extract, Transform, Load  and it is a fundamental concept in data enginering and bussiness intelligence. 

**Extract**
- E
- Synonymous with Data Ingestion
- The process of reading the data from various sources, which may include databases, spreadsheets, applications, APIs, or flat files

**Transform**
- T
- The transformation of extracted data into a format suitable for analysis
- May include 
  - Cleaning data 
    - removing duplicates 
    - handling missing values
    - standardizing text formats
  - Enriching data 
    - merging data from multiple sources
    - calculating new metrics
  - Restructuring data 
    - Pivoting
    - Summarizing
    - Normalizing
  
**Load**
- L
- Loading of the transformed data into a final destination, typically a data warehouse, database, or a datalake where it can be accessed, queried, and analyzed by end-users or applications

## Difference between ETL and ELT

In some workflows, particularly those that use modern data warehousing solutions like BigQuery, the ETL process may be modified into an ELT, where raw data is loaded into the warehouse before transformation. This takes advantage of the powerful processing capabilities of these platforms to perform transformations on the data after it has been loaded.

## ETL in the Wind Resource Assessment Project

While it is possible to do the entirety of the wind resource assessment using spreadsheets like excel, this is inefficient and non-scalable. Spreadsheets have lmitations on large datasets, make automation more difficult, and struggle with more complex calculations.



**Extract**
- The raw data will be extracted from Excelspreadsheets where it has been intially collected. This data comprises various metrics pertinent to wind resources such as wind speed, direction and gust measurements

**Transform**
- Involves cleansing the data to ensure its quality for analayis. For the context of this project, rows with null values which correspond to missing sensor data, will be dropped. 
  - This is justified because such missing data could distort the analysis

**Load**
- The cleansed and transformed data will be loaded onto a more robust and scalable data management system. Depending on the chose workflow, this can be a traditional SQL database like PostgreSQL, or a cloud-based data warehouse, like big-query. Or even a pandas dataframe. 


### Pure Python and Pandas Workflow






### Mixed SQL and Python Workflow




We have several solutions available to us, including spreadsheets, as discussed below:
- Pure Spreadsheets
- Pure Python and Pandas Workflow
- Mixed SQL and Python Workflow
- Cloud such as Google BigQuery

For the last three options, we need to load data into dataframes/databases, and we need to do some cleaning for all of the options. 

We know from looking at the data that there are rows with null values which correspond to missing sensor data. For the purpose of this project, we decide to just drop these rows, and not include the missing data in our analysis

### Pure Python and Pandas Workflow

We can work with the excel file directly, using the `pd.read_excel()` function to create a pandas dataframe which we can do some cleaning with related functions. We can then use pandas methods that can filter and aggregate the raw data into new pandas dataframes. 

### Mixed SQL and Python Workflow

We can load the csv into a sql table, then use sql queries to filter and aggregate data as views. This may be desirable as a showcase of skill. Querying a dataset with SQL is also relatively easy. 

We have several options here
- We can clean the dataset using excel in accordance with the schema of the table it will be loaded to, export that to a csv, then upload that csv directly to the table in the database (ETL)
- We can export the dataset into a csv, load it into a staging table with a similar schema to our final table, then use a SQL query to load the data in the final table that also does the cleaning/filtering/renaming of things (ELT)
- We can load the excel into a pandas dataframe, do all the cleaning and processing, output a csv, then upload that directly to the final table in the database (ETL)

### Cloud such as BigQuery

**To follow**




# Pure Pandas ETL

Version 1
- As taken from previous work
- Checks for valid input path
- Uses try-except blocks to only allow excel and csv files
- Included cleaning step

Version 2
- Added docstring
- Added engine
- rewrote try-except blocks
- included checking if file exists, valid file extension

Version 2.5
- changed some names
- fixed docstring

In [2]:
# Version 2.5
import pandas as pd
import os 

def pandas_pure_etl(input_path: str) -> pd.DataFrame:
    """
    Perform an ETL process on an Excel or CSV data file using pandas with the in order columns of date-time, wind-speed, gust-speed, and wind-direction
    
    Extracts the data from the input_path provided, transforms it by removing null values and ensureing a datetime data type. 
    Loads the data into a pandas data frame

    Parameters:
        input_path (str): The file system path to the data file. The file must be either an Excel (.xlsx) or CSV (.csv) format.

    Raises:
        FileNotFoundError: If the file specified by 'input_path' does not exist.
        ValueError: If the file is neither an Excel file nor a CSV file, or if it cannot be read.

    Returns:
        pd.DataFrame: A DataFrame containing cleaned wind data with the columns: 'date_time', 'wind_speed', 'gust_speed', and 'wind_direction'.
    """
    
    # Check if file exists
    if not os.path.isfile(input_path):
        raise FileNotFoundError(f"The file does not exist: {input_path}")
    
    # Check if valid extension
    file_extension = os.path.splitext(input_path)[1].lower()
    if file_extension not in [".xlsx",".csv"]:
        raise ValueError("File is not an Excel or CSV file.")
    
    # Try to load dataframe according to extension
    try:
        if file_extension == ".xlsx":  
            df = pd.read_excel(
                io=input_path,
                engine="openpyxl",
                names=["date_time","wind_speed","gust_speed","wind_direction"]
            )
        else:
            df = pd.read_csv(
                io=input_path,
                names=["date_time","wind_speed","gust_speed","wind_direction"]
            )
    except Exception as e:
        raise ValueError(f"Failed to read file: {e}")
        
    # Remove rows with null values
    # Ensure date_time is actually datetime
    df = df.dropna().reset_index(drop=True)
    df["date_time"] = pd.to_datetime(df["date_time"])
        
    return df

# Mixed SQL ETL

Made up of 2 functions 

- pandas_extract_transform()
- sql_load()

Version 2 ET
- docstring
- comments
- check input file existence
- try-except using read_excel
- cleans null values
- provides default output name if none given
- 

Version 2.5
- Fixed docstring
- added file extension checking

In [5]:
# Version 2.5
import pandas as pd
import os

def pandas_extract_transform(input_path : str, output_name : str = None) -> str:
    """
    Performs ET by extracting data from an excel file, cleaning data by removing rows with null values, and then writing to a csv.
    If output_name is not provided, CSV file is named after the excel file and is generated in the same directory

    Parameters:
    - input_path (str): The file path to the source Excel file.
    - output_name (str, optional): The desired name for the output CSV file.

    Raises:
    - FileNotFoundError: If the Excel file specified by 'input_path' does not exist.
    - ValueError: If the Excel file cannot be read.
        
    Returns:
    - str: The file path to the created CSV file.
    """
    
    # Check if file exists
    if not os.path.isfile(input_path):
        raise FileNotFoundError(f"The file does not exist: {input_path}")
    
    # Check if valid extension
    file_extension = os.path.splitext(input_path)[1].lower()
    if file_extension != ".xlsx":
        raise ValueError("File is not an Excel file.")
    
    # Try to load excel file
    try:
        df = pd.read_excel(
            io=input_path,
            engine="openpyxl",
            names=["date_time","wind_speed","gust_speed","wind_direction"]
        )
    except Exception as e:
        raise ValueError(f"Could not read excel file: {e}")
    
    # Remove rows with null values
    df = df.dropna().reset_index(drop=True)
    
    # Provide output_name if not given
    if output_name is None:
        base = os.path.splitext(input_path)[0]
        output_name = f"{base}.csv"
    
    # Assigns output path to be the same directory    
    output_path = os.path.join(os.path.dirname(input_path), output_name)
    
    if os.path.isfile(output_path):
        overwrite = input(f"File {output_name} already exists. Overwrite? y/n").lower()
        if overwrite != "y":
            print("Operation cancelled by user")
            return None
    
    # Generates csv
    df.to_csv(path_or_buf=output_path, index=False, encoding="utf-8")
    
    return output_path

In [None]:
import pandas as pd
import os

def pandas_extract_transform(input_path : str, output_name : str = None) -> str:
    # Check that file exists
    if not os.path.isfile(input_path):
        raise FileNotFoundError(f"The file does not exist: {input_path}")
    
    # Check that file is excel format
    file_extension = os.path.splitext(input_path)[1].lower()
    if file_extension != ".xlsx":
        raise ValueError("File is not an Excel file.")
    
    # Load excel into pandas dataframe. Re-names column headers
    try:
        df = pd.read_excel(
            io=input_path,
            engine="openpyxl",
            names=["date_time","wind_speed","gust_speed","wind_direction"]
        )
    except Exception as e:
        raise ValueError(f"Could not read excel file: {e}")
    
    # Drop rows containing null values. 
    df = df.dropna().reset_index(drop=True)
    
    # Provide name of the result csv if none is given
    if output_name is None:
        base = os.path.splitext(input_path)[0]
        output_name = f"{base}.csv"
    
    # Assigns output path to be the same directory    
    output_path = os.path.join(os.path.dirname(input_path), output_name)
    
    if os.path.isfile(output_path):
        overwrite = input(f"File {output_name} already exists. Overwrite? y/n").lower()
        if overwrite != "y":
            print("Operation cancelled by user")
            return None
    
    # Generates CSV file
    df.to_csv(path_or_buf=output_path, index=False, encoding="utf-8")
    
    return output_path

In [14]:
csv = pandas_extract_transform("/Users/gioabeleda/Desktop/wind-energy-dashboard-streamlit/data/wind_energy.xlsx")

'/Users/gioabeleda/Desktop/wind-energy-dashboard-streamlit/data/wind_energy.csv'

In [16]:
csv

'/Users/gioabeleda/Desktop/wind-energy-dashboard-streamlit/data/wind_energy.csv'

Version 1
- first version that actually works lol
- params moved to function instead of hard coded
- docstring
- file exist check
- file extension check

In [1]:
# Version 1
# Expects a csv file with columns date-time, wind-speed, gust-speed, wind-direction
# Expects already existing database, schema, and table

import os
import psycopg2

def sql_load(input_path : str, dbname : str = "wind_energy_psycopg",
             user: str = "postgres", password : str = "postgres", 
             host: str = "localhost", schema : str = "wind_sites", 
             table : str = "upd_wind_site") -> None:
    """
    Loads data from a CSV file into a PostgreSQL table using psycopg2.

    This function opens a CSV file from the specified path and loads its contents into the given PostgreSQL table. 
    The CSV file must have a header row with the fields corresponding to the database table columns. 
    The database connection is managed within the function, and it uses COPY command for efficient bulk data loading.

    Parameters:
    - input_path (str): Absolute path to the CSV file to be loaded.
    - dbname (str): Name of the database to connect to. Default is "wind_energy_psycopg".
    - user (str): Username for authentication. Default is "postgres".
    - password (str): Password for authentication. Default is "postgres".
    - host (str): Host address of the database. Default is "localhost".
    - schema (str): Schema name of the target table. Default is "wind_sites".
    - table (str): Table name where data will be loaded. Default is "upd_wind_site".

    Returns:
    - None

    Raises:
    - FileNotFoundError: If the CSV file does not exist at the specified path.
    - ValueError: If the specified file is not a CSV file.
    - psycopg2.DatabaseError: If an error occurs during the database operation.

    Note:
    - The function will commit the transaction if the COPY command is successful, or rollback the transaction if an exception occurs.
    - Ensure that the PostgreSQL user has the required permissions to perform a COPY operation on the specified table.
    - The function assumes that the CSV file is formatted correctly with the necessary headers and delimiters.
    """

    # Check if file exists
    if not os.path.isfile(input_path):
        raise FileNotFoundError(f"The file does not exist: {input_path}")
    
    # Check if valid file extension
    file_extension = os.path.splitext(input_path)[1].lower()
    if file_extension != ".csv":
        raise ValueError("File is not a csv file")
    
    parameters = {
        "dbname" : dbname,
        "user" : user,
        "password" : password,
        "host" : host
    }
    
    # pscyopg2 connection block that does the actual loading process
    with psycopg2.connect(**parameters) as connection:
        with connection.cursor() as cursor:
            with open(input_path, "r") as csv:
                try: 
                    cursor.copy_expert(
                        f"""
                        COPY {schema}.{table} (date_time, wind_speed, gust_speed, wind_direction)
                        FROM STDIN
                        DELIMITER ','
                        CSV HEADER
                        """,
                        csv)
                
                except Exception as e:
                    connection.rollback()
                    raise e

In [4]:
sql_load("/Users/gioabeleda/Desktop/wind-energy-dashboard-streamlit/data/wind_energy.csv")

# Excel Conversion No Cleaning
Version 1
- Docstring
- try-except to check excel file
- Renaming columns 

Version 2
- Check if file exists (FileNotFoundError)

In [5]:
# Dataset is converted to CSV in Excel or pandas
# Creates temporary staging table 

import psycopg2

def sql_elt_staging(csv_path : str = None, 
                    dbname : str = "wind_energy_psycopg",
                    temp_table : str = "wind_site_raw",
                    final_table : str = "upd_wind_site", 
                    ) -> None: 

    if csv_path is None:
        raise ValueError("No valid file path")
    
    parameters = {
        "dbname" : dbname,
        "user" : "postgres",
        "password" : "postgres",
        "host" : "localhost"
    }
    
    with psycopg2.connect(**parameters) as connection:
        with connection.cursor() as cursor:
            
            # Create a staging table
            cursor.execute(
                f"""
                CREATE TEMP TABLE {temp_table} (
                    LIKE wind_sites.{final_table}
                    EXCLUDING CONSTRAINTS
                );
                """
            )
            
            # Insert data from csv into staging table
            cursor.execute(
                f"""
                COPY {temp_table} 
                FROM {csv_path}  
                DELIMITER ','
                CSV HEADER;
                """
            )
            
            # Using a filter, copy only valid data to the final table
            cursor.execute(
                f"""
                INSERT INTO {final_table}
                """
            )
            
            # Drop the staging table
            cursor.execute(
                f"""
                DROP TABLE {temp_table};
                """
            )
            
            connection.commit()
