In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import os

In [2]:
spark = (SparkSession 
         .builder
         # default url of the internally accessed Kubernetes API
         # (This Jupyter notebook service is itself a Kubernetes Pod)
         .master("k8s://https://kubernetes.default.svc:443")
         # Executors spark docker image: for simplicity reasons, this jupyter notebook is reused 
         .config("spark.kubernetes.container.image", os.environ['IMAGE_NAME'])
         # Name of the Kubernetes namespace
         .config("spark.kubernetes.namespace", os.environ['KUBERNETES_NAMESPACE'])
         # Allocated memory to the JVM
         # Stay careful, by default, the Kubernetes pods has a higher limit which depends on other parameters.
         .config("spark.executor.memory", "4g")
         .config("spark.kubernetes.driver.pod.name", os.environ['KUBERNETES_POD_NAME'])
         # dynamic allocation configuration
         .config("spark.dynamicAllocation.enabled","true")
         .config("spark.dynamicAllocation.initialExecutors","1")
         .config("spark.dynamicAllocation.minExecutors","1")
         .config("spark.dynamicAllocation.maxExecutors","5")
         # Ratio match the number of pods to create for a given number of parallel tasks 
         # (100 parallel, ratio of 1, one aims at 100 pods, with 0.5 it would be 50 pods)
         .config("spark.dynamicAllocation.executorAllocationRatio","1")
         .config("spark.dynamicAllocation.shuffleTracking.enabled","true")
         .getOrCreate()
        )

In [3]:
train_path = "s3a://juleschristian0/train.csv"
test_path = "s3a://juleschristian0/test.csv"

train_df = spark.read.csv(train_path, header=True, inferSchema=True)
test_df = spark.read.csv(test_path, header=True, inferSchema=True)

                                                                                

In [8]:
train_df = train_df.drop('ID')

In [26]:
numeric_cols = [c for c in train_df.columns if str(train_df.schema[c].dataType) == "DoubleType()"]
categorical_cols = [c for c in train_df.columns if str(train_df.schema[c].dataType) == "StringType()"]

# Indexage et encodage des colonnes catégorielles
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_index", outputCol=col+"_vec") for col in categorical_cols]

# Assemblage des colonnes en un vecteur de caractéristiques
assembler = VectorAssembler(inputCols=numeric_cols + [col+"_vec" for col in categorical_cols], outputCol="features")

# Standardisation des caractéristiques numériques
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

In [29]:
len(numeric_cols) + len(categorical_cols)

4041

In [30]:
rf = RandomForestClassifier(labelCol="label", featuresCol="scaled_features", seed=102)

pipeline = Pipeline(stages=indexers + encoders + [assembler, scaler, rf])

# Grille de paramètres pour la validation croisée
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 20]) \
    .build()

# Validation croisée
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)

In [None]:
cv_model = crossval.fit(train_df)



17:41:12.678 [dispatcher-CoarseGrainedScheduler] ERROR org.apache.spark.scheduler.TaskSchedulerImpl - Lost executor 2 on 10.233.113.72: 
The executor with id 2 exited with exit code 137(SIGKILL, possible container OOM).



The API gave the following container statuses:


	 container name: spark-kubernetes-executor
	 container image: docker.io/inseefrlab/onyxia-jupyter-pyspark:py3.12.3-spark3.5.1
	 container state: terminated
	 container started at: 2024-06-22T17:29:02Z
	 container finished at: 2024-06-22T17:41:10Z
	 exit code: 137
	 termination reason: OOMKilled
      




17:41:18.879 [dispatcher-CoarseGrainedScheduler] ERROR org.apache.spark.scheduler.TaskSchedulerImpl - Lost executor 5 on 10.233.113.91: 
The executor with id 5 exited with exit code 137(SIGKILL, possible container OOM).



The API gave the following container statuses:


	 container name: spark-kubernetes-executor
	 container image: docker.io/inseefrlab/onyxia-jupyter-pyspark:py3.12.3-spark3.5.1
	 container state: terminated
	 container started at: 2024-06-22T17:29:04Z
	 container finished at: 2024-06-22T17:41:17Z
	 exit code: 137
	 termination reason: OOMKilled
      




17:41:26.003 [dispatcher-CoarseGrainedScheduler] ERROR org.apache.spark.scheduler.TaskSchedulerImpl - Lost executor 3 on 10.233.115.21: 
The executor with id 3 exited with exit code 137(SIGKILL, possible container OOM).



The API gave the following container statuses:


	 container name: spark-kubernetes-executor
	 container image: docker.io/inseefrlab/onyxia-jupyter-pyspark:py3.12.3-spark3.5.1
	 container state: terminated
	 container started at: 2024-06-22T17:29:01Z
	 container finished at: 2024-06-22T17:41:24Z
	 exit code: 137
	 termination reason: OOMKilled
      
17:41:26.049 [dispatcher-CoarseGrainedScheduler] ERROR org.apache.spark.scheduler.TaskSchedulerImpl - Lost executor 4 on 10.233.115.13: 
The executor with id 4 exited with exit code 137(SIGKILL, possible container OOM).



The API gave the following container statuses:


	 container name: spark-kubernetes-executor
	 container image: docker.io/inseefrlab/onyxia-jupyter-pyspark:py3.12.3-spark3.5.1
	 container state: termin



In [None]:
predictions = cv_model.transform(test_df)

In [None]:
# Évaluation avec BinaryClassificationEvaluator pour calculer l'AUC
evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")