In [None]:
# Ensure WORKSPACE_ROOT resolves correctly in headless runs
import os
from pathlib import Path

def _compute_workspace_root():
    env = os.getenv('WORKSPACE_ROOT')
    if env:
        return env
    cwd = Path.cwd()
    if cwd.name == 'airth_mining' and cwd.parent.name == 'Q-SMEC_Development_Environment':
        return str(cwd.parent.parent)
    return str(cwd)

os.environ['WORKSPACE_ROOT'] = _compute_workspace_root()

In [None]:
from __future__ import annotations
import os
from pathlib import Path
import pandas as pd
import numpy as np
import plotly.express as px

WORKSPACE_ROOT = os.getenv('WORKSPACE_ROOT') or str(Path.cwd())
AIRTH_DIR = Path(WORKSPACE_ROOT) / 'Q-SMEC_Development_Environment' / 'airth_mining'
EXCEL_PATH = AIRTH_DIR / 'AIRTH_SENSOR_TEMPLATE.xlsx'
EXCEL_PATH

In [None]:
assert EXCEL_PATH.exists(), f'Excel file not found: {EXCEL_PATH}'
schema_df = pd.read_excel(EXCEL_PATH, sheet_name='DataSchema')
data_df = pd.read_excel(EXCEL_PATH, sheet_name='SampleData')
kpis_df = pd.read_excel(EXCEL_PATH, sheet_name='KPIs')
schema_df.head(), data_df.head(), kpis_df.head()

In [None]:
# Preprocessing: timestamp to datetime, numeric coercion, basic cleaning
data = data_df.copy()
if 'timestamp' in data.columns:
    data['timestamp'] = pd.to_datetime(data['timestamp'], errors='coerce', utc=True)

numeric_cols = ['value','temperature_c','humidity_pct','battery_pct','signal_strength_db','depth_m','lat','lon']
for c in numeric_cols:
    if c in data.columns:
        data[c] = pd.to_numeric(data[c], errors='coerce')

# Basic filters (example): drop rows without value or sensor_id
data = data.dropna(subset=['value','sensor_id'])
data.describe(include='all')

In [None]:
# KPI recompute demo
def recompute_kpis(df: pd.DataFrame) -> pd.DataFrame:
    rows = []
    rows.append({'kpi':'records_total','value': int(len(df))})
    if 'signal_strength_db' in df:
        rows.append({'kpi':'avg_signal_db','value': float(df['signal_strength_db'].mean())})
    if 'battery_pct' in df:
        rows.append({'kpi':'median_battery_pct','value': float(df['battery_pct'].median())})
    if 'status' in df:
        rows.append({'kpi':'uptime_ok_pct','value': float((df['status']=='OK').mean()*100)})
    # per reading_type
    if 'reading_type' in df:
        for t, v in df['reading_type'].value_counts(dropna=False).to_dict().items():
            rows.append({'kpi': f'count_{t}', 'value': int(v)})
    return pd.DataFrame(rows)

kpis_re = recompute_kpis(data)
kpis_re

In [None]:
# Visualization stubs
# 1) Time series for a single sensor and reading type
sensor_example = data['sensor_id'].iloc[0] if len(data) else None
rtype_example = data['reading_type'].iloc[0] if 'reading_type' in data and len(data) else None
if sensor_example and rtype_example:
    fig = px.line(data.query('sensor_id == @sensor_example and reading_type == @rtype_example').sort_values('timestamp'),
                 x='timestamp', y='value', color='reading_type', title=f'Time Series â€” {sensor_example} / {rtype_example}')
    fig.show()

# 2) Distribution by reading_type
if 'reading_type' in data:
    fig2 = px.histogram(data, x='value', color='reading_type', nbins=40, title='Distribution by Reading Type')
    fig2.show()

# 3) Battery vs Signal scatter
if {'battery_pct','signal_strength_db'}.issubset(data.columns):
    fig3 = px.scatter(data, x='signal_strength_db', y='battery_pct', color='status',
                      title='Battery vs Signal Strength (colored by status)')
    fig3.show()

In [None]:
# Simple anomaly detection (z-score) per sensor+type
data_z = data.copy()
if not data_z.empty and {'sensor_id','reading_type','value'}.issubset(data_z.columns):
    data_z['z'] = (data_z['value'] - data_z.groupby(['sensor_id','reading_type'])['value'].transform('mean')) / \
                    data_z.groupby(['sensor_id','reading_type'])['value'].transform('std')
    data_z['anomaly'] = data_z['z'].abs() > 3
    anomalies = data_z[data_z['anomaly']].sort_values('z', ascending=False)
    anomalies[['timestamp','sensor_id','reading_type','value','z']].head(20)
else:
    print('Not enough columns for anomaly detection')

## Next Steps
- Map AIRTH signals to domain-specific thresholds and business KPIs.
- Integrate with hybrid AI+Quantum workflows (feature extraction, optimization, scheduling).
- Add export routines for reports and dashboards.

In [None]:
# Load Assumptions and Domain KPIs from the Excel template
assumptions_df = pd.read_excel(EXCEL_PATH, sheet_name='Assumptions')
domain_kpis_seed = pd.read_excel(EXCEL_PATH, sheet_name='DomainKPIs') if 'DomainKPIs' in pd.ExcelFile(EXCEL_PATH).sheet_names else pd.DataFrame()
assumptions_df.head(), domain_kpis_seed.head()

In [None]:
# Compute domain-specific KPIs using assumptions
A = {r['parameter']: r['value'] for _, r in assumptions_df.iterrows()} if not assumptions_df.empty else {}

def compute_domain_kpis_local(df: pd.DataFrame, A: dict) -> pd.DataFrame:
    rows = []
    if df.empty:
        return pd.DataFrame(rows)
    rows.append({'kpi':'sensors_unique','value': int(df['sensor_id'].nunique())})
    rows.append({'kpi':'sites_unique','value': int(df['site_id'].nunique())})
    # uptime by sensor
    up = df.groupby('sensor_id')['status'].apply(lambda s: float((s=='OK').mean()*100)).to_dict()
    for s, v in up.items():
        rows.append({'kpi': f'uptime_ok_pct__{s}', 'value': round(v,2)})
    # thresholds
    def count_exceed(sub, col, warn, alarm, higher_is_bad=True):
        if sub.empty or col not in sub:
            return 0, 0
        if higher_is_bad:
            return int((sub[col] >= warn).sum()), int((sub[col] >= alarm).sum())
        else:
            return int((sub[col] <= warn).sum()), int((sub[col] <= alarm).sum())
    for rtype in ('THz_RCS','EM_Field','Vibration','Thermal'):
        sub = df[df['reading_type']==rtype]
        if rtype=='THz_RCS':
            w,a_ = count_exceed(sub,'value',A.get('thz_rcs_warn_dbsm',-15),A.get('thz_rcs_alarm_dbsm',-10),True)
        elif rtype=='EM_Field':
            w,a_ = count_exceed(sub,'value',A.get('em_field_warn_mVpm',80),A.get('em_field_alarm_mVpm',100),True)
        elif rtype=='Vibration':
            w,a_ = count_exceed(sub,'value',A.get('vibration_warn_mms',4.0),A.get('vibration_alarm_mms',6.0),True)
        else:
            w,a_ = count_exceed(sub,'value',A.get('thermal_warn_c',40.0),A.get('thermal_alarm_c',45.0),True)
        rows.append({'kpi': f'warn_count__{rtype}', 'value': w})
        rows.append({'kpi': f'alarm_count__{rtype}', 'value': a_})
    # battery / signal
    if {'battery_pct','signal_strength_db'}.issubset(df.columns):
        rows.append({'kpi':'low_battery_count','value': int((df['battery_pct'] <= A.get('battery_warn_pct',40)).sum())})
        rows.append({'kpi':'weak_signal_count','value': int((df['signal_strength_db'] <= A.get('signal_warn_db',-85)).sum())})
    return pd.DataFrame(rows)

computed_domain_kpis = compute_domain_kpis_local(data, A)
computed_domain_kpis

In [None]:
# Enhanced visualizations + export (HTML always; PNG if 'kaleido' installed)
from pathlib import Path
EXPORT_DIR = AIRTH_DIR / 'exports'
EXPORT_DIR.mkdir(parents=True, exist_ok=True)

# Safe marker size derived from absolute values (avoid negative sizes)
if 'value' in data:
    data = data.copy()
    data['_size'] = data['value'].abs().clip(lower=1.0)

figs = []
# Geo scatter if lat/lon available
if {'lat','lon'}.issubset(data.columns):
    fig_geo = px.scatter(data, x='lon', y='lat', color='reading_type', size='_size',
                         title='Spatial Distribution of Readings (lat/lon)')
    figs.append(('spatial_distribution', fig_geo))

# Per-type distributions
if 'reading_type' in data:
    fig_dist = px.histogram(data, x='value', color='reading_type', nbins=50,
                            title='Value Distributions by Reading Type', marginal='box')
    figs.append(('value_distributions', fig_dist))

# Battery vs Signal (trendline optional: requires statsmodels)
if {'battery_pct','signal_strength_db'}.issubset(data.columns):
    try:
        import statsmodels.api
        fig_bs = px.scatter(data, x='signal_strength_db', y='battery_pct', color='status',
                            title='Battery vs Signal Strength', trendline='ols')
    except ImportError:
        fig_bs = px.scatter(data, x='signal_strength_db', y='battery_pct', color='status',
                            title='Battery vs Signal Strength')
    figs.append(('battery_vs_signal', fig_bs))

# Save figures
saved = []
for name, fig in figs:
    html_path = EXPORT_DIR / f'{name}.html'
    fig.write_html(str(html_path))
    saved.append(str(html_path))
    try:
        fig.write_image(str(EXPORT_DIR / f'{name}.png'))
    except Exception:
        pass

saved

In [None]:
# Export analytics report (Excel) with KPIs
report_path = EXPORT_DIR / 'AIRTH_ANALYTICS_REPORT.xlsx'
with pd.ExcelWriter(report_path, engine='openpyxl') as writer:
    kpis_re.to_excel(writer, sheet_name='KPIs_Recomputed', index=False)
    computed_domain_kpis.to_excel(writer, sheet_name='DomainKPIs', index=False)
    data.head(2000).to_excel(writer, sheet_name='SampleData_Head', index=False)
report_path