<a href="https://colab.research.google.com/github/Meldoner/hostfiles/blob/main/4_lab_agent.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# HTTP Malware Detection — Colab-Friendly Plan

This notebook builds an end-to-end baseline for the TCP-flow classification challenge while respecting Google Colab's 12 GB RAM limit and the dataset sizes (≈8.5 M train / 4 M test rows). The strategy:

1. **Memory-aware loading** – enforce compact dtypes (int16) and optionally limit rows via `DEBUG_ROWS` for quick experiments.
2. **Packet merging** – merge consecutive packets with the same direction when `|length| > 1200`, using NumPy/Numba for vectorized speed.
3. **Feature stack** – keep the merged 30-length sequences and add lightweight statistics (abs sums, counts, ratios, etc.).
4. **Modeling** – LightGBM multiclass with class-balancing, stratified validation slice, and early stopping.
5. **Inference** – reuse the exact preprocessing to score the 4 M-row test set and write a submission file.

All heavy steps are wrapped into reusable functions so the workflow can be executed end-to-end or step-by-step in Colab.

In [1]:
!gdown 1OjobuA8I-F6fe-8sxNcwrwLovqRQoqsn

Downloading...
From (original): https://drive.google.com/uc?id=1OjobuA8I-F6fe-8sxNcwrwLovqRQoqsn
From (redirected): https://drive.google.com/uc?id=1OjobuA8I-F6fe-8sxNcwrwLovqRQoqsn&confirm=t&uuid=e16dfdf3-249f-4977-8e2b-320416308570
To: /content/whos-talking-classify-the-app-by-its-packets.zip
100% 305M/305M [00:05<00:00, 59.0MB/s]


In [2]:
!unzip whos-talking-classify-the-app-by-its-packets.zip

Archive:  whos-talking-classify-the-app-by-its-packets.zip
  inflating: sample_submission.csv   
  inflating: test.csv                
  inflating: train.csv               


In [3]:
from __future__ import annotations

import gc
import os
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import f1_score

import lightgbm as lgb

try:
    from numba import njit, prange
    NUMBA_AVAILABLE = True
except ImportError:  # pragma: no cover
    NUMBA_AVAILABLE = False

np.random.seed(42)

In [6]:
DATA_DIR = Path('.')  # update to Path('/content/drive/MyDrive/...') in Colab if files live on Drive
TRAIN_CSV = DATA_DIR / 'train.csv'
TEST_CSV = DATA_DIR / 'test.csv'
SAMPLE_SUB = DATA_DIR / 'sample_submission.csv'

N_PACKETS = 30
MERGE_THRESHOLD = 1200
#DEBUG_ROWS = int(os.environ.get('DEBUG_ROWS', '0'))  # set non-zero in Colab for quick dry-runs
DEBUG_ROWS = 500000  # set non-zero in Colab for quick dry-runs

In [21]:
def build_feature_columns(n_packets: int = N_PACKETS) -> List[str]:
    return [f'tcp_len_{idx + 1}' for idx in range(n_packets)]

def load_dataframe(
    path: Path,
    n_rows: Optional[int] = None,
    with_target: bool = False,
    extra_columns: Optional[List[str]] = None,
) -> pd.DataFrame:
    feature_cols = build_feature_columns()
    dtype_map = {col: np.int16 for col in feature_cols}
    usecols = feature_cols.copy()
    if with_target:
        usecols = ['app_service'] + usecols
    if extra_columns:
        usecols = extra_columns + usecols
    dtypes = dtype_map if not with_target else {**dtype_map, 'app_service': 'category'}
    df = pd.read_csv(path, usecols=usecols, dtype=dtypes, nrows=n_rows)
    return df


def mem_in_gb(df: pd.DataFrame) -> float:
    return df.memory_usage(deep=True).sum() / 1024 ** 3


def report_mem(df: pd.DataFrame, label: str) -> None:
    print(f"{label} memory footprint: {mem_in_gb(df):.2f} GB")

In [15]:
def _merge_packets_python(data: np.ndarray, threshold: int) -> np.ndarray:
    n_rows, n_cols = data.shape
    merged = np.zeros_like(data)
    for i in range(n_rows):
        write_idx = 0
        merging = False
        running_sum = 0
        running_sign = 0
        for j in range(n_cols):
            val = int(data[i, j])
            if val == 0:
                if merging and write_idx < n_cols:
                    merged[i, write_idx] = running_sum
                    write_idx += 1
                break
            abs_val = abs(val)
            sign = 1 if val > 0 else -1
            if abs_val > threshold:
                if merging and sign == running_sign:
                    running_sum += val
                else:
                    if merging and write_idx < n_cols:
                        merged[i, write_idx] = running_sum
                        write_idx += 1
                    merging = True
                    running_sum = val
                    running_sign = sign
            else:
                if merging and write_idx < n_cols:
                    merged[i, write_idx] = running_sum
                    write_idx += 1
                    merging = False
                    running_sum = 0
                    running_sign = 0
                if write_idx < n_cols:
                    merged[i, write_idx] = val
                    write_idx += 1
        if merging and write_idx < n_cols:
            merged[i, write_idx] = running_sum
    return merged


if NUMBA_AVAILABLE:
    @njit(parallel=True)
    def _merge_packets_numba(data: np.ndarray, threshold: int) -> np.ndarray:  # pragma: no cover
        n_rows, n_cols = data.shape
        merged = np.zeros_like(data)
        for i in prange(n_rows):
            write_idx = 0
            merging = False
            running_sum = 0
            running_sign = 0
            for j in range(n_cols):
                val = int(data[i, j])
                if val == 0:
                    if merging and write_idx < n_cols:
                        merged[i, write_idx] = running_sum
                        write_idx += 1
                    break
                abs_val = abs(val)
                sign = 1 if val > 0 else -1
                if abs_val > threshold:
                    if merging and sign == running_sign:
                        running_sum += val
                    else:
                        if merging and write_idx < n_cols:
                            merged[i, write_idx] = running_sum
                            write_idx += 1
                        merging = True
                        running_sum = val
                        running_sign = sign
                else:
                    if merging and write_idx < n_cols:
                        merged[i, write_idx] = running_sum
                        write_idx += 1
                        merging = False
                        running_sum = 0
                        running_sign = 0
                    if write_idx < n_cols:
                        merged[i, write_idx] = val
                        write_idx += 1
            if merging and write_idx < n_cols:
                merged[i, write_idx] = running_sum
        return merged
else:
    _merge_packets_numba = None


def merge_packets(data: np.ndarray, threshold: int = MERGE_THRESHOLD) -> np.ndarray:
    func = _merge_packets_numba if NUMBA_AVAILABLE and _merge_packets_numba is not None else _merge_packets_python
    return func(data, threshold)

In [16]:
def engineer_sequence_features(seq: np.ndarray) -> Tuple[np.ndarray, List[str]]:
    seq = seq.astype(np.float32)
    abs_seq = np.abs(seq)

    non_zero = (seq != 0).sum(axis=1)
    non_zero_safe = np.maximum(non_zero, 1)
    pos_mask = seq > 0
    neg_mask = seq < 0

    features = {
        'non_zero_packets': non_zero,
        'pos_packets': pos_mask.sum(axis=1),
        'neg_packets': neg_mask.sum(axis=1),
        'pos_ratio': pos_mask.sum(axis=1) / non_zero_safe,
        'neg_ratio': neg_mask.sum(axis=1) / non_zero_safe,
        'abs_sum': abs_seq.sum(axis=1),
        'abs_mean': abs_seq.sum(axis=1) / non_zero_safe,
        'abs_max': abs_seq.max(axis=1),
        'seq_sum': seq.sum(axis=1),
        'seq_mean': seq.sum(axis=1) / non_zero_safe,
        'seq_std': seq.std(axis=1),
        'pos_sum': np.clip(seq, 0, None).sum(axis=1),
        'neg_sum': np.clip(seq, None, 0).sum(axis=1),
        'energy': (seq ** 2).sum(axis=1),
    }

    # first and last non-zero packets (if any)
    mask_non_zero = seq != 0
    first_idx = mask_non_zero.argmax(axis=1)
    first_vals = seq[np.arange(seq.shape[0]), first_idx]
    first_vals[non_zero == 0] = 0

    reversed_mask = mask_non_zero[:, ::-1]
    last_idx = reversed_mask.argmax(axis=1)
    last_vals = seq[np.arange(seq.shape[0]), seq.shape[1] - 1 - last_idx]
    last_vals[non_zero == 0] = 0

    features.update({'first_packet': first_vals, 'last_packet': last_vals, 'zero_padding_ratio': 1.0 - non_zero / seq.shape[1]})

    feat_matrix = np.vstack([values for values in features.values()]).T.astype(np.float32)
    feat_names = list(features.keys())
    return feat_matrix, feat_names

In [17]:
def build_design_matrix(df: pd.DataFrame) -> Tuple[np.ndarray, List[str]]:
    seq_cols = build_feature_columns()
    seq_array = df[seq_cols].to_numpy(dtype=np.int16, copy=False)
    merged_seq = merge_packets(seq_array)
    merged_seq = merged_seq.astype(np.float32)

    seq_feature_names = [f'merged_{col}' for col in seq_cols]
    seq_features = merged_seq

    stat_features, stat_names = engineer_sequence_features(merged_seq)
    design_matrix = np.hstack([seq_features, stat_features]).astype(np.float32)
    feature_names = seq_feature_names + stat_names
    return design_matrix, feature_names

In [18]:
def make_class_weights(labels: np.ndarray) -> Dict[int, float]:
    classes = np.unique(labels)
    weights = compute_class_weight(class_weight='balanced', classes=classes, y=labels)
    return {cls: weight for cls, weight in zip(classes, weights)}


def build_sample_weight(labels: np.ndarray, weight_map: Dict[int, float]) -> np.ndarray:
    return np.array([weight_map[label] for label in labels], dtype=np.float32)


def train_lightgbm(
    X_train: np.ndarray,
    y_train: np.ndarray,
    X_valid: np.ndarray,
    y_valid: np.ndarray,
    feature_names: List[str],
    sample_weight_train: Optional[np.ndarray] = None,
    sample_weight_valid: Optional[np.ndarray] = None,
    num_classes: int = 1,
    num_boost_round: int = 5000,
    early_stopping_rounds: int = 100,
) -> lgb.Booster:
    params = {
        'objective': 'multiclass',
        'num_class': num_classes,
        'learning_rate': 0.05,
        'num_leaves': 96,
        'min_data_in_leaf': 256,
        'feature_fraction': 0.8,
        'bagging_fraction': 0.8,
        'bagging_freq': 2,
        'max_depth': -1,
        'metric': ['multi_logloss'],
        'verbosity': -1,
        'force_row_wise': True,
    }

    train_set = lgb.Dataset(X_train, label=y_train, weight=sample_weight_train, feature_name=feature_names)
    valid_set = lgb.Dataset(X_valid, label=y_valid, weight=sample_weight_valid, feature_name=feature_names)

    model = lgb.train(
        params,
        train_set,
        valid_sets=[train_set, valid_set],
        valid_names=['train', 'valid'],
        num_boost_round=num_boost_round,
        early_stopping_rounds=early_stopping_rounds,
        verbose_eval=100,
    )
    return model

In [19]:
def predict_labels(model: lgb.Booster, X: np.ndarray, label_encoder: LabelEncoder) -> np.ndarray:
    proba = model.predict(X, num_iteration=model.best_iteration or model.current_iteration())
    if isinstance(proba, list):
        proba = np.vstack(proba).T
    preds_idx = np.argmax(proba, axis=1)
    return label_encoder.inverse_transform(preds_idx)

In [22]:
%%time
train_df = load_dataframe(TRAIN_CSV, n_rows=DEBUG_ROWS or None, with_target=True)
report_mem(train_df, 'Train raw')
train_df.head()

Train raw memory footprint: 0.03 GB
CPU times: user 3.75 s, sys: 490 ms, total: 4.24 s
Wall time: 5.03 s


Unnamed: 0,app_service,tcp_len_1,tcp_len_2,tcp_len_3,tcp_len_4,tcp_len_5,tcp_len_6,tcp_len_7,tcp_len_8,tcp_len_9,...,tcp_len_21,tcp_len_22,tcp_len_23,tcp_len_24,tcp_len_25,tcp_len_26,tcp_len_27,tcp_len_28,tcp_len_29,tcp_len_30
0,1,1448,379,-1448,-1448,-1202,-1448,-1192,80,92,...,0,0,0,0,0,0,0,0,0,0
1,1,1448,313,-1448,-1410,-522,64,92,298,-250,...,0,0,0,0,0,0,0,0,0,0
2,1,1448,379,-1448,-1448,-2,-256,64,1172,-574,...,-1448,-1448,-1448,-1448,-1448,-1448,-1448,-1448,-1448,-638
3,1,1448,315,-1448,-1448,-2,-1448,-27,64,92,...,0,0,0,0,0,0,0,0,0,0
4,1,1448,367,-1448,-1448,-477,80,92,637,-303,...,0,0,0,0,0,0,0,0,0,0


In [23]:
%%time
X_all, feature_names = build_design_matrix(train_df.drop(columns=['app_service']))
label_encoder = LabelEncoder()
y_all = label_encoder.fit_transform(train_df['app_service'])
weight_map = make_class_weights(y_all)
sample_weight_all = build_sample_weight(y_all, weight_map)
print(f"Feature matrix shape: {X_all.shape}")
print(f"Unique classes: {len(label_encoder.classes_)}")
del train_df
gc.collect();

Feature matrix shape: (500000, 47)
Unique classes: 9
CPU times: user 2.89 s, sys: 296 ms, total: 3.19 s
Wall time: 3.36 s


13812

In [24]:
VAL_SIZE = 0.05

X_train, X_valid, y_train, y_valid, w_train, w_valid = train_test_split(
    X_all,
    y_all,
    sample_weight_all,
    test_size=VAL_SIZE,
    random_state=42,
    stratify=y_all,
)
print(f"Train split: {X_train.shape}, Valid split: {X_valid.shape}")

Train split: (475000, 47), Valid split: (25000, 47)


In [25]:
%%time
model = train_lightgbm(
    X_train=X_train,
    y_train=y_train,
    X_valid=X_valid,
    y_valid=y_valid,
    feature_names=feature_names,
    sample_weight_train=w_train,
    sample_weight_valid=w_valid,
    num_classes=len(label_encoder.classes_),
)

valid_pred = predict_labels(model, X_valid, label_encoder)
valid_f1 = f1_score(label_encoder.inverse_transform(y_valid), valid_pred, average='macro')
print(f"Validation macro F1: {valid_f1:.4f}")

TypeError: train() got an unexpected keyword argument 'early_stopping_rounds'

In [None]:
%%time
best_iteration = model.best_iteration or model.current_iteration()
print(f"Training final model on all data for {best_iteration} boosting rounds")
train_set_full = lgb.Dataset(X_all, label=y_all, weight=sample_weight_all, feature_name=feature_names)
final_model = lgb.train(
    {**model.params, 'objective': 'multiclass', 'num_class': len(label_encoder.classes_)},
    train_set_full,
    num_boost_round=best_iteration,
    verbose_eval=100,
)

del X_train, X_valid, y_train, y_valid, w_train, w_valid
gc.collect();

In [None]:
%%time
test_df = load_dataframe(TEST_CSV, n_rows=DEBUG_ROWS or None, extra_columns=['id'])
report_mem(test_df, 'Test raw')
X_test, _ = build_design_matrix(test_df.drop(columns=['id']))
test_predictions = predict_labels(final_model, X_test, label_encoder)
submission_df = pd.DataFrame({'id': test_df['id'], 'app_service': test_predictions})
submission_path = DATA_DIR / 'submission.csv'
submission_df.to_csv(submission_path, index=False)
print(f"Submission saved to {submission_path} with shape {submission_df.shape}")