In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install PyWavelets

Collecting PyWavelets
  Downloading pywavelets-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Downloading pywavelets-1.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m25.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: PyWavelets
Successfully installed PyWavelets-1.8.0


In [3]:
pip install pandas numpy pywavelets scikit-learn plotly



In [None]:
# 📦 Importations
import pandas as pd
import numpy as np
import pywt
from sklearn.metrics import mean_squared_error
from scipy.stats import entropy
import plotly.graph_objs as go
import plotly.express as px
import pickle

# 📂 Charger le dataset depuis Google Drive
file_path = "/content/drive/MyDrive/P2M/sfmconnect.disaggregationP2M.csv"
df = pd.read_csv(file_path)

# 🧼 Nettoyage des noms d'équipements
for i in range(3):
    col = f"equipments[{i}].equipment.name"
    if col in df.columns:
        df[col] = df[col].astype(str).str.strip()

# 📌 Filtrer rétro-projecteur
df_projector = df[df["equipments[0].equipment.name"] == "retro-projecteur"]
if df_projector.empty:
    raise ValueError("❌ Aucun signal trouvé pour 'retro-projecteur'.")

# ⚡ Extraire le signal et les timestamps
signal = df_projector["equipments[0].consumption.Active_energy"].dropna().values
timestamps = pd.to_datetime(df_projector["date"], unit='ms')

# 🧠 Compression ondelettes + approximation Huffman
def compress_wavelet_huffman(signal, wavelet_name, keep_ratio=0.05):
    coeffs = pywt.wavedec(signal, wavelet=wavelet_name)
    coeff_arr, coeff_slices = pywt.coeffs_to_array(coeffs)

    threshold = np.quantile(np.abs(coeff_arr), 1 - keep_ratio)
    mask = (np.abs(coeff_arr) >= threshold)
    compressed_arr = coeff_arr * mask
    compressed_coeffs = pywt.array_to_coeffs(compressed_arr, coeff_slices, output_format='wavedec')

    reconstructed = pywt.waverec(compressed_coeffs, wavelet=wavelet_name)[:len(signal)]
    rmse = np.sqrt(mean_squared_error(signal, reconstructed))

    nonzero = compressed_arr[compressed_arr != 0]
    if len(nonzero) == 0:
        return None

    values, counts = np.unique(nonzero, return_counts=True)
    probas = counts / counts.sum()
    estimated_bits = entropy(probas, base=2) * len(nonzero)
    estimated_bytes = estimated_bits / 8

    return {
        "wavelet": wavelet_name,
        "reconstructed": reconstructed,
        "rmse": rmse,
        "original_bytes": signal.nbytes,
        "compressed_bytes": estimated_bytes,
        "compression_ratio": estimated_bytes / signal.nbytes,
        "coeff_arr": compressed_arr,
        "coeff_slices": coeff_slices
    }

# 🌊 Tester plusieurs ondelettes
wavelets = ['db8', 'sym8', 'coif5', 'bior3.5', 'haar']
results = []
for w in wavelets:
    res = compress_wavelet_huffman(signal, w)
    if res:
        results.append(res)

# 📊 Résumé sous forme de DataFrame
results_df = pd.DataFrame([
    {
        "Ondelette": r["wavelet"],
        "RMSE": round(r["rmse"], 6),
        "Taille originale (Ko)": round(r["original_bytes"] / 1024, 2),
        "Taille compressée (Ko)": round(r["compressed_bytes"] / 1024, 2),
        "Taux de compression (%)": round(100 * r["compression_ratio"], 2)
    }
    for r in results
]).sort_values("Taux de compression (%)")

display(results_df)

# 📈 Affichage : signal original + reconstruits
fig = go.Figure()
fig.add_trace(go.Scatter(
    x=timestamps,
    y=signal,
    mode='lines',
    name='Signal original',
    line=dict(color='black', width=2)
))

colors = px.colors.qualitative.Set1
for i, r in enumerate(results):
    fig.add_trace(go.Scatter(
        x=timestamps,
        y=r["reconstructed"],
        mode='lines',
        name=f"Reconstruit ({r['wavelet']})",
        line=dict(color=colors[i % len(colors)], dash='dash')
    ))

fig.update_layout(
    title="📉 Compression ondelettes + Huffman – rétro-projecteur",
    xaxis_title="Temps",
    yaxis_title="Énergie active (kWh)",
    template="plotly_white",
    legend=dict(x=0.01, y=0.99),
    height=600
)
fig.show()

# 💾 Sauvegarde pour transmission
best_wavelet = results_df.iloc[0]["Ondelette"]
best_result = next(r for r in results if r["wavelet"] == best_wavelet)

# Récupérer les coefficients compressés avec la meilleure ondelette
coeff_arr = best_result["coeff_arr"]
coeff_slices = best_result["coeff_slices"]

# Sauvegarde des données compressées
np.savez("/content/signal_compresse.npz",
         compressed_arr=coeff_arr,
         wavelet=best_wavelet,
         original_len=len(signal))

with open("/content/coeff_slices.pkl", "wb") as f:
    pickle.dump(coeff_slices, f)

print("✅ Données compressées sauvegardées !")


📡 1. Fichier .pkt
Signification : .pkt est généralement l'extension utilisée pour les fichiers de paquets réseau, surtout dans les simulateurs comme Cisco Packet Tracer.

Utilisation :

Utilisé pour simuler des réseaux (topologies, routeurs, switches, PC, etc.)

Contient des informations comme les configurations d'appareils, les connexions, les protocoles utilisés, etc.

Logiciel associé : Cisco Packet Tracer, un outil très populaire pour apprendre le réseautage (Cisco CCNA, etc.)

🔧 Exemple concret :

Tu peux avoir un fichier reseau_maison.pkt qui simule un petit réseau avec une box Internet, des switches, et plusieurs PCs connectés.

📦 2. Fichier .npz
Signification : .npz est un format utilisé par NumPy pour stocker plusieurs tableaux (numpy arrays) compressés.

Utilisation :

Sauvegarder plusieurs variables dans un seul fichier compressé.

Très pratique pour le Machine Learning ou la manipulation scientifique des données.



In [None]:
import numpy as np

# Enregistrer plusieurs arrays
np.savez('signal_compresse.npz', x=array1, y=array2)

# Charger
data = np.load('data.npz')
print(data['x'])  # récupère array1
