In [1]:
!pip install dask-ml




In [None]:
#  Imports
from dask import dataframe as dd
from dask.diagnostics import ProgressBar
from dask_ml.preprocessing import StandardScaler
import numpy as np

# Load the dataset
file_path = "/content/drive/MyDrive/Dataset/CICIoT2023.parquet"
print(" Loading dataset...")
df = dd.read_parquet(file_path)

📦 Loading dataset...


In [None]:
#  Initial info
with ProgressBar():
    total_rows = df.shape[0].compute()
print(f" Total rows before cleaning: {total_rows}")
print(f" Total columns: {len(df.columns)}")

[########################################] | 100% Completed | 102.40 ms
🔢 Total rows before cleaning: 46686579
🔢 Total columns: 47


In [None]:
#  Dropping duplicates
print("\n Removing duplicates...")
with ProgressBar():
    deduped = df.drop_duplicates().persist()
    deduped_rows = deduped.shape[0].compute()
    df = deduped
print(f" Duplicates removed: {total_rows - deduped_rows}")


🧹 Removing duplicates...
[########################################] | 100% Completed | 66.40 s
[########################################] | 100% Completed | 104.92 ms
✅ Duplicates removed: 34


In [None]:
#  Handling missing values
print("\n Checking and dropping missing values...")
with ProgressBar():
    missing_total = df.isnull().sum().compute().sum()
    clean_df = df.dropna().persist()
    clean_rows = clean_df.shape[0].compute()
    df = clean_df
print(f" Missing values removed: {missing_total}")
print(f" Rows after cleaning: {clean_rows} (removed {total_rows - clean_rows})")


🔍 Checking and dropping missing values...
[########################################] | 100% Completed | 730.98 ms
[########################################] | 100% Completed | 2.23 s
[########################################] | 100% Completed | 104.99 ms
✅ Missing values removed: 0
✅ Rows after cleaning: 46686545 (removed 34)


In [None]:
#  Separate features and label
print("\n Separating features and label...")
label_col = "label"
feature_cols = [col for col in df.columns if col != label_col]


🧾 Separating features and label...


In [None]:
# Variance filtering (Predefined threshold method)
print("\n Calculating feature variances...")
with ProgressBar():
    variances = df[feature_cols].var().compute()


📈 Calculating feature variances...
[########################################] | 100% Completed | 10.60 s


In [None]:
#  Select features above threshold
variance_threshold = 1e-5  # customizable
selected_features = variances[variances > variance_threshold].index.tolist()
removed_features = list(set(feature_cols) - set(selected_features))

print(f" Features retained: {len(selected_features)}")
print(f" Features removed: {len(removed_features)}")
if removed_features:
    print(f" Removed due to low variance: {removed_features}")

✅ Features retained: 40
❌ Features removed: 6
🗃️ Removed due to low variance: ['SMTP', 'IRC', 'DHCP', 'Telnet', 'ece_flag_number', 'cwr_flag_number']


In [None]:
#  Filter main df
df = df[selected_features + [label_col]]

In [None]:
#  Apply StandardScaler from Dask-ML
print("\n Scaling features using Dask-ML StandardScaler...")
scaler = StandardScaler()
with ProgressBar():
    df_scaled = scaler.fit_transform(df[selected_features])
    df_scaled[label_col] = df[label_col]


📏 Scaling features using Dask-ML StandardScaler...
[########################################] | 100% Completed | 6.52 s


In [None]:
#  Save preprocessed dataset
output_path = "/content/drive/MyDrive/Dataset/CICIoT2023_preprocessed.parquet"
print(f"\n Saving preprocessed dataset to: {output_path}")
with ProgressBar():
    df_scaled.to_parquet(output_path, write_index=False)

print("\n All preprocessing steps completed successfully!")


💾 Saving preprocessed dataset to: /content/drive/MyDrive/Dataset/CICIoT2023_preprocessed.parquet
[########################################] | 100% Completed | 25.55 s

✅ All preprocessing steps completed successfully!
