In [5]:
import re
from typing import List, Set, cast

import polars as pl

from utils import ROOTDIR

In [2]:
TRAIN_FILE_PATH = ROOTDIR / "data" / "train_data.csv"
TEST_FILE_PATH = ROOTDIR / "data" / "test_data.csv"

In [3]:
train_data = pl.scan_csv(TRAIN_FILE_PATH, truncate_ragged_lines=True).collect()
test_data = pl.scan_csv(TEST_FILE_PATH, truncate_ragged_lines=True).collect()

In [4]:
target_columns: List[str] = ["Attack_type"]

In [6]:
corr_matrix = (
    train_data.select(
        [
            column
            for column, dtype in zip(train_data.columns, train_data.dtypes)
            if dtype in [pl.Int32, pl.Int64, pl.Float32, pl.Float64]
        ]
    )
    .to_pandas()
    .corr()
    .abs()
)

df_train = train_data.drop(
    [
        column
        for column in train_data.columns
        if re.search(r"^id$|^id[._]|[._]id$", column, re.IGNORECASE)
    ]
)

df_test = test_data.drop(
    [
        column
        for column in test_data.columns
        if re.search(r"^id$|^id[._]|[._]id$", column, re.IGNORECASE)
    ]
)

highly_correlated: Set[str] = set()
for i in range(len(corr_matrix.columns)):
    for j in range(i + 1, len(corr_matrix.columns)):
        if cast(float, corr_matrix.iat[i, j]) > 0.95:
            colname = corr_matrix.columns[j]
            highly_correlated.add(colname)

df_train_raw = df_train.drop(list(highly_correlated)).to_pandas()
df_test_raw = df_test.drop(list(highly_correlated)).to_pandas()

---

In [7]:
from collections import Counter

from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline
from imblearn.under_sampling import RandomUnderSampler
from sklearn.preprocessing import LabelEncoder

In [8]:
X_raw = df_train_raw.drop(target_columns, axis=1)
y_raw = df_train_raw[target_columns].values.ravel()

In [9]:
le = LabelEncoder()

In [10]:
X_encoded = X_raw.copy()
for column in X_encoded.select_dtypes(include="object").columns:
    X_encoded[column] = le.fit_transform(X_encoded[column])

In [11]:
y_encoded = le.fit_transform(y_raw)

In [12]:
TARGET_N = int(df_train_raw[target_columns].value_counts().mean())

In [13]:
# type: ignore

counter = Counter(y_encoded)
major_class_id = max(counter, key=counter.get)

under_strategy = {major_class_id: TARGET_N}
under = RandomUnderSampler(sampling_strategy=under_strategy, random_state=42)
over = SMOTE(sampling_strategy='auto', k_neighbors=3, random_state=42)

In [14]:
pipeline = Pipeline(steps=[("under", under), ("over", over)])

In [15]:
X_resampled, y_resampled = pipeline.fit_resample(X_encoded, y_encoded)  # type: ignore

In [16]:
len(X_resampled), len(y_resampled)  # type: ignore

(92328, 92328)

---

In [18]:
import joblib

In [17]:
# type: ignore

X_train = X_resampled.copy()
y_train = y_resampled.copy()

X_test = df_test_raw.drop(target_columns, axis=1)

for column in X_test.select_dtypes(include="object").columns:
    X_test[column] = le.fit_transform(X_test[column])

y_test = le.fit_transform(df_test_raw[target_columns].values.ravel())

In [20]:
# type: ignore

BASE_EXPORT_PATH = ROOTDIR / "models" / "training"
BASE_EXPORT_PATH.mkdir(parents=True, exist_ok=True)

(
    joblib.dump(X_train, BASE_EXPORT_PATH / "X_train.joblib"),
    joblib.dump(X_test, BASE_EXPORT_PATH / "X_test.joblib"),
    joblib.dump(y_train, BASE_EXPORT_PATH / "y_train.joblib"),
    joblib.dump(y_test, BASE_EXPORT_PATH / "y_test.joblib"),
)

(['d:\\dev\\python\\rt-iot\\models\\training\\X_train.joblib'],
 ['d:\\dev\\python\\rt-iot\\models\\training\\X_test.joblib'],
 ['d:\\dev\\python\\rt-iot\\models\\training\\y_train.joblib'],
 ['d:\\dev\\python\\rt-iot\\models\\training\\y_test.joblib'])