# Training a Random Forest Model and Data Conversion with Apache Spark

In this project, we demonstrate how to train a Random Forest model using Apache Spark. As a part of the process, we will also convert a Parquet file to CSV.

## Project Structure
The project is divided into two main parts:

- Converting a Parquet file to CSV: This part involves reading in a Parquet file, and converting it into a more common and easily readable CSV format using Apache Spark.

- Training a Random Forest Model: In this part, we utilize the CSV data to train a Random Forest model. The model will be exported in PMML format, which is a standard format that allows for the representation of trained machine learning models.

Now, let's dive into each part.

### Installation and Initialization

Before starting, we need to install the necessary packages. The installation depends on the Python version. Let's start by identifying the Python version and installing the necessary packages.

#### Read in parquet file

In [None]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

#### Initializing

In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
import logging
import shutil
import site
import sys
import wget
import glob
import os
import re

In [2]:
# source path and file name (default: data.parquet)
data_parquet = os.environ.get('data_parquet', 'data.parquet')

# destination path and parquet file name (default: data.csv)
output_data_csv = os.environ.get('output_data_csv', 'data.csv')

# url of master (default: local mode)
master = os.environ.get('master', "local[*]")

# temporal data storage for local execution
data_dir = os.environ.get('data_dir', '../../data/')

### Parquet to CSV Conversion

Next, we read in the parquet file, convert it to CSV, and save the result. The script first checks if the CSV file already exists. If it does, the script skips the conversion process.

In [4]:
if not skip:
    sc = SparkContext.getOrCreate(SparkConf().setMaster(master))
    spark = SparkSession.builder.getOrCreate()

In [5]:
if not skip:
    df = spark.read.parquet(data_dir + data_parquet)

In [6]:
if not skip:
    if os.path.exists(data_dir + output_data_csv):
        shutil.rmtree(data_dir + utput_data_csv)
    df.coalesce(1).write.option("header", "true").csv(data_dir + output_data_csv)
    file = glob.glob(data_dir + output_data_csv + '/part-*')
    shutil.move(file[0], data_dir + output_data_csv + '.tmp')
    shutil.rmtree(data_dir + output_data_csv)
    shutil.move(data_dir + output_data_csv + '.tmp', data_dir + output_data_csv)

### Training a Random Forest Model

Once we have the CSV file, we can use it to train a Random Forest model. First, we download the correct version of the JPMMl library, which is required to export the trained model to PMML format.

In [7]:
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

With the necessary libraries installed, we can now define additional parameters and initialize our Spark context.

In [8]:
 # input file name (data.csv)
data_csv = os.environ.get('data_csv', 'data.csv') 

# url of master (default: local mode)
master = os.environ.get('master', "local[*]")

# model output file name
model_target = os.environ.get('model_target', "model.xml")

# temporary directory for data
data_dir = os.environ.get('data_dir', '../../data/') 

# input columns to consider
input_columns = os.environ.get('input_columns', '["x", "y", "z"]') 

In [9]:
parameters = list(
    map(lambda s: re.sub('$', '"', s),
        map(
            lambda s: s.replace('=', '="'),
            filter(
                lambda s: s.find('=') > -1 and bool(re.match(r'[A-Za-z0-9_]*=[.\/A-Za-z0-9]*', s)),
                sys.argv
            )
    )))

for parameter in parameters:
    logging.warning('Parameter: ' + parameter)
    exec(parameter)

In [10]:
conf = SparkConf().setMaster(master)

#if sys.version[0:3] == '3.6' or sys.version[0:3] == '3.7':
conf.set("spark.jars", 'jpmml-sparkml-executable-1.5.12.jar')

sc = SparkContext.getOrCreate(conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

23/06/19 01:31:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/19 01:31:52 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/06/19 01:31:52 WARN util.Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/06/19 01:31:52 WARN util.Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/06/19 01:31:52 WARN util.Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
23/06/19 01:31:52 WARN util.Utils: Service 'SparkUI' could not bind on port 4044. Attempting port 4045.


After initializing the Spark context, we can read in the CSV file and preprocess the data for model training.

In [11]:
#df = spark.read.csv(data_dir + data_csv)
df = spark.read.option("header", "true").csv(data_dir + data_csv)

                                                                                

In [12]:
# register a corresponding query table
df.createOrReplaceTempView('df')

In [13]:
from pyspark.sql.types import DoubleType
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))

In [14]:
splits = df.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

We proceed by training the Random Forest model and saving it in PMML format.

In [15]:
indexer = StringIndexer(inputCol="class", outputCol="label")

vectorAssembler = VectorAssembler(inputCols=eval(input_columns),
                                  outputCol="features")

normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")

In [19]:
seed = 1
best_predic = 0
best_params = {'numTrees': None, 'maxDepth': None, 'seed': None}

for curNTrees in [10, 20]:
    for curMaxDepth in [5, 7]:
        rf = RandomForestClassifier(numTrees=curNTrees, maxDepth=curMaxDepth, seed=1)
        pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])
        model = pipeline.fit(df_train)
        prediction = model.transform(df_train)

        binEval = MulticlassClassificationEvaluator(). \
            setMetricName("accuracy"). \
            setPredictionCol("prediction"). \
            setLabelCol("label")
        predic = binEval.evaluate(prediction)

        if predic > best_predic:
            best_predic = predic
            best_params = {'numTrees': curNTrees, 'maxDepth': curMaxDepth, 'seed': seed}

        print('Number of trees', curNTrees, 'Maximum Depth', curMaxDepth, 'seed', seed, 'Prediction', predic)

print('Best parameters:', best_params)


                                                                                

Number of trees 10 Maximum Depth 5 seed 1 Prediction 0.44228418971337163


                                                                                

Number of trees 10 Maximum Depth 7 seed 1 Prediction 0.4648783038892975


                                                                                

Number of trees 20 Maximum Depth 5 seed 1 Prediction 0.44238486901902




Number of trees 20 Maximum Depth 7 seed 1 Prediction 0.46930819333782664
Best parameters: {'numTrees': 20, 'maxDepth': 7, 'seed': 1}


                                                                                

In [17]:
pmmlBuilder = PMMLBuilder(sc, df_train, model)
pmmlBuilder.buildFile(data_dir + model_target)

'/resources/labs/BD0231EN/component-library-1/component-library/deploy/../../data/model.xml'

That's it! You have now converted a Parquet file to CSV, and trained a Random Forest model using Apache Spark.