In [0]:
license_email = dbutils.widgets.get("license_email") #email address linked with your dataheroes account
tree_path = dbutils.widgets.get("tree_path")# dbfs path to a volume which will store the coreset data and metadata

# Binary Classification Example using GBTClassifier with DataHeroes Spark Library


The Spark MLlib Pipelines API provides a higher-level API built on top of DataFrames for constructing ML pipelines.
You can read more about the Pipelines API in the [MLlib programming guide](https://spark.apache.org/docs/latest/ml-guide.html).

The **dh_spark** library leverages the [`DataHeroes`](https://dataheroes.ai/) library inside a Spark cluster for building coreset trees in a distributed environment.

The following example is derived from the [Databricks Binary Classification Example](https://docs.databricks.com/aws/en/notebooks/source/binary-classification.html).


## Installing the dh_spark Library

To use the `dh_spark` library, you can install it in your Databricks cluster in one of the following ways:

1. **Cluster Library Tab**: Upload the provided `.whl` file to your cluster via the Libraries tab in the Databricks UI.

2. **Magic Command**: Use the `%pip` magic command to install the library directly in your notebook. For example:
```
%pip install /dbfs/path/to/your/dh_spark_library.whl\
```

Make sure to replace `/dbfs/path/to/your/dh_spark_library.whl` with the actual path to the `.whl` file in your Databricks File System (DBFS).

## Dataset Review

The Adult dataset is publicly available at the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Adult).
This data derives from census data and consists of information about 48,842 individuals and their annual income.
You can use this information to predict if an individual earns **<=50K or >50K** a year.
The dataset consists of both numeric and categorical variables.

Attribute Information:

- age: continuous
- workclass: Private, Self-emp-not-inc, Self-emp-inc, Federal-gov, Local-gov, State-gov, Without-pay, Never-worked
- fnlwgt: continuous
- education: Bachelors, Some-college, 11th, HS-grad, Prof-school, Assoc-acdm, Assoc-voc...
- education-num: continuous
- marital-status: Married-civ-spouse, Divorced, Never-married, Separated, Widowed, Married-spouse-absent...
- occupation: Tech-support, Craft-repair, Other-service, Sales, Exec-managerial, Prof-specialty, Handlers-cleaners...
- relationship: Wife, Own-child, Husband, Not-in-family, Other-relative, Unmarried
- race: White, Asian-Pac-Islander, Amer-Indian-Eskimo, Other, Black
- sex: Female, Male
- capital-gain: continuous
- capital-loss: continuous
- hours-per-week: continuous
- native-country: United-States, Cambodia, England, Puerto-Rico, Canada, Germany...

Target/Label: - <=50K, >50K


## Load Data

The Adult dataset is available in Databricks datasets. Read in the data using the CSV data source for Spark and rename the columns appropriately.

In [0]:
%fs ls databricks-datasets/adult/adult.data

In [0]:
%fs head databricks-datasets/adult/adult.data

In [0]:
from pyspark.sql.types import DoubleType, StringType, StructField, StructType

schema = StructType([
  StructField("age", DoubleType(), False),
  StructField("workclass", StringType(), False),
  StructField("fnlwgt", DoubleType(), False),
  StructField("education", StringType(), False),
  StructField("education_num", DoubleType(), False),
  StructField("marital_status", StringType(), False),
  StructField("occupation", StringType(), False),
  StructField("relationship", StringType(), False),
  StructField("race", StringType(), False),
  StructField("sex", StringType(), False),
  StructField("capital_gain", DoubleType(), False),
  StructField("capital_loss", DoubleType(), False),
  StructField("hours_per_week", DoubleType(), False),
  StructField("native_country", StringType(), False),
  StructField("income", StringType(), False)
])
dataset = spark.read.format("csv").schema(schema).load("/databricks-datasets/adult/adult.data")
cols = dataset.columns

In [0]:
display(dataset)

In [0]:
### Randomly split data into training and test sets. set seed for reproducibility
### Train data is used for building the tree. This will be passed to the coreset tree service in order to build the tree.
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed=100)
print(trainingData.count())
print(testData.count())
num_instances = trainingData.count()

## Preprocess Data

To use algorithms like Gradient Boosting Classifier, you must first convert the categorical variables in the dataset into numeric variables.
There are two ways to do this:

* Category Indexing

  This is basically assigning a numeric value to each category from {0, 1, 2, ...numCategories-1}.
  This introduces an implicit ordering among your categories and is more suitable for ordinal variables (e.g., Poor: 0, Average: 1, Good: 2).

* One-Hot Encoding

  This converts categories into binary vectors with at most one nonzero value (e.g., (Blue: [1, 0]), (Green: [0, 1]), (Red: [0, 0])).

[StringIndexer]: http://spark.apache.org/docs/latest/ml-features.html#stringindexer
[OneHotEncoderEstimator]: https://spark.apache.org/docs/2.4.5/api/python/pyspark.ml.html?highlight=one%20hot%20encoder#pyspark.ml.feature.OneHotEncoderEstimator
[SparseVector]: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.linalg.SparseVector.html#pyspark.ml.linalg.SparseVector
[Pipeline]: https://spark.apache.org/docs/latest/ml-pipeline.html#ml-pipelines
[OneHotEncoder]: https://spark.apache.org/docs/latest/ml-features.html#onehotencoder

**The dh_spark library already has this preprocessing implemented under the `build_preprocess_from_df` method.**

In [0]:
from dataheroes.utils import activate_account
from dh_pyspark.services.coreset.dtc import CoresetTreeServiceDTC

# Activate your DataHeroes account
activate_account(license_email)

# Set the parameters for the coreset tree service
data_tuning_params = {
    "coreset_size":[0.2],
}
categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
features = list({"name": col, "categorical":True} if col in categoricalColumns else {"name":col} for col in cols if col != "income" and col != "label")# filter out income and label as they are converted to label and features
label  = "income"

 # Define data parameters
 # Those params are related to the data and it's preprocessing
data_params = {
    "target": {"name": label}, # Ensure the label is unique in the DataFrame   
    "features": features,
    'fill_value_cat': 'NNN',

}
# Initialize coreset tree service
service = CoresetTreeServiceDTC(
    spark_session=spark,
    data_params=data_params,
    chunk_size=int(num_instances//4),
    n_instances=num_instances,
    data_tuning_params=data_tuning_params,
    dhspark_path=tree_path,
)


input_df = trainingData
service.build_preprocess_from_df(spark_session=spark, input_df=input_df)


# Build the Coreset Tree
This step builds the coreset tree following the preprocessing done in the previous step.


In [0]:
service.build(spark_session=spark)

## Getting the Data from the Coreset Tree
The coreset tree is returned as a Spark DataFrame, which can be passed to any ML model following the Spark interface.

In [0]:
trainingData = service.get_coreset(spark_session=spark)

## Fit and Evaluate Models

We are using the Gradient Boosting Classifier from MLlib's classification algorithms:
  - GBTClassifier (Gradient Boosted Tree Classifier)



## Gradient Boosting Classifier

You can read more about [Gradient Boosting Classifier] from the [classification and regression] section of the MLlib Programming Guide.
In the Pipelines API, Gradient Boosting Classifier is a powerful ensemble learning method that builds a series of decision trees sequentially, with each tree correcting errors made by the previous ones.

[classification and regression]: https://spark.apache.org/docs/latest/ml-classification-regression.html
[Gradient Boosting Classifier]: https://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier

In [0]:
from pyspark.ml.classification import GBTClassifier

# Create initial GBTClassifier model
# The income label is a categorical variable with two values: <=50K and >50K
# dh_pyspark converted this to a numeric index
# The column with the preprocessed label is named by appending `index` to the label column name and the features are in a column named "features"
# This has to be passed to the GBTClassifier
gbtc = GBTClassifier(labelCol=f"{label}_index", featuresCol="features", maxIter=10, weightCol="w")

# Train model with Training Data
gbtc = gbtc.fit(trainingData)

In [0]:
# Preprocess the test data in the same way the training data was preprocessed
testData =service.auto_preprocessing(spark_session=spark, df=testData)

# Make predictions on test data using the transform() method.
# GBTClassifier.transform() will only use the 'features' column.
predictions = gbtc.transform(testData)

In [0]:
# View model's predictions and probabilities of each prediction class
# You can select any columns in the above schema to view as well
selected = predictions.select(f"{label}_index", "prediction", "probability")
display(selected)

### Use `BinaryClassificationEvaluator` and `MulticlassClassificationEvaluator` to evaluate the model.

`BinaryClassificationEvaluator` to evaluate the model's [areaUnderROC] metric.

[areaUnderROC]: https://en.wikipedia.org/wiki/Receiver_operating_characteristic#Area_under_the_curve

`MulticlassClassificationEvaluator` to evaluate the model's [F1](https://en.wikipedia.org/wiki/F-score) metric 

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Evaluate model - AUC
auc_evaluator = BinaryClassificationEvaluator(metricName="areaUnderROC", labelCol=f"{label}_index")
auc = auc_evaluator.evaluate(predictions)
print(f"Area under ROC curve: {auc}")

# Evaluate model - F1 Score
f1_evaluator = MulticlassClassificationEvaluator(labelCol=f"{label}_index", predictionCol="prediction", metricName="f1")
f1 = f1_evaluator.evaluate(predictions)
print(f"F1 Score: {f1}")