In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from pandas.api.types import CategoricalDtype
from scipy.stats import entropy, pearsonr

In [2]:
dtype_mix = {
    'algorithm': CategoricalDtype(['BisectingKMeans', 'GBT', 'GMM', 'KMeans', 'Linear', 'Logistic', 'MLP', 'RFR', 'Tree']),
    'dataset': CategoricalDtype(['drift', 'drivface', 'drugs', 'geomagnetic', 'higgs']),
    'family': CategoricalDtype(['classification', 'clustering', 'regression']),
    'platformId': 'int',
    'runId': 'int',
    'splitter': CategoricalDtype(['none', 'random-80-20']),
}

dtype_platform = {
    'jobId': 'int',
    'jobGroup': 'category',
    'stageId': 'int',
    'taskId': 'int',
    'launchTime': 'int',
    'finishTime': 'int',
    'duration': 'int',
    'schedulerDelay': 'int',
    'executorId': 'int',
    'host': 'category',
    'taskLocality': 'category',
    'speculative': 'bool',
    'gettingResultTime': 'int',
    'successful': 'bool',
    'executorRunTime': 'int',
    'executorCpuTime': 'int',
    'executorDeserializeTime': 'int',
    'executorDeserializeCpuTime': 'int',
    'resultSerializationTime': 'int',
    'jvmGCTime': 'int',
    'resultSize': 'int',
    'numUpdatedBlockStatuses': 'int',
    'diskBytesSpilled': 'int',
    'memoryBytesSpilled': 'int',
    'peakExecutionMemory': 'int',
    'recordsRead': 'int',
    'bytesRead': 'int',
    'recordsWritten': 'int',
    'bytesWritten': 'int',
    'shuffleFetchWaitTime': 'int',
    'shuffleTotalBytesRead': 'int',
    'shuffleTotalBlocksFetched': 'int',
    'shuffleLocalBlocksFetched': 'int',
    'shuffleRemoteBlocksFetched': 'int',
    'shuffleWriteTime': 'int',
    'shuffleBytesWritten': 'int',
    'shuffleRecordsWritten': 'int',
    'phase': 'category',
    **dtype_mix
}

dtype_applicative = {
    'transformTime': 'int',
    'features': 'int',
    'fitTime': 'int',
    'testCount': 'int',
    'trainCount': 'int',
    'r2': 'float',
    'mse': 'float',
    'mae': 'float',
    'rmse': 'float', 
    'silhouette': 'float',
    'accuracy': 'float',
    'weightedRecall': 'float',
    'f1': 'float',
    'weightedPrecision': 'float',
    **dtype_mix
}

dtype_configuration = {
    'platformId': 'int',
    'spark.shuffle.compress': 'bool',
    'spark.master': 'category',
    'spark.io.compression.codec': 'category',
    'spark.shuffle.file.buffer': 'int',
    'spark.storage.memoryFraction': 'float',
    'spark.shuffle.io.preferDirectBufs': 'bool',
    'spark.rdd.compress': 'bool',
    'spark.dynamicAllocation.enabled': 'bool',
    'spark.executor.memory': 'int',
    'spark.driver.cores': 'int',
    'spark.executor.cores': 'int',
    'spark.driver.memory': 'int',
    'spark.reducer.maxSizeInFlight': 'int',
    'spark.serializer': 'category',
    'spark.shuffle.spill.compress': 'bool',
    'spark.executor.instances': 'int',
    'spark.locality.wait': 'int'
}

mapping = {
    '32k': 32 * 1000,
    '16k': 16 * 1000,
    '64k': 64 * 1000,
    '5g': 5 * 1000 ** 3,
    '2g': 2 * 1000 ** 3,
    '10g': 10 * 1000 ** 3,
    '3s': 3 * 1000,
    '10ms': 10,
    '1ms': 1,
    '48m': 48 * 1000 ** 2,
    '24m': 24 * 1000 ** 2,
    '72m': 72 * 1000 ** 2,
    '': -1
}

def to_numeric(number):
    return mapping[number] if number in mapping else int(number)

converters_configuration = {
    'spark.shuffle.file.buffer': to_numeric,
    'spark.locality.wait': to_numeric,
    'spark.driver.memory': to_numeric,
    'spark.executor.memory': to_numeric,
    'spark.reducer.maxSizeInFlight': to_numeric,
    'spark.executor.instances': to_numeric
}

In [3]:
platform = pd.read_csv('../results/platform.csv', header=0, engine='c', na_filter=False, dtype=dtype_platform)

In [3]:
applicative = pd.read_csv('../results/applicative.csv', header=0, engine='c', dtype=dtype_applicative)

In [4]:
configuration = pd.read_csv('../results/configuration.csv', header=0, engine='c', na_filter=False, converters=converters_configuration, dtype=dtype_configuration)

  """Entry point for launching an IPython kernel.
  """Entry point for launching an IPython kernel.
  """Entry point for launching an IPython kernel.
  """Entry point for launching an IPython kernel.
  """Entry point for launching an IPython kernel.
  """Entry point for launching an IPython kernel.


In [5]:
applicative_configuration = pd.merge(applicative, configuration, left_on='platformId', right_on='platformId', suffixes=('', '_y'))

In [5]:
data = pd.merge(platform, applicative_configuration, left_on=['algorithm', 'dataset', 'platformId', 'runId'], right_on=['algorithm', 'dataset', 'platformId', 'runId'], suffixes=('', '_y'))

In [6]:
def correlations(data, y):

    return pd.DataFrame({
        algorithm + '-' + dataset:
            correlation(data[(data['algorithm'] == algorithm) & (data['dataset'] == dataset)], y=y)
        #for phase in ['transform']
        for algorithm in data['algorithm'].cat.categories.sort_values()
        for dataset in data['dataset'].cat.categories.sort_values()
    })

def remove_nan(data):
    
    if np.isnan(data):
        return 0

    return data

def correlation(data, y):

    return pd.Series({
        feature: remove_nan(pearsonr(data[feature], data[y])[0])
        for feature in data
        if feature not in ['dataset', 'algorithm', 'duration', 'phase', 'fitTime', 'transformTime']
    })

def count(corr, k):
    
    total = {}

    for couple in corr:

        top = corr[couple].abs().nlargest(k).index

        for metric in top:

            if metric not in total:
                total[metric] = 1
            else:
                total[metric] += 1
    
    return total

In [66]:
data = applicative_configuration

In [6]:
data['spark.io.compression.codec'] = data['spark.io.compression.codec'].cat.codes
data['spark.serializer'] = data['spark.serializer'].cat.codes

In [9]:
mapping = {
    'spark.shuffle.compress': [0, 12],
    'spark.io.compression.codec': [0, 7],
    'spark.shuffle.file.buffer': [0, 13,14],
    'spark.storage.memoryFraction': [0, 17,18],
    'spark.shuffle.io.preferDirectBufs': [0, 15],
    'spark.rdd.compress': [0, 8],
    'spark.executor.memory': [0, 5,6],
    'spark.executor.cores': [0, 1,2],
    'spark.reducer.maxSizeInFlight': [0, 9,10],
    'spark.serializer': [0, 11],
    'spark.shuffle.spill.compress': [0, 16],
    'spark.executor.instances': [0, 3,4],
    'spark.locality.wait': [0, 19,20],
}

In [10]:
from tqdm import tqdm

In [14]:
select = data[(data['algorithm'] == 'MLP') & (data['dataset'] == 'higgs') & ((data['platformId'] == 0) | (data['platformId'] == 13) | (data['platformId'] == 14))]

MemoryError: 

In [16]:
res = []

for algorithm in tqdm(data['algorithm'].cat.categories.sort_values()):

    for dataset in tqdm(data['dataset'].cat.categories.sort_values()):

        for config, ids in tqdm(mapping.items()):

            selection = data[(data['dataset'] == dataset) & (data['algorithm'] == algorithm) & np.isin(data['platformId'], ids)]

            corr = pearsonr(selection['duration'], selection[config])[0]

            res.append(pd.Series({"dataset": dataset, 'algorithm': algorithm, 'config': config, 'corr': corr, 'family': selection['family'].iloc[0] if len(selection) else None}))

res = pd.DataFrame(res)

  0%|          | 0/9 [00:00<?, ?it/s]
  0%|          | 0/5 [00:00<?, ?it/s][A

  0%|          | 0/13 [00:00<?, ?it/s][A[A

MemoryError: 

In [178]:
res['corr'] = res['corr'].abs()

In [179]:
total = res.set_index(['config', 'family']).groupby(['dataset', 'algorithm'])['corr'].nlargest(4, keep='all').reset_index()

In [180]:
total['config'].value_counts()

spark.executor.cores                 32
spark.serializer                     26
spark.rdd.compress                   17
spark.executor.memory                15
spark.io.compression.codec           12
spark.shuffle.spill.compress         11
spark.reducer.maxSizeInFlight        10
spark.shuffle.file.buffer             9
spark.shuffle.io.preferDirectBufs     9
spark.storage.memoryFraction          8
spark.executor.instances              8
spark.shuffle.compress                8
spark.locality.wait                   7
Name: config, dtype: int64

In [181]:
total.groupby('family')['config'].value_counts()

family          config                           
classification  spark.executor.cores                 12
                spark.serializer                     10
                spark.executor.memory                 6
                spark.rdd.compress                    5
                spark.reducer.maxSizeInFlight         4
                spark.storage.memoryFraction          4
                spark.executor.instances              3
                spark.io.compression.codec            3
                spark.locality.wait                   3
                spark.shuffle.file.buffer             3
                spark.shuffle.spill.compress          3
                spark.shuffle.compress                2
                spark.shuffle.io.preferDirectBufs     2
clustering      spark.executor.cores                  8
                spark.executor.memory                 6
                spark.serializer                      6
                spark.executor.instances              

In [190]:
total

Unnamed: 0,dataset,algorithm,config,family,corr
0,drift,BisectingKMeans,spark.serializer,clustering,0.761351
1,drift,BisectingKMeans,spark.reducer.maxSizeInFlight,clustering,0.451746
2,drift,BisectingKMeans,spark.executor.instances,clustering,0.376804
3,drift,BisectingKMeans,spark.executor.cores,clustering,0.364779
4,drift,GBT,spark.rdd.compress,regression,0.485571
5,drift,GBT,spark.locality.wait,regression,0.448519
6,drift,GBT,spark.serializer,regression,0.405312
7,drift,GBT,spark.shuffle.spill.compress,regression,0.379202
8,drift,GMM,spark.io.compression.codec,clustering,0.547946
9,drift,GMM,spark.serializer,clustering,0.383796


In [191]:
total.to_csv('usecase2.csv', index=False)