In [1]:
import xml.etree.ElementTree as ET
import requests
import psycopg2
import os
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
from lxml import etree
import time

# Load environment variables (should be identical to other scripts)
load_dotenv()

# Define the database configurations for each environment
db_configs = [
    {
        'dbname': 'Antigone',
        'user': os.getenv('DB_USER', 'DB_USER'),
        'password': os.getenv('DB_PASSWORD', 'DB_PASSWORD'),
        'host': os.getenv('DB_HOST', 'DB_HOST'),
        'schema': 'Source'
    },
    {
        'dbname': 'Ismene',
        'user': os.getenv('DB_USER', 'DB_USER'),
        'password': os.getenv('DB_PASSWORD', 'DB_PASSWORD'),
        'host': os.getenv('DB_HOST', 'DB_HOST'),
        'schema': 'Source'
    },
    {
        'dbname': 'Eteocles',
        'user': os.getenv('DB_USER', 'DB_USER'),
        'password': os.getenv('DB_PASSWORD', 'DB_PASSWORD'),
        'host': os.getenv('DB_HOST', 'DB_HOST'),
        'schema': 'Source'
    },
    {
        'dbname': 'Polyneices',
        'user': os.getenv('DB_USER', 'DB_USER'),
        'password': os.getenv('DB_PASSWORD', 'DB_PASSWORD'),
        'host': os.getenv('DB_HOST', 'DB_HOST'),
        'schema': 'Source'
    }
]

# Function to fetch and parse XML data from Eurostat API
def fetch_eurostat_data(dataset_code):
    url = f"https://ec.europa.eu/eurostat/api/dissemination/sdmx/3.0/data/dataflow/ESTAT/{dataset_code}/1.0?compress=false"
    response = requests.get(url)
    if response.status_code != 200:
        print(f"Failed to fetch data for {dataset_code}. Status code: {response.status_code}")
        return None

    root = etree.fromstring(response.content)
    structured_data = []
    for series in root.findall('.//Series', namespaces={}):
        series_data = series.attrib
        for obs in series.findall('.//Obs', namespaces={}):
            obs_data = obs.attrib
            record = {**series_data, **obs_data}
            structured_data.append(record)

    df = pd.DataFrame(structured_data)
    if df.empty:
        print("DataFrame is empty, please check the XML structure or namespace.")
    else:
        print(df.head())

    return df

# Function to upload data to PostgreSQL
def upload_to_postgres(df, table_name, config):
    engine = create_engine(f'postgresql://{config["user"]}:{config["password"]}@{config["host"]}/{config["dbname"]}')
    df.columns = [col.replace(' ', '_').lower() for col in df.columns]
    df = df.apply(pd.to_numeric, errors='ignore')
    df.to_sql(table_name, engine, schema=config['schema'], if_exists='replace', index=False, chunksize=500)
    print(f"Data uploaded to {config['dbname']} successfully.")

# Dataset mapping to table names
dataset_mapping = {
    'gov_10a_main': 'gov_10a_main_table',
    'lfsa_esgaed': 'self_employment_by_education_raw',
    'lfsq_esgan2': 'self_employment_by_sex_industry_age_raw',
    'lfsa_esgais': 'self_employment_by_occupation_raw'
}

# Main function to orchestrate data ingestion
def main():
    start_time = time.time()

    for dataset_code, table_name in dataset_mapping.items():
        print(f"\nFetching dataset: {dataset_code}")
        data = fetch_eurostat_data(dataset_code)
        if data is not None:
            for config in db_configs:
                upload_to_postgres(data, table_name, config)

    # Calculate runtime
    end_time = time.time()
    runtime = end_time - start_time
    print(f"Total script runtime: {runtime} seconds")

if __name__ == "__main__":
    main()



Fetching dataset: gov_10a_main
  freq geo     na_item sector     unit TIME_PERIOD OBS_VALUE OBS_FLAG
0    A  BE  D3REC_S212     S1  MIO_EUR        1995    1068.5      NaN
1    A  BE  D3REC_S212     S1  MIO_EUR        1996     864.6      NaN
2    A  BE  D3REC_S212     S1  MIO_EUR        1997     824.2      NaN
3    A  BE  D3REC_S212     S1  MIO_EUR        1998     738.1      NaN
4    A  BE  D3REC_S212     S1  MIO_EUR        1999     857.6      NaN
Data uploaded to Antigone successfully.
Data uploaded to Ismene successfully.
Data uploaded to Eteocles successfully.
Data uploaded to Polyneices successfully.

Fetching dataset: lfsa_esgaed
      age freq geo isced11 sex     unit wstatus TIME_PERIOD OBS_FLAG OBS_VALUE
0  Y15-19    A  AT   ED0-2   F  THS_PER    SELF        1995        u       NaN
1  Y15-19    A  AT   ED0-2   F  THS_PER    SELF        1996        u       NaN
2  Y15-19    A  AT   ED0-2   F  THS_PER    SELF        1997        u       NaN
3  Y15-19    A  AT   ED0-2   F  THS_PER  