# üî¨ Power-CLIP vNext: Standard CNN Encoder

**Â∞çÁÖßÂØ¶È©ó**: Áî®Ê®ôÊ∫ñ CNN (CLIP-style) Âèñ‰ª£ HTF-CNN

| Â∞çÊØî | HTF-CNN | Standard CNN |
|------|---------|-------------|
| Branches | 3 (time+freq+stats) | 1 (signal only) |
| FFT | ‚úÖ | ‚ùå |
| Stats | ‚úÖ | ‚ùå |

**ÁâàÊú¨**: 2025-12-31 vNext-StandardCNN

In [None]:
# @title üìÇ Setup
from google.colab import drive, userdata
import os, json, torch
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import random

drive.mount('/content/drive')
BASE_PATH = '/content/drive/MyDrive/Reserach/REDD_Dataset'
os.chdir(BASE_PATH)

SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'üî• Device: {DEVICE}')

EPOCHS = 20
LR = 1e-3
BATCH_SIZE = 32
TEMPERATURE = 0.07

In [None]:
!pip install -q sentence-transformers

In [None]:
# @title üìã Attribute Ontology (20 prototypes)

ATTRIBUTES = {
    'PS1': {'tier': 'power_shape', 'name': 'constant_high', 'text': 'sustained high power consumption above 500W'},
    'PS2': {'tier': 'power_shape', 'name': 'constant_low', 'text': 'sustained low power consumption under 200W'},
    'PS3': {'tier': 'power_shape', 'name': 'cyclic', 'text': 'power cycles on and off periodically'},
    'PS4': {'tier': 'power_shape', 'name': 'burst', 'text': 'short bursts of high power'},
    'PS5': {'tier': 'power_shape', 'name': 'fluctuating', 'text': 'continuously fluctuating power levels'},
    'T1': {'tier': 'temporal', 'name': 'long_operation', 'text': 'operates continuously for 30+ minutes'},
    'T2': {'tier': 'temporal', 'name': 'short_usage', 'text': 'used for less than 10 minutes at a time'},
    'T3': {'tier': 'temporal', 'name': 'periodic_cycle', 'text': 'regular on-off cycles every 15-30 minutes'},
    'T4': {'tier': 'temporal', 'name': 'instantaneous', 'text': 'very brief operation under 5 minutes'},
    'L1': {'tier': 'load_type', 'name': 'resistive', 'text': 'purely resistive heating load with stable power'},
    'L2': {'tier': 'load_type', 'name': 'inductive_motor', 'text': 'inductive motor load with startup surge'},
    'L3': {'tier': 'load_type', 'name': 'switching', 'text': 'switching power supply with high frequency noise'},
    'L4': {'tier': 'load_type', 'name': 'heating_element', 'text': 'resistive heating element with thermal cycling'},
    'L5': {'tier': 'load_type', 'name': 'compressor', 'text': 'compressor motor with periodic start-stop'},
    'P1': {'tier': 'power_level', 'name': 'very_high', 'text': 'power consumption above 1500W'},
    'P2': {'tier': 'power_level', 'name': 'high', 'text': 'power consumption between 500W and 1500W'},
    'P3': {'tier': 'power_level', 'name': 'medium', 'text': 'power consumption between 100W and 500W'},
    'P4': {'tier': 'power_level', 'name': 'low', 'text': 'power consumption below 100W'},
    'S1': {'tier': 'stability', 'name': 'stable', 'text': 'very stable and consistent power draw'},
    'S2': {'tier': 'stability', 'name': 'variable', 'text': 'highly variable and unpredictable power'},
}

ATTR_IDS = list(ATTRIBUTES.keys())
ATTR_TEXTS = [ATTRIBUTES[k]['text'] for k in ATTR_IDS]

DEVICE_ATTRS = {
    'fridge': ['L5', 'T3', 'P4', 'PS3', 'S1'],
    'microwave': ['L1', 'T4', 'P1', 'PS4', 'S1'],
    'dish washer': ['L2', 'L4', 'T1', 'P2', 'PS5'],
    'washer dryer': ['L2', 'L4', 'T1', 'P1', 'PS5', 'S2'],
    'electric stove': ['L1', 'T2', 'P1', 'PS1', 'S1'],
    'electric space heater': ['L4', 'T1', 'P2', 'PS1', 'S1'],
}

ALL_DEVICES = list(DEVICE_ATTRS.keys())
SHORT = {'fridge': 'F', 'microwave': 'M', 'dish washer': 'D', 
         'washer dryer': 'W', 'electric stove': 'S', 'electric space heater': 'H'}

print(f'üìã Attributes: {len(ATTR_IDS)}')

In [None]:
# @title ‚ö° Standard CNN Encoder (CLIP-style)
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sentence_transformers import SentenceTransformer

class StandardCNNEncoder(nn.Module):
    """
    Standard 1D CNN - CLIP style (single branch, no fancy fusion)
    Pure signal-based encoding, no FFT or stats
    """
    def __init__(self, embed_dim=384):
        super().__init__()
        self.conv = nn.Sequential(
            # Block 1: [B, 1, 60] -> [B, 64, 15]
            nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.MaxPool1d(2),
            
            # Block 2: [B, 64, 15] -> [B, 128, 7]
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.MaxPool1d(2),
            
            # Block 3: [B, 128, 7] -> [B, 256, 1]
            nn.Conv1d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.AdaptiveAvgPool1d(1)
        )
        
        self.projection = nn.Sequential(
            nn.Linear(256, embed_dim),
            nn.LayerNorm(embed_dim)
        )
    
    def forward(self, signal):
        # Normalize signal
        mean = signal.mean(1, keepdim=True)
        std = signal.std(1, keepdim=True) + 1e-6
        x = (signal - mean) / std
        
        x = x.unsqueeze(1)      # [B, 1, 60]
        x = self.conv(x)         # [B, 256, 1]
        x = x.squeeze(-1)        # [B, 256]
        x = self.projection(x)   # [B, 384]
        return x

model = StandardCNNEncoder()
print(f'‚úÖ Standard CNN: {sum(p.numel() for p in model.parameters()):,} params')

In [None]:
# @title üìä Load All Data

all_samples = []
ws = 60
with open('combination_labels.jsonl') as f:
    lines = f.readlines()

for i in tqdm(range(0, len(lines)-ws, ws), desc='Loading'):
    chunk = [json.loads(lines[j]) for j in range(i, i+ws)]
    act = set(chunk[ws//2]['active_appliances']) & set(ALL_DEVICES)
    if not act: continue
    
    attr_set = set()
    for d in act:
        attr_set.update(DEVICE_ATTRS.get(d, []))
    if not attr_set: continue
    
    signal = np.array([c['aggregate_power'] for c in chunk], dtype=np.float32)
    attr_mask = np.zeros(len(ATTR_IDS), dtype=np.float32)
    for attr in attr_set:
        attr_mask[ATTR_IDS.index(attr)] = 1.0
    
    all_samples.append({
        'signal': signal,
        'attr_mask': attr_mask,
        'devices': list(act),
        'attrs': list(attr_set)
    })

print(f'üìä Total samples: {len(all_samples)}')

In [None]:
# @title üéØ LOO Attribute Dataset

class LOOAttrDataset(Dataset):
    def __init__(self, samples, left_out_device, is_train=True):
        if is_train:
            self.samples = [s for s in samples if left_out_device not in s['devices']]
        else:
            self.samples = [s for s in samples 
                           if left_out_device in s['devices'] and len(s['devices']) == 1]
        self.left_out = left_out_device
    
    def __len__(self): return len(self.samples)
    def __getitem__(self, i):
        s = self.samples[i]
        return (torch.tensor(s['signal']), torch.tensor(s['attr_mask']), 
                s['devices'], s['attrs'])

In [None]:
# @title üî• Train & Evaluate One LOO Fold

def run_loo_fold(left_out, all_samples, epochs=EPOCHS):
    print(f'\n{"="*60}')
    print(f'üéØ LOO StandardCNN: Leave out [{SHORT[left_out]}] = {left_out}')
    print(f'{"="*60}')
    
    train_ds = LOOAttrDataset(all_samples, left_out, is_train=True)
    test_ds = LOOAttrDataset(all_samples, left_out, is_train=False)
    
    print(f'Train: {len(train_ds)} | Test: {len(test_ds)}')
    
    if len(test_ds) == 0:
        return None
    
    model = StandardCNNEncoder().to(DEVICE)
    text_model = SentenceTransformer('all-MiniLM-L6-v2').to(DEVICE)
    text_model.eval()
    
    with torch.no_grad():
        attr_embs = text_model.encode(ATTR_TEXTS, convert_to_tensor=True, device=DEVICE)
        attr_embs = F.normalize(attr_embs, p=2, dim=1)
    
    optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01)
    
    def collate(batch):
        return (torch.stack([b[0] for b in batch]), 
                torch.stack([b[1] for b in batch]),
                [b[2] for b in batch], [b[3] for b in batch])
    
    loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, 
                        drop_last=True, collate_fn=collate)
    
    # Training
    for ep in range(epochs):
        model.train()
        total_loss = 0
        for signals, masks, _, _ in loader:
            signals, masks = signals.to(DEVICE), masks.to(DEVICE)
            pe = F.normalize(model(signals), p=2, dim=1)
            sim = (pe @ attr_embs.T) / TEMPERATURE
            exp_sim = torch.exp(sim)
            pos_sum = (exp_sim * masks).sum(dim=1)
            all_sum = exp_sim.sum(dim=1)
            loss = -torch.log(pos_sum / (all_sum + 1e-8)).mean()
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            total_loss += loss.item()
        if (ep+1) % 10 == 0:
            print(f'  Ep {ep+1}/{epochs} Loss: {total_loss/len(loader):.4f}')
    
    # Evaluation
    model.eval()
    device_ranks, attr_hits = [], []
    target_attrs = set(DEVICE_ATTRS[left_out])
    
    for i in range(len(test_ds)):
        signal, gt_mask, _, gt_attrs = test_ds[i]
        signal = signal.unsqueeze(0).to(DEVICE)
        
        with torch.no_grad():
            pe = F.normalize(model(signal), p=2, dim=1)
            sim = (pe @ attr_embs.T).squeeze().cpu().numpy()
        
        activations = 1 / (1 + np.exp(-sim * 5))
        
        device_scores = {}
        for dev, expected in DEVICE_ATTRS.items():
            indices = [ATTR_IDS.index(a) for a in expected]
            device_scores[dev] = np.mean([activations[j] for j in indices])
        
        sorted_devs = sorted(device_scores.items(), key=lambda x: -x[1])
        rank = [d[0] for d in sorted_devs].index(left_out) + 1
        device_ranks.append(rank)
        
        top_k_attrs = set([ATTR_IDS[j] for j in np.argsort(-activations)[:len(target_attrs)]])
        hit = len(top_k_attrs & target_attrs) / len(target_attrs)
        attr_hits.append(hit)
    
    avg_rank = np.mean(device_ranks)
    top1 = np.mean([r == 1 for r in device_ranks]) * 100
    top3 = np.mean([r <= 3 for r in device_ranks]) * 100
    attr_hit_rate = np.mean(attr_hits) * 100
    
    print(f'üìä {SHORT[left_out]}: Rank={avg_rank:.2f}, Top-1={top1:.1f}%, Top-3={top3:.1f}%, AttrHit={attr_hit_rate:.1f}%')
    
    return {
        'device': left_out,
        'short': SHORT[left_out],
        'n_train': len(train_ds),
        'n_test': len(test_ds),
        'avg_rank': float(avg_rank),
        'top1': float(top1),
        'top3': float(top3),
        'attr_hit_rate': float(attr_hit_rate)
    }

In [None]:
# @title üöÄ Run All 6 LOO Folds

results = []
for device in ALL_DEVICES:
    result = run_loo_fold(device, all_samples, epochs=EPOCHS)
    if result:
        results.append(result)

print('\n' + '=' * 60)
print('üìä ALL LOO FOLDS COMPLETE (Standard CNN)')
print('=' * 60)

In [None]:
# @title üìä Summary & Comparison

print('\n### Standard CNN vs HTF-CNN Comparison\n')
print('| Device | Avg Rank | Top-1 | Top-3 | Attr Hit |')
print('|--------|----------|-------|-------|----------|')

for r in results:
    better = '‚úÖ' if r['avg_rank'] < 3.5 else '‚ùå'
    print(f"| {r['short']} | {r['avg_rank']:.2f} {better} | {r['top1']:.1f}% | {r['top3']:.1f}% | {r['attr_hit_rate']:.1f}% |")

avg_rank = np.mean([r['avg_rank'] for r in results])
avg_top1 = np.mean([r['top1'] for r in results])
avg_top3 = np.mean([r['top3'] for r in results])
avg_attr = np.mean([r['attr_hit_rate'] for r in results])

print(f'| **Avg** | **{avg_rank:.2f}** | **{avg_top1:.1f}%** | **{avg_top3:.1f}%** | **{avg_attr:.1f}%** |')
print(f'| Random | 3.5 | 16.7% | 50% | - |')

print(f'\nüìã HTF-CNN Results (for comparison):')
print(f'   Avg Rank: 4.28, Top-1: 7.7%, Top-3: 17.2%, Attr Hit: 27.8%')

In [None]:
# @title üìà Visualization
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 3, figsize=(15, 4))

devices = [r['short'] for r in results]
ranks = [r['avg_rank'] for r in results]
top3s = [r['top3'] for r in results]
hits = [r['attr_hit_rate'] for r in results]

colors = ['green' if r < 3.5 else 'red' for r in ranks]
axes[0].bar(devices, ranks, color=colors, alpha=0.7)
axes[0].axhline(y=3.5, color='black', linestyle='--', label='Random')
axes[0].set_ylabel('Avg Rank (‚Üì)')
axes[0].set_title('Standard CNN: Device Rank')
axes[0].legend()

colors = ['green' if t > 50 else 'red' for t in top3s]
axes[1].bar(devices, top3s, color=colors, alpha=0.7)
axes[1].axhline(y=50, color='black', linestyle='--', label='Random')
axes[1].set_ylabel('Top-3 (%)')
axes[1].set_title('Standard CNN: Top-3 Recall')
axes[1].legend()

axes[2].bar(devices, hits, color='blue', alpha=0.7)
axes[2].set_ylabel('Attr Hit Rate (%)')
axes[2].set_title('Standard CNN: Attribute Retrieval')

plt.tight_layout()
plt.savefig('stdcnn_loo_results.png', dpi=150)
plt.show()

In [None]:
# @title üíæ Save Results

final_results = {
    'experiment': 'Standard CNN Attribute-Level LOO Zero-Shot',
    'encoder': 'StandardCNN',
    'epochs': EPOCHS,
    'n_attributes': len(ATTR_IDS),
    'folds': results,
    'average': {
        'rank': float(avg_rank),
        'top1': float(avg_top1),
        'top3': float(avg_top3),
        'attr_hit_rate': float(avg_attr)
    },
    'vs_random': {
        'rank_better': bool(avg_rank < 3.5),
        'top3_better': bool(avg_top3 > 50)
    },
    'comparison': {
        'htf_cnn_rank': 4.28,
        'htf_cnn_top1': 7.7,
        'htf_cnn_top3': 17.2
    }
}

with open('stdcnn_loo_zeroshot_results.json', 'w') as f:
    json.dump(final_results, f, indent=2)

from google.colab import files
for fn in ['stdcnn_loo_zeroshot_results.json', 'stdcnn_loo_results.png']:
    if os.path.exists(fn):
        print(f'‚¨áÔ∏è {fn}')
        try: files.download(fn)
        except: pass