# Gradient-Boosted Tree Classification

## Imports

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler,StandardScaler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Imputer

## Start Spark

In [2]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

## Load of train set and labels and merge

In [3]:
input_train = spark.read.csv("./InputTrain.csv",header=True,inferSchema=True)
label_train = spark.read.csv("./StepOne_LabelTrain.csv",header=True,inferSchema=True)

train_data = input_train.join(label_train, on="Index",how="inner")

## Classification

### Creation of the assembler and apply on the merged data

In [4]:
# Assemble the features into a vector
assembler = VectorAssembler(inputCols=input_train.columns[:1], outputCol="features")
train_data = assembler.transform(train_data)

### Extract randomly taken training set and a validation set from the data

In [13]:
# Split the data into training and validation sets
(train_set, validation_set) = train_data.randomSplit([0.8, 0.2], seed=42)

ConnectionRefusedError: [WinError 10061] Aucune connexion n’a pu être établie car l’ordinateur cible l’a expressément refusée

### Creation of the GBT and the model

In [11]:
# Train the GBT model
models = []
for i in range(0,5):
    label_col = label_train.columns[2+i]
    gbt = GBTClassifier(labelCol=label_col,featuresCol="features",maxIter=5,maxDepth=3,maxBins=16)
    model = gbt.fit(train_data)
    
    evaluator = MulticlassClassificationEvaluator(labelCol=label_col,metricName="weightedPrecision")
    predictions = model.transform(validation_set)
    weightedPrecision = evaluator.evaluate(predictions)
    print(f"Weighted Precision {label_col} = {weightedPrecision}")
    
    models.append(model)

ConnectionRefusedError: [WinError 10061] Aucune connexion n’a pu être établie car l’ordinateur cible l’a expressément refusée

### Creation of the evaluator and compute of the predictions

In [8]:
def impute_missing_values(df):
    """Imputes missing values in the DataFrame"""
    imputer = Imputer(inputCols=df.columns, outputCols=["{}_imputed".format(c) for c in df.columns])
    imputed_data = imputer.fit(df).transform(df)
    return imputed_data

def extract_features(df):
    """Extracts features from the DataFrame"""
    feature_cols = [c for c in df.columns if "imputed" in c and "index" not in c and "house_id" not in c]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    feature_data = assembler.transform(df)
    return feature_data

### Test on `InputTest.csv`

In [9]:
test_data = spark.read.csv("InputTest.csv", header=True, inferSchema=True)

# Apply the same transformations used for the training set
test_data_cleaned = test_data.drop(test_data.columns[1])
test_data_imputed = impute_missing_values(test_data_cleaned)
test_data_features = extract_features(test_data_imputed)

# Create a vector assembler to combine the features into a single column
vector_assembler = VectorAssembler(inputCols=test_data_features.columns[2:], outputCol='features')
test_data_with_features = vector_assembler.transform(test_data_features).select('Index', 'House_id', 'features')

# apply the trained model on the test data and generate predictions
test_predictions = {}
for i in range(0,5):
    label_col = label_train.columns[2+i]
    model = models[i]
    predictions = model.transform(test_data_with_features)
    predictions = predictions.select('index', 'house_id', 'prediction')
    test_predictions[label_col] = predictions.withColumnRenamed('prediction', label_col)

# merge the individual predictions into a single DataFrame
test_predictions_df = test_predictions[label_train.columns[2]]
for i in range(1,5):
    label_col = label_train.columns[2+i]
    test_predictions_df = test_predictions_df.join(test_predictions[label_col], ['index', 'house_id'], 'inner')

# write the predictions to a CSV file
test_predictions_df.toPandas().to_csv('StepOne_TestPredictions.csv', index=False)

Py4JJavaError: An error occurred while calling o957.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 407.0 failed 1 times, most recent failure: Lost task 7.0 in stage 407.0 (TID 4525) (192.168.0.19 executor driver): java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
	at java.base/java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:172)
	at java.base/java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:557)
	at java.base/java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:576)
	at java.base/java.lang.StringBuilder.append(StringBuilder.java:204)
	at java.base/java.util.regex.Matcher.appendReplacement(Matcher.java:1002)
	at java.base/java.util.regex.Matcher.replaceAll(Matcher.java:1182)
	at scala.util.matching.Regex.replaceAllIn(Regex.scala:485)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter$.stripExtraNewLinesAndComments(CodeFormatter.scala:99)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$buildCodeBlocks$1(CodeGenerator.scala:948)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$buildCodeBlocks$1$adapted(CodeGenerator.scala:937)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$Lambda$2183/0x000000010100b040.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.buildCodeBlocks(CodeGenerator.scala:937)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.splitExpressions(CodeGenerator.scala:882)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.splitExpressionsWithCurrentInputs(CodeGenerator.scala:858)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.create(GenerateMutableProjection.scala:97)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:49)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:84)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:80)
	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:52)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:95)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:103)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$2(HashAggregateExec.scala:113)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$Lambda$4612/0x00000001016ab040.apply(Unknown Source)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.generateProcessRow(AggregationIterator.scala:206)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:225)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:106)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:121)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:95)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
	at java.base/java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:172)
	at java.base/java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:557)
	at java.base/java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:576)
	at java.base/java.lang.StringBuilder.append(StringBuilder.java:204)
	at java.base/java.util.regex.Matcher.appendReplacement(Matcher.java:1002)
	at java.base/java.util.regex.Matcher.replaceAll(Matcher.java:1182)
	at scala.util.matching.Regex.replaceAllIn(Regex.scala:485)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter$.stripExtraNewLinesAndComments(CodeFormatter.scala:99)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$buildCodeBlocks$1(CodeGenerator.scala:948)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.$anonfun$buildCodeBlocks$1$adapted(CodeGenerator.scala:937)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext$$Lambda$2183/0x000000010100b040.apply(Unknown Source)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.buildCodeBlocks(CodeGenerator.scala:937)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.splitExpressions(CodeGenerator.scala:882)
	at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.splitExpressionsWithCurrentInputs(CodeGenerator.scala:858)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.create(GenerateMutableProjection.scala:97)
	at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:49)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:84)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.createCodeGeneratedObject(Projection.scala:80)
	at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:52)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:95)
	at org.apache.spark.sql.catalyst.expressions.MutableProjection$.create(Projection.scala:103)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$2(HashAggregateExec.scala:113)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$Lambda$4612/0x00000001016ab040.apply(Unknown Source)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.generateProcessRow(AggregationIterator.scala:206)
	at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:225)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:106)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:121)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:95)


### Save of the predictions

In [None]:
# Save the predictions in the format specified in the instructions
test_predictions.write.csv("StepOne_Predictions.csv", header=True)