In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import psycopg2 as pg
import os
import sys
import requests
import logging

In [2]:
# Connect to the database


def db_connect():
    try:
        conn = pg.connect(
            dbname="current_flow_db",
            user="postgres",
            password="admin321",
            host="localhost",
            port="5432",
        )
    except Exception as e:
        print("I am unable to connect to the database")
        print(e)
    return conn

In [3]:
db_connection = db_connect()
cursor = db_connection.cursor()

In [4]:
# Create tables in the database: carga_diaria and Etags.
class CreateTables:
    def __init__(self, connect_pg):
        self.connect_pg = connect_pg
        self.cursor = connect_pg.cursor()

    def carga_diaria(self):
        self.cursor.execute(
            """CREATE TABLE IF NOT EXISTS carga_diaria (
                    id SERIAL PRIMARY KEY,
                    id_subsistema VARCHAR NOT NULL,
                    nom_subsistema VARCHAR NOT NULL,
                    din_instante VARCHAR NOT NULL,
                    val_cargaenergiamwmed VARCHAR,
                    Ano INTEGER NOT NULL,
                    input_file VARCHAR NOT NULL,
                    CONSTRAINT unique_constraint_carga_diaria UNIQUE (id_subsistema, din_instante)
            );"""
        )
        self.connect_pg.commit()
        print("Table 'carga_diaria' created successfully")

    def Etags(self):
        self.cursor.execute(
            """CREATE TABLE IF NOT EXISTS Etags (
                URL TEXT PRIMARY KEY,
                ETag TEXT
            );"""
        )
        self.connect_pg.commit()
        print("Table 'Etags' created successfully")


# Supondo que `connect_pg` seja uma conexão válida com o banco de dados PostgreSQL
create_tables = CreateTables(connect_pg=db_connection)
create_tables.carga_diaria()
create_tables.Etags()

Table 'carga_diaria' created successfully
Table 'Etags' created successfully


In [8]:
# Configure logging for better readability
logging.basicConfig(
    level=logging.INFO,  # Set to INFO or DEBUG for more verbose output
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler(stream=sys.stdout),  # Log to console
        # logging.FileHandler("file_downloads.log"),  # Log to a file
    ],
)


# Function to load ETags from the database
def load_etags_db():
    cursor.execute("SELECT * FROM Etags")  # Assuming `cursor` is a connected DB cursor
    etags = cursor.fetchall()
    etags_df = pd.DataFrame(etags, columns=["URL", "ETag"])
    logging.info("Loaded ETags from the database.")
    return etags_df


# Function to get the ETag from a URL
def get_etag(url):
    try:
        logging.info(f"Fetching ETag for {url}")
        response = requests.head(f"{url}")
        response.raise_for_status()  # Ensure the request was successful
        etag = response.headers.get("ETag")
        logging.info(f"→ ETag fetched: {etag}")
        return etag
    except requests.RequestException as e:
        logging.error(f"Failed to fetch ETag for {url}: {e}")
        return None


# Function to compare the current ETag with the stored ETag
def compare_etag(url, new_etag):
    previous_etag = etags_df[etags_df["URL"] == url]["ETag"].values
    return len(previous_etag) == 0 or previous_etag[0] != new_etag


# Function to stage URLs for download if their ETag has changed
def stage_etag(urls):
    urls_to_update = []
    logging.info("Checking for updates based on ETag comparison...")
    for url in urls:
        etag = get_etag(url)
        if etag and compare_etag(url, etag):
            urls_to_update.append(url)
            logging.info(f"✔ URL staged for download: {url}")
        else:
            logging.info(f"✖ URL is up-to-date: {url}")
    return urls_to_update


# Function to download the file from the URL and save it
def download_file(url, save_dir="data/"):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    try:
        logging.info(f"Starting download: {url}")
        response = requests.get(f"{url}")
        response.raise_for_status()  # Ensure the request was successful
        file_path = os.path.join(save_dir, os.path.basename(url))
        with open(file_path, "wb") as f:
            f.write(response.content)
        logging.info(f"→ Download complete: {url} saved to {file_path}")
    except requests.RequestException as e:
        logging.error(f"Error downloading {url}: {e}")


# Function to update the database with new ETags
def update_etag_db(url, new_etag):
    try:
        logging.info(f"Updating ETag for {url} in the database.")
        # Check if the URL already exists in the database
        cursor.execute("SELECT * FROM Etags WHERE URL = %s", (url,))
        result = cursor.fetchone()

        if result:
            # If URL exists, update the ETag
            cursor.execute("UPDATE Etags SET ETag = %s WHERE URL = %s", (new_etag, url))
        else:
            # If URL does not exist, insert a new row
            cursor.execute(
                "INSERT INTO Etags (URL, ETag) VALUES (%s, %s)", (url, new_etag)
            )

        # Commit the transaction to save changes
        db_connection.commit()
        logging.info(f"→ Database updated: ETag for {url}")

    except Exception as e:
        logging.error(f"Failed to update ETag for {url} in the database: {e}")
        db_connection.rollback()  # Rollback in case of any issues


# Function to process URLs: compare ETags, download if necessary, and update the DataFrame and database
def process_urls(urls, save_dir="data/"):
    global etags_df

    # Step 1: Stage URLs that need updating
    urls_to_update = stage_etag(urls)

    if urls_to_update:
        logging.info(f"URLs to be updated: {len(urls_to_update)} files")

        # Step 2: Download the files for the staged URLs
        for url in urls_to_update:
            logging.info(f"--- Processing {url} ---")
            download_file(url, save_dir)

            # Update the DataFrame with the new ETag after downloading
            new_etag = get_etag(url)
            if new_etag:
                if not etags_df[etags_df["URL"] == url].empty:
                    etags_df.loc[etags_df["URL"] == url, "ETag"] = new_etag
                else:
                    etags_df.loc[len(etags_df)] = [url, new_etag]

                # Step 3: Update the ETag in the database
                update_etag_db(url, new_etag)

        logging.info(
            "All URLs have been processed. ETag DataFrame and database updated."
        )
    else:
        logging.info("No files need to be updated. All files are up-to-date.")

In [6]:
# Define the base URL for the API
url_base = "https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/carga_energia_di/"

# Define the range of years for which we want to fetch data
years = range(2015, 2025)

# Generate the list of URLs for each year
urls = [f"{url_base}CARGA_ENERGIA_{year}.csv" for year in years]

In [7]:
# Initialize the DataFrame by loading ETags from the database
etags_df = load_etags_db()

# Example usage
process_urls(urls)

2024-09-27 00:36:40,605 [INFO] Loaded ETags from the database.
2024-09-27 00:36:40,607 [INFO] Checking for updates based on ETag comparison...
2024-09-27 00:36:40,608 [INFO] Fetching ETag for https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/carga_energia_di/CARGA_ENERGIA_2015.csv
2024-09-27 00:36:42,413 [INFO] → ETag fetched: "2ef2d1ac377178d14bfa59e64dc578e3"
2024-09-27 00:36:42,418 [INFO] ✖ URL is up-to-date: https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/carga_energia_di/CARGA_ENERGIA_2015.csv
2024-09-27 00:36:42,420 [INFO] Fetching ETag for https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/carga_energia_di/CARGA_ENERGIA_2016.csv
2024-09-27 00:36:43,608 [INFO] → ETag fetched: "f2faf3fdd60276f2084235ea796f9d68"
2024-09-27 00:36:43,611 [INFO] ✖ URL is up-to-date: https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/carga_energia_di/CARGA_ENERGIA_2016.csv
2024-09-27 00:36:43,612 [INFO] Fetching ETag for https://ons-aws-prod-opendata.s3.amazonaws.com/dataset/carga_e

In [10]:
import os
import pandas as pd
import logging


# Function to read downloaded CSV files into pandas DataFrames
def read_csv_file(file_path):
    try:
        logging.info(f"Reading CSV file: {file_path}")
        df = pd.read_csv(file_path, sep=";", decimal=",")
        logging.info(f"File {file_path} read successfully with {len(df)} rows.")
        return df
    except Exception as e:
        logging.error(f"Error reading {file_path}: {e}")
        return None


# Function to delete existing rows for the file from the `carga_diaria` table
def delete_existing_rows(input_file):
    try:
        logging.info(f"Deleting existing rows for {input_file} from the database.")
        delete_query = "DELETE FROM carga_diaria WHERE input_file = %s;"
        cursor.execute(delete_query, (input_file,))
        db_connection.commit()
        logging.info(f"→ Rows for {input_file} deleted successfully.")
    except Exception as e:
        logging.error(f"Failed to delete rows for {input_file}: {e}")
        db_connection.rollback()


# Function to insert all rows from the DataFrame into the `carga_diaria` table
def insert_file_to_db(df, input_file):
    try:
        logging.info(f"Inserting new rows for {input_file} into the database.")
        insert_query = """
        INSERT INTO carga_diaria (id_subsistema, nom_subsistema, din_instante, val_cargaenergiamwmed, Ano, input_file)
        VALUES (%s, %s, %s, %s, %s, %s);
        """
        data_to_insert = [
            (
                row["id_subsistema"],
                row["nom_subsistema"],
                row["din_instante"],
                row["val_cargaenergiamwmed"],
                row["Ano"],
                input_file,
            )
            for _, row in df.iterrows()
        ]

        cursor.executemany(insert_query, data_to_insert)
        db_connection.commit()
        logging.info(f"→ Rows for {input_file} inserted successfully.")
    except Exception as e:
        logging.error(f"Failed to insert rows for {input_file}: {e}")
        db_connection.rollback()


# Function to load data from a file into the database
def load_file_to_db(df, input_file):
    logging.info(f"Loading data from {input_file} into the database.")
    # Step 1: Delete existing rows for this file
    delete_existing_rows(input_file)
    # Step 2: Insert the new data from the file
    insert_file_to_db(df, input_file)


# Main function to process all downloaded files and load them into the database
def process_and_load_files(file_dir="data/"):
    logging.info(f"Processing files in directory: {file_dir}")
    for file_name in os.listdir(file_dir):
        file_path = os.path.join(file_dir, file_name)
        if file_name.endswith(".csv"):
            df = read_csv_file(file_path)
            if df is not None:
                # Add the year based on the file name
                year = int(file_name.split("_")[-1].replace(".csv", ""))
                df["Ano"] = year
                df["input_file"] = file_name
                # Load the data into the database (delete old rows, insert new ones)
                load_file_to_db(df, file_name)


# Example usage
process_and_load_files("../data/")

2024-09-27 00:37:37,656 [INFO] Processing files in directory: ../data/
2024-09-27 00:37:37,658 [INFO] Reading CSV file: ../data/CARGA_ENERGIA_2015.csv
2024-09-27 00:37:37,670 [INFO] File ../data/CARGA_ENERGIA_2015.csv read successfully with 1460 rows.
2024-09-27 00:37:37,673 [INFO] Loading data from CARGA_ENERGIA_2015.csv into the database.
2024-09-27 00:37:37,674 [INFO] Deleting existing rows for CARGA_ENERGIA_2015.csv from the database.
2024-09-27 00:37:37,689 [INFO] → Rows for CARGA_ENERGIA_2015.csv deleted successfully.
2024-09-27 00:37:37,690 [INFO] Inserting new rows for CARGA_ENERGIA_2015.csv into the database.
2024-09-27 00:37:38,953 [INFO] → Rows for CARGA_ENERGIA_2015.csv inserted successfully.
2024-09-27 00:37:38,955 [INFO] Reading CSV file: ../data/CARGA_ENERGIA_2016.csv
2024-09-27 00:37:38,960 [INFO] File ../data/CARGA_ENERGIA_2016.csv read successfully with 1464 rows.
2024-09-27 00:37:38,962 [INFO] Loading data from CARGA_ENERGIA_2016.csv into the database.
2024-09-27 00: