In [1]:
import os
import sys
import psycopg2
from psycopg2 import sql
from dotenv import load_dotenv

In [2]:
load_dotenv('.env')

db_params = {
    "host": os.getenv("POSTGRES_HOST"),
    "port": os.getenv("POSTGRES_PORT"),
    "dbname": os.getenv("POSTGRES_DB"),
    "user": os.getenv("POSTGRES_USER"),
    "password": os.getenv("POSTGRES_PASSWORD"),
}
if not all(db_params.values()):
    print("Database connection parameters are not fully set in environment variables.")
    sys.exit(1)
DB_URL = f"postgresql://{db_params['user']}:{db_params['password']}@{db_params['host']}:{db_params['port']}/{db_params['dbname']}"


In [8]:
with psycopg2.connect(DB_URL) as conn:
    with conn.cursor() as cur:
        cur.execute("select nspname from pg_namespace where nspname in ('etl','staging','core','marts','quarantine') order by 1;")
        schemas = cur.fetchall()
        print("Existing schemas in the db:")
        for schema in schemas:
            print('\t',schema[0])
conn.close()

Existing schemas in the db:
	 core
	 etl
	 marts
	 quarantine
	 staging


In [35]:
with psycopg2.connect(DB_URL) as conn:
    with conn.cursor() as cur:
        cur.execute("select tablename from pg_tables where schemaname='staging' order by 1;")
        [print(x[0]) for x in cur.fetchall()]
conn.close()

attivita_formative_esterne
attivita_formative_fuorisede
attivita_formative_interne
collaborazioni_dettaglio
corsi
dettaglio_corso
filtered_iu_stats
journal_details
mobilita_internazionale_con_studenti
ore_formazione
pubblicazioni
stat_pubb
students


In [33]:
with psycopg2.connect(DB_URL) as conn:
    with conn.cursor() as cur:
        cur.execute("truncate table etl.schema_migrations;")
        # [print(x[0]) for x in cur.fetchall()]
conn.close()

In [7]:
with psycopg2.connect(DB_URL) as conn:
    with conn.cursor() as cur:
        cur.execute("select tablename from pg_tables where schemaname='staging' order by 1;")
        [print(x[0]) for x in cur.fetchall()]
conn.close()

attivita_formative_esterne
attivita_formative_fuorisede
attivita_formative_interne
collaborazioni_dettaglio
corsi
dettaglio_corso
filtered_iu_stats
journal_details
mobilita_internazionale_con_studenti
ore_formazione
pubblicazioni
stat_pubb
students


In [3]:
with psycopg2.connect(DB_URL) as conn:
    with conn.cursor() as cur:

        # cur.execute(
        #     "SELECT * FROM etl.file_manifest ;")
        # print(cur.fetchall())

        cur.execute("truncate table etl.runs cascade;")
        cur.execute("truncate table etl.file_manifest;")
        cur.execute("truncate table staging.students cascade;")
        cur.execute("truncate table staging.attivita_formative_esterne cascade;")
        cur.execute("truncate table staging.attivita_formative_fuorisede cascade;")
        cur.execute("truncate table staging.attivita_formative_interne cascade;")
        cur.execute("truncate table staging.corsi cascade;")

        cur.execute("truncate table staging.collaborazioni_dettaglio cascade;")
        cur.execute("truncate table staging.dettaglio_corso cascade;")
        cur.execute("truncate table staging.filtered_iu_stats cascade;")
        cur.execute("truncate table staging.journal_details cascade;")
        cur.execute("truncate table staging.mobilita_internazionale_con_studenti cascade;")
        cur.execute("truncate table staging.ore_formazione cascade;")
        cur.execute("truncate table staging.pubblicazioni cascade;")
        cur.execute("truncate table staging.stat_pubb cascade;")
        # cur.execute("truncate table etl.schema_migrations;")
conn.close()

In [29]:
from pipeline.staging_transforms import t_students
from pipeline.staging_tools import create_run, load_one_file, finish_run
import glob

job = {
        "pattern": "./data/input/cicli/*/0_students_info.csv",
        "table": "staging.students",
        "transform": t_students,
    }


with psycopg2.connect(DB_URL) as conn:
        with conn.cursor() as cur:
            run_id = create_run(cur)
            conn.commit()
            try:
                files = sorted(glob.glob(job["pattern"]))
                for f in files:
                    load_one_file(cur, run_id, f, job['table'], job['transform'])
                    conn.commit()
                finish_run(cur, run_id, "SUCCESS", None)
                conn.commit()
                print(f"✅ Run success: {run_id}")


            except Exception as e:
                finish_run(cur, run_id, "FAILED", str(e))
                conn.commit()
                print(f"❌ Run failed: {run_id} | {e}", file=sys.stderr)
conn.close()
                 



✅ Run success: a6a44ef3-b3fc-4175-bdf1-df34bf15b9e3


In [36]:
import pandas as pd
df = pd.read_csv("./data/input/stat_pubb/stat_pubb_37.csv", delimiter=',')  
# df = df[~(df['Teacher']=='Nessuna collaborazione prevista')]
df.head()

Unnamed: 0,matricola,cognome,nome,numero_journal,numero_conferenze,numero_capitoli,numero_poster,numero_abstract,numero_brevetti,quartile_1,quartile_2,quartile_3,quartile_4,quartile_5,quartile_6,quartile_7,quartile_8,quartile_9,quartile_10,quartile_11,quartile_12,quartile_13,quartile_14,quartile_15
0,59760,INSINGA,GIORGIO,1,7,0,0,0,1,Q2,,,,,,,,,,,,,,
1,61952,DRI,EMANUELE,3,7,0,0,0,0,Q2,Q2,Q1,,,,,,,,,,,,
2,16417,SISINNI,SILVIA,2,6,0,1,0,0,Q1,Q1,,,,,,,,,,,,,
3,33249,ROBBIANO,LUCA,0,4,0,0,0,0,,,,,,,,,,,,,,,
4,54130,ANGI,ANTONINO,4,6,0,0,0,0,Q3,Q1,Q1,Q1,,,,,,,,,,,


In [33]:
import re

files = glob.glob('data/input/journal_details/journal_details_*.csv')
for f in files:
    base = os.path.basename(f)
    m = re.match(r"journal_details_(\d+)\.csv$", base)
    ciclo = m.group(1) if m else None
    print(ciclo)

38
37
39
40


In [None]:
from datetime import datetime, timezone
import os
import re
import pandas as pd
from pipeline.staging_tools import (extract_ciclo_from_path, extract_matricola_from_filename, extract_cod_ins_and_anno,
                           copy_df_to_staging, is_empty_marker_file, mark_manifest, sha256_file,
                           upsert_manifest)
from typing import Mapping
import psycopg2.extensions as conn
import glob

def stage_attivita_interne( # TO BE COMPLETED
    cur: conn.cursor,
    run_id: int,
    file_path: str = "./data/input/cicli/*/*_attivita_formative_insterne.csv",
    table: str = "staging.attivita_formative_interne",
) -> None:

    colmap: Mapping[str, str] = {
    "Cod Ins.": "cod_ins",
    "Nome insegnamento": "nome_insegnamento",
    "Ore": "ore",
    "Ore riconosciute": "ore_riconosciute",
    "Voto": "voto",
    "Coeff. voto": "coeff_voto",
    "Data esame": "data_esame",
    "Tipo form.": "tipo_form",
    "Liv. Esame": "liv_esame",
    "Tipo attività": "tipo_attivita",
    "Punti": "punti",
}

    files = sorted(glob.glob(file_path))

    for f in files:
        file_hash = sha256_file(f)                      # compute file hash for etl.file_manifest
        file_size = os.path.getsize(f)                  # compute file size for etl.file_manifest
        matricola = extract_matricola_from_filename(f)
        ciclo = extract_ciclo_from_path(f)

        upsert_manifest(cur, run_id, f, file_hash, file_size, status="NEW")

        cur.execute(
            "SELECT status FROM etl.file_manifest WHERE file_hash_sha256=%s",
            (file_hash,),
        )
        existing_status = cur.fetchone()
        if existing_status and existing_status[0] in ("LOADED", "SKIPPED"):
            print(f"File {f} already loaded with status {existing_status[0]}, skipping.")
            continue

        print(f"Loading file {f} into {table}...")
        try:
            if is_empty_marker_file(f):
                mark_manifest(
                    cur,
                    file_hash,
                    status="SKIPPED",
                    rows_loaded=0,
                    error_message="Empty marker detected",
                )
                continue

            raw = pd.read_csv(f, delimiter=",", dtype=str)
            keep = [c for c in colmap if c in raw.columns]
            df = raw[keep].rename(columns=colmap)

            if df.empty:
                mark_manifest(
                    cur,
                    file_hash,
                    status="SKIPPED",
                    rows_loaded=0,
                    error_message="No rows after transform",
                )
                continue

            df.insert(0, "run_id", run_id)
            df.insert(1, "source_file", f)
            df.insert(2, "loaded_at", datetime.now(timezone.utc).isoformat())
            df.insert(3, "matricola", matricola)
            df.insert(4, "ciclo", ciclo)

            copy_df_to_staging(cur, df, table)
            mark_manifest(
                cur,
                file_hash,
                status="LOADED",
                rows_loaded=len(df),
                error_message=None,
            )
            print(f"Loaded {len(df)} rows from {f} into {table}")

        except Exception as e:
            # COPY or any SQL error aborts the transaction → must rollback first
            cur.connection.rollback()

            mark_manifest(
                cur,
                file_hash,
                status="FAILED",
                rows_loaded=None,
                error_message=str(e),
            )
            raise
# for f in glob.glob("./data/input/cicli/*/*_attivita_formative_interne.csv"):
#     print(extract_matricola_from_filename(f))
#     print(extract_ciclo_from_path(f))


TypeError: stage_attivita_interne() missing 2 required positional arguments: 'cur' and 'run_id'