# Use Feature Discovery SQL in other Spark clusters

Authors: Harry Dinh, John Edwards
Date: 15/10/2024

This notebook provides a framework for running Feature Discovery SQL in a new Spark cluster on Docker. It guides you through the process of setting up a Spark cluster in Docker, registering custom User Defined Functions (UDFs), and executing complex SQL queries for feature engineering across multiple datasets. The same approach can be applied to other Spark environments, such as GCP Dataproc, Amazon EMR and Cloudera CDP. This approach also provides flexibility for running Feature Discovery on various Spark platforms.

## Problem framing


Features are commonly split across multiple data assets. Bringing these data assets together can take a lot of work, as it involves joining them and then running machine learning models. It's even more difficult when the datasets are of different granularities, as it requires you to aggregate to join the data successfully.

[Feature Discovery](https://docs.datarobot.com/en/docs/data/transform-data/feature-discovery/enrich-data-using-feature-discovery.html) solves this problem by automating the procedure of joining and aggregating your datasets. After you define how the datasets need to be joined, DataRobot handles feature generation and modeling.

<img src="img/FD_graph.png" width="800">

<img src="img/FD_SQL.png" width="800">

Feature Discovery uses Spark to perform joins and aggregations, generating Spark SQL at the end of the process. In some cases, you may want to run this Spark SQL in other Spark clusters to gain more flexibility and scalability for handling larger datasets, without the need to load data directly into the DataRobot environment. This approach allows you to leverage external Spark clusters for more resource-intensive tasks.

## File overview

The file structure is outlined below:

```bash
.
├── Using Feature Discovery SQL in other Spark clusters.ipynb
├── apps
│    ├── DataRobotRunSSSQL.py
│    ├── LC_FD_SQL.sql
│    ├── LC_profile.csv
│    ├── LC_train.csv
│    └── LC_transactions.csv
├── data
├── libs
│    ├── spark-udf-assembly-0.1.0.jar
│    └── venv.tar.gz
├── docker-compose.yml
├── Dockerfile
├── start-spark.sh
└── utils.py
```
- `Using Feature Discovery SQL in other Spark clusters.ipynb`is the notebook providing a framework for running Feature Discovery SQL in a new Spark cluster on Docker.
- `docker-compose.yml`, `Dockerfile`, and `start-spark.sh` are files used by Docker to build and start the Docker container with Spark.
- `utils.py` includes a helper function to download datasets and the UDFs jar.
- The `app` directory includes:
  - Spark SQL (a file with a `.sql` extension)
  - Datasets (files with a `.csv` extension)
  - Helper function (files with a `.py` extension) to parse and execute the SQL
- The `libs` directory includes:
  - A User-Defined Functions (UDFs) jar file
  - An environment file (only required if datasets include Japanese text, which requires a Mecab tokenizer to handle)
- The `data` directory is empty, as it is used to store the output result


## Download datasets and .jar files

Use the code below to import the datasets and .jar files used in this accelerator.

In [1]:
from utils import DATASETS, download_files_from_public_s3, ENV_AND_JARS

In [2]:
# Download datasets
download_files_from_public_s3(files_dict=DATASETS, to_folder="apps/")

# Download UDFs .jars and environment file
download_files_from_public_s3(files_dict=ENV_AND_JARS, to_folder="libs/")

Downloaded LC_train.csv and saved to apps/LC_train.csv
Downloaded LC_profile.csv and saved to apps/LC_profile.csv
Downloaded LC_transactions.csv and saved to apps/LC_transactions.csv
libs/spark-udf-assembly-0.1.0.jar already exists
libs/venv.tar.gz already exists


## Start the Spark cluster on Docker and execute SQL

### Run Docker

Make sure that Docker is installed and running on the system where Jupyter is hosted. Run the following code to check the Docker version:

In [3]:
# Check Docker installation
!docker --version

Docker version 24.0.7, build afdd53b


If Docker is not installed, follow the [Docker installation guide](https://docs.docker.com/get-started/get-docker/).

### Build the Docker image

Next, build the Docker image using the Dockerfile in your repository. Ensure that the Dockerfile is in the directory where the Jupyter notebook is located and run the following code.

In [4]:
!docker build -t apache-spark:3.2.1 .

[1A[1B[0G[?25l[+] Building 0.0s (0/1)                                    docker:desktop-linux
[?25h[1A[0G[?25l[+] Building 0.2s (2/3)                                    docker:desktop-linux
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 1.49kB                                     0.0s
[0m[34m => [internal] load .dockerignore                                          0.0s
[0m[34m => => transferring context: 2B                                            0.0s
[0m => [internal] load metadata for docker.io/library/openjdk:8-jre-slim-bus  0.2s
[?25h[1A[1A[1A[1A[1A[1A[0G[?25l[+] Building 0.3s (2/3)                                    docker:desktop-linux
[34m => [internal] load build definition from Dockerfile                       0.0s
[0m[34m => => transferring dockerfile: 1.49kB                                     0.0s
[0m[34m => [internal] load .dockerignore                              

### Start the Spark cluster using Docker compose

Next, use `docker-compose` to start the Spark master and worker containers. Ensure that the docker-compose.yml file is in the same directory as your notebook.

In [5]:
!docker-compose up -d

[1A[1B[0G[?25l[+] Running 4/0
 [32m✔[0m Network safer-recipe-sql_default  [32mCreated[0m                               [34m0.0s [0m
 [32m✔[0m Container spark-master            [32mCreated[0m                               [34m0.0s [0m
 [32m✔[0m Container spark-worker-b          [32mCreated[0m                               [34m0.0s [0m
 [32m✔[0m Container spark-worker-a          [32mCreated[0m                               [34m0.0s [0m
[?25h[1A[1A[1A[1A[1A[0G[?25l[34m[+] Running 4/4[0m
 [32m✔[0m Network safer-recipe-sql_default  [32mCreated[0m                               [34m0.0s [0m
 [32m✔[0m Container spark-master            [32mCreated[0m                               [34m0.0s [0m
 [32m✔[0m Container spark-worker-b          [32mCreated[0m                               [34m0.0s [0m
 [32m✔[0m Container spark-worker-a          [32mCreated[0m                               [34m0.0s [0m
[?25h[1A[1A[1A[1A[1A[0G[?25l[34m

### Submit Spark jobs using spark-submit

To run a specific Spark job (e.g., the `DataRobotRunSSSQL.py` script), you can use the following command from the notebook:

In [None]:
!docker exec -it spark-master /opt/spark/bin/spark-submit \
  --conf "spark.sql.legacy.timeParserPolicy=LEGACY" \
  --master spark://spark-master:7077 \
  --jars /opt/spark-libs/spark-udf-assembly-0.1.0.jar \
  -c spark.sql.caseSensitive=true \
  /opt/spark-apps/DataRobotRunSSSQL.py \
    /opt/spark-apps/LC_FD_SQL.sql \
    --input=csv,primary_dataset=/opt/spark-apps/LC_train.csv \
    --input=csv,LC_profile=/opt/spark-apps/LC_profile.csv \
    --input=csv,LC_transactions=/opt/spark-apps/LC_transactions.csv \
    --output=csvfile,/opt/spark/spark-warehouse/result.csv  # output directory

### Check the results

Once the job has been executed, you can view the result directly from the notebook. For example, to view the first few lines of the result CSV, you can use the following code:

In [7]:
import pandas as pd

In [8]:
df = pd.read_csv("data/result.csv")
df.head()

Unnamed: 0,LC_transactions (days since previous event by CustomerID) (30 days max),LC_transactions[Date] (Day of Week) (1 week most frequent),LC_transactions[Date] (Day of Week) (30 days latest),LC_transactions (days since previous event by CustomerID) (30 days avg),BadLoan,LC_transactions[Amount] (30 days std),LC_transactions[Amount] (1 week min),LC_transactions[Description] (30 days unique count),LC_transactions[Date] (Day of Month) (30 days latest),LC_transactions[Amount] (30 days min),...,LC_transactions[Date] (Day of Week) (30 days most frequent),date (days from LC_transactions[Date]) (1 week std),LC_transactions[Amount] (30 days median),LC_transactions[Amount] (30 days max),LC_transactions[AccountID] (1 week tokens),LC_profile[emp_length],LC_transactions[AccountID] (word count) (30 days sum),LC_transactions[Description] (1 week counts),LC_transactions[Date] (Day of Month) (1 week most frequent),LC_profile[zip_code]
0,5.0,5.0,5.0,1.714286,No,23.653558,14.99,12.0,23.0,10.25,...,5.0,1.118034,10.25,8.42,"{""a355056969"" : 6.0}",< 1 year,15.0,"{""internet payment"" : 2.0, ""international tran...",23.0,782xx
1,3.0,6.0,6.0,0.310345,No,597.19078,1.49,33.0,3.0,0.83,...,5.0,1.542778,32.25,95.0,"{""a366458676"" : 12.0, ""a547274624"" : 4.0, ""a38...",3 years,88.0,"{""amortisation"" : 1.0, ""internet payment"" : 2....",3.0,010xx
2,4.0,3.0,3.0,0.241071,No,439.61339,131.5,44.0,18.0,0.28,...,2.0,1.114924,61.5,,"{""a643390547"" : 2.0, ""a346560403"" : 10.0}",10+ years,113.0,"{""internet payment"" : 1.0, ""telco"" : 1.0, ""sup...",18.0,890xx
3,14.0,1.0,1.0,6.25,No,74.973167,205.29,5.0,12.0,2.5,...,4.0,0.0,55.98,,"{""a458669971"" : 1.0}",9 years,5.0,"{""internet payment"" : 1.0}",12.0,277xx
4,3.0,0.0,1.0,0.675,No,316.57698,1000.0,25.0,12.0,1.45,...,0.0,1.699673,39.0,73.3,"{""a249156803"" : 4.0, ""a448123747"" : 1.0, ""a505...",5 years,41.0,"{""internet payment"" : 1.0, ""retail furniture"" ...",11.0,939xx


### Stop the cluster and clean up

Once you are finished with the previous tasks, you can stop and remove the Spark cluster containers.

In [None]:
# Stop the running containers
!docker-compose down

# Remove the Spark Docker image
!docker image rm apache-spark:3.2.1

## Conclusion

This accelerator provides a step-by-step guide for running Feature Discovery-generated SQL in an external Spark cluster, such as a Docker-based Spark environment. It demonstrates how to execute complex SQL queries, including custom User-Defined Functions (UDFs), in a more scalable Spark cluster. By following this framework, you can offload resource-intensive SQL tasks to larger Spark clusters, allowing for greater flexibility in handling large datasets. The same approach can be adapted to other Spark platforms, enabling seamless integration with various infrastructure setups.