SELECT-FROM-WHERE

In [3]:
from sqlalchemy import create_engine, text
import pandas as pd
from pathlib import Path
import re
from typing import Set, Dict, List, Tuple
import os, uuid

from sqlalchemy.orm.base import PASSIVE_OFF

Connessione al database e utilities

In [3]:
USER = "postgres"
HOST = "localhost"
PORT = "5432"
PASSWORD = "user"

DB_ADMIN_URL = f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/postgres"
engine_admin = create_engine(DB_ADMIN_URL, isolation_level="AUTOCOMMIT")

DB_URL = f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/synthea"
engine = create_engine(DB_URL)
print("Connesso al database synthea")

Connesso al database synthea


In [20]:
def run(sql_or_text, show=False):
    with engine.begin() as conn:
        stmt = text(sql_or_text) if isinstance(sql_or_text, str) else sql_or_text
        result = conn.execute(stmt)
        if result.returns_rows:
            df = pd.DataFrame(result.fetchall(), columns=result.keys())
            if show:
                display(df)
            return df
        return None

def _strip_semicolon(sql: str) -> str:
    return re.sub(r';\s*$', '', sql.strip())


def _count_table(tname: str) -> int:
    return int(run(f"SELECT COUNT(*) AS n FROM {tname};").iloc[0]["n"])


def _size_table(tname: str) -> int:
    return int(run(f"SELECT pg_total_relation_size('{tname}') AS bytes;").iloc[0]["bytes"])


def _network_bytes(strategy_key: str, sizes: dict) -> int:
    if strategy_key == "owner-server":
        return sizes.get("ro", 0) + sizes.get("rs", 0)
    if strategy_key == "server-owner":
        return sizes.get("rs", 0)
    if strategy_key == "owner-only":
        return 0
    if strategy_key == "server-only":
        return sizes.get("out", 0)
    if strategy_key == "parallel":
        return sizes.get("ro", 0) + sizes.get("rs", 0)
    return 0


def _replan_alternative(plan: dict, Fo: set, Fs: set) -> dict | None:
    Co = plan["Classificazione"]["Co"]
    Cs = plan["Classificazione"]["Cs"]
    if not (Co and Cs):
        return None
    alt = {"owner-server": "server-owner", "server-owner": "owner-server"}.get(plan["Strategia"].lower())
    if not alt:
        return None
    qs, qo, qso = generate_subqueries(Co, Cs, plan["Classificazione"]["Cso"], plan["SELECT"], Fo, Fs, alt)
    return {
        "Strategia": alt,
        "SELECT": plan["SELECT"],
        "Classificazione": plan["Classificazione"],
        "qs": qs, "qo": qo, "qso": qso
    }


def _subst_token(sql: str, token: str, replacement: str) -> str:
    return re.sub(rf'\b{re.escape(token)}\b', replacement, sql)

FRAMMENTAZIONE VERTICALE

# PATIENTS
Owner(PATIENTS) = { id, deathdate, first, last, ssn, drivers, passport, address, city, state, county, fips, zip, lat, lon, income, birthplace }
Server(PATIENTS) = { id, birthdate, gender, race, ethnicity, marital }

In [5]:
# se servisse ricaricare i dati

sql = open("sql/fragmentPatients.sql").read()

with engine.begin() as conn:
    if sql.strip():
        conn.execute(text(sql))
        print("Frammentazione creata")
    else:
        print("Errore")

# O semplicemente uso la funzione run(sql)

Frammentazione creata


In [6]:
run(''' ANALYZE owner.patients_owner; ANALYZE server.patients_server;''')

In [7]:
Fo = {
    "id", "deathdate", "ssn", "drivers", "passport", "prefix", "first", "middle", "last", "suffix", "maiden",
    "birthplace", "address", "city", "state", "county", "fips", "zip", "lat", "lon",
    "healthcare_expenses", "healthcare_coverage", "income"
}
Fs = {"id", "birthdate", "gender", "race", "ethnicity", "marital"}

In [8]:

def domini_from_pg_stats(schema: str, table: str) -> dict:
    sql = f"""
    SELECT s.attname::text AS col,
           CASE
             WHEN s.n_distinct > 0
               THEN s.n_distinct::numeric
             ELSE (-s.n_distinct) * c.reltuples
           END AS est_distinct
    FROM pg_stats s
    JOIN pg_class c ON c.relname = s.tablename
    JOIN pg_namespace n ON n.oid = c.relnamespace AND n.nspname = s.schemaname
    WHERE s.schemaname = :schema AND s.tablename = :table;
    """
    rows = run(text(sql).bindparams(schema=schema, table=table), show=False)
    return {r.col.lower(): max(1, int(r.est_distinct or 1)) for _, r in rows.iterrows()}



domini_owner = domini_from_pg_stats("owner", "patients_owner")
domini_server = domini_from_pg_stats("server", "patients_server")


domini = {**domini_owner, **domini_server}


In [50]:
def parse_query(query: str) -> Tuple[Set[str], str]:
    select_match = re.search(r"SELECT\s+(.*?)\s+FROM", query, flags=re.IGNORECASE | re.DOTALL)
    where_match = re.search(r"WHERE\s+(.*)", query, flags=re.IGNORECASE | re.DOTALL)
    if not select_match or not where_match:
        raise ValueError("La query deve contenere sia SELECT che WHERE.")


    select_fields = {tok.lower() for tok in re.findall(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b', select_match.group(1))}
    where_clause = where_match.group(1).strip()
    return select_fields, where_clause


def extract_conditions(where_clause: str) -> Tuple[List[str], bool]:
    if " OR " in where_clause.upper():
        conditions = [c.strip() for c in re.split(r"\bOR\b", where_clause, flags=re.IGNORECASE)]
        return conditions, True
    else:
        conditions = [c.strip() for c in re.split(r"\bAND\b", where_clause, flags=re.IGNORECASE)]
        return conditions, False


def classify_conditions(conditions: List[str], Fo: Set[str], Fs: Set[str]) -> Dict[str, List[str]]:
    Co, Cs, Cso = [], [], []
    for cond in conditions:
        attrs = {tok.lower() for tok in re.findall(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b', cond)}
        in_owner = attrs & Fo
        in_server = attrs & Fs
        if in_owner and in_server:
            Cso.append(cond)
        elif in_owner:
            Co.append(cond)
        elif in_server:
            Cs.append(cond)
    return {"Co": Co, "Cs": Cs, "Cso": Cso}


def stima_selettivita(condizione: str, domini: dict) -> float:
    c = condizione.lower()
    for attr in domini:
        if attr.lower() in c:
            return 1 / domini[attr]
    return 0.5


def choose_strategy(classified, domini, where_clause, has_or,
                    bytes_owner=1.0, bytes_server=1.0) -> str:
    Co  = classified.get("Co", [])
    Cs  = classified.get("Cs", [])
    Cso = classified.get("Cso", [])

    def _sel(preds: list[str]) -> float:
        s = 1.0
        for c in preds:
            s *= max(min(stima_selettivita(c, domini), 1.0), 1e-6)
        return s


    if Cso:
        if Co and not Cs:
            return "owner-server"
        if Cs and not Co:
            return "server-owner"
        if Co and Cs:
            cost_o = _sel(Co) * bytes_owner
            cost_s = _sel(Cs) * bytes_server
            return "owner-server" if cost_o < cost_s else "server-owner"
        return "owner-server" if bytes_owner <= bytes_server else "server-owner"


    if has_or:
        if Co and Cs:
            return "parallel"
        if Co:
            return "owner-only"
        if Cs:
            return "server-only"
        return "unknown"

    # AND / nessun OR
    if Co and not Cs:
        return "owner-only"
    if Cs and not Co:
        return "server-only"
    if Co and Cs:
        cost_o = _sel(Co) * bytes_owner
        cost_s = _sel(Cs) * bytes_server
        return "owner-server" if cost_o < cost_s else "server-owner"

    return "unknown"


In [19]:
def generate_subqueries(Co, Cs, Cso, select_attrs, Fo, Fs, strategy, has_or=False):

    join_o = " OR " if (has_or and not Cs and not Cso) else " AND "
    join_s = " OR " if (has_or and not Co and not Cso) else " AND "

    sel_attrs = {a.lower() for a in select_attrs}


    sel_o = sorted(sel_attrs & Fo)
    sel_s = sorted((sel_attrs & Fs) - {'id'})

    fs_in_cso = {
        a.lower()
        for cond in Cso
        for a in re.findall(r'\b[a-zA-Z_][a-zA-Z0-9_]*\b', cond)
        if a.lower() in Fs
    }


    Aqs = sorted(((sel_attrs & Fs) | fs_in_cso) - {'id'})
    proj_owner_final = [f"o.{c}" for c in sel_o] if sel_o else ["o.id"]
    proj_server_final = [f"s.{c}" for c in sel_s]
    owner_final = ", ".join(proj_owner_final)
    server_final = ", ".join(proj_server_final)
    sel_both_list = ", ".join(proj_owner_final + proj_server_final)

    qs = qo = qso = None

    if strategy == "server-owner":
        # SERVER → OWNER
        proj_qs = ", ".join(["s.id"] + [f"s.{c}" for c in Aqs])
        qs = f"SELECT {proj_qs} FROM server.patients_server s" + (f" WHERE {' AND '.join(Cs)}" if Cs else "")
        qso = f"SELECT {sel_both_list} FROM owner.patients_owner o JOIN Rs s USING (id)" \
              + (f" WHERE {' AND '.join(Co + Cso)}" if (Co or Cso) else "")

    elif strategy == "owner-server":
        # OWNER → SERVER → JOIN
        qo = "SELECT o.id FROM owner.patients_owner o" + (f" WHERE {' AND '.join(Co)}" if Co else "")
        proj_qs = ", ".join(["s.id"] + [f"s.{c}" for c in Aqs])
        qs = f"SELECT {proj_qs} FROM server.patients_server s JOIN Ro r USING (id)" \
             + (f" WHERE {' AND '.join(Cs)}" if Cs else "")
        qso = f"SELECT {sel_both_list} FROM owner.patients_owner o JOIN Rs s USING (id)" \
              + (f" WHERE {' AND '.join(Cso)}" if Cso else "")

    elif strategy == "owner-only":
        qso = f"SELECT {owner_final} FROM owner.patients_owner o" \
          + (f" WHERE {join_o.join(Co)}" if Co else "")

    elif strategy == "server-only":
        server_proj = server_final if server_final else "s.id"
        qso = f"SELECT {server_proj} FROM server.patients_server s" \
          + (f" WHERE {join_s.join(Cs)}" if Cs else "")

    elif strategy == "parallel":
        sel_attrs = {a.lower() for a in select_attrs}
        owner_cols  = [f"o.{c}" for c in sorted((sel_attrs & Fo) - {'id'})]
        server_cols = [f"s.{c}" for c in sorted((sel_attrs & Fs) - {'id'})]

        #  OWNER
        qo = "SELECT " + ", ".join(["o.id"] + owner_cols) + " FROM owner.patients_owner o" \
         + (f" WHERE {' OR '.join(Co)}" if Co else "")

        # SERVER
        qs = "SELECT " + ", ".join(["s.id"] + server_cols) + " FROM server.patients_server s" \
         + (f" WHERE {' OR '.join(Cs)}" if Cs else "")



        final_sel = []
        if 'id' in sel_attrs:
            final_sel.append("u.id AS id")
        final_sel += owner_cols + server_cols
        if not final_sel:
            final_sel = ["u.id AS id"]
        sel_final_sql = ", ".join(final_sel)

        qso = f"""
            WITH U AS (
            SELECT id FROM Ro
            UNION
            SELECT id FROM Rs
                            )
            SELECT {sel_final_sql}
            FROM U u
            JOIN owner.patients_owner o USING (id)
            JOIN server.patients_server s USING (id)
            """.strip()

    else:
        raise ValueError("Strategy must be one of: server-owner, owner-server, owner-only, server-only, parallel")

    return qs, qo, qso


In [11]:
def process_query(query: str, Fo: Set[str], Fs: Set[str], domini: Dict[str, int]) -> Dict[str, any]:

    select_attrs, where = parse_query(query)
    conditions, has_or = extract_conditions(where)
    classified = classify_conditions(conditions, Fo, Fs)
    strategy = choose_strategy(classified, domini, where, has_or)

    s = strategy.lower()
    if "parallel" in s:
        strategy_key = "parallel"
    elif "owner-server" in s:
        strategy_key = "owner-server"
    elif "server-owner" in s:
        strategy_key = "server-owner"
    elif "owner-only" in s:
        strategy_key = "owner-only"
    elif "server-only" in s:
        strategy_key = "server-only"
    else:
        strategy_key = "unknown"


    qs, qo, qso = generate_subqueries(
        classified["Co"], classified["Cs"], classified["Cso"],
        select_attrs, Fo, Fs,
        strategy_key,
        has_or=has_or
    )

    return {
        "Query": query,
        "SELECT": select_attrs,
        "WHERE": where,
        "Condizioni": conditions,
        "Classificazione": classified,
        "Strategia": strategy_key,
        "qs": qs,  # query lato server (può essere None)
        "qo": qo,  # query lato owner  (può essere None)
        "qso": qso,  # query che interpella entrambi (può essere None in parallel)
    }


In [12]:
run('''
DROP SCHEMA work CASCADE; CREATE SCHEMA work;
''')

In [34]:


def evaluate_query(query: str,
                   Fo: set, Fs: set, domini: dict,
                   tag: str | None = None,
                   schema: str = "work",
                   save_to: str | None = None,
                   also_compare_alt: bool = True) -> dict:
    plan = process_query(query, Fo, Fs, domini)
    sk = plan["Strategia"]
    tag = tag or uuid.uuid4().hex[:8]

    run(f"CREATE SCHEMA IF NOT EXISTS {schema};")

    ro_name = f"{schema}.ro_{tag}"
    rs_name = f"{schema}.rs_{tag}"
    out_name = f"{schema}.out_{tag}"

    counts, sizes = {}, {}

    if sk == "owner-server":
        qo = _strip_semicolon(plan["qo"])
        qs = _strip_semicolon(plan["qs"])
        qso = _strip_semicolon(plan["qso"])

        run(f"DROP TABLE IF EXISTS {ro_name}; CREATE TABLE {ro_name} AS {qo};")
        counts["ro"] = _count_table(ro_name)
        sizes["ro"] = _size_table(ro_name)

        qs_mat = qs.replace(" Ro ", f" {ro_name} ")
        run(f"DROP TABLE IF EXISTS {rs_name}; CREATE TABLE {rs_name} AS {qs_mat};")
        counts["rs"] = _count_table(rs_name)
        sizes["rs"] = _size_table(rs_name)

        qso_mat = qso.replace(" Rs ", f" {rs_name} ")
        run(f"DROP TABLE IF EXISTS {out_name}; CREATE TABLE {out_name} AS {qso_mat};")
        counts["out"] = _count_table(out_name)
        sizes["out"] = _size_table(out_name)

    elif sk == "server-owner":
        qs = _strip_semicolon(plan["qs"])
        qso = _strip_semicolon(plan["qso"])

        run(f"DROP TABLE IF EXISTS {rs_name}; CREATE TABLE {rs_name} AS {qs};")
        counts["rs"] = _count_table(rs_name)
        sizes["rs"] = _size_table(rs_name)

        qso_mat = qso.replace(" Rs ", f" {rs_name} ")
        run(f"DROP TABLE IF EXISTS {out_name}; CREATE TABLE {out_name} AS {qso_mat};")
        counts["out"] = _count_table(out_name)
        sizes["out"] = _size_table(out_name)

    elif sk in ("owner-only", "server-only"):
        qso = _strip_semicolon(plan["qso"])
        run(f"DROP TABLE IF EXISTS {out_name}; CREATE TABLE {out_name} AS {qso};")
        counts["out"] = _count_table(out_name)
        sizes["out"] = _size_table(out_name)

    elif sk == "parallel":
        if plan["qo"]:
            qo = _strip_semicolon(plan["qo"])
            run(f"DROP TABLE IF EXISTS {ro_name}; CREATE TABLE {ro_name} AS {qo};")
            counts["ro"] = _count_table(ro_name)
            sizes["ro"]  = _size_table(ro_name)
        if plan["qs"]:
            qs = _strip_semicolon(plan["qs"])
            run(f"DROP TABLE IF EXISTS {rs_name}; CREATE TABLE {rs_name} AS {qs};")
            counts["rs"] = _count_table(rs_name)
            sizes["rs"]  = _size_table(rs_name)

        if plan["qso"]:
            qso = _strip_semicolon(plan["qso"])
            qso_mat = _subst_token(qso, "Ro", ro_name)
            qso_mat = _subst_token(qso_mat, "Rs", rs_name)
            run(f"DROP TABLE IF EXISTS {out_name}; CREATE TABLE {out_name} AS {qso_mat};")
            counts["out"] = _count_table(out_name)
            sizes["out"]  = _size_table(out_name)

    net_bytes = _network_bytes(sk, sizes)

    alt_info = None
    if also_compare_alt and sk in ("owner-server", "server-owner"):
        alt = _replan_alternative(plan, Fo, Fs)
        if alt:
            tag_alt = tag + "_alt"
            ro_alt = f"{schema}.ro_{tag_alt}"
            rs_alt = f"{schema}.rs_{tag_alt}"
            out_alt = f"{schema}.out_{tag_alt}"
            sizes_alt = {}

            if alt["Strategia"] == "owner-server":
                qo_alt = _strip_semicolon(alt["qo"])
                qs_alt = _strip_semicolon(alt["qs"])
                qso_alt = _strip_semicolon(alt["qso"])

                run(f"DROP TABLE IF EXISTS {ro_alt}; CREATE TABLE {ro_alt} AS {qo_alt};")
                sizes_alt["ro"] = _size_table(ro_alt)

                qs_alt_mat = qs_alt.replace(" Ro ", f" {ro_alt} ")
                run(f"DROP TABLE IF EXISTS {rs_alt}; CREATE TABLE {rs_alt} AS {qs_alt_mat};")
                sizes_alt["rs"] = _size_table(rs_alt)

                qso_alt_mat = qso_alt.replace(" Rs ", f" {rs_alt} ")
                run(f"DROP TABLE IF EXISTS {out_alt}; CREATE TABLE {out_alt} AS {qso_alt_mat};")

            elif alt["Strategia"] == "server-owner":
                qs_alt = _strip_semicolon(alt["qs"])
                run(f"DROP TABLE IF EXISTS {rs_alt}; CREATE TABLE {rs_alt} AS {qs_alt};")
                sizes_alt["rs"] = _size_table(rs_alt)

                qso_alt = _strip_semicolon(alt["qso"])
                qso_alt_mat = qso_alt.replace(" Rs ", f" {rs_alt} ")
                run(f"DROP TABLE IF EXISTS {out_alt}; CREATE TABLE {out_alt} AS {qso_alt_mat};")

            net_alt = _network_bytes(alt["Strategia"], sizes_alt)
            saving_pct = 1 - (net_bytes / net_alt) if net_alt and net_alt > 0 else None
            alt_info = {
                "alt_strategy": alt["Strategia"],
                "alt_network_bytes": net_alt,
                "saving_pct": float(saving_pct) if saving_pct is not None else None,
                "tables_alt": {"ro": ro_alt if "ro" in sizes_alt else None,
                               "rs": rs_alt if "rs" in sizes_alt else None,
                               "out": out_alt}
            }


    row = {
        "tag": tag,
        "query": plan["Query"],
        "strategy": sk,
        "result_owner": counts.get("ro"), "result_server": counts.get("rs"), "result_out": counts.get("out"),
        "bytes_result_owner": sizes.get("ro"), "bytes_result_server": sizes.get("rs"), "bytes_result_out": sizes.get("out"),
        "network_bytes": net_bytes,
        "alt_strategy": alt_info["alt_strategy"] if alt_info else None,
        "alt_network_bytes": alt_info["alt_network_bytes"] if alt_info else None,
        "saving_pct": alt_info["saving_pct"] if alt_info else None
    }


    if save_to:
        save_to = os.path.abspath(save_to)
        df = pd.DataFrame([row])
        header = not os.path.exists(save_to)
        df.to_csv(save_to, mode="a", index=False, header=header)


    return {
        "plan": plan,
        "row": row,
        "tables": {"result_owner": ro_name if "ro" in counts else None,
                   "result_server": rs_name if "rs" in counts else None,
                   "result_out": out_name if "out" in counts else None},
        "alt": alt_info
    }


def evaluate_queries(queries: list[str],
                     Fo: set, Fs: set, domini: dict,
                     schema: str = "work",
                     save_to: str | None = None,
                     also_compare_alt: bool = True) -> pd.DataFrame:
    rows = []
    for i, q in enumerate(queries, 1):
        tag = f"q{i:02d}"
        res = evaluate_query(q, Fo, Fs, domini, tag=tag, schema=schema,
                             save_to=save_to, also_compare_alt=also_compare_alt)
        rows.append(res["row"])
    df = pd.DataFrame(rows)
    return df


TESTING

In [63]:
q= '''SELECT id, first, last FROM patients WHERE gender='F' AND birthdate <= (deathdate - INTERVAL '18 years') AND city='Worcester' '''
res = evaluate_query(q, Fo, Fs, domini, tag='q01')
print(res)

In [62]:
if res["tables"]["result_owner"]:
    print("ro")
    run(f"SELECT * FROM {res['tables']['result_owner']} ;", show=True)  # Ro (Qo)
if res["tables"]["result_server"]:
    print("rs")
    run(f"SELECT * FROM {res['tables']['result_server']} ;", show=True)  # Rs (Qs)
run(f"SELECT * FROM {res['tables']['result_out']} ;")  # Rs (Qs))


ro


Unnamed: 0,id
0,8cef0aac-f2c6-40ed-cc00-e65bbb555956
1,d329f8fc-dc7c-1b4c-a2c8-68b5c8d3c0bf
2,f0acfdee-ec65-24ff-bdeb-aae448cc6512
3,42c12870-bb0a-403d-fbf6-c28673450ed8


rs


Unnamed: 0,id,birthdate


Unnamed: 0,first,id,last


In [28]:
queries = [
    "SELECT id FROM patients WHERE gender='F' AND county='Worcester County'",
    "SELECT id, first, last FROM patients WHERE birthdate <= (deathdate - INTERVAL '18 years') AND city='Worcester' ",
    "SELECT id, first, last FROM patients WHERE zip='02170' OR marital='D'",
    "SELECT id, first, last FROM patients WHERE (birthdate <= (deathdate - INTERVAL '80 years')) OR income > 150000",

]
df = evaluate_queries(queries, Fo, Fs, domini, save_to='query1_evaluation.cvs')
df

Unnamed: 0,tag,query,strategy,num_result_owner,num_result_server,num_result_out,bytes_result_owner,bytes_result_server,bytes_result_out,network_bytes,alt_strategy,alt_network_bytes,saving_pct
0,q01,SELECT id FROM patients WHERE gender='F' AND c...,owner-server,14.0,7,7,16384.0,16384,16384,32768,server-owner,16384.0,-1.0
1,q02,"SELECT id, first, last FROM patients WHERE bir...",server-owner,,112,2,,24576,16384,24576,,,
2,q03,"SELECT id, first, last FROM patients WHERE zip...",parallel,1.0,12,13,16384.0,16384,16384,32768,,,
3,q04,"SELECT id, first, last FROM patients WHERE (bi...",server-owner,,112,0,,24576,8192,24576,,,
