In [1]:
from pyspark.sql import SparkSession
# Creating spark session
spark = SparkSession.builder.appName("TrainRandomForest")\
.getOrCreate()
# Creating object of spark context
sc = spark.sparkContext

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1678463987972_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql import functions
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

def preprocess(df):
# getting the numerical columns
    cat_columns = []
    num_columns = ['amnt','hour','days_before','weekofyear','hour_diff']
    for column in df.columns:
        if column not in num_columns:
            cat_columns.append(column)

    cat_columns.remove('app_id')
    cat_columns.remove('transaction_number')
    cat_columns.remove('__index_level_0__')
    # creating a new dataframe
    new_df = df.select('app_id').distinct()

    # handling the categorical columns
    for column in cat_columns:
        # grouping by app_id and categorical column & counting the occurences 
        cnts = df.groupby('app_id',column).count()
        
        # extracting the maximum count and its app_id 
        max_cnt = cnts.groupby('app_id').agg(max('count').alias('count'))
        
        # extracting the frequently occuring value
        mode = cnts.join(max_cnt,['app_id','count'],'inner')\
        .select('app_id',column)\
        .withColumnRenamed(column,'mode_'+column)
        
        # appending the newly created column to the original dataframe
        new_df = new_df.join(mode,'app_id','inner')
        
    # handling numerical columns
    for column in num_columns:
        # calculating the average of the numerical features
        avgs = df.groupby('app_id').agg(mean(column).alias('avg_'+column))
        
        # appending the column to the dataframe
        new_df = new_df.join(avgs,'app_id','inner')
    # returning the preprocessed dataframe
    return new_df

def getData(parquet_path,csv_path):
    # reading the parquet file
    df = spark.read.parquet(parquet_path)
    # applying preprocessig to reduce the data
    df = preprocess(df)
    # reading the csv file
    csv = spark.read.format("csv")\
    .option("header","True")\
    .option("inferSchema","True")\
    .load(csv_path)
    # merging the csv and the parquet file
    df = df.join(csv,'app_id',"inner")
    # droping the app_id column
    df = df.drop('app_id')
    # returning the cleaned data frame
    return df

def buildPipeline(df):
    """
    This model creates a pipeline and return the pipeline model which can be trained
    """
    # getting the name of the feature columns
    features = df.columns[:len(df.columns)-1]
    # creating object of VectorAssembler
    assemble = VectorAssembler(inputCols=features,outputCol='features')
    # creating object of classifier model
    classifier = RandomForestClassifier(seed=7,labelCol="flag",featuresCol="features")
    STAGES = [assemble,classifier]

    # creating an object of Pipeline
    pipeline = Pipeline(stages=STAGES)

    # Setting the parameter grid for parameter tuning
    paramGrid = ParamGridBuilder() \
    .addGrid(classifier.maxDepth, [3,5]) \
    .addGrid(classifier.numTrees, [100,200]) \
    .build()

    tvs = TrainValidationSplit(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=MulticlassClassificationEvaluator(labelCol="flag", predictionCol="prediction"),
    parallelism=2,
    trainRatio=0.7)  # data is separated by 70% and 30%, in which the former is used for training and the latter for evaluation
    return tvs

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
parquet_path = "s3://transaction-train/*.parquet"
csv_path = "s3://transaction-train-etl/train_csv/alfabattle2_train_target.csv"
df = getData(parquet_path,csv_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
split = df.randomSplit([0.7,0.3],seed=7)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
tvs = buildPipeline(split[0])
model = tvs.fit(split[0])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
model.bestModel.save('s3://transaction-train-etl/train_model/model1')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# f1-scores of the models
model.validationMetrics

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.9594324003218819, 0.9594324003218819, 0.9594324003218819, 0.9594324003218819]

In [9]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Create both evaluators
evaluator = MulticlassClassificationEvaluator(labelCol="flag", predictionCol="prediction")

# Make predicitons
pred = model.transform(split[1]).select("flag", "prediction")

# Get metrics
acc = evaluator.evaluate(pred, {evaluator.metricName: "accuracy"})
f1 = evaluator.evaluate(pred, {evaluator.metricName: "f1"})
weightedPrecision = evaluator.evaluate(pred, {evaluator.metricName: "weightedPrecision"})
weightedRecall = evaluator.evaluate(pred, {evaluator.metricName: "weightedRecall"})

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
print('Accuracy : ',acc)
print('F1  : ',f1)
print('Precision :',weightedPrecision)
print('Recall :',weightedRecall)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Accuracy :  0.9731265422745573
F1  :  0.9598728180784522
Precision : 0.9469752672792359
Recall : 0.9731265422745573