In [1]:
import os
import uuid
from collections import Counter

import matplotlib.pyplot as plt
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col,
    count,
    isnull,
    ntile,
    rand,
    when,
)
from pyspark.sql.window import Window
from pyspark.sql.types import (
    ArrayType,
    DoubleType,
    FloatType,
    StructField,
    StructType,
)

from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT

In [8]:
print("Initializing Spark session...")

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TestSpark") \
    .config("spark.submit.deployMode", "client") \
    .config("spark.excludeOnFailure.enabled", "true") \
    .master("yarn") \
    .getOrCreate()
    
spark.sparkContext.setLogLevel("ERROR")

Initializing Spark session...


In [3]:
# Đọc dữ liệu gốc và lấy mẫu nhỏ
df = spark.read.option("header", "true").csv("hdfs://master:9000/data/NF-UQ-NIDS-v2.csv")
# df = df.sample(fraction=0.50, seed=42)  #Điều chỉnh nếu muốn đọc một phần nhỏ để TEST

total_rows = df.count()
print(f"Tổng số dòng (mẫu): {total_rows}")

# Điều chỉnh partition
# if total_rows < 1000000:  # Rất nhỏ (<1M)
#     df = df.repartition(20)  # Giảm partition để giảm I/O
#     spark.conf.set("spark.sql.adaptive.enabled", "false")
# elif total_rows < 10000000:  # Nhỏ (<10M)
#     df = df.repartition(50)
#     spark.conf.set("spark.sql.adaptive.enabled", "false")
# else:
#     df = df.repartition(100)



Tổng số dòng (mẫu): 75987976


                                                                                

![Screenshot 2025-06-01 143019.png](attachment:ca3037cf-e56c-4be9-85d6-24b134d5548f.png)

In [4]:
def preprocess_nids_data_with_weights(df, spark, save_path="saves"):
    print("Bắt đầu tiền xử lý dữ liệu...")
    total_rows_initial = df.count() # Lưu tổng số dòng ban đầu
    print(f"Đã phân vùng dữ liệu. Số dòng: {total_rows_initial}")

    # 1. Loại bỏ cột
    print("Bước 1: Loại bỏ các cột 'Label' và 'Dataset'...")
    cols_to_drop_initial = ['Label', 'Dataset']
    df = df.drop(*[col_name for col_name in cols_to_drop_initial if col_name in df.columns])
    print(f"Đã loại bỏ các cột: {[col_name for col_name in cols_to_drop_initial if col_name in df.columns]}")

    # Lấy danh sách các nhãn cần giữ lại (trước khi xử lý nhãn chính thức)
    # Lưu ý: 'Attack' phải tồn tại ở đây
    if 'Attack' in df.columns:
        label_counts_before_filter = df.groupBy("Attack").count()
        # Loại bỏ các Attack có số lượng quá thấp (nhỏ hơn 5000)
        valid_labels = label_counts_before_filter.filter(col("count") >= 5000).select("Attack").rdd.flatMap(lambda x: x).collect()
        df = df.filter(col("Attack").isin(valid_labels))
        print(f"Đã lọc các Attack có số lượng < 5000. Số Attack còn lại: {len(valid_labels)}")
    else:
        print("Cột 'Attack' không tồn tại, không thể lọc theo số lượng.")


    # 2. Chuyển đổi kiểu dữ liệu
    print("Bước 2: Chuyển đổi kiểu dữ liệu các cột số...")
    numeric_cols = [
        "L4_SRC_PORT", "L4_DST_PORT", "IN_BYTES", "IN_PKTS", "OUT_BYTES", "OUT_PKTS",
        "FLOW_DURATION_MILLISECONDS", "DURATION_IN", "DURATION_OUT", "MIN_TTL", "MAX_TTL",
        "LONGEST_FLOW_PKT", "SHORTEST_FLOW_PKT", "MIN_IP_PKT_LEN", "MAX_IP_PKT_LEN",
        "SRC_TO_DST_SECOND_BYTES", "DST_TO_SRC_SECOND_BYTES", "RETRANSMITTED_IN_BYTES",
        "RETRANSMITTED_IN_PKTS", "RETRANSMITTED_OUT_BYTES", "RETRANSMITTED_OUT_PKTS",
        "SRC_TO_DST_AVG_THROUGHPUT", "DST_TO_SRC_AVG_THROUGHPUT", "NUM_PKTS_UP_TO_128_BYTES",
        "NUM_PKTS_128_TO_256_BYTES", "NUM_PKTS_256_TO_512_BYTES", "NUM_PKTS_512_TO_1024_BYTES",
        "NUM_PKTS_1024_TO_1514_BYTES", "TCP_WIN_MAX_IN", "TCP_WIN_MAX_OUT", "DNS_QUERY_ID",
        "DNS_TTL_ANSWER", "FTP_COMMAND_RET_CODE",
        "ICMP_TYPE", "ICMP_IPV4_TYPE"
    ]
    for col_name in numeric_cols:
        if col_name in df.columns:
            df = df.withColumn(col_name, col(col_name).cast(DoubleType()))
    print(f"Đã chuyển đổi các cột số: {[col for col in numeric_cols if col in df.columns]}")

    # 3. Chuyển đổi cột chuỗi (không phải Attack)
    print("Bước 3: Chuyển đổi các cột chuỗi thành số...")
    string_cols = [c[0] for c in df.dtypes if c[1] == 'string']
    string_cols_to_index = [col_name for col_name in string_cols if col_name not in ['Attack']]
    indexer_models = []
    for column in string_cols_to_index:
        indexer = StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="skip") # Thêm handleInvalid
        indexer_model = indexer.fit(df)
        df = indexer_model.transform(df).drop(indexer.getInputCol())
        indexer_model.write().overwrite().save(os.path.join(save_path, f"string_indexer_{column}"))
        indexer_models.append(indexer_model)
    print(f"Đã lưu mô hình StringIndexer cho các cột: {string_cols_to_index}")

    # 4. Chuẩn hóa đặc trưng
    print("Bước 4: Chọn và chuẩn hóa các đặc trưng số...")
    # Lấy lại danh sách các cột feature sau khi StringIndexer
    feature_cols = [c[0] for c in df.dtypes if c[1] in ('int', 'double', 'float') and c[0] not in ['Attack']]
    # Thêm các cột đã được StringIndexer nếu chúng là số nguyên
    feature_cols.extend([c[0] for c in df.dtypes if c[1] == 'double' and c[0].endswith('_index') and c[0] not in feature_cols])
    # Đảm bảo Attack không có trong feature_cols
    feature_cols = [f for f in feature_cols if f != 'Attack']

    print(f"Các đặc trưng số được chọn để chuẩn hóa: {feature_cols}")

    assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
    df = assembler.transform(df)

    scaler = StandardScaler(inputCol="raw_features", outputCol="scaled_features")
    scaler_model = scaler.fit(df)
    df = scaler_model.transform(df).drop("raw_features")
    scaler_model.write().overwrite().save(os.path.join(save_path, "standard_scaler"))
    print(f"Đã lưu mô hình StandardScaler")

    # 5. Xử lý nhãn và tính toán trọng số lớp
    print("Bước 5: Xử lý cột nhãn và tính trọng số lớp...")
    label_indexer_model = None
    num_classes = None
    if 'Attack' in df.columns:
        # A. Mã hóa nhãn
        label_indexer = StringIndexer(inputCol="Attack", outputCol="label", handleInvalid="skip") # Thêm handleInvalid
        label_indexer_model = label_indexer.fit(df)
        df = label_indexer_model.transform(df).drop("Attack")
        label_indexer_model.write().overwrite().save(os.path.join(save_path, "label_indexer"))
        num_classes = len(label_indexer_model.labels)
        print(f"Số lượng lớp (num_classes): {num_classes}")

        # B. Tính toán trọng số lớp
        label_counts_df_final = df.groupBy("label").count()
        total_samples_final = df.count()
        label_counts_map = {float(row["label"]): row["count"] for row in label_counts_df_final.collect()}

        class_weights_map = {}
        if num_classes > 0: # Tránh chia cho 0 nếu không có lớp nào
            for label, count in label_counts_map.items():
                # Công thức trọng số 'balanced': tổng số mẫu / (số lượng lớp * số mẫu của lớp)
                class_weights_map[label] = total_samples_final / (num_classes * count)
        else:
             print("Không có lớp nào được tìm thấy để tính trọng số.")

        print("Trọng số lớp đã tính toán:")
        # In các nhãn gốc cùng với trọng số của chúng
        original_labels_map = {idx: label for idx, label in enumerate(label_indexer_model.labels)}
        for encoded_label, weight in class_weights_map.items():
            print(f"  {original_labels_map.get(int(encoded_label), 'N/A')} (encoded: {int(encoded_label)}): {weight:.4f}")

        from pyspark.sql.functions import udf

        class_weights_map_broadcast = spark.sparkContext.broadcast(class_weights_map)

        @udf(DoubleType())
        def get_weight_udf(label_value):
            # Đảm bảo label_value là float/int để khớp với key trong map
            return class_weights_map_broadcast.value.get(float(label_value), 1.0) # Mặc định 1.0 nếu không tìm thấy (should not happen)

        df = df.withColumn("weight", get_weight_udf(col("label")))
        print("Đã thêm cột 'weight' vào DataFrame.")

    else:
        print("Không tìm thấy cột nhãn 'Attack'. Không thể tính trọng số lớp.")
        num_classes = None
        label_indexer_model = None

    # 6. Chỉ giữ scaled_features, label và weight
    print("Bước 6: Chỉ giữ cột scaled_features, label và weight...")
    final_cols = ['scaled_features', 'label', 'weight']
    df = df.select(*final_cols)
    print(f"Các cột được giữ: {df.columns}")

    print("Hoàn tất tiền xử lý dữ liệu.")
    return df, indexer_models, scaler_model, label_indexer_model, num_classes

In [5]:
# Tiền xử lý và chia split
processed_df, string_indexer_models, scaler_model, label_indexer_model, num_classes = \
    preprocess_nids_data_with_weights(df, spark, save_path="preprocessed_data")

Bắt đầu tiền xử lý dữ liệu...


                                                                                

Đã phân vùng dữ liệu. Số dòng: 75987976
Bước 1: Loại bỏ các cột 'Label' và 'Dataset'...
Đã loại bỏ các cột: []


[Stage 6:>                                                         (0 + 6) / 30]

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: 
Aborting TaskSet 6.0 because task 20 (partition 20)
cannot run anywhere due to node and executor excludeOnFailure.
Most recent failure:
Lost task 20.0 in stage 6.0 (TID 323) (worker3 executor 2): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/hadoop/tmp/nm-local-dir/usercache/ubuntu/appcache/application_1759911303345_0011/container_1759911303345_0011_01_000003/pyspark.zip/pyspark/worker.py", line 1100, in main
    raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [PYTHON_VERSION_MISMATCH] Python in worker has different version (3, 8) than that in driver 3.10, PySpark cannot run with different minor versions.
Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1049)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


ExcludeOnFailure behavior can be configured via spark.excludeOnFailure.*.

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2898)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2834)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2833)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2833)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1253)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1253)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3102)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3036)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3025)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:995)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:195)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
# Import
from pyspark.sql.functions import col
import os # Import os để tạo thư mục nếu cần
# Chia train và val/test (80/20)
train_df, val_df, test_df = processed_df.randomSplit([0.7, 0.15, 0.15], seed=42)
print(f"TRAIN DataFrame: {train_df.count():,} dòng.")
print(f"VAL DataFrame: {val_df.count():,} dòng.")
print(f"TEST DataFrame: {test_df.count():,} dòng.")

# Lưu dữ liệu
print("Ghi train_df")
train_df.write.mode("overwrite").parquet("hdfs://master:9000/usr/ubuntu/data/classweights-43-16/train_df.parquet")
print("Ghi val_df")
val_df.write.mode("overwrite").parquet("hdfs://master:9000/usr/ubuntu/data/classweights-43-16/val_df.parquet")
print("Ghi test_df")
test_df.write.mode("overwrite").parquet("hdfs://master:9000/usr/ubuntu/data/classweights-43-16/test_df.parquet")

25/06/07 19:41:18 WARN DAGScheduler: Broadcasting large task binary with size 8.2 MiB
25/06/07 19:51:38 WARN DAGScheduler: Broadcasting large task binary with size 10.9 MiB
                                                                                                    

TRAIN DataFrame: 60,777,330 dòng.


25/06/07 20:13:45 WARN DAGScheduler: Broadcasting large task binary with size 8.2 MiB
25/06/07 20:24:30 WARN DAGScheduler: Broadcasting large task binary with size 10.9 MiB
                                                                                                    

TEST DataFrame: 15,200,900 dòng.
Ghi train_df


25/06/07 20:46:30 WARN DAGScheduler: Broadcasting large task binary with size 8.2 MiB
25/06/07 20:56:50 WARN DAGScheduler: Broadcasting large task binary with size 11.1 MiB
                                                                                                    

Ghi test_df


25/06/07 21:22:05 WARN DAGScheduler: Broadcasting large task binary with size 8.2 MiB
25/06/07 21:32:32 WARN DAGScheduler: Broadcasting large task binary with size 11.1 MiB
                                                                                                    

: 

: 

: 

: 

: 