# Data Profiling and Selection-Guiding Summaries

This notebook:
- Reads all CSVs in `raw/` (ignoring `Zone.Identifier`), extracts `tool_id` and `signal` from filename.
- Autodetects timestamp and value columns, standardizes to long format `{timestamp, tool_id, signal, value}`.
- Profiles streams, infers sampling intervals, resamples per tool to dominant frequency, builds wide and long tables.
- Saves artifacts under `eval/`:
  - `schema_inventory.csv`
  - `wide_by_tool.parquet`
  - `long_all.parquet`
  - `predictability.csv`, `ts_characteristics.csv`, `corr_matrix.csv`, `vif.csv`, `outlier_rate.csv`
- Prints compact selection tables at the end.

In [None]:
# Imports and setup
import sys, os, re, math, warnings, json
from pathlib import Path
from collections import Counter, defaultdict
import numpy as np
import pandas as pd

# Optional installs for parquet and stats
def ensure_packages(pkgs):
    import importlib, subprocess
    for name in pkgs:
        try:
            importlib.import_module(name)
        except Exception:
            try:
                subprocess.check_call([sys.executable, "-m", "pip", "install", name])
            except Exception as e:
                print(f"Warning: failed to install {name}: {e}")

ensure_packages(["pyarrow", "fastparquet", "scikit-learn", "statsmodels", "tqdm"])

# Re-import after potential installation
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import KFold, cross_val_score
from statsmodels.tsa.stattools import adfuller
from statsmodels.stats.outliers_influence import variance_inflation_factor as sm_vif

warnings.filterwarnings("ignore")

ROOT = Path.cwd()
RAW_DIR = ROOT / "raw"
EVAL_DIR = ROOT / "eval"
EVAL_DIR.mkdir(parents=True, exist_ok=True)

print(f"Root: {ROOT}")
print(f"RAW dir: {RAW_DIR}  exists={RAW_DIR.exists()}")
print(f"Eval dir: {EVAL_DIR}")

Collecting pyarrow
  Downloading pyarrow-17.0.0-cp38-cp38-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Downloading pyarrow-17.0.0-cp38-cp38-manylinux_2_28_x86_64.whl (40.0 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/40.0 MB[0m [31m?[0m eta [36m-:--:--[0mDownloading pyarrow-17.0.0-cp38-cp38-manylinux_2_28_x86_64.whl (40.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.0/40.0 MB[0m [31m590.7 kB/s[0m eta [36m0:00:00[0m00:01[0m00:06[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.0/40.0 MB[0m [31m590.7 kB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyarrow
Installing collected packages: pyarrow
Successfully installed pyarrow-17.0.0
Successfully installed pyarrow-17.0.0
Collecting fastparquet
  Downloading fastparquet-2024.2.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.1 kB)
Collecting fastparquet
  Downloading fastparquet-2024.2.0-cp38-cp38-manylinux_2_17_x86_64.

In [1]:
import pycaret
print(pycaret.__version__)

3.3.2


In [3]:
# Helpers: filename parsing, timestamp/value detection, parquet writer
FILENAME_RE = re.compile(r"Tool\.([^.]+)\.[^.]+\.([^.]+)\.csv$", re.IGNORECASE)
TS_CANDIDATES = [
    "timestamp","time","datetime","date","Date","Time","Timestamp"
]

def extract_meta_from_path(path: Path):
    name = path.name
    m = FILENAME_RE.search(name)
    if not m:
        return None, None
    tool_id, signal = m.group(1), m.group(2)
    return tool_id, signal

# Try to detect timestamp column by name or by parsability (>90%)
def detect_timestamp_column(df: pd.DataFrame):
    # by candidate names (case-insensitive)
    lower_map = {c.lower(): c for c in df.columns}
    for key in TS_CANDIDATES:
        if key.lower() in lower_map:
            col = lower_map[key.lower()]
            ts = pd.to_datetime(df[col], errors='coerce', utc=False)
            valid = ts.notna().mean()
            if valid >= 0.90:
                return col, ts
    # heuristic: first column with >90% parse rate
    for col in df.columns:
        ts = pd.to_datetime(df[col], errors='coerce', utc=False)
        if ts.notna().mean() >= 0.90:
            return col, ts
    return None, None

# Find first numeric column after the timestamp column
def detect_value_column(df: pd.DataFrame, ts_col: str):
    cols = list(df.columns)
    start_idx = 0 if ts_col not in cols else cols.index(ts_col) + 1
    for col in cols[start_idx:]:
        s = pd.to_numeric(df[col], errors='coerce')
        if s.notna().mean() >= 0.90:
            return col, s
    # fallback: any prior column
    for col in cols:
        if col == ts_col: continue
        s = pd.to_numeric(df[col], errors='coerce')
        if s.notna().mean() >= 0.90:
            return col, s
    return None, None

# Robust parquet writer
def to_parquet_safe(df: pd.DataFrame, path: Path, index=False):
    try:
        df.to_parquet(path, index=index, engine="pyarrow")
    except Exception:
        try:
            df.to_parquet(path, index=index, engine="fastparquet")
        except Exception as e:
            # final fallback to CSV next to it
            csv_path = path.with_suffix(".csv")
            df.to_csv(csv_path, index=index)
            print(f"Parquet write failed, wrote CSV instead: {csv_path} ({e})")

# Sampling interval (median) in seconds
def median_sampling_seconds(ts: pd.Series):
    ts = pd.to_datetime(ts)
    ts = ts.sort_values().drop_duplicates()
    if ts.size < 2:
        return np.nan
    dt = ts.diff().dropna().dt.total_seconds()
    if dt.empty:
        return np.nan
    return float(np.median(dt))

In [4]:
# Reader producing standardized long records and schema rows

def read_csv_standardize(path: Path):
    tool_id, signal = extract_meta_from_path(path)
    if not tool_id or not signal:
        return None, None
    try:
        df = pd.read_csv(path, low_memory=False)
    except UnicodeDecodeError:
        df = pd.read_csv(path, low_memory=False, encoding='latin-1')
    if df.empty:
        return None, None

    ts_col, ts_parsed = detect_timestamp_column(df)
    if not ts_col:
        return None, None

    val_col, val_parsed = detect_value_column(df, ts_col)
    if not val_col:
        return None, None

    # Build standardized frame
    ts = pd.to_datetime(ts_parsed, errors='coerce')
    val = pd.to_numeric(df[val_col], errors='coerce')

    # Schema metrics prior to dropping
    rows_total = len(df)
    missing_pct = float(val.isna().mean() * 100.0)

    frame = pd.DataFrame({
        'timestamp': ts,
        'tool_id': tool_id,
        'signal': signal,
        'value': val,
    }).dropna(subset=['timestamp'])

    # Basic cleanup & order
    frame = frame.sort_values('timestamp')
    # remove exact duplicate timestamp rows for the same tool/signal
    frame = frame.drop_duplicates(subset=['timestamp'])

    # Sampling inference
    freq_s = median_sampling_seconds(frame['timestamp'])

    start = frame['timestamp'].min()
    end   = frame['timestamp'].max()

    schema = {
        'file': str(path.relative_to(ROOT)),
        'tool_id': tool_id,
        'signal': signal,
        'rows': int(rows_total),
        'start': start,
        'end': end,
        'inferred_freq_s': freq_s,
        'value_mean': float(frame['value'].mean(skipna=True)) if frame['value'].notna().any() else np.nan,
        'value_std': float(frame['value'].std(skipna=True)) if frame['value'].notna().any() else np.nan,
        'missing_pct': missing_pct,
    }
    return frame, schema

# Collect files
all_csvs = sorted([p for p in RAW_DIR.rglob('*.csv') if 'Zone.Identifier' not in p.name])
print(f"Found CSV files: {len(all_csvs)}")

Found CSV files: 54


In [2]:
# Build long table and schema inventory
records = []
schema_rows = []

for p in tqdm(all_csvs, desc="Reading & standardizing"):
    frame, schema = read_csv_standardize(p)
    if frame is None:
        continue
    # keep only rows where value is present
    records.append(frame)
    schema_rows.append(schema)

if not records:
    print("No valid CSVs found.")
    long_df = pd.DataFrame(columns=['timestamp','tool_id','signal','value'])
else:
    long_df = pd.concat(records, ignore_index=True)

schema_df = pd.DataFrame(schema_rows)

# Save schema inventory
schema_path = EVAL_DIR / 'schema_inventory.csv'
schema_df.to_csv(schema_path, index=False)
print(f"Saved: {schema_path}  rows={len(schema_df)}")

# Save long table (raw long, not resampled)
long_path = EVAL_DIR / 'long_all.parquet'
to_parquet_safe(long_df, long_path, index=False)
print(f"Saved: {long_path}  rows={len(long_df)}")

long_df.head(3)

NameError: name 'tqdm' is not defined

In [1]:
# Resample per tool to dominant frequency and build wide tables

def dominant_frequency_seconds(freq_list):
    # round to nearest second and take mode
    vals = [int(round(f)) for f in freq_list if pd.notna(f) and np.isfinite(f) and f > 0]
    if not vals:
        return 1
    cnt = Counter(vals)
    # Find the (count, value) pair with highest count (lowest negative count), tie-breaker: smallest value
    best = min(((-c, v), v) for v, c in cnt.items())
    return best[1]

wide_tables = []
resampled_long = []

for tool_id, g in long_df.groupby('tool_id'):
    # compute per-signal median intervals
    sig_freqs = {}
    for sig, gg in g.groupby('signal'):
        sig_freqs[sig] = median_sampling_seconds(gg['timestamp'])
    dom_s = dominant_frequency_seconds(list(sig_freqs.values()))
    freq_str = f"{int(max(1, dom_s))}s"

    # common index across this tool
    start = g['timestamp'].min()
    end   = g['timestamp'].max()
    if pd.isna(start) or pd.isna(end) or start >= end:
        continue
    idx = pd.date_range(start=start, end=end, freq=freq_str)

    # build wide matrix
    cols = {}
    for sig, gg in g.groupby('signal'):
        s = gg.set_index('timestamp')['value'].sort_index()
        s = s.resample(freq_str).ffill()
        # ensure full coverage on common index
        s = s.reindex(idx).ffill()
        cols[sig] = s
        # also keep resampled long
        res = pd.DataFrame({'timestamp': s.index, 'tool_id': tool_id, 'signal': sig, 'value': s.values})
        resampled_long.append(res)

    wide = pd.DataFrame(cols, index=idx).reset_index().rename(columns={'index': 'timestamp'})
    wide.insert(0, 'tool_id', tool_id)
    wide_tables.append(wide)

# Concatenate
if wide_tables:
    wide_by_tool = pd.concat(wide_tables, ignore_index=True, sort=False)
else:
    wide_by_tool = pd.DataFrame()

res_long_df = pd.concat(resampled_long, ignore_index=True) if resampled_long else pd.DataFrame(columns=['timestamp','tool_id','signal','value'])

# Save artifacts
wide_path = EVAL_DIR / 'wide_by_tool.parquet'
to_parquet_safe(wide_by_tool, wide_path, index=False)
print(f"Saved: {wide_path}  rows={len(wide_by_tool)}")

res_long_path = EVAL_DIR / 'long_all_resampled.parquet'
to_parquet_safe(res_long_df, res_long_path, index=False)
print(f"Saved: {res_long_path}  rows={len(res_long_df)}")

wide_by_tool.head(3)

NameError: name 'long_df' is not defined

In [None]:
# Predictability score per signal (5-fold CV RFR predicting each signal from others)

pred_rows = []
MIN_SAMPLES = 200  # skip very small datasets

if not wide_by_tool.empty:
    for tool_id, w in wide_by_tool.groupby('tool_id'):
        # identify signal columns (exclude timestamp/tool_id)
        sig_cols = [c for c in w.columns if c not in ('tool_id','timestamp')]
        # drop rows where all signals are NaN
        W = w.dropna(subset=sig_cols, how='all').copy()
        if W.empty:
            continue
        for target in sig_cols:
            X_cols = [c for c in sig_cols if c != target]
            sub = W.dropna(subset=[target] + X_cols, how='any')
            if len(sub) < max(MIN_SAMPLES, 5):
                continue
            X = sub[X_cols].values
            y = sub[target].values
            try:
                model = RandomForestRegressor(n_estimators=200, random_state=42, n_jobs=-1)
                cv = KFold(n_splits=5, shuffle=True, random_state=42)
                scores = cross_val_score(model, X, y, cv=cv, scoring='r2', n_jobs=-1)
                pred_rows.append({
                    'tool_id': tool_id,
                    'signal': target,
                    'r2_mean': float(np.mean(scores)),
                    'r2_std': float(np.std(scores)),
                    'n_samples': int(len(sub)),
                    'n_features': int(len(X_cols)),
                })
            except Exception as e:
                pred_rows.append({
                    'tool_id': tool_id,
                    'signal': target,
                    'r2_mean': np.nan,
                    'r2_std': np.nan,
                    'n_samples': int(len(sub)),
                    'n_features': int(len(X_cols)),
                })

pred_df = pd.DataFrame(pred_rows)

# Aggregate overall by signal across tools
if not pred_df.empty:
    overall = (pred_df.groupby('signal')['r2_mean']
                     .mean()
                     .reset_index()
                     .assign(tool_id='ALL')
                     .rename(columns={'r2_mean':'r2_mean'}))
    pred_all = pd.concat([pred_df, overall], ignore_index=True, sort=False)
else:
    pred_all = pred_df.copy()

pred_path = EVAL_DIR / 'predictability.csv'
pred_all.to_csv(pred_path, index=False)
print(f"Saved: {pred_path}  rows={len(pred_all)}")

pred_all.sort_values(['tool_id','r2_mean'], ascending=[True, False]).head(10)

: 

: 

In [None]:
# Time-series characteristics per signal: lag1 autocorr, ADF p, top FFT peak period (s), % zeros, % const-run

def const_run_fraction(x: np.ndarray, min_len: int = 3):
    if x.size == 0:
        return np.nan
    # Identify run lengths of equal consecutive values
    diffs = np.diff(x)
    same = np.concatenate([[False], diffs == 0])
    # compute run lengths
    runs = []
    run_len = 1
    for i in range(1, len(x)):
        if x[i] == x[i-1]:
            run_len += 1
        else:
            runs.append(run_len)
            run_len = 1
    runs.append(run_len)
    # fraction of points that belong to runs length >= min_len
    eligible = sum([rl for rl in runs if rl >= min_len])
    return eligible / float(len(x))

char_rows = []

if not wide_by_tool.empty:
    for tool_id, w in wide_by_tool.groupby('tool_id'):
        sig_cols = [c for c in w.columns if c not in ('tool_id','timestamp')]
        # Determine dt in seconds from index if evenly spaced; else infer by median
        # Use timestamp diffs median
        tdiff = pd.to_datetime(w['timestamp']).sort_values().diff().dt.total_seconds().median()
        dt = float(tdiff) if pd.notna(tdiff) and tdiff > 0 else 1.0
        for sig in sig_cols:
            s = pd.to_numeric(w[sig], errors='coerce').dropna()
            if len(s) < 50:
                continue
            try:
                lag1 = pd.Series(s.values).autocorr(lag=1)
            except Exception:
                lag1 = np.nan
            try:
                adf_p = float(adfuller(s.values, autolag='AIC')[1])
            except Exception:
                adf_p = np.nan
            # FFT peak period
            try:
                y = s.values.astype(float)
                y = y - np.nanmean(y)
                n = len(y)
                if n > 1:
                    freqs = np.fft.rfftfreq(n, d=dt)
                    fft = np.abs(np.fft.rfft(y))
                    # ignore zero freq
                    if freqs.size > 1:
                        fft[0] = 0.0
                        k = int(np.nanargmax(fft))
                        peak_freq = freqs[k]
                        peak_period = float(1.0/peak_freq) if peak_freq > 0 else np.nan
                    else:
                        peak_period = np.nan
                else:
                    peak_period = np.nan
            except Exception:
                peak_period = np.nan
            # zeros and const runs
            arr = s.values
            pct_zero = float((arr == 0).mean() * 100.0)
            pct_const = float(const_run_fraction(arr, min_len=3) * 100.0)
            char_rows.append({
                'tool_id': tool_id,
                'signal': sig,
                'lag1_autocorr': lag1,
                'adf_pvalue': adf_p,
                'top_fft_peak_period_s': peak_period,
                'pct_zeros': pct_zero,
                'pct_const_runs': pct_const,
            })

char_df = pd.DataFrame(char_rows)
if not char_df.empty:
    overall_char = (char_df.groupby('signal')
                           .agg({
                               'lag1_autocorr':'mean',
                               'adf_pvalue':'mean',
                               'top_fft_peak_period_s':'median',
                               'pct_zeros':'mean',
                               'pct_const_runs':'mean',
                            })
                           .reset_index()
                           .assign(tool_id='ALL'))
    char_all = pd.concat([char_df, overall_char], ignore_index=True, sort=False)
else:
    char_all = char_df.copy()

char_path = EVAL_DIR / 'ts_characteristics.csv'
char_all.to_csv(char_path, index=False)
print(f"Saved: {char_path}  rows={len(char_all)}")

char_all.head(10)

: 

: 

In [None]:
# Cross-signal correlation (Pearson) and VIF

corr_rows = []
vif_rows = []

if not wide_by_tool.empty:
    for tool_id, w in wide_by_tool.groupby('tool_id'):
        sig_cols = [c for c in w.columns if c not in ('tool_id','timestamp')]
        if len(sig_cols) < 2:
            continue
        dfnum = w[sig_cols].copy()
        cmat = dfnum.corr(method='pearson', min_periods=50)
        # long format
        for i in sig_cols:
            for j in sig_cols:
                corr_rows.append({'tool_id': tool_id, 'signal_i': i, 'signal_j': j, 'corr': cmat.loc[i, j] if i in cmat.index and j in cmat.columns else np.nan})
        # VIF (drop rows with NaN)
        X = dfnum.dropna()
        if len(X) >= 200 and X.shape[1] >= 2:
            # standardize (optional but helps numerical stability)
            Xz = (X - X.mean())/X.std(ddof=0)
            Xz = Xz.replace([np.inf, -np.inf], np.nan).dropna()
            if len(Xz) >= 200:
                try:
                    for k, col in enumerate(Xz.columns):
                        vif = sm_vif(Xz.values, k)
                        vif_rows.append({'tool_id': tool_id, 'feature': col, 'vif': float(vif)})
                except Exception:
                    pass

# Overall correlation across tools (concatenate and compute)
if not wide_by_tool.empty:
    sig_cols_all = [c for c in wide_by_tool.columns if c not in ('tool_id','timestamp')]
    df_all = wide_by_tool[sig_cols_all].copy()
    cmat_all = df_all.corr(method='pearson', min_periods=200)
    for i in sig_cols_all:
        for j in sig_cols_all:
            corr_rows.append({'tool_id': 'ALL', 'signal_i': i, 'signal_j': j, 'corr': cmat_all.loc[i, j] if i in cmat_all.index and j in cmat_all.columns else np.nan})
    # Overall VIF
    Xall = df_all.dropna()
    if len(Xall) >= 500 and Xall.shape[1] >= 2:
        Xallz = (Xall - Xall.mean())/Xall.std(ddof=0)
        Xallz = Xallz.replace([np.inf, -np.inf], np.nan).dropna()
        if len(Xallz) >= 500:
            try:
                for k, col in enumerate(Xallz.columns):
                    vif = sm_vif(Xallz.values, k)
                    vif_rows.append({'tool_id': 'ALL', 'feature': col, 'vif': float(vif)})
            except Exception:
                pass

corr_df = pd.DataFrame(corr_rows)
vif_df = pd.DataFrame(vif_rows)

corr_path = EVAL_DIR / 'corr_matrix.csv'
vif_path  = EVAL_DIR / 'vif.csv'

corr_df.to_csv(corr_path, index=False)
vif_df.to_csv(vif_path, index=False)
print(f"Saved: {corr_path} rows={len(corr_df)}")
print(f"Saved: {vif_path} rows={len(vif_df)}")

corr_df.head(10)

: 

: 

In [None]:
# Quick outlier rate: z-score > 4 per signal (by tool and overall)

out_rows = []
if not wide_by_tool.empty:
    for tool_id, w in wide_by_tool.groupby('tool_id'):
        sig_cols = [c for c in w.columns if c not in ('tool_id','timestamp')]
        Z = (w[sig_cols] - w[sig_cols].mean())/w[sig_cols].std(ddof=0)
        for c in sig_cols:
            s = Z[c].replace([np.inf, -np.inf], np.nan).dropna()
            if s.empty:
                continue
            rate = float((s.abs() > 4).mean() * 100.0)
            out_rows.append({'tool_id': tool_id, 'signal': c, 'outlier_rate_pct': rate, 'n': int(s.size)})

# Overall
if not wide_by_tool.empty:
    sig_cols = [c for c in wide_by_tool.columns if c not in ('tool_id','timestamp')]
    Zall = (wide_by_tool[sig_cols] - wide_by_tool[sig_cols].mean())/wide_by_tool[sig_cols].std(ddof=0)
    for c in sig_cols:
        s = Zall[c].replace([np.inf, -np.inf], np.nan).dropna()
        if s.empty:
            continue
        rate = float((s.abs() > 4).mean() * 100.0)
        out_rows.append({'tool_id': 'ALL', 'signal': c, 'outlier_rate_pct': rate, 'n': int(s.size)})

out_df = pd.DataFrame(out_rows)
out_path = EVAL_DIR / 'outlier_rate.csv'
out_df.to_csv(out_path, index=False)
print(f"Saved: {out_path} rows={len(out_df)}")

out_df.head(10)

: 

: 

In [None]:
# Compact selection tables to print

# 1) Top 10 most predictable signals (highest R^2)
if (EVAL_DIR / 'predictability.csv').exists():
    pred = pd.read_csv(EVAL_DIR / 'predictability.csv')
    top_pred = (pred[pred['tool_id'] == 'ALL']
                .sort_values('r2_mean', ascending=False)
                .head(10))
    print("Top 10 most predictable signals (overall R^2):")
    display(top_pred[['signal','r2_mean']])

# 2) Signals with strong temporal structure (high lag-1, low ADF p)
if (EVAL_DIR / 'ts_characteristics.csv').exists():
    tschar = pd.read_csv(EVAL_DIR / 'ts_characteristics.csv')
    overall_ts = tschar[tschar['tool_id'] == 'ALL'].copy()
    if not overall_ts.empty:
        # rank by lag1 desc and adf_p asc
        overall_ts['rank'] = overall_ts['lag1_autocorr'].rank(ascending=False, method='average') + \
                              overall_ts['adf_pvalue'].rank(ascending=True, method='average')
        strong_temporal = overall_ts.sort_values('rank').head(10)
        print("Signals with strongest temporal structure:")
        display(strong_temporal[['signal','lag1_autocorr','adf_pvalue','top_fft_peak_period_s']])

# 3) Signals with lowest pairwise correlations to others
if (EVAL_DIR / 'corr_matrix.csv').exists():
    corr_long = pd.read_csv(EVAL_DIR / 'corr_matrix.csv')
    all_corr = corr_long[corr_long['tool_id'] == 'ALL']
    # mean absolute corr to others per signal
    m = all_corr[all_corr['signal_i'] != all_corr['signal_j']]
    mean_abs = (m.assign(abs_corr=m['corr'].abs())
                 .groupby('signal_i')['abs_corr']
                 .mean()
                 .sort_values()
                 .reset_index()
                 .rename(columns={'signal_i':'signal','abs_corr':'mean_abs_corr'}))
    low_corr = mean_abs.head(10)
    print("Signals with lowest mean absolute correlation to others:")
    display(low_corr)

: 

: 

In [3]:
import pycaret
print('hello')

hello
