In [None]:
!gdown 1zPuZu9i8Im4WMZ5cFL912zEeSxPj-zX3
!gdown 1xbVwPQqZdiVvYt33tKu4AM2QWi4W3VQ8

Downloading...
From (original): https://drive.google.com/uc?id=1zPuZu9i8Im4WMZ5cFL912zEeSxPj-zX3
From (redirected): https://drive.google.com/uc?id=1zPuZu9i8Im4WMZ5cFL912zEeSxPj-zX3&confirm=t&uuid=de6fe784-3b33-47e0-84cf-6b31950d7f32
To: /content/train.csv
100% 601M/601M [00:07<00:00, 82.9MB/s]
Downloading...
From (original): https://drive.google.com/uc?id=1xbVwPQqZdiVvYt33tKu4AM2QWi4W3VQ8
From (redirected): https://drive.google.com/uc?id=1xbVwPQqZdiVvYt33tKu4AM2QWi4W3VQ8&confirm=t&uuid=d51487fd-2cdd-4c72-ba33-180e647bf934
To: /content/test.csv
100% 246M/246M [00:02<00:00, 122MB/s]


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
from xgboost import XGBClassifier
from sklearn.metrics import classification_report
from pyspark.ml.classification import GBTClassifier
from xgboost.spark import SparkXGBClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("DataProcessingAndModeling").getOrCreate()

In [None]:
data_path = "train.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

### Data processing

1. Filling nan value of each column with median value

2. Encoding any categorical features columns

3. Building features column and remove Identify column include "ID", "Label", "Protocal type"
4. Normalize features with MinMax Scaler

In [None]:
# Data Preprocessing
# Handle missing values (e.g., fill nulls with median or mean)
for col_name in df.columns:
    if df.select(col_name).dtypes[0][1] == "double" or df.select(col_name).dtypes[0][1] == "int":
        median_value = df.stat.approxQuantile(col_name, [0.5], 0.1)[0]
        df = df.fillna({col_name: median_value})
    else:
        df = df.fillna({col_name: "unknown"})

In [None]:
# Label encoding for categorical features (if any)
label_indexer = StringIndexer(inputCol="Label", outputCol="LabelIndex")
protocol_indexer = StringIndexer(inputCol="Protocol type", outputCol="ProtocolIndex")

In [None]:
# Assemble features into a single vector
feature_columns = [col for col in df.columns if col not in ["ID", "Label", "Protocol type"]]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

In [None]:
# Normalize features
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [None]:
# Create a pipeline for transformations
pipeline = Pipeline(stages=[label_indexer, protocol_indexer, assembler, scaler])
model = pipeline.fit(df)
processed_df = model.transform(df)

### Split training set into training set and validation set

#### The ratio is 70% training and 30% validation

In [None]:
split_train_df, split_test_df = processed_df.randomSplit(weights=[0.7,0.3], seed=100)

In [None]:
gbt_split = SparkXGBClassifier(label_col="LabelIndex", features_col="scaledFeatures", maxIter=1, maxDepth=5, num_cpus=4)
gbt_model_split = gbt_split.fit(split_train_df)

INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 1 workers with
	booster params: {'objective': 'multi:softprob', 'device': 'cpu', 'maxIter': 1, 'maxDepth': 5, 'num_cpus': 4, 'num_class': 34, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


In [None]:
y_pred_split = gbt_model_split.transform(split_test_df)

In [None]:
# Evaluate the model
evaluator = MulticlassClassificationEvaluator(labelCol="LabelIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(y_pred_split)
print(f"Test Accuracy: {accuracy}")

Test Accuracy: 0.9358977209560867


### Training all model with training set

In [None]:
gbt = SparkXGBClassifier(label_col="LabelIndex", features_col="scaledFeatures", maxIter=100, maxDepth=5)
gbt_model = gbt.fit(processed_df)

INFO:XGBoost-PySpark:Running xgboost-2.1.3 on 1 workers with
	booster params: {'objective': 'multi:softprob', 'device': 'cpu', 'maxIter': 100, 'maxDepth': 5, 'num_class': 34, 'nthread': 1}
	train_call_kwargs_params: {'verbose_eval': True, 'num_boost_round': 100}
	dmatrix_kwargs: {'nthread': 1, 'missing': nan}
INFO:XGBoost-PySpark:Finished xgboost training!


In [None]:
gbt_model.save("xgboost")

In [None]:
!zip -r xgboost.zip xgboost

### Evaluate testing set

In [None]:
# Load test data
test_path = 'test.csv'
test_df = spark.read.csv(test_path, header=True, inferSchema=True)

In [None]:
# Handle missing values in test data (same logic as training data)
for col_name in test_df.columns:
    if test_df.select(col_name).dtypes[0][1] == "double" or test_df.select(col_name).dtypes[0][1] == "int":
        median_value = df.stat.approxQuantile(col_name, [0.5], 0.1)[0]
        test_df = test_df.fillna({col_name: median_value})
    else:
        test_df = test_df.fillna({col_name: "unknown"})

In [None]:
# Apply the trained pipeline to test data
processed_test_df = model.transform(test_df)

In [None]:
# Predictions
y_pred = gbt_model.transform(processed_test_df)

In [None]:
df.printSchema()

In [None]:
# # Evaluate the model
# evaluator = MulticlassClassificationEvaluator(labelCol="LabelIndex", predictionCol="prediction", metricName="accuracy")
# accuracy = evaluator.evaluate(y_pred)
# print(f"Test Accuracy: {accuracy}")