# Introduction to feature Engineering and Machine Learning with Apache Spark

Welcome to our comprehensive tutorial on Data Engineering and Machine Learning using Apache Spark. In this interactive guide, we'll dive deep into the principles of data engineering and explore how to apply machine learning techniques to real-world data. Our focus will be on leveraging Apache Spark, a powerful open-source distributed computing system designed for fast computation.

## Objectives

- **Understand feature Engineering for time series data:** Learn the essentials of data engineering, including data loading, preprocessing, and feature engineering, using Spark's DataFrame API. Discover how to prepare data effectively for analytical or machine learning applications.

- **Explore Machine Learning Pipelines:** Gain hands-on experience with building and evaluating machine learning models. Understand how to construct pipelines for streamlined data transformation and model training.

- **Practical Applications:** Apply what you've learned to a dataset, performing feature engineering tasks and use machine learning models to solve real-world problems.





In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

In [2]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Paypark Data Engineering and ML Tutorial") \
    .getOrCreate()

# Load the dataset
df = spark.read.parquet("./processed.pq")
# Display the first few rows to verify the data is loaded correctly
df.show(5)


Exception: Java gateway process exited before sending its port number

In [3]:
df = spark.read.parquet("./processed.pq").withColumn(
    "target", F.when(F.col("label") != "Benign", 1).otherwise(0)
)
df.show(5)

+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+---------+----------+----------+----------+-------+---------+-------------+---------+-------------+---------+--------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------+
|                 ts|               uid|      source_ip|source_port|        dest_ip|dest_port|proto|service| duration|orig_bytes|resp_bytes|conn_state|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes|    label|      detailed-label|                 dt|                day|               hour|             minute|             second|target|
+-------------------+------------------+---------------+-----------+---------------+---------+-----+-------+---------+----------+----------+----------+-------+---------+-------------+---------+-------------+---------+--------------------+-------------------+-------------------+-------------------+----------

## Feature Engineering and Data Preparation

After loading our initial dataset, the next crucial step in our data engineering journey involves feature engineering. This process is essential for enhancing the performance of machine learning models by creating new features from the existing data.


Once our data is adequately prepared, we will proceed to save the processed dataset. This step ensures that we can easily access our feature-engineered data for model training and evaluation, without needing to repeat these preprocessing steps.



In [4]:
from pyspark.sql.functions import col, from_unixtime
df = df.withColumn("timestamp", from_unixtime("ts"))
# First, ensure 'dt' is a timestamp if not already
df = df.withColumn("dt", col("dt").cast("timestamp"))

In [5]:
from typing import Optional
from pyspark.sql.functions import col, unix_timestamp, from_unixtime, window

def mins_to_secs(mins):
    """Convert minutes to seconds."""
    return mins * 60



# Then, redefine generate_window function to accommodate timestamp operations correctly
def generate_window(window_in_minutes: int, partition_by: str, timestamp_col: str):
    return Window.partitionBy(partition_by) \
                 .orderBy(unix_timestamp(col(timestamp_col))) \
                 .rangeBetween(-mins_to_secs(window_in_minutes), 0)



def generate_rolling_aggregate(col: str, partition_by: Optional[str] = None, operation: str = "count", 
                               timestamp_col: str = "timestamp", window_in_minutes: int = 1):
    """Generate rolling aggregate based on the specified operation."""
    window_spec = generate_window(window_in_minutes, partition_by if partition_by else col, timestamp_col)
    operations = {
        "count": F.count,
        "sum": F.sum,
        "avg": F.avg,
        "max": F.max,
        "min": F.min,
        "stddev": F.stddev,
        "variance": F.variance
        # Extend with other operations as needed
    }
    
    if operation in operations:
        return operations[operation](col).over(window_spec)
    else:
        raise ValueError(f"Operation {operation} is not supported. Supported operations: {list(operations.keys())}")


In [6]:
df = df.withColumns({
    "source_ip_count_last_min": generate_rolling_aggregate(col="source_ip", operation="count", timestamp_col="dt", window_in_minutes=1),
    "source_ip_count_last_30_mins": generate_rolling_aggregate(col="source_ip", operation="count", timestamp_col="dt", window_in_minutes=30),
    "source_port_count_last_min": generate_rolling_aggregate(col="source_port", operation="count", timestamp_col="dt", window_in_minutes=1),
    "source_port_count_last_30_mins": generate_rolling_aggregate(col="source_port", operation="count", timestamp_col="dt", window_in_minutes=30),
    "dest_ip_count_last_min": generate_rolling_aggregate(col="dest_ip", operation="count", timestamp_col="dt", window_in_minutes=1),
    "dest_ip_count_last_30_mins": generate_rolling_aggregate(col="dest_ip", operation="count", timestamp_col="dt", window_in_minutes=30),
    "dest_port_count_last_min": generate_rolling_aggregate(col="dest_port", operation="count", timestamp_col="dt", window_in_minutes=1),
    "dest_port_count_last_30_mins": generate_rolling_aggregate(col="dest_port", operation="count", timestamp_col="dt", window_in_minutes=30),
    "source_ip_avg_pkts_last_min": generate_rolling_aggregate(col="orig_pkts", partition_by="source_ip", operation="avg", timestamp_col="dt", window_in_minutes=1),
    "source_ip_avg_pkts_last_30_mins": generate_rolling_aggregate(col="orig_pkts", partition_by="source_ip", operation="avg", timestamp_col="dt", window_in_minutes=30),
    "source_ip_avg_bytes_last_min": generate_rolling_aggregate(col="orig_ip_bytes", partition_by="source_ip", operation="avg", timestamp_col="dt", window_in_minutes=1),
    "source_ip_avg_bytes_last_30_mins": generate_rolling_aggregate(col="orig_ip_bytes", partition_by="source_ip", operation="avg", timestamp_col="dt", window_in_minutes=30),
})

In [7]:
df.show(5)

+-------------------+------------------+-------------+-----------+---------------+---------+-----+-------+---------+----------+----------+----------+-------+---------+-------------+---------+-------------+------+--------------+-------------------+-------------------+-------------------+-------------------+-------------------+------+-------------------+------------------------+----------------------------+--------------------------+------------------------------+----------------------+--------------------------+------------------------+----------------------------+---------------------------+-------------------------------+----------------------------+--------------------------------+
|                 ts|               uid|    source_ip|source_port|        dest_ip|dest_port|proto|service| duration|orig_bytes|resp_bytes|conn_state|history|orig_pkts|orig_ip_bytes|resp_pkts|resp_ip_bytes| label|detailed-label|                 dt|                day|               hour|             minute|

Now,execute and save the resulting table into a new parquet file

In [8]:
df.write.mode("overwrite").parquet("feature_engineered.pq")

In [9]:
df_c = spark.read.parquet("feature_engineered.pq")

### Preprocessing

In [10]:
numerical_features = [
    "duration",
    "orig_bytes",
    "resp_bytes",
    "orig_pkts",
    "orig_ip_bytes",
    "resp_pkts",
    "resp_ip_bytes",
    "source_ip_count_last_min",
    "source_ip_count_last_30_mins",
    "source_port_count_last_min",
    "source_port_count_last_30_mins",
    "source_ip_avg_pkts_last_min",
    "source_ip_avg_pkts_last_30_mins",
    "source_ip_avg_bytes_last_min",
    "source_ip_avg_bytes_last_30_mins",
]
categorical_features = ["proto", "service", "conn_state", "history"]
categorical_features_indexed = [c + "_index" for c in categorical_features]

input_features = numerical_features + categorical_features_indexed

In [11]:
from pyspark.sql.functions import col, when, lit

# Assuming df is your DataFrame and categorical_features are defined
for c in categorical_features:
    # Identify frequent values for the feature 'c'
    frequent_values = (
        df.groupby(c)
        .count()
        .filter(col("count") > 100)  # Threshold for considering a category as 'frequent'
        .select(c)
    )
    
    # Collect frequent values to a Python list
    frequent_values_list = [row[c] for row in frequent_values.collect()]
    
    # Replace rare categories with 'Other'
    df_c = df_c.withColumn(
        c, 
        when(col(c).isin(frequent_values_list), col(c)).otherwise(lit("Other"))
    )


In [14]:
df_c.select([F.count_distinct(c) for c in categorical_features]).show()

+---------------------+-----------------------+--------------------------+-----------------------+
|count(DISTINCT proto)|count(DISTINCT service)|count(DISTINCT conn_state)|count(DISTINCT history)|
+---------------------+-----------------------+--------------------------+-----------------------+
|                    3|                      4|                         8|                     23|
+---------------------+-----------------------+--------------------------+-----------------------+



## Train/Test Split
Train test split will need to be done using the source IP address, otherwise we risk leaking data. The best way to do this is by splitting the IP addresses at random, and then filtering the data frame according to the IP address.

In [15]:
df_c.groupby("source_ip").agg(F.sum(F.col("target")).alias("bad_sum")).orderBy("bad_sum", ascending=False).show(5)

+---------------+-------+
|      source_ip|bad_sum|
+---------------+-------+
|192.168.100.103| 539473|
|    192.168.2.5| 151566|
|    192.168.2.1|      1|
|  192.168.100.1|      0|
|  219.250.49.64|      0|
+---------------+-------+
only showing top 5 rows



In [16]:
from pyspark.sql.functions import col, lit, when

# Assuming df_c is your original DataFrame
# Define the IPs of interest
malicious_ip_for_training = "192.168.100.103"
malicious_ip_for_testing = "192.168.2.5"
additional_ip_for_testing = "192.168.2.1"

# Exclude these specific IPs from the initial sampling
excluded_ips = [malicious_ip_for_training, malicious_ip_for_testing, additional_ip_for_testing]
train_ips_sample = (
    df_c.filter(~col("source_ip").isin(excluded_ips))
    .select("source_ip")
    .distinct()
    .sample(False, 0.8, seed=42)  # Using sample with a seed for reproducibility
    .withColumn("train_flag", lit(1))  # Use a different column name for train flag
)

# Join the sampled IPs with a flag indicating training set membership
df_c = df_c.join(train_ips_sample, on="source_ip", how="left_outer")

# Resolve ambiguity by explicitly modifying 'train_flag'
# Include the specific IPs in the training or testing set as needed
df_c = df_c.withColumn("train_flag",
                       when(col("source_ip") == malicious_ip_for_training, lit(1))
                       .when(col("source_ip").isin([malicious_ip_for_testing, additional_ip_for_testing]), lit(0))
                       .otherwise(col("train_flag"))
                      )

# Finally, create the train and test sets based on the 'train_flag'
df_train = df_c.where((col("train_flag") == 1) | (col("source_ip") == malicious_ip_for_training))
df_test = df_c.where((col("train_flag").isNull() | (col("train_flag") == 0)) | (col("source_ip") == malicious_ip_for_testing) | (col("source_ip") == additional_ip_for_testing))

# Optionally, clean up by dropping the 'train_flag' column if it's no longer needed
df_train = df_train.drop("train_flag")
df_test = df_test.drop("train_flag")


# Building a Machine Learning Pipline in PySpark

This guide outlines the construction of a machine learning pipeline using PySpark, specifically for binary classification tasks. The pipeline integrates preprocessing, model training, and hyperparameter tuning stages tailored for datasets with both categorical and numerical features.

## Pipeline Components

1. **RandomForest Classifier**: 
   - `RandomForestClassifier` is chosen as the machine learning model, configured to predict a binary target variable. It's initialized with `numTrees=100`, indicating the ensemble will consist of 100 decision trees.

2. **String Indexer**:
   - `StringIndexer` is used to convert categorical features into numerical indices. 

3. **Vector Assembler**:
   - The first `VectorAssembler` stage combines raw numerical features into a single vector, which is a required format for Spark's ML algorithms.

4. **Standard Scaler**:
   - `StandardScaler` standardizes features by removing the mean and scaling to unit variance. This is particularly important for algorithms sensitive to feature scale, but it's also a good practice to standardize data for general numerical stability.

5. **Updated Vector Assembler**:
   - A second `VectorAssembler` stage creates a final feature vector that combines the standardized numerical features and the indexed categorical features. This consolidated feature vector is then used for model training.

## Hyperparameter Tuning and Model Evaluation

- **Parameter Grid**:
  - A `ParamGridBuilder` is used to define a grid of hyperparameters for tuning the RandomForest model. This example varies `numTrees` and `maxDepth` to find the best model configuration.
  
- **Cross Validator**:
  - `CrossValidator` performs hyperparameter tuning and model selection. It evaluates different model configurations by cross-validation, using the parameter grid defined earlier. The process involves splitting the training data into a specified number of folds (here, `numFolds=3`), training the model on `n-1` folds, and evaluating it on the remaining fold. This cycle repeats for each combination of parameters in the grid to identify the best model.
  
- **Model Training**:
  - The pipeline, including the RandomForest model and all preprocessing stages, is fit to the training data. The fitting process applies the preprocessing transformations and trains the model on the transformed data.

- **Binary Classification Evaluator**:
  - The performance of the best model found by cross-validation is evaluated using a `BinaryClassificationEvaluator`. This evaluator can compute various binary classification metrics, such as area under the ROC curve, to assess model performance.



In [17]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.functions import col
from pyspark.ml.feature import StandardScaler

# Define the stages of the pipeline
rf = RandomForestClassifier(featuresCol="features", labelCol="target", numTrees=100)

ind = StringIndexer(inputCols=categorical_features, outputCols=categorical_features_indexed, handleInvalid='keep')
# Assemble numerical features and standardize
va = VectorAssembler(inputCols=numerical_features, outputCol="numerical_features")
scaler = StandardScaler(inputCol="numerical_features", outputCol="scaled_numerical_features", withStd=True, withMean=False)

# Updated VectorAssembler to include standardized numerical features and indexed categorical features
va2 = VectorAssembler(inputCols=["scaled_numerical_features"] + categorical_features_indexed, outputCol="features")

# Define the stages of the pipeline including the scaler
pipeline = Pipeline(stages=[ind, va, scaler, va2, rf])


# Define a parameter grid for hyperparameter tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .build()

# Define a cross-validator for hyperparameter tuning
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="target"),
                          numFolds=3)
# Fit the cross-validator to the training data
cvModel = crossval.fit(df_train)




In [19]:
# Access the best model from the cross-validation process
bestModel = cvModel.bestModel

# Assuming the RandomForestClassifier is the last stage in your pipeline,
# you can access it like this:
rfModel = bestModel.stages[-1]  # Adjust index based on your pipeline's structure

# Now, print the hyperparameters of the RandomForest model
# For example, to print the number of trees and max depth:
print(f"Best number of trees: {rfModel.getNumTrees}")
print(f"Best max depth: {rfModel.getMaxDepth()}")


Best number of trees: 100
Best max depth: 10


In [21]:
# Predictions on the training data
predictions = cvModel.transform(df_train)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="target")
accuracy = evaluator.evaluate(predictions)

print("Test Accuracy:", accuracy)



Test Accuracy: 0.956762461463573


In [27]:
from pyspark.ml.feature import VectorAssembler
# Feature Importance (assuming RandomForest is the last stage in the best model)

# Access the best model from CrossValidator
bestPipelineModel = cvModel.bestModel

# Assuming RandomForest is the last stage in the pipeline
rfModel = bestPipelineModel.stages[-1]

# Feature names: This part is tricky because VectorAssembler doesn't store feature names
# If you know the order and names of the features you can create this list manually
# For demonstration, assuming we have a list of feature names
featureNames = numerical_features + categorical_features  # Adjust accordingly

# Retrieve feature importances
importances = rfModel.featureImportances

# Convert feature importances to a list, matching them with feature names
# Note: This step assumes the direct correlation of feature indices with the names list
importanceList = [(featureNames[i], importances[i]) for i in range(len(featureNames))]

# Create a DataFrame from the list for better visualization and sort by importance
importanceDF = pd.DataFrame(importanceList, columns=["Feature", "Importance"]).sort_values(by="Importance", ascending=False)

importanceDF


Unnamed: 0,Feature,Importance
18,history,0.314674
15,proto,0.233254
10,source_port_count_last_30_mins,0.193774
9,source_port_count_last_min,0.141115
4,orig_ip_bytes,0.075309
3,orig_pkts,0.016193
0,duration,0.011925
2,resp_bytes,0.005079
1,orig_bytes,0.003662
17,conn_state,0.001763


### evaluation on test data

In [28]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.functions import col

#  df_test is the test dataset and cvModel is the trained model

# Make predictions on the test data
test_preds = cvModel.transform(df_test)

# Evaluate ROC AUC and PR AUC
roc_evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
pr_evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderPR")

roc_auc = roc_evaluator.evaluate(test_preds)
pr_auc = pr_evaluator.evaluate(test_preds)

# Display the evaluation metrics
print(f"ROC AUC: {roc_auc:.4f}")
print(f"PR AUC: {pr_auc:.4f}")





ROC AUC: 0.8825
PR AUC: 0.9819


# Comparing Model Performance in PySpark

To enhance our machine learning pipeline and select the best model for our binary classification task, we aim to compare the performance of RandomForest with three other popular algorithms: CatBoost, XGBoost, and Logistic Regression. This comparison will provide insights into which model is most suitable for our dataset, considering various performance metrics.

## Preparing the Data

Ensure that the dataset is preprocessed appropriately, handling both categorical and numerical features to meet the requirements of each model. Consistent data preparation is crucial for a fair comparison.

## Setting Up Models

### RandomForest
- Already implemented in the PySpark MLlib and part of the initial pipeline.

### Logistic Regression
- Use `pyspark.ml.classification.LogisticRegression`.
- Can be seamlessly integrated into the Spark pipeline as a direct replacement for RandomForest.

### XGBoost
- Requires integrating the [Spark XGBoost](https://github.com/dmlc/xgboost/tree/master/jvm-packages) project.
- Import `ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier` and configure similarly to RandomForest.

### CatBoost
- Does not have a native Spark integration.
- Use local CatBoost training within a Spark UDF or the [CatBoost Spark package](https://github.com/catboost/catboost/tree/master/catboost/spark), if available.

## Hyperparameter Tuning

Define parameter grids for each model, considering the unique hyperparameters relevant to each. Utilize `CrossValidator` for Spark models or model-specific tuning methods for external models like CatBoost and XGBoost.

## Evaluation

Evaluate each model's performance using `BinaryClassificationEvaluator` or other relevant evaluators, focusing on metrics such as area under the ROC curve, accuracy, precision, and recall. This step is crucial for understanding each model's strengths and weaknesses.




### Logistic Regression

In [30]:
from pyspark.ml.classification import LogisticRegression

# Logistic Regression model
lr = LogisticRegression(featuresCol="features", labelCol="target")

# Pipeline with Logistic Regression instead of RandomForest
pipeline_lr = Pipeline(stages=[ind, va, scaler, va2, lr])

# Hyperparameter tuning and cross-validation setup could be similar to RandomForest


lrModel  = pipeline_lr.fit(df_train)
test_preds = lrModel.transform(df_test)


In [32]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

roc = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
print("ROC AUC", roc.evaluate(test_preds))

pr = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderPR")
print("PR AUC", pr.evaluate(test_preds))

ROC AUC 0.8698669049284795
PR AUC 0.990320848399635


### XGBoost 

In [58]:
!!pip install --user sparkxgb


 'Collecting sparkxgb',
 '  Using cached sparkxgb-0.1-py3-none-any.whl',
 'Collecting pyspark==3.1.1 (from sparkxgb)',
 '  Using cached pyspark-3.1.1-py2.py3-none-any.whl',
 'Installing collected packages: pyspark, sparkxgb',
 'Successfully installed pyspark-3.1.1 sparkxgb-0.1']

In [64]:
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from sparkxgb import XGBoostClassifier

# Create an XGBoostClassifier
xgb = XGBoostClassifier(
    featuresCol="features", 
    labelCol="target", 
    objective="binary:logistic"
)

# Create a pipeline
pipeline = Pipeline(stages=[ind, va, xgb])

# Fit the pipeline to the training data
model = pipeline.fit(df_train)

# Make predictions on the test data
test_preds = model.transform(df_test)

# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(test_preds)
print("ROC AUC:", roc_auc)


ModuleNotFoundError: No module named 'sparkxgb'

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from sparkxgb import XGBoostClassifier

# Define categorical features, numerical features, and indexed categorical features as you did before
# ...

# Create a feature vector assembler
va = VectorAssembler(inputCols=input_features, outputCol="features", handleInvalid='skip')

# Create an XGBoostClassifier
xgb = XGBoostClassifier(featuresCol="features", labelCol="target", numRound=100)

# Create a pipeline with the stages
pipeline = Pipeline(stages=[ind] + [va, xgb])


ModuleNotFoundError: No module named 'sparkxgb'

In [None]:
from mmlspark.xgboost import XGBoostClassifier

xgb = XGBoostClassifier(
    featuresCol="features", 
    labelCol="target", 
    objective="binary:logistic"
)

# Proceed with pipeline and cross-validation setup


ModuleNotFoundError: No module named 'mmlspark'

#### catboost

In [None]:
#!pip install catboost # optioanal install required 

In [None]:
from catboost import CatBoostClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

# Define categorical features, numerical features, and indexed categorical features as you did before
categorical_features = ["proto", "service", "conn_state", "history"]
categorical_features_indexed = [c + "_index" for c in categorical_features]
input_features = numerical_features + categorical_features_indexed



from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

class CatBoostClassifierWrapper(Transformer, HasInputCol, HasOutputCol):
    def __init__(self, model, inputCol, outputCol):
        super(CatBoostClassifierWrapper, self).__init__()
        self.model = model
        self.inputCol = inputCol
        self.outputCol = outputCol

    def _transform(self, dataset):
        predict_udf = udf(lambda features: float(self.model.predict_proba([features.tolist()])[0][1]), DoubleType())
        return dataset.withColumn(self.outputCol, predict_udf(dataset[self.inputCol]))

# Usage in your pipeline
catboost = CatBoostClassifier(iterations=100, depth=6, learning_rate=0.1, loss_function='Logloss')

catboost_wrapper = CatBoostClassifierWrapper(model=catboost, inputCol="features", outputCol="prediction")

# Modify your pipeline stages
pipeline = Pipeline(stages=[ind, va, catboost_wrapper])
model = pipeline.fit(df_train)



In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

roc = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderROC")
print("ROC AUC", roc.evaluate(test_preds))

pr = BinaryClassificationEvaluator(labelCol="target", metricName="areaUnderPR")
print("PR AUC", pr.evaluate(test_preds))

ROC AUC 0.8698650740921258
PR AUC 0.9903206178941372
