# Converts a parquet file to CSV file with header using ApacheSpark

In [3]:
%%bash
export version=`python --version |awk '{print $2}' |awk -F"." '{print $1$2}'`

echo $version

if [ $version == '36' ] || [ $version == '37' ]; then
    echo 'Starting installation...'
    pip3 install pyspark==2.4.8 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
elif [ $version == '38' ] || [ $version == '39' ]; then
    pip3 install pyspark==3.1.2 wget==3.2 pyspark2pmml==0.5.1 > install.log 2> install.log
    if [ $? == 0 ]; then
        echo 'Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)'
    else
        echo 'Installation failed, please check log:'
        cat install.log
    fi
else
    echo 'Currently only python 3.6, 3.7 , 3.8 and 3.9 are supported, in case you need a different version please open an issue at https://github.com/IBM/claimed/issues'
    exit -1
fi

37
Starting installation...
Please <<RESTART YOUR KERNEL>> (Kernel->Restart Kernel and Clear All Outputs)


In [4]:
# @param data_dir temporal data storage for local execution
# @param data_csv csv path and file name (default: data.csv)
# @param data_parquet path and parquet file name (default: data.parquet)
# @param master url of master (default: local mode)

In [5]:
import re
import os
import sys
import site
import wget
import glob
import shutil
import logging
import itertools
import pandas as pd

from pyspark.ml import Pipeline
from pyspark2pmml import PMMLBuilder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark import SparkContext, SparkConf, SQLContext

In [6]:
if sys.version[0:3] == '3.9':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.8':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.7.2/'
           'jpmml-sparkml-executable-1.7.2.jar')
    wget.download(url)
    shutil.copy('jpmml-sparkml-executable-1.7.2.jar',
                site.getsitepackages()[0] + '/pyspark/jars/')
elif sys.version[0:3] == '3.7':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
elif sys.version[0:3] == '3.6':
    url = ('https://github.com/jpmml/jpmml-sparkml/releases/download/1.5.12/'
           'jpmml-sparkml-executable-1.5.12.jar')
    wget.download(url)
else:
    raise Exception('Currently only python 3.6 , 3.7, 3,8 and 3.9 is supported, in case '
                    'you need a different version please open an issue at '
                    'https://github.com/IBM/claimed/issues')

In [7]:
# Creating a spark context class
sc = SparkContext()
# Creating a spark session
spark = SparkSession \
.builder \
.appName("Python Spark Random Forest Classification") \
.getOrCreate()
# .config("spark.some.config.option", "some‐value") \

23/02/17 20:31:17 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/17 20:31:21 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Read Parquet file


In [8]:
data_parquet = 'data.parquet'
data_csv = 'randomforest.csv'
data_dir = './claimed/data/'
df = spark.read.parquet(data_dir + data_parquet)

                                                                                

Convert parquet to csv


In [9]:
skip = False
if os.path.exists(data_dir + data_csv):
    skip = True

if not skip:
    df = spark.read.parquet(data_dir + data_parquet)
    
if not skip:
    if os.path.exists(data_dir + data_csv):
        shutil.rmtree(data_dir + data_csv)
    df.coalesce(1).write.option("header", "true").csv(data_dir + data_csv)
    file = glob.glob(data_dir + data_csv + '/part-*')
    shutil.move(file[0], data_dir + data_csv + '.tmp')
    shutil.rmtree(data_dir + data_csv)
    shutil.move(data_dir + data_csv + '.tmp', data_dir + data_csv)

                                                                                

Load the csv file into a dataframe


In [10]:
df = spark.read.option('header', 'true').csv(data_dir + data_csv)

Train Logistic Regression classifier with Apache SparkML

In [11]:
# casting feature columns to double
df = df.withColumn("x", df.x.cast(DoubleType()))
df = df.withColumn("y", df.y.cast(DoubleType()))
df = df.withColumn("z", df.z.cast(DoubleType()))
# spliting dataframe into training and testing subsets
splits = df.randomSplit([0.8, 0.2], seed=1)
df_train = splits[0]
df_test = splits[1]

Random Forest Model
Hyperparameter combinations:

number of trees : {10, 20}

maximum depth : {5, 7}

In [None]:
# indexing classes
indexer = StringIndexer(inputCol="class", outputCol="label")
input_columns = ['x', 'y', 'z']
# aggregating feature columns into vector
vectorAssembler = VectorAssembler(inputCols=input_columns, outputCol="features")
# normalizing features
normalizer = MinMaxScaler(inputCol="features", outputCol="features_norm")
# creating pandas dataframe to keep predictions accuracy
pd_df = pd.DataFrame(columns = ['n_trees', 'max_depth', 'accuracy'])

# hyperparameter testing
for n_trees in [10, 20]:
    for max_depth in [5, 7]:
        rf = RandomForestClassifier(numTrees=n_trees, maxDepth=max_depth, featuresCol="features")
        pipeline = Pipeline(stages=[indexer, vectorAssembler, normalizer, rf])
        rf_model = pipeline.fit(df_train)
        predictions = rf_model.transform(df_test)
        # evaluate predictions
        evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction") 
        accuracy = evaluator.evaluate(predictions)
        # print accuracy
        print("# Trees = %s" % (n_trees))
        print("Max Depth = %s" % (max_depth))
        print("Accuracy = %s" % (accuracy))
        # add entry to pandas dataframe
        pd_df = pd_df.append({'n_trees' : n_trees, 'max_depth' : max_depth, 'accuracy' : accuracy}, ignore_index = True)

                                                                                

# Trees = 10
Max Depth = 5
Accuracy = 0.530034185370968


[Stage 35:>                                                         (0 + 4) / 4]

Best hyperparameters


In [None]:
pd_df[pd_df['accuracy'] == pd_df['accuracy'].max()]
