In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName("ModelTrain").master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
org.elasticsearch#elasticsearch-spark-30_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a59127d2-0602-4128-aba1-2e99944ab3d6;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.5.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.5.0 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.

In [3]:
df_water = spark.read.parquet("hdfs://namenode:9000/workspace/output/water_army/water_army_label")

In [4]:
df_normal = spark.read.parquet("hdfs://namenode:9000/workspace/output/normal/normal_comment_label")

In [5]:
df_all_label = df_water.unionByName(df_normal)

In [6]:
df_all_label.orderBy(F.rand())

DataFrame[text: string, v0: float, v1: float, v2: float, v3: float, v4: float, v5: float, v6: float, v7: float, v8: float, v9: float, v10: float, v11: float, v12: float, v13: float, v14: float, v15: float, v16: float, v17: float, v18: float, v19: float, v20: float, v21: float, v22: float, v23: float, v24: float, v25: float, v26: float, v27: float, v28: float, v29: float, v30: float, v31: float, v32: float, v33: float, v34: float, v35: float, v36: float, v37: float, v38: float, v39: float, v40: float, v41: float, v42: float, v43: float, v44: float, v45: float, v46: float, v47: float, v48: float, v49: float, v50: float, v51: float, v52: float, v53: float, v54: float, v55: float, v56: float, v57: float, v58: float, v59: float, v60: float, v61: float, v62: float, v63: float, v64: float, v65: float, v66: float, v67: float, v68: float, v69: float, v70: float, v71: float, v72: float, v73: float, v74: float, v75: float, v76: float, v77: float, v78: float, v79: float, v80: float, v81: float, v8

In [7]:
df_all_label.groupBy("label").count().show()

+-----+-----+
|label|count|
+-----+-----+
|    1| 1800|
|    0| 2106|
+-----+-----+



In [8]:
from pyspark.ml.linalg import Vectors,VectorUDT
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf

In [9]:
vector_column = [col for col in df_all_label.columns if col.startswith("v")]

In [10]:
assembler = VectorAssembler(inputCols=vector_column,outputCol="features")
df_all_label_ready = assembler.transform(df_all_label)

In [11]:
to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())
df_all_label_ready = df_all_label_ready.withColumn("features",to_vector_udf("features"))

In [12]:
train_df,test_df = df_all_label_ready.randomSplit([0.8,0.2],seed=42)

In [13]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def evaluate_model(model, train_data, test_data):
    train_pred = model.transform(train_data)
    test_pred = model.transform(test_data)

    binary_evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

    evaluator_acc = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
    evaluator_precision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="precisionByLabel")
    evaluator_recall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="recallByLabel")
    evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

    def evaluate_all(pred_df, dataset_name):
        auc = binary_evaluator.evaluate(pred_df)
        acc = evaluator_acc.evaluate(pred_df)
        precision = evaluator_precision.evaluate(pred_df)
        recall = evaluator_recall.evaluate(pred_df)
        f1 = evaluator_f1.evaluate(pred_df)

        print(f"\n{dataset_name} Evaluation Results:")
        print(f"AUC       = {auc:.4f}")
        print(f"Accuracy  = {acc:.4f}")
        print(f"Precision = {precision:.4f}")
        print(f"Recall    = {recall:.4f}")
        print(f"F1-score  = {f1:.4f}")
        return auc, acc, precision, recall, f1

    auc_train, acc_train, p_train, r_train, f_train = evaluate_all(train_pred, "Train Set")

    auc_test, acc_test, p_test, r_test, f_test = evaluate_all(test_pred, "Test Set")

    print("\nSummary:")
    auc_gap = abs(auc_train - auc_test)
    acc_gap = abs(acc_train - acc_test)
    f1_gap = abs(f_train - f_test)

    if auc_gap > 0.05 or acc_gap > 0.05:
        print("The model may be overfitting: significant performance drop on test set.")
    elif auc_test < 0.8 or f_test < 0.75:
        print("Test performance is suboptimal. The model may lack generalization.")
    else:
        print("The model shows consistent train-test performance and good generalization.")

In [15]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
lr = LogisticRegression(featuresCol="features",labelCol="label")
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam,[0.0,0.1,0.01])
             .addGrid(lr.elasticNetParam,[0.0,0.5,1.0])
             .build())
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC")
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=5)
cv_model = cv.fit(train_df)
evaluate_model(cv_model, train_df, test_df)
cv_model.write().overwrite().save("hdfs://namenode:9000/workspace/output/model/lr_model")

                                                                                


Train Set Evaluation Results:
AUC       = 0.9969
Accuracy  = 0.9804
Precision = 0.9787
Recall    = 0.9845
F1-score  = 0.9804

Test Set Evaluation Results:
AUC       = 0.9831
Accuracy  = 0.9390
Precision = 0.9423
Recall    = 0.9533
F1-score  = 0.9390

Summary:
The model shows consistent train-test performance and good generalization.


In [16]:
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol="features",labelCol="label",predictionCol="prediction",probabilityCol="probability",seed=42)
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees,[20,50])
             .addGrid(rf.maxDepth,[5,10])
             .build())
evaluator = BinaryClassificationEvaluator(labelCol="label",rawPredictionCol="rawPrediction",metricName="areaUnderROC")
cv = CrossValidator(estimator=rf,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=3,
                    parallelism=2)
cv_model = cv.fit(train_df)
evaluate_model(cv_model, train_df, test_df)
cv_model.write().overwrite().save("hdfs://namenode:9000/workspace/output/model/rf_model")

                                                                                


Train Set Evaluation Results:
AUC       = 0.9995
Accuracy  = 0.9949
Precision = 0.9929
Recall    = 0.9976
F1-score  = 0.9949

Test Set Evaluation Results:
AUC       = 0.9465
Accuracy  = 0.8672
Precision = 0.8650
Recall    = 0.9136
F1-score  = 0.8663

Summary:
The model may be overfitting: significant performance drop on test set.


In [17]:
from pyspark.ml.classification import LinearSVC
svm = LinearSVC(featuresCol='features', labelCol='label', maxIter=100)
paramGrid = (ParamGridBuilder()
             .addGrid(svm.regParam,[0.0,0.1,1.0])
             .build())
evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC",labelCol="label")
cv = CrossValidator(estimator=svm,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator,
                    numFolds=5)
cv_model = cv.fit(train_df)
evaluate_model(cv_model, train_df, test_df)
cv_model.write().overwrite().save("hdfs://namenode:9000/workspace/output/model/svm_model")

                                                                                


Train Set Evaluation Results:
AUC       = 0.9908
Accuracy  = 0.9634
Precision = 0.9567
Recall    = 0.9750
F1-score  = 0.9634

Test Set Evaluation Results:
AUC       = 0.9814
Accuracy  = 0.9350
Precision = 0.9299
Recall    = 0.9603
F1-score  = 0.9347

Summary:
The model shows consistent train-test performance and good generalization.


In [18]:
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="label", featuresCol="features")
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxIter, [50, 100])
             .addGrid(gbt.maxDepth, [3, 5])
             .addGrid(gbt.stepSize, [0.1, 0.2])
             .build())
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
cv = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3,
                          parallelism=2)
cv_model = cv.fit(train_df)
evaluate_model(cv_model, train_df, test_df)
cv_model.write().overwrite().save("hdfs://namenode:9000/workspace/output/model/gbt_model")

                                                                                


Train Set Evaluation Results:
AUC       = 0.9997
Accuracy  = 0.9975
Precision = 0.9970
Recall    = 0.9982
F1-score  = 0.9975

Test Set Evaluation Results:
AUC       = 0.9523
Accuracy  = 0.8821
Precision = 0.8920
Recall    = 0.9065
F1-score  = 0.8819

Summary:
The model may be overfitting: significant performance drop on test set.


In [19]:
spark.stop()