In [1]:
import os
import pandas as pd
import numpy as np
import warnings
from scipy.stats import mode

warnings.filterwarnings("ignore")

try:
    from tqdm import tqdm
except ImportError:
    print("tqdm not found. Please install it for progress bars: pip install tqdm")
    tqdm = lambda x, **kwargs: x

# ======= CONFIGURATION ======= #
COLUMNS = [
    "device_id", "time", "temperature", "humidity", "tvoc", "co2", "co", "no2", "hcho",
    "pm_1", "pm_25", "pm_10", "etoh", "person_detection", "loitering", "aqi", "noise_ratio",
    "crowd_count", "alert_type", "alert_threshold", "alert_value", "device_name"
]
# List of columns that should always be mode-aggregated (not averaged)
INT_MODE_COLS = {
    "person_detection", "loitering", "crowd_count", "alert_type",
    "alert_threshold", "alert_value", "aqi"
}
# Raw data from ULTRA and PRO sensors - through infuxDB
RAW_FILES = ["../data/raw_alerts.csv", "../data/raw_heartbeats.csv"]
DATASET_NAMES = ["alerts", "heartbeats"]
OUTPUT_BASE = "data/partitioned_data"
# ============================= #

def read_custom_csv(file_path):
    print(f"\n[INFO] Reading file: {file_path}")
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    dtype_row = lines[1].strip().split(",")
    header_row = lines[3].strip().split(",")
    dtype_map = dict(zip(header_row, dtype_row))
    df = pd.read_csv(file_path, skiprows=4, header=None, names=header_row, low_memory=False)
    print(f"[INFO] Loaded {len(df)} rows with {len(df.columns)} columns")
    return df, header_row, dtype_map

def select_and_type_columns(df, dtype_map):
    print("[INFO] Selecting and type-correcting columns...")
    initial_cols = set(df.columns)
    keep = [col for col in COLUMNS if col in df.columns]
    removed = initial_cols - set(keep)
    df = df[keep]
    # Cast types
    for col in df.columns:
        typ = dtype_map.get(col, "str").lower()
        if typ.startswith("int"):
            df[col] = pd.to_numeric(df[col], errors='coerce').astype('Int64')
        elif typ.startswith("float") or typ == "double":
            df[col] = pd.to_numeric(df[col], errors='coerce')
        elif typ in {"str", "object", "string"}:
            df[col] = df[col].astype(str)
        elif typ in {"bool", "boolean"}:
            df[col] = df[col].astype('boolean')
    print(f"[INFO] Columns removed: {removed}")
    print(f"[INFO] Columns kept: {df.columns.tolist()}")
    return df

def aggregate_10min(df):
    """
    1. Truncates all timestamps to minute resolution (removes seconds/ms)
    2. Aggregates to 10-min intervals.
    3. Integer-like columns are always reduced with mode, never mean.
    """
    print("[INFO] Rounding time to minute and aggregating to 10-minute intervals...")
    df['time'] = pd.to_datetime(df['time'], errors='coerce')
    df = df.dropna(subset=['time', 'device_id'])
    # Remove seconds and milliseconds from time (floor to minute)
    df['time'] = df['time'].dt.floor('min')
    # Create 10-minute bucket for aggregation
    df['bucket'] = df['time'].dt.floor('10T')
    key_cols = ['device_id', 'bucket']
    agg_dict = {}
    for col in df.columns:
        if col in ['device_id', 'time', 'bucket', 'date']:
            continue
        # If the column should be mode-aggregated (int/categorical), always use mode
        if col in INT_MODE_COLS or pd.api.types.is_integer_dtype(df[col]):
            agg_dict[col] = lambda x: mode(x.dropna(), keepdims=True)[0][0] if len(x.dropna()) else np.nan
        # Numeric columns (floats): average
        elif pd.api.types.is_numeric_dtype(df[col]):
            agg_dict[col] = 'mean'
        else:
            agg_dict[col] = lambda x: mode(x.dropna(), keepdims=True)[0][0] if len(x.dropna()) else ''
    grouped = df.groupby(key_cols)
    agg_df = grouped.agg(agg_dict).reset_index()
    # The "time" of each record is the left edge of the 10-min bucket (also floored to minute)
    agg_df['time'] = agg_df['bucket']
    agg_df = agg_df.drop(columns=['bucket'])
    print(f"[INFO] Aggregated to {len(agg_df)} 10-min intervals (from {len(df)} minute-level rows)")
    return agg_df

def fix_temporal_order(df):
    df = df.sort_values(by=['device_id', 'time'])
    return df

def handle_missing(df):
    print("[INFO] Handling missing/NaN values...")
    for col in df.columns:
        if col in ['device_id', 'time', 'date']:
            continue
        if pd.api.types.is_numeric_dtype(df[col]):
            df[col] = df[col].interpolate().fillna(method='bfill').fillna(method='ffill')
        else:
            mode_val = df[col].mode(dropna=True)
            if len(mode_val) > 0:
                df[col] = df[col].fillna(mode_val[0])
            else:
                df[col] = df[col].fillna('')
    return df

def partition_and_write(df, dataset_name):
    print("[INFO] Partitioning data and writing output CSV files...")
    df['date'] = df['time'].dt.date.astype(str)
    grouped = df.groupby(['device_id', 'date'])
    num_partitions = grouped.ngroups
    with tqdm(total=num_partitions, desc=f"Writing {dataset_name}") as pbar:
        for (device, date), group in grouped:
            out_dir = os.path.join(OUTPUT_BASE, dataset_name, f"device_id={device}", f"date={date}")
            os.makedirs(out_dir, exist_ok=True)
            out_df = group.drop(columns=['device_id', 'date'])
            out_file = os.path.join(out_dir, f"{dataset_name}.csv")
            out_df.to_csv(out_file, index=False)
            pbar.update(1)
    print(f"[INFO] Partitioned and wrote {num_partitions} files for {dataset_name}.")

def process_file(file_path, dataset_name):
    df, header, dtype_map = read_custom_csv(file_path)
    df = select_and_type_columns(df, dtype_map)
    df = aggregate_10min(df)
    df = fix_temporal_order(df)
    df = handle_missing(df)
    partition_and_write(df, dataset_name)
    print(f"[DONE] Processing complete for {dataset_name} ({file_path})\n")

if __name__ == "__main__":
    print("=== Starting Data Cleansing and Partitioning Pipeline ===")
    for f, ds in zip(RAW_FILES, DATASET_NAMES):
        process_file(f, ds)
    print("\n=== All tasks complete. Data is ready for S3 upload and AWS Glue cataloging. ===")


=== Starting Data Cleansing and Partitioning Pipeline ===

[INFO] Reading file: ../data/raw_alerts.csv
[INFO] Loaded 2718 rows with 99 columns
[INFO] Selecting and type-correcting columns...
[INFO] Columns removed: {'', 'pm_10_threshold', 'tamper', 'cell_phone', 'glass_threshold', 'wrapped_device', 'alpha_divisor', 'etoh_threshold', 'loitering_threshold', 'tamper_enabled', 'client_site_id', 'move_z', 'env', 'person_detection_threshold', 'risk', 'software_version', 'thc_level', 'inci_stddev_mult', 'device_type', 'client_org', 'co_threshold', 'client_name', 'audio_stream', 'tamper_polarity', 'gunshot_threshold', 'inci_back_time', 'co2_threshold', 'client_id', 'noise_ratio_threshold', 'keywords', 'client_reseller_id', 'crowd_count_thresh', 'move', 'vape_level', 'audio_server', 'aqi_threshold', 'tamper_debounce', 'temperature_f', 'crowd_count_dist', 'tvoc_threshold', 'audio_shift', 'move_x', 'cell_phone_threshold', 'hcho_threshold', 'scream_level', 'client_organization_id', 'crowd_count_du


riting alerts: 100%|█████████████████████████████████████████████████████████████████| 40/40 [00:00<00:00, 167.21it/s]

[INFO] Partitioned and wrote 40 files for alerts.
[DONE] Processing complete for alerts (../data/raw_alerts.csv)


[INFO] Reading file: ../data/raw_heartbeats.csv
[INFO] Loaded 434311 rows with 92 columns
[INFO] Selecting and type-correcting columns...
[INFO] Columns removed: {'', 'pm_10_threshold', 'tamper', 'cell_phone', 'glass_threshold', 'wrapped_device', 'alpha_divisor', 'etoh_threshold', 'loitering_threshold', 'tamper_enabled', 'client_site_id', 'move_z', 'env', 'person_detection_threshold', 'risk', 'software_version', 'inci_stddev_mult', 'device_type', 'co_threshold', 'client_name', 'audio_stream', 'tamper_polarity', 'gunshot_threshold', 'inci_back_time', 'co2_threshold', 'client_id', 'noise_ratio_threshold', 'client_reseller_id', 'crowd_count_thresh', 'move', 'audio_server', 'aqi_threshold', 'tamper_debounce', 'temperature_f', 'crowd_count_dist', 'tvoc_threshold', 'audio_shift', 'move_x', 'cell_phone_threshold', 'hcho_threshold', 'client_organization_id', 'crowd_count_dur', 'mi

Writing heartbeats: 100%|███████████████████████████████████████████████████████████| 373/373 [00:03<00:00, 113.09it/s]

[INFO] Partitioned and wrote 373 files for heartbeats.
[DONE] Processing complete for heartbeats (../data/raw_heartbeats.csv)


=== All tasks complete. Data is ready for S3 upload and AWS Glue cataloging. ===



