In [None]:
# =======================================
# 1. Imports and Dataset Load
# =======================================
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
import ipywidgets as widgets
from IPython.display import display, clear_output

# Load dataset
INPUT_CSV = "/content/smart_meter_data (1).csv"   # <-- Change filename if needed
df = pd.read_csv(INPUT_CSV)

print("Dataset Loaded:", df.shape)
print("Columns:", df.columns.tolist())
print(df.head())

# Select column for consumption (in kWh)
consumption_col = "Electricity_Consumed"  # change if your dataset uses "Avg_Past_Consumption"

# Scale values: convert from kWh → Wh
df['consumption_wh'] = pd.to_numeric(df[consumption_col], errors='coerce') * 1000
values = df['consumption_wh'].dropna().values


Dataset Loaded: (5000, 7)
Columns: ['Timestamp', 'Electricity_Consumed', 'Temperature', 'Humidity', 'Wind_Speed', 'Avg_Past_Consumption', 'Anomaly_Label']
             Timestamp  Electricity_Consumed  Temperature  Humidity  \
0  2024-01-01 00:00:00              0.457786     0.469524  0.396368   
1  2024-01-01 00:30:00              0.351956     0.465545  0.451184   
2  2024-01-01 01:00:00              0.482948     0.285415  0.408289   
3  2024-01-01 01:30:00              0.628838     0.482095  0.512308   
4  2024-01-01 02:00:00              0.335974     0.624741  0.672021   

   Wind_Speed  Avg_Past_Consumption Anomaly_Label  
0    0.445441              0.692057        Normal  
1    0.458729              0.539874        Normal  
2    0.470360              0.614724        Normal  
3    0.576241              0.757044        Normal  
4    0.373004              0.673981        Normal  


In [None]:
# =======================================
# 2. Benford’s Law Functions
# =======================================

def benford_first_digit(data):
    data = data[data > 0]
    first_digits = [int(str(int(x))[0]) for x in data if int(x) > 0]
    counts = Counter(first_digits)
    total = sum(counts.values())
    observed = np.array([counts[d]/total if d in counts else 0 for d in range(1,10)])
    expected = np.array([np.log10(1+1/d) for d in range(1,10)])
    return observed, expected

def benford_second_digit(data):
    data = data[data > 9]  # at least 2 digits
    second_digits = [int(str(int(x))[1]) for x in data if len(str(int(x))) > 1]
    counts = pd.Series(second_digits).value_counts().reindex(range(0,10), fill_value=0)
    observed = counts / counts.sum()
    # Expected second digit distribution
    expected = []
    for d in range(0,10):
        prob = 0
        for k in range(1,1000):
            prob += np.log10(1 + 1/(10*k + d)) if (10*k + d)>0 else 0
        expected.append(prob)
    expected = np.array(expected)
    expected /= expected.sum()
    return observed, expected

def benford_two_digit(data):
    data = data[data > 9]
    two_digits = [int(str(int(x))[:2]) for x in data if int(x) >= 10]
    counts = pd.Series(two_digits).value_counts().sort_index()
    observed = counts / counts.sum()
    expected = pd.Series({d: np.log10(1 + 1/d) for d in range(10,100)})
    return observed, expected


In [None]:
# =======================================
# 3. Forensic Checks & Scoring
# =======================================

def compute_population_stats(df, value_col='consumption_wh'):
    vals = pd.to_numeric(df[value_col], errors='coerce').dropna()
    stats = {
        'mean': float(vals.mean()),
        'std': float(vals.std(ddof=0) if len(vals)>1 else 0.0),
        'q1': float(vals.quantile(0.25)),
        'q3': float(vals.quantile(0.75)),
        'iqr': float(vals.quantile(0.75) - vals.quantile(0.25)),
        'rolling_window': 24,
        'pop_rolling_mean': float(vals.rolling(24, min_periods=1).mean().iloc[-1]) if len(vals)>0 else None
    }
    return stats

# Weights and thresholds
ZSCORE_THRESH = 3.0
RSF_THRESH = 3.0
IQR_MULT = 1.5
WEIGHTS = {'z_flag':2,'iqr_flag':1.5,'rsf_flag':2,'dup_flag':2,'benford_weak':0.5}
MAX_POSSIBLE = sum(WEIGHTS.values())

def score_single_record(consumption, avg_past=None, population_stats=None, is_duplicate=False):
    if population_stats is None:
        population_stats = compute_population_stats(df)

    score = 0.0
    flags = {}

    # Z-score
    stdev = population_stats['std']
    mean = population_stats['mean']
    z = (consumption - mean)/stdev if stdev>0 else 0.0
    flags['zscore'] = z
    flags['z_flag'] = abs(z)>ZSCORE_THRESH
    if flags['z_flag']: score+=WEIGHTS['z_flag']

    # IQR
    lower = population_stats['q1'] - IQR_MULT*population_stats['iqr']
    upper = population_stats['q3'] + IQR_MULT*population_stats['iqr']
    flags['iqr_flag'] = (consumption<lower) or (consumption>upper)
    if flags['iqr_flag']: score+=WEIGHTS['iqr_flag']

    # RSF
    base = avg_past if avg_past and avg_past>0 else population_stats['pop_rolling_mean']
    rsf_ratio = consumption/base if base and base>0 else 0
    flags['rsf_ratio'] = rsf_ratio
    flags['rsf_flag'] = rsf_ratio>RSF_THRESH
    if flags['rsf_flag']: score+=WEIGHTS['rsf_flag']

    # Duplicate
    flags['dup_flag'] = is_duplicate
    if is_duplicate: score+=WEIGHTS['dup_flag']

    # Weak Benford
    try: first_digit = int(str(int(consumption))[0])
    except: first_digit = None
    flags['first_digit'] = first_digit
    flags['benford_weak'] = (first_digit!=1) if first_digit else False
    if flags['benford_weak']: score+=WEIGHTS['benford_weak']

    score_norm = (score/MAX_POSSIBLE)*10
    flags['raw_score']=score
    flags['norm_score']=score_norm
    flags['suspicious']=score_norm>=4
    return score_norm, score, flags


In [None]:
# =======================================
# 5. Upgraded Forensic Dashboard
# =======================================

pop_stats = compute_population_stats(df)

# Widgets
consumption_w = widgets.FloatSlider(min=0,max=5000,step=50,value=500,description="Consumption (Wh)")
avg_past_w = widgets.FloatSlider(min=0,max=5000,step=50,value=400,description="Avg Past (Wh)")
temp_w = widgets.FloatSlider(min=-10,max=50,step=1,value=25,description="Temp (°C)")
hum_w = widgets.FloatSlider(min=0,max=100,step=5,value=50,description="Humidity (%)")
wind_w = widgets.FloatSlider(min=0,max=50,step=1,value=5,description="Wind (m/s)")
dup_w = widgets.Checkbox(value=False,description="Duplicate?")
run_btn = widgets.Button(description="Run Forensic Check",button_style='info')
output = widgets.Output()
history=[]

# Mantissa Arc Test
def mantissa_arc(data):
    vals = data[data > 0]
    mantissas = np.mod(np.log10(vals), 1)
    angles = mantissas * 2 * np.pi
    R = np.sqrt((np.sum(np.cos(angles))**2 + np.sum(np.sin(angles))**2)) / len(angles)
    return R, angles

def run_check(b):
    with output:
        clear_output(wait=True)
        score_norm, raw_score, flags = score_single_record(consumption_w.value, avg_past_w.value, pop_stats, dup_w.value)

        # Environmental adjustment
        if temp_w.value>40 or temp_w.value<5:
            score_norm+=1; flags['env_flag']=True
        else: flags['env_flag']=False

        # Verdict
        if score_norm >= 6:
            verdict = "SUSPICIOUS (High Risk)"
            color = "red"
        elif score_norm >= 4:
            verdict = "SUSPICIOUS (Moderate Risk)"
            color = "orange"
        else:
            verdict = "LEGAL (Low Risk)"
            color = "green"

        # Save history
        history.append({"Consumption":consumption_w.value,"Avg_Past":avg_past_w.value,
                        "Temp":temp_w.value,"Humidity":hum_w.value,"Wind":wind_w.value,
                        "Verdict":verdict,"Score":round(score_norm,2)})
        if len(history)>5: history.pop(0)

        print("=== Forensic Dashboard Result ===")
        print("Verdict:", verdict)
        print("Score:", round(score_norm,2), "/10")

        # Explanation
        explanations=[]
        if flags['z_flag']: explanations.append("High Z-score deviation")
        if flags['iqr_flag']: explanations.append("Outside IQR bounds")
        if flags['rsf_flag']: explanations.append("Consumption spike (RSF)")
        if flags['benford_weak']: explanations.append("Benford’s law deviation")
        if flags['dup_flag']: explanations.append("Duplicate detected")
        if flags['env_flag']: explanations.append("Extreme environment")
        if not explanations: explanations.append("No strong anomalies")
        print("\nReasons:")
        for e in explanations: print("-",e)

        # Flags table
        display(pd.DataFrame([flags]))

        # Risk Gauge
        plt.figure(figsize=(6,1.2))
        plt.barh([0],[score_norm],color=color)
        plt.xlim(0,10); plt.yticks([]); plt.title("Risk Score (0-10)"); plt.show()

        # Consumption vs Avg
        plt.figure(figsize=(4,3))
        plt.bar(["Consumption","Avg Past"],[consumption_w.value,avg_past_w.value])
        plt.title("Consumption vs Avg Past"); plt.show()

        # Benford First Digit
        obs1,exp1 = benford_first_digit(values)
        plt.figure(figsize=(6,3))
        plt.bar(range(1,10),obs1,alpha=0.7,label="Observed")
        plt.plot(range(1,10),exp1,'ro-',label="Expected")
        plt.title("Benford First Digit Test"); plt.legend(); plt.show()

        # Benford Second Digit
        obs2,exp2 = benford_second_digit(values)
        plt.figure(figsize=(6,3))
        plt.bar(obs2.index,obs2.values,alpha=0.7,label="Observed")
        plt.plot(range(0,10),exp2,'ro-',label="Expected")
        plt.title("Benford Second Digit Test"); plt.legend(); plt.show()

        # Benford Two Digit
        obs3,exp3 = benford_two_digit(values)
        plt.figure(figsize=(6,3))
        plt.bar(obs3.index,obs3.values,alpha=0.7,label="Observed")
        plt.plot(exp3.index,exp3.values,'r-',label="Expected")
        plt.title("Benford Two-Digit Test"); plt.legend(); plt.show()

        # Mantissa Arc Test
        R, angles = mantissa_arc(values)
        plt.figure(figsize=(4,4))
        plt.scatter(np.cos(angles), np.sin(angles), alpha=0.2, s=5)
        circle = plt.Circle((0,0),1,color="r",fill=False,linestyle="--")
        plt.gca().add_artist(circle)
        plt.title(f"Mantissa Arc Test (R={R:.3f})")
        plt.axis("equal"); plt.show()

        # Correlation Analysis
        if all(col in df.columns for col in ["Temperature","Humidity","Wind_Speed"]):
            subset = df[['consumption_wh','Temperature','Humidity','Wind_Speed']].dropna()
            corr = subset.corr()
            print("\nCorrelation with Environment:")
            print(corr['consumption_wh'])
            plt.figure(figsize=(5,3))
            plt.scatter(subset['Temperature'],subset['consumption_wh'],alpha=0.3)
            plt.xlabel("Temperature (°C)"); plt.ylabel("Consumption (Wh)")
            plt.title("Consumption vs Temperature"); plt.show()

        # RSF Trend Visualization
        vals = df['consumption_wh'].dropna().values
        rolling_mean = pd.Series(vals).rolling(24,min_periods=1).mean()
        plt.figure(figsize=(6,3))
        plt.plot(vals[:200],label="Consumption")
        plt.plot(rolling_mean[:200],label="Rolling Mean")
        plt.legend(); plt.title("RSF Trend (first 200 samples)"); plt.show()

        # History
        print("\nLast 5 Checks:")
        display(pd.DataFrame(history))

        # Population stats
        print("\nPopulation Stats:", pop_stats)

run_btn.on_click(run_check)
display(widgets.VBox([consumption_w,avg_past_w,temp_w,hum_w,wind_w,dup_w,run_btn,output]))


VBox(children=(FloatSlider(value=500.0, description='Consumption (Wh)', max=5000.0, step=50.0), FloatSlider(va…