In [1]:
import time
import pandas as pd
import numpy as np
from sklearn.preprocessing import KBinsDiscretizer
from multiprocessing import Pool
import pyspark

In [2]:
# Create input data by simulation with Id + 10 continuous features and 1 million rows
np.random.seed(1)
input_data = pd.DataFrame({'Id': range(1000000),
                           'feature_1' : np.random.uniform(0,10, 1000000),
                           'feature_2' : np.random.uniform(0,10, 1000000),
                           'feature_3' : np.random.uniform(0,10, 1000000),
                           'feature_4' : np.random.uniform(0,10, 1000000),
                           'feature_5' : np.random.uniform(0,10, 1000000),
                           'feature_6' : np.random.uniform(0,10, 1000000),
                           'feature_7' : np.random.uniform(0,10, 1000000),
                           'feature_8' : np.random.uniform(0,10, 1000000),
                           'feature_9' : np.random.uniform(0,10, 1000000),
                           'feature_10' : np.random.uniform(0,10, 1000000)})

In [3]:
# Pipeline 1 - pandas package for data report + sequential discretization
start_time = time.time()

In [4]:
# discretizer
est = KBinsDiscretizer(n_bins=3, encode='ordinal', strategy='uniform')

In [15]:
# prepare data report
data_report = pd.DataFrame(columns=['Number of rows with non-empty values',
                            'Number of rows with empty values',
                            'Number of distinct values',
                            'Top 2 frequent values',
                            'Top 2 frequent counts'])

In [16]:
# for loop for discretization
for col in input_data.columns[1:]:
    # discretize
    input_data['Bin_' + col] = est.fit_transform(input_data[[col]]).flatten().astype(int)
    # report
    data_report.loc[col, 'Number of rows with non-empty values'] = input_data[col].count()
    data_report.loc[col, 'Number of rows with empty values'] = input_data[col].isna().sum()
    data_report.loc[col, 'Number of distinct values'] = len(input_data[col].value_counts())
    data_report.loc[col, 'Top 2 frequent values'] = input_data[col].value_counts().index[0:2].tolist()
    data_report.loc[col, 'Top 2 frequent counts'] = input_data[col].value_counts().values[0:2].tolist()

time_pipeline_1 = time.time() - start_time
def discretization(df, col):
    # discretize
    est = KBinsDiscretizer(n_bins=3, encode='ordinal', strategy='uniform')
    df['Bin_' + col] = est.fit_transform(df[col].values.reshape(-1, 1)).flatten().astype(int)
    # prepare data report
    data_report = pd.DataFrame(columns=['Number of rows with non-empty values',
                                'Number of rows with empty values',
                                'Number of distinct values',
                                'Top 2 frequent values',
                                'Top 2 frequent counts'])
    # report
    data_report.loc[col, 'Number of rows with non-empty values'] = df[col].count()
    data_report.loc[col, 'Number of rows with empty values'] = df[col].isnull().sum()
    data_report.loc[col, 'Number of distinct values'] = df[col].nunique()
    value_counts = df[col].value_counts()
    if len(value_counts) >= 2:
      data_report.loc[col, 'Top 2 frequent values'] = value_counts.index[:2].tolist()
      data_report.loc[col, 'Top 2 frequent counts'] = value_counts.values[:2].tolist()
    else:
      data_report.loc[col, 'Top 2 frequent values'] = value_counts.index[:1].tolist()
      data_report.loc[col, 'Top 2 frequent counts'] = value_counts.values[:1].tolist()
    return data_report

In [17]:
#Initialize Spark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [18]:
#Parameters
num_cores = 4

In [19]:
#Convert Spark dataframe to Pandas dataframe
df = df_spark.toPandas()

In [20]:
#Parallel processing with Pool
start_time = time.time()
with Pool(num_cores) as pool:
  data_reports = pool.starmap(discretization, [(df, col) for col in input_data.columns[1:]])
time_pipeline_2 = time.time() - start_time

In [21]:
#Concatenate data reports
data_report = pd.concat(data_reports, axis=0)
print('Time for Pipeline 1: ',time_pipeline_1,'seconds')
print('Memory usage for Pipeline 1: ', input_data.memory_usage().sum()/ (1024 * 1024),'MB')
print('Time for Pipeline 2: ',time_pipeline_2,'seconds')
print('Memory usage for Pipeline 2: ', df_spark.rdd.getNumPartitions()/ (1024 * 1024),'MB')