In [None]:
import os
from dotenv import load_dotenv
import pymongo

# Load .env file from project root
load_dotenv()

# Read the MongoDB connection string (plain string, not JSON)
MONGODB_URI = os.getenv("MONGODB_URI")

if not MONGODB_URI:
    raise ValueError("‚ùå MONGODB_URI not found in .env")

client = pymongo.MongoClient(MONGODB_URI)

db = client["prod"]
collection = db["PRJ-16"]


In [None]:
import os
import duckdb
import pyarrow as pa
import pyarrow.parquet as pq
from datetime import datetime, UTC
from pathlib import Path

# --------------------------------------------
# CONFIG
# --------------------------------------------
CHUNK = 5000  # number of rows before flush
PARQUET_DIR = Path("parquet_output")

# HUB definitions (unchanged)
HUBS = {
    "hub1": {"A": ["A33", "A34", "A35"],
             "S": ["S27", "S28", "S29"],
             "T": ["T4", "T5", "T6"]},

    "hub2": {"A": ["A30"],
             "S": ["S7","S8","S9","S10","S11","S12","S13","S14"],
             "T": ["T1"]},

    "hub3": {"A": ["A31"],
             "S": ["S15","S16","S17","S18"],
             "T": ["T2"]},

    "hub4": {"A": ["A32"],
             "S": ["S19","S20","S21","S22","S23","S24","S25","S26"],
             "T": ["T3"]},
}

ACCEL_ALL = (
    HUBS["hub1"]["A"]
  + HUBS["hub2"]["A"]
  + HUBS["hub3"]["A"]
  + HUBS["hub4"]["A"]
)


# ------------------------------------------------
# SCHEMAS (exact same as your old parqueter)
# ------------------------------------------------

# ACCEL ALL
accel_fields = [pa.field("datetime", pa.timestamp("us"))]
for a in ACCEL_ALL:
    accel_fields += [
        pa.field(f"{a}_t", pa.float32()),
        pa.field(f"{a}_x", pa.float32()),
        pa.field(f"{a}_y", pa.float32()),
        pa.field(f"{a}_z", pa.float32()),
    ]
ACCEL_ALL_SCHEMA = pa.schema(accel_fields)


# FFT
def make_fft_schema():
    return pa.schema([
        pa.field("datetime", pa.timestamp("us")),
        pa.field("sensor_id", pa.string()),
        pa.field("fft_main_f", pa.float32()),
        pa.field("fft_main_a", pa.float32()),
        pa.field("fft_freqs", pa.list_(pa.float32())),
        pa.field("fft_amps", pa.list_(pa.float32())),
    ])


# SST
def make_sst_schema(hub):
    fields = [pa.field("datetime", pa.timestamp("us"))]
    for s in HUBS[hub]["S"]:
        fields.append(pa.field(s, pa.float32()))
    for t in HUBS[hub]["T"]:
        fields.append(pa.field(t, pa.float32()))
    return pa.schema(fields)


# --------------------------------------------
# HELPERS
# --------------------------------------------

def safe_float(v):
    try:
        return float(v)
    except:
        return None

def safe_list(items, key):
    out = []
    for it in items:
        try:
            out.append(float(it.get(key)))
        except:
            out.append(None)
    return out


In [None]:
def create_empty_week_files(iso_year, iso_week):
    week_str = f"{iso_year}W{iso_week:02d}"
    base = PARQUET_DIR
    base.mkdir(exist_ok=True, parents=True)

    files = {}

    # accel_all
    fn = base / f"{week_str}_accel_all.parquet"
    pq.write_table(pa.Table.from_pylist([], ACCEL_ALL_SCHEMA), fn)
    files["accel_all"] = fn

    # fft hubs
    for hub in HUBS:
        schema = make_fft_schema()
        fn = base / f"{week_str}_fft_{hub}.parquet"
        pq.write_table(pa.Table.from_pylist([], schema), fn)
        files[f"fft_{hub}"] = fn

    # sst hubs
    for hub in HUBS:
        schema = make_sst_schema(hub)
        fn = base / f"{week_str}_sst_{hub}.parquet"
        pq.write_table(pa.Table.from_pylist([], schema), fn)
        files[f"sst_{hub}"] = fn

    print("Created empty weekly parquet files:")
    for k, v in files.items():
        print(" -", v)

    return files


In [None]:
def append_rows_to_parquet(rows, schema, parquet_path):
    if not rows:
        return

    tbl = pa.Table.from_pylist(rows, schema=schema)
    duckdb.register("new_rows", tbl)

    # Ensure file exists before reading (EMPTY file already created)
    duckdb.execute(f"""
        CREATE OR REPLACE TEMP TABLE existing AS
        SELECT * FROM parquet_scan('{parquet_path}', union_by_name=true)
    """)

    # Union existing + new
    duckdb.execute("""
        CREATE OR REPLACE TEMP TABLE merged AS
        SELECT * FROM existing
        UNION ALL
        SELECT * FROM new_rows
    """)

    # Overwrite parquet file
    duckdb.execute(f"""
        COPY merged TO '{parquet_path}' (FORMAT PARQUET, COMPRESSION ZSTD);
    """)

    duckdb.unregister("new_rows")


In [None]:
def process_week_cursor(cursor, iso_year, iso_week, weekly_files):
    # buffers
    accel_rows = []
    fft_rows   = {hub: [] for hub in HUBS}
    sst_rows   = {hub: [] for hub in HUBS}

    for obj in cursor:
        dt_raw = obj.get("time", {}).get("datetime")
        if not dt_raw:
            continue

        dt = datetime.fromisoformat(dt_raw)

        # ----------------------
        # ACCEL
        # ----------------------
        ar = {"datetime": dt}
        for a in ACCEL_ALL:
            if a in obj:
                ar[f"{a}_t"] = safe_float(obj[a].get("t"))
                ar[f"{a}_x"] = safe_float(obj[a].get("x"))
                ar[f"{a}_y"] = safe_float(obj[a].get("y"))
                ar[f"{a}_z"] = safe_float(obj[a].get("z"))
        accel_rows.append(ar)

        if len(accel_rows) >= CHUNK:
            append_rows_to_parquet(
                accel_rows, ACCEL_ALL_SCHEMA,
                weekly_files["accel_all"]
            )
            accel_rows.clear()

        # ----------------------
        # HUBS ‚Üí SST + FFT
        # ----------------------
        for hub in HUBS:
            # SST
            sr = {"datetime": dt}
            has_sst = False

            for s in HUBS[hub]["S"]:
                if s in obj:
                    sr[s] = safe_float(obj[s].get("s"))
                    has_sst = True

            for t in HUBS[hub]["T"]:
                if t in obj:
                    sr[t] = safe_float(obj[t].get("t"))
                    has_sst = True

            if has_sst:
                sst_rows[hub].append(sr)
                if len(sst_rows[hub]) >= CHUNK:
                    append_rows_to_parquet(
                        sst_rows[hub], make_sst_schema(hub),
                        weekly_files[f"sst_{hub}"]
                    )
                    sst_rows[hub].clear()

            # FFT
            for a in HUBS[hub]["A"]:
                fft = obj.get(a, {}).get("fft")
                if fft:
                    mn = fft.get("main", {}) or {}
                    sp = fft.get("spectrum", []) or []
                    fft_rows[hub].append({
                        "datetime": dt,
                        "sensor_id": a,
                        "fft_main_f": safe_float(mn.get("f")),
                        "fft_main_a": safe_float(mn.get("a")),
                        "fft_freqs": safe_list(sp, "f"),
                        "fft_amps": safe_list(sp, "a"),
                    })
                    if len(fft_rows[hub]) >= CHUNK:
                        append_rows_to_parquet(
                            fft_rows[hub],
                            make_fft_schema(),
                            weekly_files[f"fft_{hub}"]
                        )
                        fft_rows[hub].clear()

    # ----------------------
    # FINAL FLUSH
    # ----------------------
    if accel_rows:
        append_rows_to_parquet(
            accel_rows, ACCEL_ALL_SCHEMA,
            weekly_files["accel_all"]
        )

    for hub in HUBS:
        if fft_rows[hub]:
            append_rows_to_parquet(
                fft_rows[hub], make_fft_schema(),
                weekly_files[f"fft_{hub}"]
            )
        if sst_rows[hub]:
            append_rows_to_parquet(
                sst_rows[hub], make_sst_schema(hub),
                weekly_files[f"sst_{hub}"]
            )

    print("‚úî Week processing complete.")


In [None]:
def process_week(collection, week_start_dt, week_end_dt, iso_year, iso_week):
    # convert to iso8601 for Mongo
    start_iso = week_start_dt.strftime("%Y-%m-%dT%H:%M:%S")
    end_iso   = week_end_dt.strftime("%Y-%m-%dT%H:%M:%S")

    # create empty weekly parquets
    weekly_files = create_empty_week_files(iso_year, iso_week)

    print("\nüîç Querying MongoDB...")
    cursor = collection.find(
        {"time.datetime": {"$gte": start_iso, "$lt": end_iso}},
        batch_size=5000
    )

    print("üöÄ Processing week...")
    process_week_cursor(cursor, iso_year, iso_week, weekly_files)

    print("\nüéâ Weekly parquet files ready.")
    return weekly_files


In [None]:
# === Cell 4: Compute previous ISO week range (UTC safe) ===

from datetime import datetime, date, timedelta, UTC

# Current date in UTC
today = datetime.now(UTC).date()

# ISO year/week of today
iso_year, iso_week, _ = today.isocalendar()

iso_week -= 1


# Previous ISO week
prev_week = iso_week - 1
prev_year = iso_year

# Handle wrap to previous year
if prev_week == 0:
    prev_year -= 1
    prev_week = date(prev_year, 12, 28).isocalendar()[1]

# Start of previous week (Monday)
start_prev_week = date.fromisocalendar(prev_year, prev_week, 1)

# Start of current week (Monday)
start_current_week = date.fromisocalendar(iso_year, iso_week, 1)

# Convert to timezone-aware datetimes
start_dt = datetime.combine(start_prev_week, datetime.min.time(), tzinfo=UTC)
end_dt   = datetime.combine(start_current_week, datetime.min.time(), tzinfo=UTC)

print(f"Today:              {today}")
print(f"Current ISO week:   {iso_year}-W{iso_week}")
print(f"Previous ISO week:  {prev_year}-W{prev_week}")
print()
print(f"Week start:         {start_dt}")
print(f"Week end:           {end_dt}  <-- start of current week")


In [None]:
weekly_files = process_week(collection, start_dt, end_dt, prev_year, prev_week)


In [None]:
import duckdb
from pathlib import Path

NEW_DIR = Path("parquet_output")
OLD_DIR = Path("w48_old")


In [None]:
def list_parquets(folder: Path):
    return sorted([p for p in folder.glob("*.parquet")])

old_files = list_parquets(OLD_DIR)
new_files = list_parquets(NEW_DIR)

print("OLD:", len(old_files), "files")
print("NEW:", len(new_files), "files")

print("\nOLD FILES:")
for f in old_files:
    print(" -", f.name)

print("\nNEW FILES:")
for f in new_files:
    print(" -", f.name)


In [None]:
import pandas as pd

def parquet_info(path):
    df = duckdb.sql(f"DESCRIBE SELECT * FROM parquet_scan('{path}')").df()
    cnt = duckdb.sql(f"SELECT COUNT(*) AS n FROM parquet_scan('{path}')").df()["n"][0]
    return df, cnt

comparison = []

for old in old_files:
    new = NEW_DIR / old.name
    if not new.exists():
        comparison.append({
            "file": old.name,
            "status": "‚ùå missing in NEW",
            "rows_old": None,
            "rows_new": None,
            "schema_match": False
        })
        continue

    old_schema, old_rows = parquet_info(old)
    new_schema, new_rows = parquet_info(new)

    # Compare schema
    schema_match = (
        list(old_schema["column_name"]) == list(new_schema["column_name"]) and
        list(old_schema["column_type"]) == list(new_schema["column_type"])
    )

    comparison.append({
        "file": old.name,
        "status": "OK" if (schema_match and old_rows == new_rows) else "DIFF",
        "rows_old": old_rows,
        "rows_new": new_rows,
        "schema_match": schema_match
    })

print(pd.DataFrame(comparison))


In [None]:
df_compare = pd.DataFrame(comparison)
print(df_compare)

print("\nFILES FULLY MATCHING:")
print(df_compare[df_compare["status"] == "OK"]["file"].tolist())

print("\nFILES DIFFERING:")
print(df_compare[df_compare["status"] != "OK"])


In [None]:
FILE = "2025W48_sst_hub2.parquet"   # or fft_hub2
old_path = OLD_DIR / FILE
new_path = NEW_DIR / FILE

diff = duckdb.sql(f"""
WITH old AS (
    SELECT * FROM parquet_scan('{old_path}')
),
new AS (
    SELECT * FROM parquet_scan('{new_path}')
)
SELECT 
    new.datetime AS new_dt,
    old.datetime AS old_dt,
    *
FROM new
FULL OUTER JOIN old USING (datetime)
WHERE old.datetime IS NULL OR new.datetime IS NULL
ORDER BY new_dt NULLS LAST, old_dt NULLS LAST
""").df()

diff


In [None]:
diff.columns

In [None]:
FILE = "2025W48_sst_hub2.parquet"
old_path = OLD_DIR / FILE
new_path = NEW_DIR / FILE

# Extract unique timestamps from diff
missing_ts = diff["new_dt"].dropna().unique().tolist()

print("Missing timestamps detected:", missing_ts)

results = []

for ts in missing_ts:
    ts_str = ts.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]  # microsecond trimming if needed

    old_count = duckdb.sql(f"""
        SELECT COUNT(*) AS n
        FROM parquet_scan('{old_path}')
        WHERE datetime = '{ts_str}'
    """).df()["n"][0]

    new_count = duckdb.sql(f"""
        SELECT COUNT(*) AS n
        FROM parquet_scan('{new_path}')
        WHERE datetime = '{ts_str}'
    """).df()["n"][0]

    results.append({
        "timestamp": ts_str,
        "exists_in_old": old_count,
        "exists_in_new": new_count,
    })

import pandas as pd
pd.DataFrame(results)


In [None]:
for p in sorted(NEW_DIR.glob("*.parquet")):
    print("==========================================")
    print("FILE:", p.name)
    print("------------------------------------------")

    cols = duckdb.sql(f"""
        DESCRIBE SELECT * FROM parquet_scan('{p}', union_by_name=true)
    """).df()

    print(cols.to_string(index=False))
    print()
