### Setup

Let's set up Spark on your Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 21 not upgraded.
Need to get 39.7 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 124926 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u442-b06~us1-0ubuntu1~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u442-b06~us1-0

Now we import some of the libraries usually needed by our workload.





In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

Let's initialize the Spark context.

In [3]:
# [NOTE]  It needs Re-run here when debuging. by Hui Lin 02/20/2025
#sc.stop()

In [4]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

### Data Preprocessing

In this Colab, rather than downloading a file from Google Drive, we will load a famous machine learning dataset, the [Breast Cancer Wisconsin dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_breast_cancer.html), using the ```scikit-learn``` datasets loader.

In [5]:
from sklearn.datasets import load_breast_cancer
breast_cancer = load_breast_cancer()

In [6]:
# take a look at the dataset. Add by Hui Lin 02/20/2025
print(type(breast_cancer))
print(breast_cancer.DESCR)


<class 'sklearn.utils._bunch.Bunch'>
.. _breast_cancer_dataset:

Breast cancer wisconsin (diagnostic) dataset
--------------------------------------------

**Data Set Characteristics:**

:Number of Instances: 569

:Number of Attributes: 30 numeric, predictive attributes and the class

:Attribute Information:
    - radius (mean of distances from center to points on the perimeter)
    - texture (standard deviation of gray-scale values)
    - perimeter
    - area
    - smoothness (local variation in radius lengths)
    - compactness (perimeter^2 / area - 1.0)
    - concavity (severity of concave portions of the contour)
    - concave points (number of concave portions of the contour)
    - symmetry
    - fractal dimension ("coastline approximation" - 1)

    The mean, standard error, and "worst" or largest (mean of the three
    worst/largest values) of these features were computed for each image,
    resulting in 30 features.  For instance, field 0 is Mean Radius, field
    10 is Radius 

For convenience, given that the dataset is small, we first

*   construct a Pandas dataframe
*   tune the schema
*   and convert it into a Spark dataframe.

In [7]:
pd_df = pd.DataFrame(breast_cancer.data, columns=breast_cancer.feature_names)
df = spark.createDataFrame(pd_df)

def set_df_columns_nullable(spark, df, column_list, nullable=False):
    for struct_field in df.schema:
        if struct_field.name in column_list:
            struct_field.nullable = nullable
    df_mod = spark.createDataFrame(df.rdd, df.schema)
    return df_mod

df = set_df_columns_nullable(spark, df, df.columns)
#add by Hui Lin 02/20/2025
print("what df looks after setting nullable:")
print(type(df))
df.show()

df = df.withColumn('features', array(df.columns))
#add by Hui Lin 02/20/2025
print("what df looks with column:")
df.show()

vectors = df.rdd.map(lambda row: Vectors.dense(row.features))
#add by Hui Lin 02/20/2025
print(type(vectors))

df.printSchema()

what df looks after setting nullable:
<class 'pyspark.sql.dataframe.DataFrame'>
+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+
|mean radius|mean texture|mean perimeter|mean area|mean smoothness|mean compactness|mean concavity|mean concave points|mean symmetry|mean fractal dimension|radius error|texture error|perimeter error|area error|smoothness error|compactness error|concavity error|concave points error|symmetry error|fractal dimension error|worst radius|worst texture|worst perimeter|worst area|worst smoothness|worst compactness|worst concavity|worst c

With the next cell, we build the two data structures that we will be using throughout this Colab:


*   ```features```, a dataframe of Dense vectors, containing all the original features in the dataset;
*   ```labels```, a series of binary labels indicating if the corresponding set of features belongs to a subject with breast cancer, or not.



In [17]:
from pyspark.ml.linalg import Vectors
features = spark.createDataFrame(vectors.map(Row), ["features"])
labels = pd.Series(breast_cancer.target)

#Add by Hui Lin 02/20/2025
print("What we got in here:")
print(type(df))
print(type(features))
print(type(labels))


What we got in here:
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pandas.core.series.Series'>


### Your task

If you run successfully the Setup and Data Preprocessing stages, you are now ready to use decision tree library of pyspark ([documentation](https://spark.apache.org/docs/latest/mllib-decision-tree.html)) to predict labels. In the documentation look for the python code example.

Start by trying to understand the nature of the dataset that you are given. Then, investigate what are the purposes of `StringIndexer`, `VectorIndexer`, and `Pipeline` in the example provided in the documentation.  Do you have to use them for your dataset?


In [10]:
# YOUR CODE HERE
# Decision Trees applied on breast_cancer data. by Hui Lin 02/20/2025

# [step00] import the lib first.
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import shutil
import os
# for model optimization
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# [step01] pre-handle data.
# convert  "labels" from series to dataframe.
labels_df = spark.createDataFrame(labels.astype('int').to_frame('label'))
# combine "features" with "labels"  into one df
data = features.withColumn('row_index', monotonically_increasing_id())
labels_df = labels_df.withColumn('row_index', monotonically_increasing_id())
data = data.join(labels_df, on='row_index').drop('row_index')

# [step02] split train and test datasets (20% held out for testing)
(trainData, testData) = data.randomSplit([0.8, 0.2], seed=36)
#(trainData, testData) = data.randomSplit([0.8, 0.2])

# [step03] define decision tree moder instance
#--- default parameters setting
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

#--- manual setting
#dt = DecisionTreeClassifier(
#    labelCol="label",
#    featuresCol="features",
#    maxDepth=10,                #5
#    minInstancesPerNode=5,      #1
#    minInfoGain=0.0,            #0.1
#    impurity="gini",
#    maxBins=32                 #32

# try to train (default setting)
model = dt.fit(trainData)

#--- Optimize the training
#--- adjust the parameters to optimize the model
#paramGrid = (ParamGridBuilder()
#             .addGrid(dt.maxDepth, [5, 10, 15])
#             .addGrid(dt.minInfoGain, [0.0, 0.1, 0.2])
#             .addGrid(dt.minInstancesPerNode, [1, 5, 10])
#             .addGrid(dt.maxBins, [32, 64, 128])
#             .build())
#--- setting crossval vertification
#evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
#crossval = CrossValidator(estimator=dt,
#                          estimatorParamMaps=paramGrid,
#                          evaluator=evaluator,
#                          numFolds=5)
#--- run the crossaval
#cvModel = crossval.fit(trainData)
#bestModel = cvModel.bestModel

# [step04] verify using test dataset
#predictions = model.transform(testData.map(lambda x: x.features))
predictions1 = model.transform(testData)
#predictions2 = bestModel.transform(testData)

# [step05] show the prediction result
predictions1.select("label", "prediction").show()
#predictions2.select("label", "prediction").show()

# [step06] evaluation for the dt model.
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions1)
print(f"Test Accuracy of default: {accuracy:.2f}")
#accuracy = evaluator.evaluate(predictions2)
#print(f"Test Accuracy of Best Model: {accuracy:.2f}")

# [step07] print the model running details
print("Learned classification tree model:")
print(model.toDebugString)
#print(bestModel.toDebugString)

# [step08] Save and load model
defaultPath = "assignment03/DecisionTreeModel"
#bestPath = "assignment03/BestDTM"

if os.path.exists(defaultPath):
    shutil.rmtree(defaultPath)  # delete already existed path
model.save(defaultPath)
#bestModel.save(bestPath)
sameModel = DecisionTreeClassificationModel.load(defaultPath)

+-----+----------+
|label|prediction|
+-----+----------+
|    1|       1.0|
|    1|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    0|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       0.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
|    1|       1.0|
+-----+----------+
only showing top 20 rows

Test Accuracy of default: 0.92
Learned classification tree model:
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_c97c66ef231a, depth=5, numNodes=33, numClasses=2, numFeatures=30
  If (feature 22 <= 113.75)
   If (feature 27 <= 0.13579999999999998)
    If (feature 13 <= 35.49)
     If (feature 21 <= 29.29)
      Predict: 1.0
     Else (feature 21 > 29.29)
      If (feature 14 <= 0.0041235)
       Predict: 0.0
      Else (feature 14 > 0.0041235)
       Predict: 1.0
    Else (feature 13 > 35.49)
     If


For the next step use the tree ensembles for predicting labels ([documentation](https://spark.apache.org/docs/latest/mllib-ensembles.html)).

Compare the accuracy of the two approach.

In [13]:
# YOUR CODE HERE
# An ensemble method applied on breast_cancer data. by Hui Lin 02/20/2025

# [step00] import the lib first.
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# [step01] split train and test datasets (20% held out for testing)
(trainData, testData) = data.randomSplit([0.8, 0.2], seed=36)

# [step02] using GradientBoostedTrees model
gbt = GBTClassifier(labelCol="label", featuresCol="features", maxIter=10)
gbt_model = gbt.fit(trainData)
gbt_predictions = gbt_model.transform(testData)

# [step03] Evaluate the GradientBoostedTrees model
gbt_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
gbt_accuracy = gbt_evaluator.evaluate(gbt_predictions)
print(f"GradientBoostedTrees Test Accuracy: {gbt_accuracy:.2f}")

# [step04] using RandomForest model
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10)
rf_model = rf.fit(trainData)
rf_predictions = rf_model.transform(testData)

# [step05] Evaluate the RandomForest model
rf_evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
rf_accuracy = rf_evaluator.evaluate(rf_predictions)
print(f"RandomForest Test Accuracy: {rf_accuracy:.2f}")

# [step06] Save and load model
gbt_Path = "assignment03/GBTModel"
rf_Path = "assignment03/RFModel"

if os.path.exists(gbt_Path):
    shutil.rmtree(gbt_Path)  # delete already existed path
gbt_model.save(gbt_Path)

if os.path.exists(rf_Path):
    shutil.rmtree(rf_Path)  # delete already existed path
rf_model.save(rf_Path)


GradientBoostedTrees Test Accuracy: 0.92
RandomForest Test Accuracy: 0.97


The accuracy comparison of GBT and RF model:
GradientBoostedTrees Test Accuracy: 0.92
RandomForest Test Accuracy: 0.97

It seems that RF model is better than GBT for the breast_cancer data.

### Try to solve the problem of Option 3:

In [46]:
# [Option 3]: HR Analytics: Job Change of Data Scientists

#--- load Dataset First
from pathlib import Path
import pandas as pd

aug = pd.read_csv(Path("/content/assignment03/aug_train.csv"))



In [27]:
#--- take a look at the dataset
printf("the head sample data:")
aug.head()

printf("the information of dataset:")
aug.info()

printf("the description of dataset:")
aug.describe()



<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19158 entries, 0 to 19157
Data columns (total 14 columns):
 #   Column                  Non-Null Count  Dtype  
---  ------                  --------------  -----  
 0   enrollee_id             19158 non-null  int64  
 1   city                    19158 non-null  object 
 2   city_development_index  19158 non-null  float64
 3   gender                  14650 non-null  object 
 4   relevent_experience     19158 non-null  object 
 5   enrolled_university     18772 non-null  object 
 6   education_level         18698 non-null  object 
 7   major_discipline        16345 non-null  object 
 8   experience              19093 non-null  object 
 9   company_size            13220 non-null  object 
 10  company_type            13018 non-null  object 
 11  last_new_job            18735 non-null  object 
 12  training_hours          19158 non-null  int64  
 13  target                  19158 non-null  float64
dtypes: float64(2), int64(2), object(10)
me

Unnamed: 0,enrollee_id,city_development_index,training_hours,target
count,19158.0,19158.0,19158.0,19158.0
mean,16875.358179,0.828848,65.366896,0.249348
std,9616.292592,0.123362,60.058462,0.432647
min,1.0,0.448,1.0,0.0
25%,8554.25,0.74,23.0,0.0
50%,16982.5,0.903,47.0,0.0
75%,25169.75,0.92,88.0,0.0
max,33380.0,0.949,336.0,1.0


### Observe the data

In [47]:


# Drop enrolled_id.  because it is unique and no relationship with other columns
# Drop city.  because it has the same meaning with the "city_development_index"
# axis = 1: drop column, axis = 0: drop row
# inplace = True: modify on original dataset
aug.drop(['enrollee_id','city'], axis=1, inplace=True)

# Find the object columns
obj = (aug.dtypes == 'object')
print(obj)

city_development_index    False
gender                     True
relevent_experience        True
enrolled_university        True
education_level            True
major_discipline           True
experience                 True
company_size               True
company_type               True
last_new_job               True
training_hours            False
target                    False
dtype: bool


In [43]:
#--- now we get the condensed version data
aug.head()


Unnamed: 0,city_development_index,gender,relevent_experience,enrolled_university,education_level,major_discipline,experience,company_size,company_type,last_new_job,training_hours,target
0,0.92,Male,Has relevent experience,no_enrollment,Graduate,STEM,>20,,,1,36,1.0
1,0.776,Male,No relevent experience,no_enrollment,Graduate,STEM,15,50-99,Pvt Ltd,>4,47,0.0
2,0.624,,No relevent experience,Full time course,Graduate,STEM,5,,,never,83,0.0
3,0.789,,No relevent experience,,Graduate,Business Degree,<1,,Pvt Ltd,never,52,1.0
4,0.767,Male,Has relevent experience,no_enrollment,Masters,STEM,>20,50-99,Funded Startup,4,8,0.0


### Pre-handling of columns

In [48]:
import pandas as pd

aug['gender'] = aug['gender'].str.strip().str.lower()
# define mapping rule for "gender"
gender_mapping = {
    'female': 0,
    'male': 1,
    'other': 3
}

# apply the mappling rule
aug['gender'] = aug['gender'].map(gender_mapping)
aug['gender'] = aug['gender'].fillna(3) # NULL -> 3

# check the result
print(aug['gender'].value_counts())

gender
1.0    13221
3.0     4699
0.0     1238
Name: count, dtype: int64


In [49]:
aug['relevent_experience'] = aug['relevent_experience'].str.strip().str.lower()

# define mapping rule for "relevent_experience"
rlexp_mapping = {
    'has relevent experience': 1,
    'no relevent experience': 0
}
# apply the mappling rule
aug['relevent_experience'] = aug['relevent_experience'].map(rlexp_mapping)
aug['relevent_experience'] = aug['relevent_experience'].fillna(0)

# check the result
print(aug['relevent_experience'].value_counts())

relevent_experience
1    13792
0     5366
Name: count, dtype: int64


In [51]:
aug['enrolled_university'] = aug['enrolled_university'].str.strip().str.lower()

# define mapping rule for "enrolled_university"
enunv_mapping = {
    'full time course': 1,
    'no_enrollment': 0,
    'part time course': 0.5
}
# apply the mappling rule
aug['enrolled_university'] = aug['enrolled_university'].map(enunv_mapping)
aug['enrolled_university'] = aug['enrolled_university'].fillna(0)

# check the result
print(aug['enrolled_university'].value_counts())

enrolled_university
0.0    14203
1.0     3757
0.5     1198
Name: count, dtype: int64


In [53]:
aug['education_level'] = aug['education_level'].str.strip().str.lower()

# define mapping rule for "education_level"
edulevel_mapping = {
    'primary school': 1,
    'high school': 2,
    'graduate': 3,
    'masters': 4,
    'phd': 5
}
# apply the mappling rule
aug['education_level'] = aug['education_level'].map(edulevel_mapping)
aug['education_level'] = aug['education_level'].fillna(0)

# check the result
print(aug['education_level'].value_counts())

education_level
3.0    11598
4.0     4361
2.0     2017
0.0      460
5.0      414
1.0      308
Name: count, dtype: int64


In [55]:
aug['major_discipline'] = aug['major_discipline'].str.strip().str.lower()

# define mapping rule for "major_discipline"
mjdsp_mapping = {
    'arts': 1,
    'business degree': 2,
    'humanities': 3,
    'stem': 4,
    'other': 5,
    'no major': 0
}
# apply the mappling rule
aug['major_discipline'] = aug['major_discipline'].map(mjdsp_mapping)
aug['major_discipline'] = aug['major_discipline'].fillna(0)

# check the result
print(aug['major_discipline'].value_counts())

major_discipline
4.0    14492
0.0     3036
3.0      669
5.0      381
2.0      327
1.0      253
Name: count, dtype: int64


In [57]:
aug['experience'] = aug['experience'].str.strip().str.lower()

# define the mapping rule
def expr_mapping(value):
    if value == '<1':
        return 0
    elif value == '>20':
        return 21
    else:
        try:
            return int(value)
        except (ValueError, TypeError):
            return None

# 应用映射函数
aug['experience'] = aug['experience'].apply(expr_mapping)
aug['experience'] = aug['experience'].fillna(0)

# 检查结果
print(aug['experience'].value_counts())

experience
21.0    3286
5.0     1430
4.0     1403
3.0     1354
6.0     1216
2.0     1127
7.0     1028
10.0     985
9.0      980
8.0      802
15.0     686
11.0     664
0.0      587
14.0     586
1.0      549
16.0     508
12.0     494
13.0     399
17.0     342
19.0     304
18.0     280
20.0     148
Name: count, dtype: int64


In [59]:
aug['last_new_job'] = aug['last_new_job'].str.strip().str.lower()

# define the mapping rule
def lnj_mapping(value):
    if value == 'never':
        return 0
    elif value == '>4':
        return 5
    else:
        try:
            return int(value)
        except (ValueError, TypeError):
            return None

# 应用映射函数
aug['last_new_job'] = aug['last_new_job'].apply(lnj_mapping)
aug['last_new_job'] = aug['last_new_job'].fillna(0)

# 检查结果
print(aug['last_new_job'].value_counts())

last_new_job
1.0    8040
5.0    3290
2.0    2900
0.0    2875
4.0    1029
3.0    1024
Name: count, dtype: int64


In [60]:
aug['company_type'] = aug['company_type'].str.strip().str.lower()

# define mapping rule
cmptype_mapping = {
    'early stage startup': 1,
    'funded startup': 2,
    'ngo': 3,
    'public sector': 4,
    'other': 5,
    'pvt ltd': 6
}
# apply the mappling rule
aug['company_type'] = aug['company_type'].map(cmptype_mapping)
aug['company_type'] = aug['company_type'].fillna(0)

# check the result
print(aug['company_type'].value_counts())

company_type
6.0    9817
0.0    6140
2.0    1001
4.0     955
1.0     603
3.0     521
5.0     121
Name: count, dtype: int64


In [63]:
import numpy as np

aug['company_size'] = aug['company_size'].str.strip().str.lower()

# define mapping rule
def cmpsize_mapping(value):
    if pd.isna(value):  # NULL
        return np.nan
    elif '-' in value:  #  "1000-4999"
        low, high = map(int, value.split('-'))
        return (low + high) / 2
    elif '/' in value:  #  "10/49"
        low, high = map(int, value.split('/'))
        return (low + high) / 2
    elif value.endswith('+'):  # "1000+"
        return int(value.rstrip('+'))
    elif value.startswith('<'):  #  "<10"
        return int(value.lstrip('<')) * 0.67
    elif value.startswith('>'):  #  ">10"
        return int(value.rstrip('>')) + 1
    else:
        return np.nan

# 应用映射函数
aug['company_size'] = aug['company_size'].apply(cmpsize_mapping)
aug['company_size'] = aug['company_size'].fillna(0)

# 检查结果
print(aug['company_size'].value_counts(dropna=False))

company_size
0.0        5938
74.5       3083
300.0      2571
10000.0    2019
29.5       1471
2999.5     1328
6.7        1308
749.5       877
7499.5      563
Name: count, dtype: int64


In [64]:
obj = (aug.dtypes == 'object')
print(obj)

aug.head()

city_development_index    False
gender                    False
relevent_experience       False
enrolled_university       False
education_level           False
major_discipline          False
experience                False
company_size              False
company_type              False
last_new_job              False
training_hours            False
target                    False
dtype: bool


Unnamed: 0,city_development_index,gender,relevent_experience,enrolled_university,education_level,major_discipline,experience,company_size,company_type,last_new_job,training_hours,target
0,0.92,1.0,1,0.0,3.0,4.0,21.0,0.0,0.0,1.0,36,1.0
1,0.776,1.0,0,0.0,3.0,4.0,15.0,74.5,6.0,5.0,47,0.0
2,0.624,3.0,0,1.0,3.0,4.0,5.0,0.0,0.0,0.0,83,0.0
3,0.789,3.0,0,0.0,3.0,2.0,0.0,0.0,6.0,0.0,52,1.0
4,0.767,1.0,1,0.0,4.0,4.0,21.0,74.5,2.0,4.0,8,0.0


### Data Processing

In [72]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession

#--- create Spark Session
spark = SparkSession.builder.appName("RandomForestPrediction").getOrCreate()

df = spark.createDataFrame(aug)

#--- Define feature columns and label
feature_cols = [col for col in df.columns if col != 'target']
label_col = 'target'

#--- using VectorAssembler to generate Vectors
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

#--- Create RandomForestClassifier model
rf = RandomForestClassifier(labelCol=label_col, featuresCol="features")

#--- creae Pipeline
pipeline = Pipeline(stages=[assembler, rf])

#--- split data
train_data, test_data = df.randomSplit([0.8, 0.2], seed=36)

#--- train
model = pipeline.fit(train_data)

#--- predict
predictions = model.transform(test_data)

#--- Evaluate the model's accuracy
evaluator = BinaryClassificationEvaluator(labelCol=label_col)
auc = evaluator.evaluate(predictions)
print(f"Area Under ROC: {auc}")

# 停止SparkSession
spark.stop()

Area Under ROC: 0.789119775640068
