In [37]:
import sqlite3
import pandas as pd
import os
import logging
from utilities.utils import get_latest_processed_file

In [38]:
data_path = r"E:\My_Github\fr-fuel-price-tracking\data"

DB_PATH = os.path.join(data_path, "fuel_prices.db")
TABLE_NAME = "fuel_prices"

conn = sqlite3.connect(DB_PATH)

query = f"""
    WITH LatestUpdates AS (
        SELECT *,
               MAX(updated_at) OVER (PARTITION BY station_id) AS max_updated_at
        FROM {TABLE_NAME}
    )
    SELECT * FROM LatestUpdates
    WHERE updated_at = max_updated_at;
"""

df2 = pd.read_sql_query(query, conn)
conn.close()


In [39]:
len(df2)

9997

In [40]:
# Get the latest processed data file and its associated date
filename_starts_with = "processed_data"
latest_processed_file, latest_processed_date = get_latest_processed_file(data_path, filename_starts_with)

if not latest_processed_file or not os.path.exists(latest_processed_file):
    logging.warning("No processed data file found. Ensure transform_data.py has been run.")

# Load CSV data into a Pandas DataFrame
latest_fetched_data_df = pd.read_csv(latest_processed_file, parse_dates=[
    "gazole_maj", "e10_maj", "sp98_maj", "sp95_maj", "e85_maj", "gplc_maj"
])


In [41]:
len(latest_fetched_data_df)

9999

In [42]:
latest_fetched_data_df.head()

Unnamed: 0,station_id,latitude,longitude,postal_code,address,city,department,code_department,region,services,...,e85_maj,gplc_price,gplc_maj,gazole_unavailable,e10_unavailable,sp98_unavailable,sp95_unavailable,e85_unavailable,gplc_unavailable,updated_at
0,15700005,45.133465,2.222644,15700,RUE DU BOURNAT,Pleaux,Cantal,15,Auvergne-Rhône-Alpes,"Vente de gaz domestique (Butane, Propane), DAB...",...,NaT,,NaT,,,definitive,,definitive,definitive,2025-03-18
1,44160006,47.449842,-2.127483,44160,RELAIS DE BEAULIEU RN 165,Pontchâteau,Loire-Atlantique,44,Pays de la Loire,,...,NaT,,NaT,,definitive,,,definitive,definitive,2025-03-18
2,56700007,47.758147,-3.23759,56700,Le Champ de la Patience,Kervignac,Morbihan,56,Bretagne,"Boutique alimentaire, Boutique non alimentaire",...,NaT,,NaT,,definitive,,,definitive,definitive,2025-03-18
3,94380003,48.766693,2.49011,94380,5 AVENUE DES ROSES,Bonneuil-sur-Marne,Val-de-Marne,94,Île-de-France,"Carburant additivé, DAB (Distributeur automati...",...,NaT,,NaT,,,,,definitive,definitive,2025-03-18
4,91000001,48.613,2.377,91070,RUE DE VILLEROY,Bondoufle,Essonne,91,Île-de-France,"Toilettes publiques, Boutique alimentaire, Bou...",...,NaT,,NaT,,,,definitive,definitive,definitive,2025-03-18


In [43]:
news_station_id = [i for i in latest_fetched_data_df['station_id'] if i not in df2['station_id'].values]
len(f'the total number of news_station_id is : {len(news_station_id)}')

# remove news_station_id from latest_fetched_data_df
latest_fetched_data_df_without_news_station_id = latest_fetched_data_df[~latest_fetched_data_df['station_id'].isin(news_station_id)]

len(latest_fetched_data_df_without_news_station_id)

9997

In [44]:
# Convert multiple columns to datetime at once
date_columns = ["gazole_maj", "e10_maj", "sp98_maj", "sp95_maj", "e85_maj", "gplc_maj"]

# Convert date columns safely
latest_fetched_data_df_without_news_station_id.loc[:, date_columns] = latest_fetched_data_df_without_news_station_id[date_columns].apply(pd.to_datetime)
df2[date_columns] = df2[date_columns].apply(pd.to_datetime)

updated_station_id=[]
for i in latest_fetched_data_df_without_news_station_id['station_id']:
    # Select filtered DataFrame for station_id i
    latest_filtered = latest_fetched_data_df_without_news_station_id[
        latest_fetched_data_df_without_news_station_id["station_id"] == i
    ]

    df2_filtered = df2[df2["station_id"] == i]

    # Ensure both DataFrames are not empty before accessing `.iloc[0]`
    if not latest_filtered.empty and not df2_filtered.empty:
        # Extract first row values
        latest_gazole_maj = latest_filtered["gazole_maj"].iloc[0]
        latest_e10_maj = latest_filtered["e10_maj"].iloc[0]
        latest_sp98_maj = latest_filtered["sp98_maj"].iloc[0]
        latest_sp95_maj = latest_filtered["sp95_maj"].iloc[0]
        latest_e85_maj = latest_filtered["e85_maj"].iloc[0]
        latest_gplc_maj = latest_filtered["gplc_maj"].iloc[0]

        df2_gazole_maj = df2_filtered["gazole_maj"].iloc[0]
        df2_e10_maj = df2_filtered["e10_maj"].iloc[0]
        df2_sp98_maj = df2_filtered["sp98_maj"].iloc[0]
        df2_sp95_maj = df2_filtered["sp95_maj"].iloc[0]
        df2_e85_maj = df2_filtered["e85_maj"].iloc[0]
        df2_gplc_maj = df2_filtered["gplc_maj"].iloc[0]

        # Ensure comparison handles NaT (missing dates) properly
        condition = (
            (pd.notna(latest_gazole_maj) and pd.notna(df2_gazole_maj) and latest_gazole_maj > df2_gazole_maj)
            |
            (pd.notna(latest_e10_maj) and pd.notna(df2_e10_maj) and latest_e10_maj > df2_e10_maj)
            |
            (pd.notna(latest_sp98_maj) and pd.notna(df2_sp98_maj) and latest_sp98_maj > df2_sp98_maj)
            |
            (pd.notna(latest_sp95_maj) and pd.notna(df2_sp95_maj) and latest_sp95_maj > df2_sp95_maj)
            |
            (pd.notna(latest_e85_maj) and pd.notna(df2_e85_maj) and latest_e85_maj > df2_e85_maj)
            |
            (pd.notna(latest_gplc_maj) and pd.notna(df2_gplc_maj) and latest_gplc_maj > df2_gplc_maj)
        )

        if condition:
            updated_station_id.append(i)
print(f'the total number of row updated is : {len(updated_station_id)}')
            

the total number of row updated is : 0


In [46]:
new_or_updated_station_id_to_add = news_station_id + updated_station_id
print(f'the total number of new_or_updated_station_id_to_add is : {len(new_or_updated_station_id_to_add)}')

the total number of new_or_updated_station_id_to_add is : 2


In [47]:
# filter in latest_fetched_data_df row where station_id in new_or_updated_station_id_to_add
df_new = latest_fetched_data_df[latest_fetched_data_df['station_id'].isin(new_or_updated_station_id_to_add)]


In [48]:
df_new

Unnamed: 0,station_id,latitude,longitude,postal_code,address,city,department,code_department,region,services,...,e85_maj,gplc_price,gplc_maj,gazole_unavailable,e10_unavailable,sp98_unavailable,sp95_unavailable,e85_unavailable,gplc_unavailable,updated_at
9997,39110007,46.949,5.874,39110,13 AVENUE ARISTIDE BRIAND,Salins-les-Bains,Jura,39,Bourgogne-Franche-Comté,"Relais colis, Boutique alimentaire, Boutique n...",...,2025-03-18 00:01:00,,NaT,,,,definitive,,definitive,2025-03-18
9998,78200008,48.997,1.711,78200,16 Rue Jean Hoët,Mantes-la-Jolie,Yvelines,78,Île-de-France,"Station de gonflage, Piste poids lourds, Lavag...",...,2025-03-17 08:23:30,,NaT,,,,,,,2025-03-18


In [None]:
# Insert only new or updated records into the database
if df_new.empty:
    logging.info("No new updates found. Database is already up to date.")
else:
    df_new.to_sql(TABLE_NAME, conn, if_exists="append", index=False)
    logging.info(f'New records found: {news_station_id}')
    logging.info(f"Inserted {len(df_new)} new or updated records into the database.")

In [None]:
import sqlite3
import pandas as pd
import os
import logging
from utilities.utils import get_latest_processed_file

# Configure logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")


def initialize_database(data_path):
    """
    Create the SQLite database table if it doesn't exist.
    Ensures the structure is set up before inserting data.
    """
    DB_NAME = "fuel_prices.db"
    TABLE_NAME = "fuel_prices"  # Table name
    DB_PATH = os.path.join(data_path, DB_NAME)

    try:
        conn = sqlite3.connect(DB_PATH)
        cursor = conn.cursor()

        cursor.execute(f"""
        CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            station_id INTEGER,
            latitude REAL,
            longitude REAL,
            postal_code TEXT,
            address TEXT,
            city TEXT,
            department TEXT,
            code_department TEXT,
            region TEXT,
            services TEXT,
            gazole_price REAL,
            gazole_maj DATE,
            e10_price REAL,
            e10_maj DATE,
            sp98_price REAL,
            sp98_maj DATE,
            sp95_price REAL,
            sp95_maj DATE,
            e85_price REAL,
            e85_maj DATE,
            gplc_price REAL,
            gplc_maj DATE,
            gazole_unavailable TEXT,
            e10_unavailable TEXT,
            sp98_unavailable TEXT,
            sp95_unavailable TEXT,
            e85_unavailable TEXT,
            gplc_unavailable TEXT,
            updated_at DATE
        );
        """)
        conn.commit()
        conn.close()
        logging.info("Database initialized successfully.")
    except Exception as e:
        logging.error(f"Error initializing database: {e}", exc_info=True)


def fetch_latest_records(data_path):
    """
    Fetch the latest updated row for each station_id from the database.

    Returns:
        dict: A dictionary where keys are station_ids and values are the latest row data.
    """
    DB_PATH = os.path.join(data_path, "fuel_prices.db")
    TABLE_NAME = "fuel_prices"

    try:
        conn = sqlite3.connect(DB_PATH)

        query = f"""
            SELECT * FROM {TABLE_NAME} t1
            WHERE updated_at = (
                SELECT MAX(updated_at)
                FROM {TABLE_NAME} t2
                WHERE t1.station_id = t2.station_id
            );
        """

        df = pd.read_sql_query(query, conn)
        conn.close()

        return df.to_dict("index")  # Converts DataFrame into a dictionary for quick lookup

    except Exception as e:
        logging.error(f"Error fetching latest records: {e}", exc_info=True)
        return {}


def load_latest_processed_data(data_path):
    """
    Load the latest processed data from CSV and insert new records into the SQLite database.

    Args:
        data_path (str): The directory where the processed CSV files are stored.
    """
    # Database setup
    DB_PATH = os.path.join(data_path, "fuel_prices.db")
    TABLE_NAME = "fuel_prices"

    # Get the latest processed data file and its associated date
    filename_starts_with = "processed_data"
    latest_processed_file, latest_processed_date = get_latest_processed_file(data_path, filename_starts_with)

    if not latest_processed_file or not os.path.exists(latest_processed_file):
        logging.warning("No processed data file found. Ensure transform_data.py has been run.")
        return

    try:
        # Connect to the SQLite database
        conn = sqlite3.connect(DB_PATH)

        # Load CSV data into a Pandas DataFrame
        df = pd.read_csv(latest_processed_file, parse_dates=[
            "gazole_maj", "e10_maj", "sp98_maj", "sp95_maj", "e85_maj", "gplc_maj"
        ])

        logging.info(f'The last number of rows fetched on {latest_processed_date}: {len(df)}')

        # Retrieve latest update information from the database
        latest_updated_rows = fetch_latest_records(data_path)
        logging.info(f'The last number of rows updated per station id in the database: {len(latest_updated_rows)}')

        new_records = 0  # Keep track of new entries

        def is_new_or_updated(row):
            """
            Determines if a row contains new or updated fuel price information compared to the database.

            Args:
                row (Series): A single row from the DataFrame.

            Returns:
                bool: True if the row should be inserted, False otherwise.
            """
            nonlocal new_records
            station_id = row["station_id"]

            if station_id not in latest_updated_rows:
                new_records += 1
                return True  # If the station_id does not exist in DB, it's a new entry

            latest = latest_updated_rows[station_id]

            # Check if any of the new modification dates (`maj` fields) are newer than the existing database record
            updated = False
            for col in ["gazole_maj", "e10_maj", "sp98_maj", "sp95_maj", "e85_maj", "gplc_maj"]:
                if pd.notna(row[col]) and col in latest and pd.notna(latest[col]) and row[col] > latest[col]:
                    updated = True
                    break  # Exit loop early if any column is updated

            if updated:
                new_records += 1

            return updated


        # Apply the filtering function to the DataFrame
        df_new = df[df.apply(is_new_or_updated, axis=1)]

        # Insert only new or updated records into the database
        if df_new.empty:
            logging.info("No new updates found. Database is already up to date.")
        else:
            df_new.to_sql(TABLE_NAME, conn, if_exists="append", index=False)
            logging.info(f'New records found: {new_records}')
            logging.info(f"Inserted {len(df_new)} new or updated records into the database.")

        # Close the database connection
        conn.close()

    except Exception as e:
        logging.error(f"Error loading data: {e}", exc_info=True)


if __name__ == "__main__":
    # Define the directory where processed data is stored
    data_path = r"E:\My_Github\fr-fuel-price-tracking\data"

