# Spark Labs
## Introduction 
Welcome to the Spark Labs Lab Guide! In this lab, we’ll walk through a hands-on scenario where you'll learn how to leverage Apache Spark to:

- Load data efficiently
- Save data into a table
- Clean and preprocess data using Spark's parallel processing capabilities
- Train a simple Logistic Regression classification model on the processed data

### Spark Labs Benefits:
Before we dive into the lab, let’s take a moment to discuss the benefits of using Spark Labs with watsonx.data, especially in comparison to using external notebooks that connects to watsonx.data.


1.  No Dependency Management:
    - External Notebooks: When working in an external notebook, you'd typically run your code with the dependencies installed on your own computer. This can lead to version mismatches, dependency conflicts, or other setup challenges.
    - watsonx.data: Since Spark Labs runs directly within the watsonx.data environment, you don’t need to worry about managing or troubleshooting dependencies. All the necessary packages and configurations are preconfigured on the server, ensuring consistency and saving you time.
  

2.  Easier Configuration:
    - External Notebooks: Setting up an external environment often requires manual configuration, from installing the right versions of Spark and libraries to setting up paths and environment variables.
    - watsonx.data: With watsonx.data, the environment is preconfigured for you. You can jump straight into coding without spending time on setup, making the process much faster and smoother.
  

3.  Integrated Development Environment:
    - External Notebooks: Using external notebooks can require managing different tools and platforms to run Spark, track results, and visualize data, which can be disjointed and time-consuming.
    - watsonx.data: In this environment, everything you need is integrated into a single platform. You can manage your notebooks, run Spark jobs, and view results all in real time, without switching between different tools.


4. Easy Collaboration:
   - External Notebooks: Collaboration can be cumbersome when working in local environments. Sharing notebooks may involve version control, syncing, or exporting and sending files manually.
   - watsonx.data: With notebooks stored directly on the server, collaboration is seamless. You can easily share your work with colleagues and stakeholders, ensuring that everyone is on the same page and able to provide feedback in real-time.

## Spark Session Setup 

### Traditional Method (External Notebooks)

Before we can start using Apache Spark, we need to set up a Spark Session. This session is crucial as it acts as the entry point for all Spark functionality, allowing us to interact with Spark’s powerful distributed computing capabilities.

In external environments, the setup typically involves configuring several options like the application name, cluster manager, memory allocation, and more. For example:
```python
conf = SparkConf() \
    .setAppName("ComplexSparkSessionSetup") \
    .setMaster("yarn") \  # Using YARN as the cluster manager
    .set("spark.sql.warehouse.dir", "/user/hive/warehouse") \  # Custom warehouse location
    .set("spark.sql.shuffle.partitions", "500") \  # Increase shuffle partitions for performance
    .set("spark.executor.memory", "8g") \  # Allocate 8GB of memory to each executor
    .set("spark.driver.memory", "4g") \  # Allocate 4GB of memory to the driver
    .set("spark.executor.cores", "4") \  # 4 cores per executor
    .set("spark.sql.parquet.compression.codec", "snappy") \  # Use Snappy compression for Parquet files
    .set("spark.ui.port", "4041")  # Change the Spark UI port to avoid conflicts
```

However, when running within watsonx.data, this extensive configuration is unnecessary because it is automatically setup, streamlining the setup processes, saving time and avoiding the need to sift through documentation.

### Simplified Setup for Watsonx.data (Spark Labs)

In this current lab, the only configuration needed is the `fs.s3a.path.style.access` flag, which should be set to `true`. This is required because we're working with a `MinIO` bucket, and this flag ensures that Spark can properly access it.

#### Additional Useful Flags:
There are a few other configuration options that can help fine-tune your Spark session, especially for logging:

- `ae.spark.driver.log.level`: Sets the log level for the driver (e.g. `"INFO"`)
- `ae.spark.executor.log.level`: Sets the log level for the executro (e.g. `"INFO"`)

These flags are optional but can be helpful for debugging or performance tuning.

In [1]:
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings('ignore')

spark = SparkSession.builder.appName('sparky').getOrCreate()
conf=spark.sparkContext.getConf()
spark.stop()

conf.setAll([
    ("fs.s3a.path.style.access", "true"),
    # ("ae.spark.driver.log.level", "INFO"),
    # ("ae.spark.executor.log.level", "INFO"),
])

spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/10 14:38:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/10 14:38:27 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
25/03/10 14:38:35 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).


To verify that spark session and see the current version of spark, we can run:

In [2]:
spark.version

'3.4.2'

## View Database and Creating Schemas

Watsonx.data follows a cascading architecture, structured in a hierarchical form: `Catalog` > `Schema` > `Table`. This topology helps organize the data in a way that is intuitive and easy to navigate.

- `Catalog`: The top level in the architecture. Each Catalog contains multiple Schemas. It acts as a container for related datasets.
- `Schema`: The middle level. A Schema organizes multiple Tables within a catalog. It serves as a namespace for tables, often reflecting logical groupings of data.
- `Table`: The bottom level, where actual data resides. Each Table follows a specific schema definition and contains the data, often in a structured format such as rows and columns.

### Syntax for working with the hierachy:

In Watsonx.data, the hierarchy of `Catalogs`, `Schemas`, and `Tables` is accessed using `dot notation`. This makes it easy to specify and query data at various levels of the hierarchy.

For example, if you wanted to run queries on the Table `exampleTable` within the Schema `exampleSchema`, which is contained in the Catalog `exampleCatalog`, you would refer to it using the following syntax:
```python
exampleCatalog.exampleSchema.exampleTable
```

### Viewing our Catalogs
To view the available `Catalogs` in Watsonx.data, you can use the following approach:

In [3]:
spark.sql("""
SHOW CATALOGS
""").show()

# Note: the .show() method displays outputs in a human readable table format as seen below

25/03/10 14:38:51 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(scavenge), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
25/03/10 14:38:51 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(global, scavenge), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


+-------------+
|      catalog|
+-------------+
|spark_catalog|
+-------------+



### Viewing Schemas within a Catalog:
To view the `Schemas` within a catalog, you can use the following approach:

In [4]:
spark.sql("""
SHOW SCHEMAS IN iceberg_data
""").show()

25/03/10 14:39:18 WARN HiveConf: HiveConf of name hive.metastore.truststore.type does not exist


+--------------------+
|           namespace|
+--------------------+
|             default|
|wxd_system_data_d...|
+--------------------+



### Creating a New Schema
Before we proceed with data ingestion, we need to create a new `Schema` where the newly created `Table` will reside.

In this case, we will create a `Schema` called `demo` within the `Catalog` `iceberg_data`. Using dot notation, the full reference to this Schema will be `iceberg_data.demo`.

To create the new `Schema`, run the following SQL command:

In [5]:
spark.sql("""
CREATE SCHEMA IF NOT EXISTS iceberg_data.demo
""")

DataFrame[]

Now we'll double check to see if the `Schema` was created correctly:

In [6]:
spark.sql("""
SHOW SCHEMAS IN iceberg_data
""").show()

+--------------------+
|           namespace|
+--------------------+
|             default|
|wxd_system_data_d...|
|                demo|
+--------------------+



As you can see there is now a `demo` in the returned table, so the Schema creation was successful and we can move on to the next step.

## Data Injestion

Now that we have the needed `Catalog` and `Schema` setup, we can now move onto injesting data.

### Dataset Overview

The dataset we'll be using is an ensambled **Heart Disease Dataset**, which consists of a combination of the 5 popular heart disease datasets:

 - Cleveland
 - Hungarian
 - Switzerland
 - Long Beach VA
 - Statlog (Heart) Data Set.

The dataset consists of 1190 rows, with 11 features and 1 target:

| Feature Name          | Description                                                                   |Data Type  | Type                  |
|-----------------------|-------------------------------------------------------------------------------|-----------|-----------------------|
| age                   | Age of the patient                                                            | int       | Numeric               |
| sex                   | Gender of the patient (1 = male, 0 = female)                                  | int       | Categorical (binary)  |
| chest pain type       | Type of chest pain (values: 1, 2, 3, 4)                                       | int       | Categorical (ordinal) |
| resting bp s          | Resting blood pressure                                                        | int       | Numeric               |
| cholesterol           | Serum cholesterol in mg/dl                                                    | int       | Numeric               |
| fasting blood sugar   | Fasting blood sugar > 120 mg/dl (1 = true, 0 = false)                         | int       | Categorical (binary)  |
| resting ecg           | Resting electrocardiographic results (values: 0, 1, 2)                        | int       | Categorical (ordinal) |
| max heart rate        | Maximum heart rate                                                            | int       | Numeric               |
| exercise angina       | Exercise-induced angina (1 = yes, 0 = no)                                     | int       | Categorical (binary)  |
| old peak              | Depression induced by exercise relative to rest (numeric value)	            | float     | Numeric               |
| ST slope              | Slope of the peak exercise ST segment (values: 1, 2, 3)	                    | int       | Categorical (ordinal) |
| target                | Presence or absence of heart disease (1 = disease present, 0 = no disease)    | int       | Categorical (binary)  |

The data is stored in `.csv` format.

### Importing Data from a CSV File

First we'll setup the path to our file, in this example we'll be storing the data in the `/data` folder:

In [9]:
import os

loc = os.path.abspath('')
data_loc = f"{loc}/heart_data.csv"

The dataset filename is called: `heart_data.csv` therefore we append that to the path and read the file into a DataFrame

In [10]:
# Load Data
df = spark.read.csv(f'{data_loc}', header=True)

25/03/10 14:40:35 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
                                                                                

You can show the top two rows of the dataframe by running the `.show()` method:

In [11]:
df.show(2)

                                                                                

+---+---+---------------+------------+-----------+-------------------+-----------+--------------+---------------+-------+--------+------+
|age|sex|chest pain type|resting bp s|cholesterol|fasting blood sugar|resting ecg|max heart rate|exercise angina|oldpeak|ST slope|target|
+---+---+---------------+------------+-----------+-------------------+-----------+--------------+---------------+-------+--------+------+
| 40|  1|              2|         140|        289|                  0|          0|           172|              0|    0.0|       1|     0|
| 49|  0|              3|         160|        180|                  0|          0|           156|              0|    1.0|       2|     1|
+---+---+---------------+------------+-----------+-------------------+-----------+--------------+---------------+-------+--------+------+
only showing top 2 rows



You can also show the schema of the dataframe by running the `.printSchema()` method:

In [12]:
df.printSchema()

root
 |-- age: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- chest pain type: string (nullable = true)
 |-- resting bp s: string (nullable = true)
 |-- cholesterol: string (nullable = true)
 |-- fasting blood sugar: string (nullable = true)
 |-- resting ecg: string (nullable = true)
 |-- max heart rate: string (nullable = true)
 |-- exercise angina: string (nullable = true)
 |-- oldpeak: string (nullable = true)
 |-- ST slope: string (nullable = true)
 |-- target: string (nullable = true)



One issue we need to address is that the columns in the currently imported DataFrame are all of type string, which isn't what we want (this is because csv data doesn't include types, therefore everything is assumed to be a string). We need the data types to match those defined in the table above, so we'll have to cast the columns to their appropriate types.

In [13]:
from pyspark.sql.functions import col

# Convert the columns to the appropriate data types
df_transformed = df.withColumn("age", col("age").cast("int")) \
                   .withColumn("sex", col("sex").cast("int")) \
                   .withColumn("chest pain type", col("chest pain type").cast("int")) \
                   .withColumn("resting bp s", col("resting bp s").cast("int")) \
                   .withColumn("cholesterol", col("cholesterol").cast("int")) \
                   .withColumn("fasting blood sugar", col("fasting blood sugar").cast("int")) \
                   .withColumn("resting ecg", col("resting ecg").cast("int")) \
                   .withColumn("max heart rate", col("max heart rate").cast("int")) \
                   .withColumn("exercise angina", col("exercise angina").cast("int")) \
                   .withColumn("oldpeak", col("oldpeak").cast("float")) \
                   .withColumn("target", col("target").cast("int")) \
                   .withColumn("ST slope", col("ST slope").cast("int"))


Now we can print out the new transformed DataFrame, and we see that the types are now correct:

In [14]:
df_transformed.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- chest pain type: integer (nullable = true)
 |-- resting bp s: integer (nullable = true)
 |-- cholesterol: integer (nullable = true)
 |-- fasting blood sugar: integer (nullable = true)
 |-- resting ecg: integer (nullable = true)
 |-- max heart rate: integer (nullable = true)
 |-- exercise angina: integer (nullable = true)
 |-- oldpeak: float (nullable = true)
 |-- ST slope: integer (nullable = true)
 |-- target: integer (nullable = true)



### Creating the Table

To create a table in an Iceberg-managed format, you can use the following SQL statement within Spark. This ensures that the table is created with the correct schema and is managed within your Iceberg catalog.

The following code creates a `Table` called `heart_data` in the `demo` `Schema` of the iceberg_data `Catalog`, ensuring that it only gets created if it doesn't already exist:

In [15]:
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_data.demo.heart_data (
    age INT,
    sex INT,
    `chest pain type` INT,
    `resting bp s` INT,
    cholesterol INT,
    `fasting blood sugar` INT,
    `resting ecg` INT,
    `max heart rate` INT,
    `exercise angina` INT,
    oldpeak FLOAT,
    `ST slope` INT,
    target INT
)
""")


25/03/10 14:41:20 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/03/10 14:41:20 WARN VersionInfoUtils: The AWS SDK for Java 1.x entered maintenance mode starting July 31, 2024 and will reach end of support on December 31, 2025. For more information, see https://aws.amazon.com/blogs/developer/the-aws-sdk-for-java-1-x-is-in-maintenance-mode-effective-july-31-2024/
You can print where on the file system the AWS SDK for Java 1.x core runtime is located by setting the AWS_JAVA_V1_PRINT_LOCATION environment variable or aws.java.v1.printLocation system property to 'true'.
This message can be disabled by setting the AWS_JAVA_V1_DISABLE_DEPRECATION_ANNOUNCEMENT environment variable or aws.java.v1.disableDeprecationAnnouncement system property to 'true'.
The AWS SDK for Java 1.x is being used here:
at java.base/java.lang.Thread.getStackTrace(Thread.java:1188)
at com.amazonaws.util.VersionInfoUtils.printDeprecationAn

DataFrame[]

Confirm that the table was created successfully:

In [16]:
spark.sql("""
SHOW TABLES IN iceberg_data.demo
""").show()

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|     demo|heart_data|      false|
+---------+----------+-----------+



To write the data from the transformed DataFrame into the created Iceberg table, you can use the `write` method in Spark. The following steps outline how to insert the transformed DataFrame into the heart_data table created in the Iceberg catalog.

In [17]:
df_transformed.write \
    .format("iceberg") \
    .mode("append") \
    .save("iceberg_data.demo.heart_data")

                                                                                

Now lets run a query on the saved Iceberg table within watsonx.data to see if the information was saved successfully:

In [18]:
spark.sql("""
SELECT *
FROM iceberg_data.demo.heart_data
LIMIT 2
""").show()

[Stage 4:>                                                          (0 + 1) / 1]

+---+---+---------------+------------+-----------+-------------------+-----------+--------------+---------------+-------+--------+------+
|age|sex|chest pain type|resting bp s|cholesterol|fasting blood sugar|resting ecg|max heart rate|exercise angina|oldpeak|ST slope|target|
+---+---+---------------+------------+-----------+-------------------+-----------+--------------+---------------+-------+--------+------+
| 40|  1|              2|         140|        289|                  0|          0|           172|              0|    0.0|       1|     0|
| 49|  0|              3|         160|        180|                  0|          0|           156|              0|    1.0|       2|     1|
+---+---+---------------+------------+-----------+-------------------+-----------+--------------+---------------+-------+--------+------+



                                                                                

And voilà! We have successfully ingested data into watsonx.data.

## Data Cleaning

Data quality is crucial for the success of AI models, and a major part of machine learning involves cleaning the data. While our dataset contains only around 1,000 rows, in real-world applications, datasets can contain trillions of rows or tokens.

Traditional data processing methods, such as using Hadoop, typically involve reading and writing data to disk. Each read and write operation takes significant computation time, and transformations are applied sequentially.

For example, if you had only 100MB of disk space but your dataset was 1GB (1,000MB), Hadoop would work as follows: it would load 100MB of data into memory, perform transformations, then load the next 100MB, and continue this process until the entire dataset has been processed.


### The Value of Spark

Spark revolutionizes data processing by offering two key advantages:

- **In-Memory Computation:** Unlike Hadoop, which relies on disk storage, Spark loads data directly into memory. This approach is 10x to 100x faster since accessing memory is much quicker than reading from disk. This is especially advantageous for iterative algorithms, such as machine learning model training, which require repeated access to the data.

- **Parallelism:** To maximize efficiency, Spark is designed to process data in parallel across distributed systems (i.e., across many nodes in a cluster). By splitting the dataset into smaller partitions and performing operations on them simultaneously, Spark avoids the bottleneck of sequential processing. This parallel processing allows Spark to handle very large datasets that don't fit into a single machine's memory by distributing the workload across many machines.

By leveraging these capabilities, Spark can process data much faster, saving both time and resources.

### Viewing the Data

First we run a query to select the data from our `iceberg_data` on the server.

In [19]:
df_server = spark.sql("""
SELECT *
FROM iceberg_data.demo.heart_data
""")

The quality of a model is heavily influenced by the data it's trained on. To ensure better performance, we need to carefully examine and clean the dataset.

For instance, a dataset may contain inherent biases. If certain columns are overrepresented, the model could become biased toward those features. Additionally, missing values in the dataset can disrupt the training process and lead to inaccurate predictions or lower model performance.

While there are many aspects to consider when cleaning a dataset, for the purposes of this lab, we will only focus on handling missing values.

In this lab, we'll walk through each column of the dataset and check for any missing (`null`) values, which we will then address appropriately to ensure a cleaner and more reliable dataset for model training.



In [20]:
from pyspark.sql.functions import col

# List of columns with null values
null_columns = [c for c in df_server.columns if df.filter(col(c).isNull()).count() > 0]

print("Columns with null values:", null_columns)

                                                                                

Columns with null values: ['chest pain type']


From the output, we can see that the column with null values is the chest pain type column.

Before we address this, there's another important point about Spark. Up until now, we've mainly been running queries to work with the data. However, Spark also comes with a rich set of built-in data processing methods that are not only simple to use but also highly efficient. These methods can often be more convenient and performant than SQL queries, especially when working with large datasets.

For the task of handling the null values in the chest pain type column, I'll demonstrate how to approach this in two ways: first using SQL query syntax, and second using Spark's built-in data processing methods. This will give us the flexibility to choose the most suitable approach based on the context.

### Cleaning the Data (SQL)

First we will need to see how many `null` values this column contains:

- If its a small number, then we can directly remove those rows
- If its a large number, then we can remove the column all togehter

In [22]:
print("Total Samples:")
spark.sql("SELECT count(*) AS count FROM iceberg_data.demo.heart_data").show()
print("Null Samples:")
spark.sql("SELECT count(*) AS count FROM iceberg_data.demo.heart_data AS t WHERE t.`chest pain type` IS NULL").show()

Total Samples:
+-----+
|count|
+-----+
| 1190|
+-----+

Null Samples:
+-----+
|count|
+-----+
|    5|
+-----+



As we can see there are only 5 rows with `null` values, therefore we'll go with the first option and remove those rows all together

In [24]:
df_server_cleaned = spark.sql("""
                      SELECT *
                      FROM iceberg_data.demo.heart_data AS t 
                      WHERE t.`chest pain type` IS NOT NULL
                      """)

And that's it! Now I will show the method using Sparks built in data processing methods.

### Cleaning the Data (Spark Methods)

The steps will be the same, first seeing how many rows there are and how mnay contain `null` values:

In [25]:
print("Total Samples:")
print(df_server.count())
print("Null Samples:")
print(df_server.filter(df_server["`chest pain type`"].isNull()).count())

Total Samples:
1190
Null Samples:
5


Then dropping the null values:

In [26]:
df_server = df_server.dropna()

As you can see, Spark's built-in methods result in much shorter and cleaner code. These methods leverage the df_server variable, which has already been queried and stored in memory. This makes the approach more efficient, as it's operating on the cached data in memory rather than querying the data on the server repeatedly.

Additionally, Spark provides easy-to-use, built-in functions for common tasks like dropping null values, making the code simpler and more concise. This approach allows us to take full advantage of Spark’s capabilities while keeping the code efficient and readable.

In [27]:
print(f"Current Length: {df_server.count()}")

Current Length: 1185


### Feature Transformation and Encoding

Our classification model will be trained using a single vector that represents the data. The dataset contains two types of data: numerical and categorical.

Categorical data can be further divided into two types: binary and ordinal.

- Binary features take values of either `0` or `1`.
- Ordinal features, on the other hand, represent categories that are ordered (e.g., `0, 1, ..., x`). While these values are numeric, they should not be treated as continuous numbers because they represent distinct categories. Unlike numerical features, where we might identify relationships (e.g., identifying a threshold for resting heart rate indicating potential heart disease), ordinal features should not have relationships inferred between the numbers themselves.

For example, consider the ordinal feature height with values `0`, `1`, and `2`, corresponding to `short`, `average`, and `tall`. It doesn't make sense to infer that the model should treat `0 (short)`, `1 (average)`, and `2 (tall)` as ordered numbers. Instead, we can represent this data using one-hot encoding, where each category is represented as a binary vector: `[1, 0, 0]` for short, `[0, 1, 0]` for average, and `[0, 0, 1]` for tall. This ensures that the model sees these categories as mutually exclusive, not as a numeric range.

The next step is to convert these ordinal categorical features into one-hot encodings for better training.

From the data representation table given above, we can identify the following features as ordinal categorical features:
- Chest pain type
- Resting ECG
- ST slope

Since the binary features do not require any transformation, we will include them in the list of numerical features. These binary features already take values of 0 or 1, so they don't need further encoding.



In [28]:
cat_cols = ["chest pain type", "resting ecg", "ST slope"]
num_cols = [x for x in df_server.columns if (x not in cat_cols) & (x != "target")]

print(f"Categorical Features: {cat_cols}")
print(f"Numerical Features: {num_cols}")

Categorical Features: ['chest pain type', 'resting ecg', 'ST slope']
Numerical Features: ['age', 'sex', 'resting bp s', 'cholesterol', 'fasting blood sugar', 'max heart rate', 'exercise angina', 'oldpeak']


We can look at the distribution of these ordinal categorical features by doing the following:

In [29]:
for feat in cat_cols:
    print(f"Feature: {feat}")
    df_server.groupBy(feat).count().show()

Feature: chest pain type


                                                                                

+---------------+-----+
|chest pain type|count|
+---------------+-----+
|              2|  215|
|              3|  281|
|              4|  624|
|              1|   65|
+---------------+-----+

Feature: resting ecg


                                                                                

+-----------+-----+
|resting ecg|count|
+-----------+-----+
|          0|  679|
|          1|  181|
|          2|  325|
+-----------+-----+

Feature: ST slope


[Stage 56:>                                                         (0 + 1) / 1]

+--------+-----+
|ST slope|count|
+--------+-----+
|       1|  522|
|       2|  581|
|       3|   81|
|       0|    1|
+--------+-----+



                                                                                

As seen above, the chest pain type feature contains values: `1`, `2`, `3`, and `4`. To represent this feature using one-hot encoding, we convert each value into a binary vector. For example, the value 4 will be represented as `[0, 0, 0, 1]`, where each index corresponds to one of the four possible categories.

This way, the model will treat each category as mutually exclusive, avoiding any misleading relationships between the values.

Next we will be using Spark’s `OneHotEncoder` to transform the categorical features into one-hot encoded vectors. The `OneHotEncoder` is a part of the `pyspark.ml.feature` module, which provides useful data transformation methods tailored for machine learning workflows.

It's useful because it:
- is optimized for distributed computing.
- is a part of Spark’s MLlib (Machine Learning Library), which allows for seamless integration into machine learning pipelines.
- is easy to use.

In [30]:
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder = [
    OneHotEncoder(inputCols=cat_cols, 
                  outputCols=[f"{x}_OneHotEncoder" for x in cat_cols])
]

The output of the `OneHotEncoder` will have the encoded ordinal categorical features with the suffix `_OneHotEncoder`. These newly created columns represent the one-hot encoded version of each categorical feature, turning them into binary vectors.

Next, we'll need to combine all the selected features — both the numerical columns and the one-hot encoded categorical columns — into a single vector column using Spark's `VectorAssembler`. This step is necessary because machine learning algorithms in Spark require the input features to be in a single vector format.

In [31]:
from pyspark.ml.feature import VectorAssembler

assemblerInput = [x for x in num_cols]
assemblerInput += [f"{x}_OneHotEncoder" for x in cat_cols]

print(f"Assembler inputs are: \n{assemblerInput}")

vector_assembler = VectorAssembler(
    inputCols=assemblerInput, outputCol="VectorAssembler_features"
)

Assembler inputs are: 
['age', 'sex', 'resting bp s', 'cholesterol', 'fasting blood sugar', 'max heart rate', 'exercise angina', 'oldpeak', 'chest pain type_OneHotEncoder', 'resting ecg_OneHotEncoder', 'ST slope_OneHotEncoder']


### Splitting the Data to train and test

Now we split the data into training data and testing data with a `80/20` split: `80%` training and `20%` testing

In [32]:
train_df, test_df = df_server.randomSplit([0.8, 0.2], seed=1234)

print(f"Train data length: {train_df.count()}")
print(f"Test data length:  {test_df.count()}")

                                                                                

Train data length: 946


[Stage 58:>                                                         (0 + 1) / 1]

Test data length:  239


                                                                                

### Create a Spark Pipeline

Next we wil setup a Spark pipeline to apply a sequence of data transformations.

- `Pipeline`: A pipeline allows you to chain multiple stages of data processing into one object, making it easy to apply the same transformations to both training and test datasets.
- `stages`: This list contains the transformation steps that will be applied in order. In this case, it includes the one-hot encoding of categorical features and the assembly of features into a single vector using VectorAssembler.
- `pipeline`: The pipeline object is created and the stages are set. When applied, it will automatically execute the transformations in the correct order (first one-hot encoding, then vector assembly).
  
The benefit of using a pipeline is that it organizes the data processing steps into a streamlined, reusable workflow, ensuring consistency across datasets.

In [33]:
from pyspark.ml import Pipeline

stages = one_hot_encoder + [vector_assembler]

pipeline = Pipeline().setStages(stages)

Now, we'll fit the pipeline to the training data, learning the necessary transformations, and then apply the fitted pipeline to transform the test set, ensuring it undergoes the same preprocessing as the training data.

In [34]:
%%time

fitted_pipeline = pipeline.fit(train_df)

pp_train_df = fitted_pipeline.transform(train_df)
pp_test_df = fitted_pipeline.transform(test_df)

                                                                                

CPU times: user 40.4 ms, sys: 11.6 ms, total: 52.1 ms
Wall time: 4.4 s


Now, let's check if our transformed data, `pp_train_df`, is correct. 

Don't be alarmed by the output format. While we expected one vector with the desired values, some rows may appear as tuples because they are using a sparse vector format. This format helps save memory while still representing the same information.

In the sparse vector format:

- `Index 0`: Represents the length of the vector.
- `Index 1`: Lists the indices of the non-zero values.
- `Index 2`: Contains the actual non-zero values.
This efficient representation allows Spark to handle large datasets more effectively by only storing non-zero values.

In [35]:
pp_train_df.select(
    'VectorAssembler_features'
).show(truncate=False)

[Stage 60:>                                                         (0 + 1) / 1]

+----------------------------------------------------------------------------------+
|VectorAssembler_features                                                          |
+----------------------------------------------------------------------------------+
|(17,[0,1,2,3,5,10,15],[28.0,1.0,130.0,132.0,185.0,1.0,1.0])                       |
|(17,[0,1,2,3,5,10,15],[29.0,1.0,130.0,204.0,202.0,1.0,1.0])                       |
|(17,[0,1,2,3,5,10,15],[29.0,1.0,130.0,204.0,202.0,1.0,1.0])                       |
|(17,[0,2,3,5,9,13,15],[30.0,170.0,237.0,170.0,1.0,1.0,1.0])                       |
|(17,[0,2,3,5,10,13,15],[31.0,100.0,219.0,150.0,1.0,1.0,1.0])                      |
|(17,[0,1,2,3,5,6,7,12,16],[31.0,1.0,120.0,270.0,153.0,1.0,1.5,1.0,1.0])           |
|(17,[0,2,3,5,10,12,15],[32.0,105.0,198.0,165.0,1.0,1.0,1.0])                      |
|(17,[0,1,2,4,5,7,9,12,15],[32.0,1.0,95.0,1.0,127.0,0.699999988079071,1.0,1.0,1.0])|
|(17,[0,1,2,3,5,10,12,15],[32.0,1.0,110.0,225.0,184.0,1.0,1.0,1.0

                                                                                

## Model Training

### Preparing data for model training

Before training the model, we need to ensure that the data is in the correct format. Specifically, we need to:

- **Select the features:** Use the transformed features, which are represented by the VectorAssembler_features column.
- **Rename the target column:** The column containing the labels should be named label (as required by many Spark MLlib models like `LogisticRegression`).

In [36]:
from pyspark.sql import functions as F

train_data = pp_train_df.select(F.col("VectorAssembler_features").alias("features"), F.col("target").alias("label"))
test_data = pp_test_df.select(F.col("VectorAssembler_features").alias("features"), F.col("target").alias("label"))

### Training

We'll be using Logistic Regression as the classification model. If you're interested in learning more about what Logistic Regression is, I highly recommend checking out this article by IBM: [Logistic Regression](https://www.ibm.com/think/topics/logistic-regression). However going into detail is out of scope of this lab.

Steps:
1. **Initialize the Model:** We start by initializing a Logistic Regression model using LogisticRegression() from PySpark's MLlib.
2. **Fit the Model:** Next, we fit the model to the train_data dataset, where the model learns the relationship between the features and the label.

In [38]:
%%time
from pyspark.ml.classification import LogisticRegression

model = LogisticRegression().fit(train_data)
print(model)

                                                                                

LogisticRegressionModel: uid=LogisticRegression_1a70d05f5db6, numClasses=2, numFeatures=17
CPU times: user 36.8 ms, sys: 13.5 ms, total: 50.3 ms
Wall time: 29 s


We can see the performance of the model on the train set by running:

In [39]:
model.summary.areaUnderROC

                                                                                

0.9250667923935244

As we can see the model performed pretty well!! With a 90+ percent accuracy on the training data.

### Testing

Now that the model is trained and shows good performance on the training data, the next critical step is to evaluate how well the model generalizes. This means testing whether the model performs well on unseen data (the test set), which gives us an indication of how it will perform in real-world scenarios when dealing with new data.

To evaluate the performance of the model on the test data, we'll use the `MulticlassClassificationEvaluator` from `PySpark`. This evaluator helps compute various metrics like accuracy, precision, recall, and F1 score. For this example, we'll focus on accuracy.

Steps to Evaluate the Model:
1. **Get Predictions from the test_data:** Use the trained model to predict on the test data.
2. **Evaluate Accuracy:** Use the MulticlassClassificationEvaluator to compute the accuracy by comparing the predicted labels with the actual labels.

In [40]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Make predictions on the test data
predictions = model.transform(test_data)

# Use MulticlassClassificationEvaluator to calculate accuracy
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

# Compute accuracy
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy on test data: {accuracy:.4f}")

[Stage 110:>                                                        (0 + 1) / 1]

Accuracy on test data: 0.8285


                                                                                

And the model performs well on unseen data as well! With that, the lab is concluded. I hope you enjoyed this lab and learned a lot about the advantages of Spark and Spark Lab.

All the best!