In [1]:
import dask.dataframe as dd
import os
from dask.distributed import Client
import numpy as np

# Cấu hình Dask Client với giới hạn tài nguyên phù hợp với 12GB RAM
client = Client(n_workers=2, threads_per_worker=2, memory_limit='4GB')
print(client)

# Đường dẫn file và thư mục đầu ra
file_path = "Mapped_Dataset.csv"
output_csv = "processed_output.csv"

# Tạo file đầu ra nếu chưa tồn tại (để tránh lỗi ghi)
if os.path.exists(output_csv):
    os.remove(output_csv)

# Đọc dữ liệu với Dask, chỉ định dtype cho cột label để tránh lỗi suy ra
df = dd.read_csv(file_path, blocksize="50MB", dtype={'label': 'float64'})

# Bước 1: Trích xuất các mẫu có label mong muốn [0, 4, 6, 7, 8]
df = df[df['label'].isin([0, 4, 6, 7, 8])]

# Bước 2: Xử lý giá trị NaN và ép kiểu
df = df.dropna(subset=['label'])  # Loại bỏ các hàng có label là NaN
df['label'] = df['label'].astype('int64')  # Ép kiểu thành int64

# Bước 3: Ánh xạ lại label từ [0, 4, 6, 7, 8] thành [0, 1, 2, 3, 4]
label_map = {0: 0, 4: 1, 6: 2, 7: 3, 8: 4}
df['label'] = df['label'].map(label_map).astype('int32')

# Bước 4: Trích xuất và giảm mẫu cho label 1 (ánh xạ từ 4)
# Lấy mẫu ngẫu nhiên từ label 1 (giảm từ >33 triệu xuống 4 triệu)
df_label_1 = df[df['label'] == 1].sample(frac=4e6 / len(df[df['label'] == 1]))  # Tỷ lệ dựa trên số mẫu thực tế
# Kết hợp lại với các label khác
df_other = df[df['label'] != 1]
df = dd.concat([df_other, df_label_1]).repartition(npartitions=50)

# Bước 5: Shuffle dữ liệu
def shuffle_dask_dataframe(df, random_state=None):
    return (
        df.map_partitions(
            lambda part: part.assign(__shuffle_key=np.random.RandomState(random_state).random(len(part))),
            meta={**df.dtypes.to_dict(), '__shuffle_key': 'float64'}
        )
        .shuffle('__shuffle_key')
        .drop(columns='__shuffle_key')
    )

df = shuffle_dask_dataframe(df, random_state=42)

# Lưu thành file CSV duy nhất với xử lý luồng
# Sử dụng một vòng lặp để ghi từng phần, tránh tải toàn bộ vào RAM
header_written = False
for chunk in df.partitions:
    chunk_df = chunk.compute()  # Tải từng phân vùng vào RAM
    if not chunk_df.empty:
        mode = 'w' if not header_written else 'a'
        chunk_df.to_csv(output_csv, mode=mode, header=not header_written, index=False)
        header_written = True

print(f"Đã lưu thành công vào {output_csv}")

# Kiểm tra số lượng mẫu (tùy chọn, đọc một phần nhỏ)
import pandas as pd
df_check = pd.read_csv(output_csv, nrows=1000)
print("Đầu ra mẫu:", df_check.head())
print("Số lượng mẫu trong file CSV (ước lượng):", len(pd.read_csv(output_csv, usecols=['label'])))
client.close()

<Client: 'tcp://127.0.0.1:36175' processes=2 threads=4, memory=7.45 GiB>


You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('label', 'float64'))



Đã lưu thành công vào processed_output.csv
Đầu ra mẫu:    flow_duration  Header_Length  Protocol Type  Duration         Rate  \
0       5.448339     2071578.50            6.0     63.50  2608.309776   
1      13.725179       34362.30            7.1    114.70    29.822694   
2       0.113588          13.48           46.1     64.64     2.203735   
3       0.085688      258339.00            6.0     64.00  1744.706846   
4       0.000000           0.00           47.0     64.00     2.642315   

         Srate  Drate  fin_flag_number  syn_flag_number  rst_flag_number  ...  \
0  2608.309776    0.0              0.0              0.0              0.0  ...   
1    29.822694    0.0              0.0              0.0              0.0  ...   
2     2.203735    0.0              0.0              0.0              0.0  ...   
3  1744.706846    0.0              0.0              0.0              0.0  ...   
4     2.642315    0.0              0.0              0.0              0.0  ...   

          Std  Tot 