Importy bibliotek

In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import tensorflow as tf
from tensorflow.keras import layers, models
import matplotlib.pyplot as plt

Wczytanie danych, przetwarzanie i inżynieria cech

In [None]:
FILE = 'data/stan_ustalony_gotowe.csv'

# Lista cech (bazowa)
SELECTED_FEATURES = [
    'Active Power 1 (Cycle) [W]',
    'Reactive Power 1 (Cycle) [VAr]',
    'Power Factor 1 (Cycle) (Load)',
    'RMS - non-Fundamental I1 (Cycle) [A]',
    'Voltage and Current - Harmonics Amplitude I1 Harmonic 3 (Cycle) [A]',
    'Voltage and Current - Harmonics Amplitude I1 Harmonic 5 (Cycle) [A]',
    'Active Power - per Harmonic 1 Harmonic 3 (Cycle) [W]',
]

# Lista wszystkich urządzeń/klas
DEVICE_COLUMNS = [
    'kettle', 'induction_cooker', 'phone_charger', 'microwave', 'mixer',
    'toaster', 'tv', 'spin_dryer', 'coffee_maker', 'immersion_heater',
    'sandwich_maker', 'decoder', 'lamp', 'aquarium', 'heater',
    'usb_c_charger', 'laptop', 'christmas_tree', 'timer', 'hair_straightener',
    'fridge', 'printer', 'bathroom_heater', 'Monitor'
]

# Wczytanie surowych danych
df = pd.read_csv(FILE, sep=';', decimal=',')
df = df.iloc[:, :120]
# Funkcja czyszcząca wartości Power Factor (usuwanie 'CAP'/'IND')
def clean_power_factor(value):
    if isinstance(value, str):
        value = value.replace(' CAP', '').replace(' IND', '').strip()
        value = value.replace(',', '.')
    return value

# Czyszczenie kolumn Power Factor
PF_COLS = [col for col in SELECTED_FEATURES if 'Power Factor' in col]
for col in PF_COLS:
    df[col] = df[col].apply(clean_power_factor)

# Konwersja cech na typ numeryczny
for col in SELECTED_FEATURES:
    df[col] = pd.to_numeric(df[col], errors='coerce')

# Weryfikacja liczby wierszy przed czyszczeniem
print(f"Liczba wierszy przed czyszczeniem: {len(df)}")

In [None]:
# Definicje nazw kolumn (niezbędne do obliczeń)
col_v = 'RMS V1N (Cycle) [V]'
col_i = 'RMS I1 (Cycle) [A]'
col_p = 'Active Power 1 (Cycle) [W]'
col_q = 'Reactive Power 1 (Cycle) [VAr]'

# Upewniamy się, że kolumny są numeryczne (dla bezpieczeństwa)
for col in [col_v, col_i, col_p, col_q]:
    df[col] = pd.to_numeric(df[col], errors='coerce')

# Obliczenie całkowitej mocy pozornej (S) - baza dla obu teorii
df['S_total_calc'] = df[col_v] * df[col_i]

# 1. Teoria Budeanu (Twoja dotychczasowa cecha)
# Wzór: D = sqrt(S^2 - P^2 - Q^2)
term_under_sqrt = (df['S_total_calc']**2) - (df[col_p]**2) - (df[col_q]**2)
df['Budeanu Distortion Power [VAr]'] = np.sqrt(np.maximum(term_under_sqrt, 0))

# 2. Teoria Fryzego


# A. Konduktancja Czynna (Gw)
# Wzór: Gw = P / U^2
df['Fryze_Conductance'] = df[col_p] / (df[col_v] ** 2)

# B. Prąd Czynny Fryzego (Iw)
# Wzór: Iw = Gw * U  (zastosowanie konduktancji do wzoru na prąd)
df['Fryze_Active_Current'] = df['Fryze_Conductance'] * df[col_v]

# C. Prąd Bierny Fryzego (Ib)
# Wzór: Ib = sqrt(I_calkowite^2 - Iw^2)

df['Fryze_Passive_Current'] = np.sqrt(np.maximum(df[col_i]**2 - df['Fryze_Active_Current']**2, 0))

# D. Moc Bierna Fryzego (Pb)
# Wzór: Pb = sqrt(S^2 - P^2)
df['Fryze_Passive_Power'] = np.sqrt(np.maximum(df['S_total_calc']**2 - df[col_p]**2, 0))

# 3. Aktualizacja listy cech wejściowych do modelu
NEW_FEATURES = [
    'Budeanu Distortion Power [VAr]',
    'Fryze_Conductance',
    'Fryze_Active_Current',
    'Fryze_Passive_Current',
    'Fryze_Passive_Power'
]

# Dodaanie nowych cech do listy wybranych cech
for feature in NEW_FEATURES:
    if feature not in SELECTED_FEATURES:
        SELECTED_FEATURES.append(feature)
        print(f"-> Dodano nową cechę: {feature}")

print(f"\nLiczba cech: {len(SELECTED_FEATURES)}")
print(SELECTED_FEATURES)

In [None]:
# 1. Zastępujemy wartości Inf/ -Inf NaN
df.replace([np.inf, -np.inf], np.nan, inplace=True)

# 2. Zastępujemy wartości NaN zerami w wybranych cechach
df[SELECTED_FEATURES] = df[SELECTED_FEATURES].fillna(0)

print(f"Wartości NaN/Inf zastąpiono zerami.")
print(f"Liczba wierszy: {len(df)}")

if len(df) == 0:
    raise ValueError("Zbiór danych jest pusty")

# Przygotowanie macierzy cech (X) i etykiet (y)
for col in DEVICE_COLUMNS:
    df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)

X = df[SELECTED_FEATURES].values
y = df[DEVICE_COLUMNS].values

# Podział na zbiory treningowe/testowe i standaryzacja
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

print("-" * 30)
print(f"Wymiary X_train: {X_train_scaled.shape}, wymiary y_train: {y_train.shape}")
print(f"Liczba cech: {len(SELECTED_FEATURES)}")
print(SELECTED_FEATURES)

Analiza rozkładu klas (urządzeń) w zbiorach treningowych i testowych

In [None]:
# Zliczanie aktywności urządzeń w zbiorach
train_counts = y_train.sum(axis=0)
test_counts = y_test.sum(axis=0)
total_counts = train_counts + test_counts

# Utworzenie ramki danych z podsumowaniem
distribution_df = pd.DataFrame({
    'Device': DEVICE_COLUMNS,
    'Train (1s)': train_counts,
    'Test (1s)': test_counts,
    'Total (1s)': total_counts,
    'Share [%]': (total_counts / len(df) * 100).round(2)
})

# Sortowanie malejąco wg liczby wystąpień
distribution_df.sort_values(by='Total (1s)', ascending=False, inplace=True)

print("Szczegółowy rozkład aktywności urządzeń:")
try:
    display(distribution_df)
except NameError:
    print(distribution_df)

# Weryfikacja występowania pustych klas
zero_classes = distribution_df[distribution_df['Total (1s)'] == 0]['Device'].tolist()
if zero_classes:
    print(f"\nBrak próbek pozytywnych dla: {zero_classes}")
else:
    print("\nWszystkie urządzenia posiadają reprezentację w danych.")

Random Forest

In [None]:
# Konfiguracja modelu
#rf_model = RandomForestClassifier(n_estimators=100, random_state=42, n_jobs=-1)

rf_model = RandomForestClassifier(
    n_estimators=320,        # większa liczba drzew
    max_depth=25,            # zmniejszona glebokosc
    min_samples_split=5,     # wezel musi miec min 5 probek do podzialu
    min_samples_leaf=2,      # lisc min 2 probki
    class_weight='balanced', # wieksze wagi dla mniejszościowych klas
    n_jobs=-1,               # Użycie wszystkich rdzeni CPU
    random_state=42
)

# Dopasowanie modelu do danych treningowych
print("Rozpoczynam trening Random Forest...")
rf_model.fit(X_train_scaled, y_train)
print("Trening zakończony.")

# Predykcja na zbiorze testowym
y_pred = rf_model.predict(X_test_scaled)

# Obliczenie dokładności (Exact Match Ratio)
acc = accuracy_score(y_test, y_pred)
print(f"\nDokładność całkowita (Exact Match): {acc:.2%}\n")

# Generowanie raportu klasyfikacji
print("Raport klasyfikacji:")
print(classification_report(y_test, y_pred, target_names=DEVICE_COLUMNS, zero_division=0))

Deep Neural Network (PyTorch)

In [None]:
# Sprawdzenie dostępności GPU
print(f"TensorFlow version: {tf.__version__}")
print(f"Dostępne urządzenia: {tf.config.list_physical_devices()}")

input_dim = X_train_scaled.shape[1]
output_dim = y_train.shape[1]  # Liczba urządzeń (klas)

model = models.Sequential([
    # Warstwa wejściowa -> Ukryta 1
    layers.Dense(64, activation='relu', input_shape=(input_dim,)),
    layers.Dropout(0.3),  # Zapobiega overfittingowi
    
    # Ukryta 1 -> Ukryta 2
    layers.Dense(32, activation='relu'),
    layers.Dropout(0.2),
    
    # Warstwa wyjściowa
    layers.Dense(output_dim)
])

# Wyświetlenie podsumowania architektury
model.summary()

model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
    # from_logits=True to odpowiednik BCEWithLogitsLoss
    loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),
    metrics=['accuracy']
)

EPOCHS = 60
BATCH_SIZE = 32

print("Rozpoczynam trening sieci neuronowej (TensorFlow)...")

history = model.fit(
    X_train_scaled, y_train,
    epochs=EPOCHS,
    batch_size=BATCH_SIZE,
    validation_split=0.2,
    verbose=1
)

# Wykres funkcji straty
plt.figure(figsize=(8, 5))
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Przebieg uczenia (Loss)')
plt.xlabel('Epoka')
plt.ylabel('Loss (BCE)')
plt.legend()
plt.grid(True)
plt.show()

# Predykcja zwraca "logits" (surowe wartości)
logits = model.predict(X_test_scaled)

# Konwersja logits -> prawdopodobieństwa (Sigmoid)
probs = tf.nn.sigmoid(logits).numpy()

# Binaryzacja wyników (Próg 0.5)
y_pred_numpy = (probs > 0.5).astype(int)

# Raport wyników
from sklearn.metrics import accuracy_score, classification_report
acc_dnn = accuracy_score(y_test, y_pred_numpy)

print(f"\nDokładność DNN (Exact Match): {acc_dnn:.2%}")
print("Szczegółowy raport klasyfikacji:")
print(classification_report(y_test, y_pred_numpy, target_names=DEVICE_COLUMNS, zero_division=0))