# MR Tree - local configuration

Srcipt for testing MR-Tree performance scalability on single node

In [None]:
from pyspark.sql import SparkSession

# Start a SparkSession
spark = SparkSession.builder.master("local[5]").appName("MySparkApp").getOrCreate()

## Data Processing

Data is sourced from https://archive.ics.uci.edu/dataset/2/adult

Steps:
- Load data into spark dataframe
- Preprocess data (fill na)
- Create embedings for categorical values
- Assemble into a single feature vector

### Load Data

In [None]:
# Read data into Spark DataFrame
df = spark.read.csv('data_a/adult.data', header=False, inferSchema=True)

# Fill NA
df = df.fillna(0)

df.show(5)

In [None]:
# Define the features and label columns
feature_cols = df.columns[:-1]
label_col = df.columns[-1]

print("Feature columns: ", feature_cols)
print("Label column: ", label_col)

### Get embeddings

In [None]:
# Dependencies
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [None]:
# String indexers for categorical columns
str_cols = [col for col in feature_cols if df.select(col).dtypes[0][1] == 'string']
feature_indexers = [StringIndexer(inputCol=col, outputCol=col+'_index') for col in str_cols]

# Get new feature column names
feature_cols_indexed = [indexer.getOutputCol() for indexer in feature_indexers] + [col for col in feature_cols if col not in str_cols]

In [None]:
# String indexer for label
labelIndexer = StringIndexer(inputCol=label_col, outputCol="indexedLabel")

In [None]:
# Transform dataframe
df_indexed = Pipeline(stages=feature_indexers+[labelIndexer]).fit(df).transform(df)

In [None]:
df_indexed.select("indexedLabel", *feature_cols_indexed).show(5, truncate=False)

In [None]:
# Set all columns to integer type
for column_name in feature_cols_indexed + ["indexedLabel"]:
    df_indexed = df_indexed.withColumn(column_name, col(column_name).cast(IntegerType()))

In [None]:
df_indexed.select("indexedLabel", *feature_cols_indexed).show(5, truncate=False)

### Assemble

In [None]:
assembler = VectorAssembler(inputCols=feature_cols_indexed, outputCol="features")

In [None]:
df_assembled = assembler.transform(df_indexed).select("features", "indexedLabel")

In [None]:
df_assembled.show(5, truncate=False)

## MR Tree training

Check time to train MR Tree on different dataset sizes

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier
import time

# Init dataframe
df_train = df_assembled.alias('df_train')

# Init variables
performances = []
max_m = 200
step = 10

for i in range(0, max_m, step):
    # Create the DecisionTree model
    tree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='features')

    # Fit the model to the data and calculate performance
    dt = time.time()
    model = tree.fit(df_train)
    dt = time.time() - dt

    # Add performance to list
    performances.append((i + 1, dt))
    print(performances[-1])

    # Add data to train for next loop
    for _ in range(step):
        df_train = df_train.union(df_assembled)


In [None]:
print(performances)

In [None]:
# Save to csv
with open('out/ml_tree_local.csv', 'w') as f:
    # Write header
    f.write('m,dt\n')

    # Write data
    for m, dt in performances:
        f.write(f'{m},{dt}\n')

## Visualise

In [None]:
import numpy as np
import matplotlib.pyplot as plt

segment_size = 32561*14*32

data_out = np.array([[m * segment_size , m, dt, dt/performances[0][1]] for m, dt in performances])

# Plot the performance
plt.plot(*zip(*performances))
plt.xlabel('Multiplier')
plt.ylabel('Time (s)')
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Plot the performance
plt.plot(data_out[:, 0], data_out[:, 1], '-o')
plt.plot(data_out[:, 0], data_out[:, 3], '-o')
plt.xlabel('Dataset Size')
plt.ylabel('Scale factor')
plt.show()