
# **AirQ_Part2_xxx** — Assignment 2 (ETL + OLAP with Atoti)

- This notebook orchestrates Assignment 2.
- All SQL must live in external `.sql` files under `ddl/`, `etl/`, and `sql/`. 
- All MDX must live in external `.mdx` files under `mdx/`.

**Final folder layout (per‑group, self‑contained)**

```
BI_Projects/
  DWH2_xxx/
    csv/       # 15 OLTP CSV files
    ddl/       # DDL only (staging, warehouse)
    etl/       # ETL steps: a2_etl*.sql files
    mdx/       # MDX-queries in .mdx files: a2_q{NN}_{A|B}.mdx
    mdx_out/   # CSV files with the results of MDX-queries
    pdf/       # PDF files with dashboard exports: a2_q{NN}.pdf
    sql/       # SQL-queries in .sql files: a2_q{NN}_{A|B}.sql
    sqldump/   # Export produced by pg_dump
    AirQ_Part2_xxx.ipynb
    group_xxx.txt
    Report_Part2_Group_xxx.pdf
```
> Replace `xxx` in your file names with your **three‑digit** group number.


## Contents
1. Configuration & preflight (group, paths)  
2. Database connection
3. Reset and create staging schema (`stg2_xxx`) from DDL file 
4. Load CSVs into stg2_xxx (order-sensitive)  
5. Reset and create warehouse (`dwh2_xxx`) from DDL file 
6. ETL runner (executes `etl/a2_etl*.sql`)  
7. SQL queries
8. Atoti setup and build the OLAP cube (scaffold)
9. Define hierarchies and measures
10. MDX queries
11. Batch executor: run all .mdx → CSV (+ an index)
12. Create database dump
13. Submission checklist


## 1) Configuration & preflight

In [260]:
# === Parameters ===
# XXX = "001"               # # three digits, e.g. "007"
# ...
# XXX = "031"               # # three digits, e.g. "007"
# ...
# XXX = "071"               # # three digits, e.g. "007"
# ...
# XXX = "199"               # # three digits, e.g. "007"
XXX = "020"               # # three digits, e.g. "007"

VERBOSE_SQL = False             # print progress when running .sql files
LOAD_ORDER_CSV = []             # or fill later
LOAD_ORDER_DATA = []            # or fill later

In [262]:
import re, time
import shutil, subprocess, os
import json, hashlib

from pathlib import Path
from getpass import getpass
from urllib.parse import quote_plus
from datetime import datetime, timezone

import pandas as pd
import sqlalchemy as sa
import sqlparse
from sqlalchemy import create_engine, text, engine

import atoti as tt

In [264]:
!pip show atoti

Name: atoti
Version: 0.9.10
Summary: Explore metrics across hundreds of dimensions, analyze live data at its most granular level and perform what-if simulations at unparalleled speed
Home-page: https://www.atoti.io
Author: ActiveViam
Author-email: ActiveViam <dev@atoti.io>
License: 
Location: C:\ProgramData\anaconda3\Lib\site-packages
Requires: atoti-client, atoti-server, jdk4py
Required-by: 


In [265]:
# === Toggles & paths ===
root_dir = Path.cwd()
csv_dir = root_dir / "csv"
ddl_dir = root_dir / "ddl"
etl_dir = root_dir / "etl"
mdx_dir = root_dir / "mdx"
mdx_out_dir = root_dir / "mdx_out"
sql_dir = root_dir / "sql"
sqldump_dir = root_dir / "sqldump"

SCHEMA_STG = f"stg2_{XXX}"
SCHEMA_DWH = f"dwh2_{XXX}"

# files we expect in the ddl subfolder
STG2_RESET  = ddl_dir / f"airq_reset_stg2_{XXX}.sql"
STG2_CREATE = ddl_dir / f"airq_create_stg2_{XXX}.sql"
DWH2_RESET  = ddl_dir / f"airq_reset_dwh2_{XXX}.sql"
DWH2_CREATE = ddl_dir / f"airq_create_dwh2_{XXX}.sql"

print("CSV dir:", csv_dir)
print("DDL dir:", ddl_dir)
print("ETL dir:", etl_dir)
print("MDX dir:", mdx_dir)
print("MDX_out dir:", mdx_out_dir)
print("SQL dir:", sql_dir)
print("SQLdump dir:", sqldump_dir)

CSV dir: C:\ProgramData\anaconda3\envs\dwh\etc\jupyter\BI_Projects\DWH2_020\csv
DDL dir: C:\ProgramData\anaconda3\envs\dwh\etc\jupyter\BI_Projects\DWH2_020\ddl
ETL dir: C:\ProgramData\anaconda3\envs\dwh\etc\jupyter\BI_Projects\DWH2_020\etl
MDX dir: C:\ProgramData\anaconda3\envs\dwh\etc\jupyter\BI_Projects\DWH2_020\mdx
MDX_out dir: C:\ProgramData\anaconda3\envs\dwh\etc\jupyter\BI_Projects\DWH2_020\mdx_out
SQL dir: C:\ProgramData\anaconda3\envs\dwh\etc\jupyter\BI_Projects\DWH2_020\sql
SQLdump dir: C:\ProgramData\anaconda3\envs\dwh\etc\jupyter\BI_Projects\DWH2_020\sqldump



## 2) Make database connection


In [269]:
from getpass import getpass

# === Minimal config & connect ===
DB_USER = f"grp_{XXX}"
DB_NAME = "airq"
DB_HOST = "localhost"
DB_PORT = "5432"

# a password is asked once per run; enter empty password if your local pg_hba allows trust/peer
pw = getpass(f"Password for {DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME} (leave empty if not needed): ")
DSN = f"postgresql+psycopg2://{DB_USER}:{quote_plus(pw)}@{DB_HOST}:{DB_PORT}/{DB_NAME}" if pw \
      else f"postgresql+psycopg2://{DB_USER}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

def _mask_dsn(dsn: str) -> str:
    try:
        return str(engine.make_url(dsn).set(password="***"))
    except Exception:
        return re.sub(r"://([^:@]+)(?::[^@]*)?@", r"://\\1:***@", dsn)

engine = create_engine(DSN, future=True, pool_pre_ping=True)
print("Connecting via:", _mask_dsn(DSN))

with engine.begin() as conn:
    # best-effort: set the role if it exists; don't crash if not
    try:
        conn.exec_driver_sql(f"SET ROLE grp_{XXX}")
        print(f"SET ROLE grp_{XXX} ✓")
    except Exception as e:
        print(f"(no SET ROLE: {e.__class__.__name__})")
    who = conn.exec_driver_sql("select current_user").scalar_one()
    print("current_user:", who)


Password for grp_020@localhost:5432/airq (leave empty if not needed):  ········


Connecting via: postgresql+psycopg2://\1:***@localhost:5432/airq
SET ROLE grp_020 ✓
current_user: grp_020


In [271]:
def run_sqlscript(
    path: str,
    *,
    engine,
    progress: bool = True,      # progress/verbosity- show progress OR keep output quiet
    add_search_path: bool = False,
    schema_dwh: str | None = None,
    schema_stg: str | None = None,
    title: str | None = None,      # optional title
    strip_psql_meta: bool = True,  # psql meta stripping
):
    """
    Execute all statements in a .sql file.
    - Returns the LAST result set as a pandas.DataFrame if any statement returns rows; else None.
    - Set progress=False to suppress progress/header prints (great for check scripts).
    """

    raw = Path(path).read_text(encoding="utf-8")

    # Strip psql meta-commands (e.g., \i, \set) if requested
    if strip_psql_meta:
        raw = "\n".join(
            line for line in raw.splitlines()
            if not line.lstrip().startswith("\\")
        )

    # Optional search_path prologue
    prologue = ""
    if add_search_path:
        schs = [s for s in (schema_dwh, schema_stg) if s]
        if schs:
            prologue = f"SET search_path TO {', '.join(schs)};\n"

    script = prologue + raw
    stmts = [s.strip() for s in sqlparse.split(script) if s and s.strip(" ;\n\t")]

    if progress:
        hdr = f"▶ {title}" if title else "▶ Running SQL script"
        print(f"{hdr}: {path} ({len(stmts)} statements)")
    t0 = time.time()

    last_df = None
    with engine.begin() as conn:
        for i, stmt in enumerate(stmts, start=1):
            if not stmt:
                continue
            start = time.time()
            try:
                if progress:
                    preview = " ".join(stmt.split())[:120]
                    print(f"  {i:>3}: {preview} ...")

                cursor = conn.exec_driver_sql(stmt)

                if cursor.returns_rows:
                    rows = cursor.fetchall()
                    cols = cursor.keys()
                    last_df = pd.DataFrame(rows, columns=cols)

                if progress:
                    print(f"       OK ({time.time() - start:.3f}s)")

            except Exception as e:
                # Raise with a helpful preview even when progress=False
                preview = " ".join(stmt.split())[:160]
                raise RuntimeError(
                    f"SQL error in statement #{i}: {preview}"
                ) from e

    if progress:
        print(f"✅ Done in {time.time() - t0:.2f}s")

    return last_df

## 3) Reset and create **staging schema** (`stg2_xxx`) from DDL file

In [123]:
print(f"== STAGING-ONLY RESET: stg2_{XXX} ==")
try:
    for p in (STG2_RESET, STG2_CREATE):
        run_sqlscript(p, engine=engine, progress=VERBOSE_SQL)
except Exception as e:
    print(f"!! Reset & create failed: {e}")
    raise

== STAGING-ONLY RESET: stg2_020 ==


## 4) Load CSV → `stg2_xxx` with Pandas `.to_sql()`

In [273]:
def load_folder_to_stg(
    folder_name: str,
    engine,
    SCHEMA_STG: str,
    load_order=None,
    if_exists: str = "append",
    chunksize: int = 20000,
):
    global root_dir  # expected to be defined earlier
    src_dir = Path(root_dir) / folder_name
    if not src_dir.exists():
        raise FileNotFoundError(f"Folder not found: {src_dir}")

    def load_one(name: str):
        path = src_dir / f"{name}.csv"
        if not path.exists():
            print("Missing CSV:", path.name)
            return 0
        df = pd.read_csv(
            path,
            na_values=["\\N"],
            keep_default_na=False,
            low_memory=False,
        )
        # Convert any *...from / ...to / ...at* to DATE
        for col in df.columns:
            col_l = col.lower()
            if col_l.endswith(("from", "to", "at")):
                df[col] = pd.to_datetime(df[col], format="%Y-%m-%d", errors="coerce").dt.date
        # Write
        df.to_sql(
            name,
            con=engine,
            schema=SCHEMA_STG,
            if_exists=if_exists,
            index=False,
            method="multi",
            chunksize=chunksize,
        )
        print(f"Loaded {len(df):,} rows → {SCHEMA_STG}.{name}")
        return len(df)

    if not load_order:
        discovered = sorted([p.stem for p in src_dir.glob("*.csv")])
        print("No order set yet. CSVs found:", discovered)
        return

    t0 = time.time()
    total = 0
    for name in load_order:
        total += load_one(name)
    print(f"⏱️ Total load time: {time.time() - t0:.2f} seconds · {total:,} rows")

In [128]:
# Loading of original 15 CSV files in the correct order
LOAD_ORDER_CSV = [
    "tb_country",
    "tb_city",
    "tb_role",
    "tb_servicetype",
    "tb_employee",
    "tb_param",
    "tb_alert",
    "tb_paramalert",
    "tb_sensortype",
    "tb_paramsensortype",
    "tb_sensordevice",
    # dependent tables come **after** parent tables
    "tb_readingmode",
    "tb_weather",          # depends only on city
    "tb_readingevent",     # depends on sensordevice + param + readingmode
    "tb_serviceevent",     # depends on employee + sensordevice + servicetype
                 ]

load_folder_to_stg("csv", engine, SCHEMA_STG, load_order=LOAD_ORDER_CSV,  if_exists="append")

Loaded 20 rows → stg2_020.tb_country
Loaded 36 rows → stg2_020.tb_city
Loaded 16 rows → stg2_020.tb_role
Loaded 24 rows → stg2_020.tb_servicetype
Loaded 484 rows → stg2_020.tb_employee
Loaded 30 rows → stg2_020.tb_param
Loaded 4 rows → stg2_020.tb_alert
Loaded 120 rows → stg2_020.tb_paramalert
Loaded 12 rows → stg2_020.tb_sensortype
Loaded 115 rows → stg2_020.tb_paramsensortype
Loaded 627 rows → stg2_020.tb_sensordevice
Loaded 8 rows → stg2_020.tb_readingmode
Loaded 26,316 rows → stg2_020.tb_weather
Loaded 985,573 rows → stg2_020.tb_readingevent
Loaded 22,720 rows → stg2_020.tb_serviceevent
⏱️ Total load time: 434.10 seconds · 1,036,105 rows


## 5) Reset and create **warehouse** (`dwh2_xxx`) from DDL file

In [206]:
print(f"== DWH-ONLY RESET: dwh2_{XXX} ==")
try:
    for p in (DWH2_RESET, DWH2_CREATE):
        run_sqlscript(p, engine=engine, progress=VERBOSE_SQL)
except Exception as e:
    print(f"!! Reset & create failed: {e}")
    raise

== DWH-ONLY RESET: dwh2_020 ==



## 6) SQL-first ETL — run all files in etl/

We execute **all** files matching `etl/a2_etl*.sql` in lexicographic order. Every ETL file must begin with `SET search_path TO dwh2_xxx, stg2_xxx;`  



In [208]:
steps = sorted(etl_dir.glob("a2_etl*.sql"))
if not steps:
    print("No ETL step files found in etl/ (expected a2_etl*.sql).")
else:
    for s in steps:
        run_sqlscript(s, engine=engine, progress=VERBOSE_SQL)

## 7) SQL-queries 

In [275]:
# Business question Q31 (example)
df = run_sqlscript("sql/a2_q31.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,city_name,P95 Recorded Value (2023)
0,Prague,152.58
1,Hamburg,150.75
2,Athens,142.47
3,London,132.38
4,Istanbul,131.14
5,Edinburgh,126.17
6,Copenhagen,124.67
7,Stuttgart,119.63
8,Salzburg,119.14
9,Kazan,117.25


In [277]:
# Business question Q32 (example)
df = run_sqlscript("sql/a2_q32.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,city_name,Data Volume (KB) 2024
0,Istanbul,803441
1,London,330327
2,Moscow,257108
3,Berlin,224658
4,St. Petersburg,215185
5,Paris,196835
6,Rome,188638
7,Ufa,174256
8,Copenhagen,148863
9,Vienna,146300


In [279]:
# Business question Q33 (example)
df = run_sqlscript("sql/a2_q33.sql", engine=engine, progress=VERBOSE_SQL)
display(df)

Unnamed: 0,country_name,month_name,Avg Data Quality
0,Austria,Sep,3.58
1,Belarus,Sep,3.24
2,Belgium,Sep,3.18
3,Croatia,Jun,3.6
4,Czech Republic,Apr,3.24
5,Denmark,Jun,3.48
6,Finland,May,3.7
7,France,Apr,3.09
8,Germany,Sep,3.12
9,Greece,Aug,3.12


In [295]:
# **Business Question Q1** — SQL for Student A

# For parameter PM2, show Exceed Days (any) by Country × Month for Q1 of 2024.
# Return Countries on rows and the first three months of 2024 (Jan–Mar) on columns.

# Running SQL query to get the result
df = run_sqlscript("sql/a2_q01_A.sql", engine=engine, progress=VERBOSE_SQL)
display(df)


Unnamed: 0,country_name,Jan_2024,Feb_2024,Mar_2024
0,Austria,0,0,0
1,Belgium,0,0,0
2,Croatia,0,0,0
3,Czech Republic,0,0,0
4,Denmark,0,0,0
5,Finland,0,0,0
6,France,0,0,0
7,Germany,0,0,0
8,Greece,0,0,0
9,Hungary,0,0,0


In [291]:
# **Business Question Q2** — SQL for Student A

# For parameter O3, show Missing Days in Austria by City × Month for Q1 of 2023.
# Return Austrian Cities on rows and the first three months of 2023 (Jan–Mar) on columns.

# Running SQL query to get the result
df = run_sqlscript("sql/a2_q02_A.sql", engine=engine, progress=VERBOSE_SQL)
display(df)


Unnamed: 0,city_name,Jan_2023,Feb_2023,Mar_2023
0,Graz,11,4,2
1,Salzburg,3,5,4
2,Vienna,6,2,2


In [299]:
# **Business Question Q4** — SQL for Student A

# For 2024, show total Data Volume (KB) by Region × Quarter.
# Return Regions on rows and the four quarters of 2024 (Q1–Q4) on columns.

# Running SQL query to get the result
df = run_sqlscript("sql/a2_q04_A.sql", engine=engine, progress=VERBOSE_SQL)
display(df)


Unnamed: 0,region_name,Q1_2024,Q2_2024,Q3_2024,Q4_2024
0,Central Europe,1780051,1779221,1792775,1807947
1,Eastern Europe,2586668,2598339,2629947,2586709
2,Western Europe,2364973,2376173,2383913,2405937


In [301]:
# **Business Question Q5** — SQL for Student A

# For 2023 and 2024, show total Data Volume (KB) by Param Category × Year.
# Return Param Categories on rows and the two years (2023, 2024) on columns.

# Running SQL query to get the result
df = run_sqlscript("sql/a2_q05_A.sql", engine=engine, progress=VERBOSE_SQL)
display(df)


Unnamed: 0,category,2023,2024
0,Biological,6328259,6298621
1,Gas,5599952,5597083
2,Heavy Metal,4961428,4955113
3,Particulate matter,5257962,5267659
4,Volatile Organic Compound,4962229,4974177


In [317]:
# **Business Question Q6** — SQL for Student A

# For 2024, list the Top 10 Cities by total Missing Days (all parameters).
# Return the Top 10 cities on rows (highest → lowest) and one column with the total Missing Days for 2024.

# Running SQL query to get the result
df = run_sqlscript("sql/a2_q06_A.sql", engine=engine, progress=VERBOSE_SQL)
display(df)


Unnamed: 0,city_name,Total_Missing_Days_2024
0,Zagreb,2524
1,Salzburg,2436
2,Kazan,2327
3,Minsk,2274
4,Warsaw,2273
5,Edinburgh,2196
6,Milan,2189
7,Marseille,2134
8,Hamburg,2133
9,Gothenburg,2109


In [333]:
# **Business Question Q7** — SQL for Student A

# For 2023, show Avg Recorded Value and P95 Recorded Value by Country for PM10.
# Return Countries on rows and two columns: Avg Recorded Value and P95 Recorded Value.

# Running SQL query to get the result
df_q7_b = run_sqlscript("sql/a2_q07_A.sql", engine=engine, progress=VERBOSE_SQL)
display(df_q7_b)


Unnamed: 0,country_name,Avg_Recorded_Value_2023,P95_Recorded_Value_2023
0,Greece,95.07001516666665,131.444565
1,Austria,81.9452968611111,107.143666
2,Turkey,80.61919258333333,92.928107
3,Germany,80.09584458333333,101.082223
4,Croatia,79.74585641666665,96.761852
5,Hungary,79.43401425,95.53179
6,United Kingdom,79.11582875,91.239154
7,Czech Republic,79.09515479166666,99.943981
8,Russia,78.03715670833333,91.607558
9,Belgium,77.52159558333332,93.422599


In [327]:
# **Business Question Q8** — SQL for Student B

# For 2024, show Reading Events by Country × Quarter (Top 10 countries).
# Return the four quarters on columns (Q1–Q4) and the Top 10 countries on rows, ranked by total Reading Events in 2024.

# Running SQL query to get the result
df_q8_b = run_sqlscript("sql/a2_q08_B.sql", engine=engine, progress=VERBOSE_SQL)
display(df_q8_b)


Unnamed: 0,country_name,Q1_2024,Q2_2024,Q3_2024,Q4_2024,total_reading_events
0,Turkey,22618,22383,22562,22277,89840
1,Russia,17562,17851,18006,17648,71067
2,Germany,13509,13244,13718,13759,54230
3,United Kingdom,10797,10802,11045,11075,43719
4,France,8845,9043,9050,8942,35880
5,Austria,7287,7168,7302,7284,29041
6,Italy,6593,6617,6695,6648,26553
7,Sweden,5080,5025,5256,5275,20636
8,Czech Republic,3965,4138,4110,4144,16357
9,Greece,3153,3191,3267,3201,12812


In [329]:
# **Business Question Q9** — SQL for Student B

# For 2024, list the Top 10 Countries by Avg Data Quality.
# Return the 10 countries with the highest values on rows (highest → lowest) and one column with Avg Data Quality for 2024.

# Running SQL query to get the result
df_q9_b = run_sqlscript("sql/a2_q09_B.sql", engine=engine, progress=VERBOSE_SQL)
display(df_q9_b)


Unnamed: 0,country_name,Avg_Data_Quality_2024
0,Belarus,3.0413258209876544
1,Croatia,3.0156915344827584
2,Sweden,3.011928864583333
3,Finland,3.0109282857142854
4,Germany,3.010857410344828
5,Italy,3.0071215112994354
6,Poland,3.0067860333333334
7,France,3.006653448643411
8,Serbia,3.005463354938272
9,United Kingdom,3.002515738700565


In [331]:
# **Business Question Q10** — SQL for Student B

# For 2024, show Exceed Days (any) by Region for Param Category = Gas.
# Return Regions on rows and one column with Exceed Days (any) for the year 2024, filtered to Category = Gas.

# Running SQL query to get the result
df_q10_b = run_sqlscript("sql/a2_q10_B.sql", engine=engine, progress=VERBOSE_SQL)
display(df_q10_b)


Unnamed: 0,region_name,Exceed_Days_2024
0,Central Europe,0
1,Eastern Europe,0
2,Western Europe,0


## 8) Atoti setup and build cube (scaffold)

In [None]:
os.environ.pop("JAVA_HOME", None)  # let Atoti use its own JDK via jdk4py

# Start a new Atoti session
session = tt.Session.start()

# URL to the Atoti web app
session.url

In [None]:
def upsert_table(session, name, df, *, keys=None, defaults=None, dtypes=None):
    if name in session.tables.keys():
        t = session.tables[name]
        t.drop()  # delete all rows, keep schema
        if defaults:  # non-nullability even for existing tables
            for col, val in defaults.items():
                t[col].default_value = val  # set after creation too
        t.load(df)
    else:
        t = session.read_pandas(
            df,
            table_name=name,
            keys=keys or (),
            default_values=defaults or {},   # set at creation time
            data_types=dtypes or {},
        )
    return t

In [None]:
# Load star-schema tables to DataFrames
df_time   = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.dim_timemonth", engine)
df_city   = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.dim_city", engine)
df_param  = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.dim_param", engine)
df_alert  = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.dim_alertpeak", engine)
df_fact   = pd.read_sql(f"SELECT * FROM dwh2_{XXX}.ft_param_city_month", engine)

time_store  = upsert_table(session, "dim_timemonth", df_time,
                           keys=["month_key"],
                           defaults={"year_num": 0, "quarter_num": 0, "month_name": "Unknown"},
                           dtypes={"year_num": "int", "quarter_num": "int"})

city_store  = upsert_table(session, "dim_city", df_city,
                           keys=["city_key"],
                           defaults={"region_name": "Unknown", "country_name": "Unknown", "city_name": "Unknown"})

param_store = upsert_table(session, "dim_param", df_param,
                           keys=["param_key"],
                           defaults={"purpose": "Unknown", "category": "Unknown", "param_name": "Unknown"})

ap_store    = upsert_table(session, "dim_alertpeak", df_alert,
                           keys=["alertpeak_key"],
                           defaults={"alert_level_name": "None"})

fact_store = upsert_table(session, "ft_param_city_month", df_fact, 
                          keys=["ft_pcm_key"], 
                          defaults={"month_key": 0, "city_key": 0, "param_key": 0, "alertpeak_key": 1000,  # FKs
                                    "reading_events_count": 0, "devices_reporting_count": 0, 
                                    "data_volume_kb_sum": 0, "recordedvalue_avg": 0.0, "recordedvalue_p95": 0.0, 
                                    "exceed_days_any": 0, "data_quality_avg": 0.0, "missing_days": 0, 
                                   }, 
                          dtypes={"month_key": "int", "city_key": "int", 
                                  "param_key": "int", "alertpeak_key": "int", 
                                  "reading_events_count": "int", "devices_reporting_count": "int", 
                                  "data_volume_kb_sum": "int", "recordedvalue_avg": "float", 
                                  "recordedvalue_p95": "float", "exceed_days_any": "int", 
                                  "data_quality_avg": "float", "missing_days": "int", 
                                 },
                         )

# Define joins once per fresh session - can re-run the cell without redefining joins
if not getattr(session, "_airq_joins_done", False):
    fact_store.join(time_store,   fact_store["month_key"]     == time_store["month_key"])
    fact_store.join(city_store,   fact_store["city_key"]      == city_store["city_key"])
    fact_store.join(param_store,  fact_store["param_key"]     == param_store["param_key"])
    fact_store.join(ap_store,     fact_store["alertpeak_key"] == ap_store["alertpeak_key"])
    session._airq_joins_done = True

# Create or reuse the cube
cube_name = "AirQ Cube"
cube = (
    session.cubes[cube_name]
    if cube_name in session.cubes.keys()
    else session.create_cube(fact_store, cube_name, mode="manual")
)

# Access cube components
m, h, l = cube.measures, cube.hierarchies, cube.levels

cube

## 9) Define hierarchies and measures
Define explicit hierarchies in Atoti:

1) Time: Year → Quarter → Month,
2) Geo: Region → Country → City,
3) Param: Purpose → Category → Param,
4) Alert: Level (sorted by rank).

In [None]:
# TODO: define hierarchies
# h["Time"]  = [...]
# h["Geo"]   = [...]
# h["Param"] = [...]
# h["Alert"] = [...]

# TODO: define measures
# m["Reading Events"]        =   ...      # fully additive
# m["Devices Reporting"]     =   ...      # fully additive
# m["Data Volume (KB)"]      =   ...      # fully additive
# m["Missing Days"]          =   ...      # fully additive
# m["Exceed Days (any)"]     =   ...      # fully additive
# m["Avg Recorded Value"]    =   ...      # semi-additive
# m["P95 Recorded Value"]    =   ...      # semi-additive
# m["Avg Data Quality"]      =   ...      # semi-additive


In [None]:
# order months as in calendar, not alphabetically
month_lvl = cube.hierarchies["Time"]["month_name"]
month_lvl.order = tt.CustomOrder(first_elements=["Jan","Feb","Mar","Apr","May","Jun",
                                                 "Jul","Aug","Sep","Oct","Nov","Dec"])

In [None]:
# order alert levels from least to most harmful
alert_lvl = cube.hierarchies["Alert"]["alert_level_name"]
alert_lvl.order = tt.CustomOrder(first_elements=["None", "Yellow", "Orange", "Red", "Crimson"])

In [None]:
cube

In [None]:
print("\nHierarchies and their levels:")
for h_name, hierarchy in cube.hierarchies.items():
    level_names = [getattr(level, "name", str(level)) for level in hierarchy]
    print(f" - {h_name} → levels: {level_names}")

print("\Measures:")
for m in cube.measures.keys():
    print("  -", m)    

## 10) MDX queries

In [None]:
# MDX cell magic: let us write MDX code like this:
#   %%mdx
#   SELECT ... FROM [AirQ Cube]
#
# Requirements: a live `session` from atoti and the cube already created.

from IPython.core.magic import register_cell_magic
from IPython.display import display

@register_cell_magic
def mdx(line, cell):
    """Run MDX in this cell and display a DataFrame.
    Usage:
        %%mdx
        SELECT ...
        FROM [AirQ Cube]
    """
    q = cell.strip()
    df = session.query_mdx(q)   # Atoti returns levels on index, measures as columns
    return df                   # df = _


### 10.1) Business question Q31 (example)

In [None]:
%%mdx

-- 31. For parameter O3, list the Top 10 Cities by P95 Recorded Value for 2023.
-- Return the 10 cities with the highest values on rows (highest → lowest) and one column with P95 Recorded Value for 2023.
SELECT
  { [Measures].[P95 Recorded Value] } ON COLUMNS,
  TOPCOUNT(
    NONEMPTY(
      [dim_city].[Geo].[city_name].Members,
      [Measures].[P95 Recorded Value]
    ),
    10, [Measures].[P95 Recorded Value]
  ) ON ROWS
FROM [AirQ Cube]
WHERE (
  [dim_timemonth].[Time].[year_num].&[2023],
  [dim_param].[Param].[param_name].&[O3]
)

### 10.2) Business question Q32 (example)

In [None]:
%%mdx 

-- 32. For 2024, show Data Volume (KB) by City for category ‘Volatile Organic Compound’, and list the Top 10 cities.
-- Return the Top 10 cities on rows (highest -> lowest) and one column with Data Volume (KB) for 2024, limited to the Volatile Organic Compound category.
SELECT
  { [Measures].[Data Volume (KB)] } ON COLUMNS,
  TOPCOUNT(
    NONEMPTY([dim_city].[Geo].[city_name].Members, [Measures].[Data Volume (KB)]),
    10, [Measures].[Data Volume (KB)]
  ) ON ROWS
FROM (
  SELECT ( [dim_timemonth].[Time].[year_num].&[2024] ) ON 0 FROM (
    SELECT (
      FILTER(
        [dim_param].[Param].[param_name].Members,
        ANCESTOR(
          [dim_param].[Param].CurrentMember,
          [dim_param].[Param].[category]
        ).Name = "Volatile Organic Compound"
      )
    ) ON 0 FROM [AirQ Cube]
  )
)

### 10.3) Business question Q33 (example)

In [None]:
%%mdx

-- 33. For parameter PM4 in 2024, return for each Country the Month with the highest Avg Data Quality.
-- Return one row per Country × Month (the month with the highest Avg Data Quality in 2024) and one column with Avg Data Quality.
SELECT
  { [Measures].[Avg Data Quality] } ON COLUMNS,
  NON EMPTY
    GENERATE(
      [dim_city].[Geo].[country_name].Members,
      TOPCOUNT(
        CROSSJOIN(
          { [dim_city].[Geo].CurrentMember },
          Descendants(
            [dim_timemonth].[Time].[year_num].&[2024],
            [dim_timemonth].[Time].[month_name]
          )
        ),
        1, [Measures].[Avg Data Quality]
      )
    ) ON ROWS
FROM [AirQ Cube]
WHERE ( [dim_param].[Param].[param_name].&[PM4] )

## 11) Batch executor: run all .mdx → CSV (+ an index)

In [None]:
def run_mdx_folder(
    mdx_folder="mdx",
    out_folder="mdx_out",
    pattern="*.mdx",
    overwrite=True,
    index_csv="mdx_index.csv",
):
    mdx_path = Path(mdx_folder)
    out_path = Path(out_folder)
    mdx_path.mkdir(exist_ok=True)
    out_path.mkdir(exist_ok=True)

    records = []
    files = sorted(mdx_path.glob(pattern))
    if not files:
        print(f"No MDX files found in {mdx_path.resolve()}.")
        return pd.DataFrame()

    for f in files:
        q = f.read_text(encoding="utf-8")
        t0 = time.time()
        error = None
        rows = cols = 0
        dest = out_path / f"{f.stem}.csv"

        try:
            df = session.query_mdx(q).reset_index()
            rows, cols = df.shape
            if overwrite or not dest.exists():
                df.to_csv(dest, index=False)
        except Exception as e:
            error = str(e)

        elapsed = time.time() - t0
        records.append({
            "file": f.name,
            "csv": dest.name,
            "rows": rows,
            "cols": cols,
            "seconds": round(elapsed, 3),
            "error": error,
        })

    index_df = pd.DataFrame(records)
    index_path = out_path / index_csv
    index_df.to_csv(index_path, index=False)
    print(f"Done. Index saved to {index_path}")
    return index_df
    

In [None]:
# Run all MDX files:
index_df = run_mdx_folder()
index_df


## 12) Create `sqldump/sqldump_airq_dwh2_xxx.sql`

We run `pg_dump -n dwh2_xxx --no-owner --no-privileges` to keep dumps portable.


In [None]:
# === Create sqldump/sqldump_airq_dwh2_xx.sql (pg_dump) ===
sqldump_dir.mkdir(exist_ok=True)
outfile = sqldump_dir / f"sqldump_airq_dwh2_{XXX}.sql"

pg_dump = shutil.which("pg_dump") or "pg_dump"
cmd = [
    pg_dump,
    "-h", DB_HOST,
    "-p", str(DB_PORT),
    "-U", DB_USER,
    "-d", DB_NAME,
    "-n", f"dwh2_{XXX}",
    "--no-owner",
    "--no-privileges",
    "-f", str(outfile),
]

# Avoid echoing the password; supply it via env if provided
env = dict(os.environ)
if 'pw' in globals() and pw:
    env["PGPASSWORD"] = pw

print("Running:", " ".join(cmd).replace(DB_USER, "<user>"))
try:
    subprocess.run(cmd, check=True, env=env)
    print("✓ Dump created at", outfile)
except Exception as e:
    print("pg_dump failed; try this manually in a terminal:\n", " ".join(cmd), "\nError:", e)


## 13) Submission checklist (put these in your **ZIP**)

- `csv/` — CSV files 
- `ddl/` — DDL scripts 
- `etl/` — Your `a2_etl*.sql` files (ETL scripts)
- `mdx/` — Your `a2_q{NN}_{A|B}.mdx` files (MDX queries for business questions)
- `mdx_out/` — Your `a2_q{NN}_{A|B}.csv` files (results of MDX queries)
- `pdf/` — Your `a2_q{NN}.pdf` files (Dashboard exports as .pdf)
- `sql/` — Your `a2_q{NN}_{A|B}.sql` files (SQL queries for business questions)
- `sqldump/` — `sqldump_airq_dwh2_xxx.sql`  
- `AirQ_Part2_xxx.ipynb`
- `group_xxx.txt`
- `Report_Part2_Group_xxx.pdf`

### 