# Library

In [None]:
#import sys
import pandas as pd
import numpy as np
import random
import pyspark
import itertools
import matplotlib.pyplot as plt
import itertools
import seaborn as sns
import pickle
import statsmodels.api as sm

from pyspark import SparkContext, SQLContext

from math import sqrt
from time import time as ttt

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier as DTC_spark
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import joblib
from joblib import parallel_backend
from joblib import Parallel, delayed
from joblib import parallel_backend


# Upload

In [None]:
spark = SparkSession.builder.master("Yarn").appName("spark_app_1234").getOrCreate()
sc = spark.sparkContext
spark
d0 = (
    spark
    .read
    .format("csv") 
    .option("header","true") 
    .option("inferSchema","true") 
    .load("gs://mas-a5-storage-1/notebooks/jupyter/application_train.csv")
)

d1 = d0.filter(d0.DAYS_EMPLOYED != 365243).select('TARGET','DAYS_EMPLOYED')

print(d1.rdd.getNumPartitions())

In [None]:
sc

# Function

In [None]:
def prepare_spark_data(n_part, k_mult):
    '''
    takes 'DAYS_EMPLOYED and 'TARGET'  from d1 (alreay filtered)
    rearrane in n partitions (if n==0 keeps initial number of partitions)
    prints final shape/ partition
    returns d2 - spark df
    '''
    data = d1
    data_new = data # first step in the cycle
    for i in range(k_mult-1):
        data_tmp = data.select('TARGET', \
                     f.col('DAYS_EMPLOYED')*(f.lit(0.9995) + f.rand()/1000)).\
                   toDF('TARGET','DAYS_EMPLOYED')
        data_tmp = data_tmp.select('TARGET', f.floor('DAYS_EMPLOYED'))
        data_new = data_new.union(data_tmp)

    assembler = VectorAssembler(inputCols=["DAYS_EMPLOYED"], 
                        outputCol="DAYS_EMPLOYED_vect")
    d2 = assembler.transform(data_new)
    if n_part != 0:
        d2 = d2.repartition(n_part, "DAYS_EMPLOYED_vect")       
    #print(f'n-partitions initial: {d2.rdd.getNumPartitions()}; df size: {d2.count()}\n')

    return d2

# Params an run

In [None]:
## !! mind the fn NOT TO rewrite results
fn = '4cpu_by_12n__4m_yarn_ssd_repart'
size_mult = [1, 2, 5, 10, 15, 20, 30, 50, 100]
partitions = [8, 12, 16, 24, 36, 72] 
# initial, 2**i incl n_nodes till n_cpu*n_nodes and n_cpu*n_nodes
n_iter = 3

print('d1-size', d1.count())
n_part_base = d1.rdd.getNumPartitions()
print('n-partitions initial', n_part_base, '\n')

rd1 = {}
for k_size_mult in size_mult:
    rd2 = {}
    for n_part in partitions:
        df = prepare_spark_data(n_part, k_size_mult)
        df.cache()
        print('======================================================================')
        print(f'=== size_mult={k_size_mult}; \
        n_part_req={n_part}; n_part_act={df.rdd.getNumPartitions()}; df_count={df.count()} ===\n')
        times = [0 for i in range(n_iter)]
        models = {}
        for i in range(n_iter):
            dt = DTC_spark(labelCol="TARGET",
                       featuresCol="DAYS_EMPLOYED_vect",
                       minInfoGain=0.0001,
                       impurity='entropy',
                       maxDepth=14, maxBins=2**14, # it differs from scikit learn - it means number of canidate split points
                       #minInstancesPerNode = 1,
                       #checkpointInterval = 10
                       )
           
            t0 = ttt()
            model = dt.fit(df)
            t1 = ttt()
            times[i] = t1-t0
            models[i] = model 
            print('model', i, 'build time', round(times[i],2), '\n', model)
        rd2[n_part] = (models, times)
    rd1[k_size_mult] = rd2

result = rd1  

# Results

In [None]:
di = result
df_res_time = pd.DataFrame()
df_res_nodes = pd.DataFrame()
for p1 in di.keys():
    for p2 in di[p1].keys():
        df_res_time.loc[p1, p2] = np.round(np.median(di[p1][p2][1]), 1)
        m_tmp = di[p1][p2][0]
        n_nodes = []
        for i,_ in enumerate(m_tmp):
            tmp = f'{m_tmp[i]}'.split(' ')
            print(i, tmp)
            n_nodes.append([int(x.split('=')[1][:-1]) for x in tmp if x[:4]== 'numN'][0])
        df_res_nodes.loc[p1, p2] = np.round(np.mean(n_nodes), 1)

df_res_time.to_csv(f'gs://mas-a5-storage-1/notebooks/jupyter/obj/{fn}_t.csv')
df_res_nodes.to_csv(f'gs://mas-a5-storage-1/notebooks/jupyter/obj/{fn}_n.csv')

display(df_res_time)      
display(df_res_nodes)     

In [None]:
#fn = '4cpu_by_8n__4m_yarn_ssd'
tmp = pd.read_csv(f'gs://mas-a5-storage-1/notebooks/jupyter/obj/{fn}_t.csv')
print(fn)
tmp

In [None]:
#fn = '4cpu_by_8n__4m_yarn_ssd'
tmp = pd.read_csv(f'gs://mas-a5-storage-1/notebooks/jupyter/obj/{fn}_n.csv')
print(fn)
tmp