In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean, stddev, min, max, last, count, countDistinct
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, \
                                      GBTClassificationModel, LinearSVC, MultilayerPerceptronClassifier, LinearSVCModel, \
                                      LogisticRegressionModel, DecisionTreeClassificationModel, RandomForestClassificationModel, \
                                      MultilayerPerceptronClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml import Pipeline
import os

In [2]:
CAT_VARS = ["B_30","B_38","D_114","D_116","D_117","D_120","D_126","D_63","D_64","D_66","D_68"]
TARGET_COLUMN = 'target'
TRAIN_FILE = '../Data/train_small.csv'
TEST_FILE = '../Data/test_small.csv'
CAT_FEATURES = ['B_30_last', 'B_38_last', 'D_114_last', 'D_116_last', 'D_117_last',
                'D_120_last', 'D_126_last', 'D_63_last', 'D_64_last','D_66_last', 'D_68_last']

In [3]:
conf = SparkConf() \
    .setAppName("Train and Save Models") \
    .set("spark.executor.memory", "8g") \
    .set("spark.driver.memory", "8g")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
sc.setLogLevel("ERROR")
data_train = spark.read.csv(TRAIN_FILE, header=True, inferSchema=True)
data_test = spark.read.csv(TEST_FILE, header=True, inferSchema=True)
data_train = data_train.fillna(0)
data_test = data_test.fillna(0)

24/04/21 10:06:18 WARN Utils: Your hostname, DESKTOP-D1SJQQ6 resolves to a loopback address: 127.0.1.1; using 172.27.228.206 instead (on interface eth0)
24/04/21 10:06:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/04/21 10:06:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

In [4]:
def feature_engineer_spark(df, CAT_VARS, TARGET_COLUMN):
    all_cols = [c for c in df.columns if c not in ['customer_ID', 'S_2']]
    cont_vars = [c for c in all_cols if c not in CAT_VARS + [TARGET_COLUMN]]
    cont_vars_agg_exprs = [expr for c in cont_vars for expr in (
        mean(c).alias(c + '_mean'),
        stddev(c).alias(c + '_std'),
        min(c).alias(c + '_min'),
        max(c).alias(c + '_max'),
        last(c).alias(c + '_last')
    )]
    cont_vars_agg = df.groupBy("customer_ID").agg(*cont_vars_agg_exprs)
    cat_vars_agg_exprs = [expr for c in CAT_VARS for expr in (
        count(c).alias(c + '_count'),
        last(c).alias(c + '_last'),
        countDistinct(c).alias(c + '_nunique')
    )]
    cat_vars_agg = df.groupBy("customer_ID").agg(*cat_vars_agg_exprs)
    df_agg = cont_vars_agg.join(cat_vars_agg, "customer_ID")
    target_column_df = df.select("customer_ID", TARGET_COLUMN)
    df_agg = df_agg.join(target_column_df, "customer_ID")
    return df_agg

data_train = feature_engineer_spark(data_train, CAT_VARS, TARGET_COLUMN).drop('customer_ID')
data_test = feature_engineer_spark(data_test, CAT_VARS, TARGET_COLUMN).drop('customer_ID')

In [5]:
indexers = [
    StringIndexer(inputCol=column, outputCol=column + "_indexed", handleInvalid='keep')
    for column in CAT_FEATURES
]
continuous_features = [c for c in data_train.columns if c not in CAT_FEATURES and c != TARGET_COLUMN and not c.endswith('_indexed')]
data_train = data_train.fillna(0)
data_test = data_test.fillna(0)
assembler_cont = VectorAssembler(inputCols=continuous_features, outputCol="features_raw")
scaler = MinMaxScaler(inputCol="features_raw", outputCol="scaled_features", min=0.1, max=0.9)
final_feature_columns = [col + "_indexed" for col in CAT_FEATURES] + ["scaled_features"]
assembler_final = VectorAssembler(inputCols=final_feature_columns, outputCol="features")
pipeline = Pipeline(stages=indexers + [assembler_cont, scaler, assembler_final])
model = pipeline.fit(data_train)
train_indexed = model.transform(data_train)
test_indexed = model.transform(data_test)
train_selected = train_indexed.select("features", TARGET_COLUMN).cache()
test_selected = test_indexed.select("features", TARGET_COLUMN).cache()

                                                                                

In [11]:
feature_vector_size = len(train_indexed.select("features").first()[0])
layers = [feature_vector_size, feature_vector_size // 2 + 1, feature_vector_size // 4 + 1, 2]
models = {
    "GBTClassifier": GBTClassifier(featuresCol='features', labelCol='target'),
    "LinearSVC": LinearSVC(featuresCol='features', labelCol='target'),
    "LogisticRegression": LogisticRegression(featuresCol='features', labelCol='target'),
    "DecisionTreeClassifier": DecisionTreeClassifier(featuresCol='features', labelCol='target'),
    "RandomForestClassifier": RandomForestClassifier(featuresCol='features', labelCol='target'),
    "MultilayerPerceptronClassifier": MultilayerPerceptronClassifier(featuresCol='features', labelCol='target', layers=layers)
}

binary_evaluator = BinaryClassificationEvaluator(labelCol="target")
multi_evaluator = MulticlassClassificationEvaluator(labelCol="target")

results_path = "evaluation_results.txt"
with open(results_path, "w") as file:
    for name, model in models.items():
        model_path = os.path.join("models", f"{name}_model")
        if os.path.exists(model_path):
            if name == "GBTClassifier":
                model = GBTClassificationModel.load(model_path)
            elif name == "LinearSVC":
                model = LinearSVCModel.load(model_path)
            elif name == "LogisticRegression":
                model = LogisticRegressionModel.load(model_path)
            elif name == "DecisionTreeClassifier":
                model = DecisionTreeClassificationModel.load(model_path)
            elif name == "RandomForestClassifier":
                model = RandomForestClassificationModel.load(model_path)
            elif name == "MultilayerPerceptronClassifier":
                model = MultilayerPerceptronClassificationModel.load(model_path)
            print(f"Loaded model: {name}")
        else:
            model = model.fit(train_selected)
            model.save(model_path)
            print(f"Trained and saved model: {name}")
        
        predictions = model.transform(test_selected)
        accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
        precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "precisionByLabel"})
        recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "recallByLabel"})
        f1 = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "f1"})
        auc = binary_evaluator.evaluate(predictions, {binary_evaluator.metricName: "areaUnderROC"})

        result_string = f"Model: {name}, Accuracy: {accuracy}, Precision: {precision}, Recall: {recall}, F1 Score: {f1}, AUC: {auc}\n"
        print(result_string)
        file.write(result_string)

                                                                                

Loaded model: GBTClassifier
Model: GBTClassifier, Accuracy: 0.8656849620705043, Precision: 0.9082503556187767, Recall: 0.9129036101513166, F1 Score: 0.8653391277632483, AUC: 0.9266113729537194

Loaded model: LinearSVC
Model: LinearSVC, Accuracy: 0.8638107987505578, Precision: 0.9027565982404692, Recall: 0.9169546050279995, F1 Score: 0.8627054450263494, AUC: 0.9263057306273149

Loaded model: LogisticRegression
Model: LogisticRegression, Accuracy: 0.8730923694779117, Precision: 0.9055264688772542, Recall: 0.9273203860359823, F1 Score: 0.8714828711430652, AUC: 0.9332636219405903

Loaded model: DecisionTreeClassifier
Model: DecisionTreeClassifier, Accuracy: 0.8497991967871485, Precision: 0.9099462365591398, Recall: 0.8872870249017037, F1 Score: 0.8515432375499287, AUC: 0.7680224528365523

Loaded model: RandomForestClassifier
Model: RandomForestClassifier, Accuracy: 0.8602409638554217, Precision: 0.8919508554369043, Recall: 0.9255331824139164, F1 Score: 0.8573720316154725, AUC: 0.9213273643

                                                                                

Model: MultilayerPerceptronClassifier, Accuracy: 0.8686300758589915, Precision: 0.9074531967502649, Recall: 0.9182652210175146, F1 Score: 0.867829983203917, AUC: 0.9314231793106735



In [12]:
sc.stop()