In [1]:
import ptls
import pandas as pd
import os
from ptls.preprocessing import PandasDataPreprocessor
from ptls.preprocessing import PysparkDataPreprocessor
import numpy as np
import torch
from sklearn.preprocessing import KBinsDiscretizer
from scipy.stats import chisquare
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession


spark = SparkSession.builder.getOrCreate()

25/06/16 23:15:20 WARN Utils: Your hostname, everdark-Zenbook-UX3402VA-UX3402VA resolves to a loopback address: 127.0.1.1; using 192.168.1.90 instead (on interface wlo1)
25/06/16 23:15:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/16 23:15:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
data = spark.read.option("escape", "\"").csv("../data/gender/transactions.csv", header=True)
data = data.drop('_c0', 'term_id').withColumn('amount', F.col('amount').cast('float'))

In [12]:
data

DataFrame[customer_id: string, tr_datetime: string, mcc_code: string, tr_type: string, amount: float]

In [2]:
def prepare_data_gender_scenario():
    data_path = '../data/gender'

    source_data = pd.read_csv(os.path.join(data_path, 'transactions.csv'))
    source_data = source_data.drop(columns=["term_id"]).rename(columns={'customer_id' : 'client_id'})
    if 'Unnamed: 0' in source_data.columns:
        source_data = source_data.drop(columns=['Unnamed: 0'])

    source_data['time'] = [i.split()[1] for i in source_data.tr_datetime.values]

    padded_time = source_data['tr_datetime'].str.pad(15, 'left', '0')
    day_part = padded_time.str[:6].astype(float)
    time_part = pd.to_datetime(padded_time.str[7:], format='%H:%M:%S').values.astype('int64') // 1e9
    time_part = time_part % (24 * 60 * 60) / (24 * 60 * 60)
    
    source_data.tr_datetime = day_part + time_part

    df_params = {
        "numeric_cols" : ["amount"],
        "cat_cols" : ["mcc_code", "tr_type"],
        "cat_unique" : [],
        "order_col" : "tr_datetime",
        "time_col": "time",
        "text_cols" : ['description'],
        "id_col" : "client_id",
        "target" : "gender"
    }

    for f in df_params["cat_cols"]:
        df_params["cat_unique"].append(source_data[f].unique().shape[0])

    targets = pd.read_csv(os.path.join('../data/gender', 'gender_train.csv')).rename(columns={'customer_id' : 'client_id'})
    targets = source_data[['client_id']].drop_duplicates().merge(targets, on='client_id', how='left').dropna() 
    
    return source_data, targets, df_params

In [3]:
data, targets, df_params = prepare_data_gender_scenario()

In [24]:
preprocessor = PandasDataPreprocessor(
            col_id=df_params['id_col'],
            col_event_time=df_params['order_col'],
            event_time_transformation='none',
            category_transformation = 'frequency',
            cols_category=df_params['cat_cols'],
            cols_discretize={'amount' : ('kmeans', 100)},
            cols_numerical=['amount'],
            return_records=True,
        )

Creating Dask Server
Link Dask Server - http://172.19.0.1:57904/status


Perhaps you already have a cluster running?
Hosting the HTTP server on port 57904 instead


In [3]:
preprocessor = PysparkDataPreprocessor(
            col_id='customer_id',
            col_event_time='tr_datetime',
            event_time_transformation='none',
            category_transformation = 'frequency',
            cols_discretize={'amount' : ('kmeans', 100)},
            cols_numerical=['amount'],
        )

Creating Dask Server
Link Dask Server - http://172.20.10.8:8787/status


In [4]:
a = preprocessor.fit(data)

25/06/16 14:43:29 WARN MemoryStore: Not enough space to cache rdd_34_8 in memory! (computed 18.4 MiB so far)
25/06/16 14:43:29 WARN BlockManager: Persisting block rdd_34_8 to disk instead.
25/06/16 14:43:29 WARN MemoryStore: Not enough space to cache rdd_34_1 in memory! (computed 18.4 MiB so far)
25/06/16 14:43:29 WARN BlockManager: Persisting block rdd_34_1 to disk instead.
25/06/16 14:43:29 WARN MemoryStore: Not enough space to cache rdd_34_5 in memory! (computed 18.4 MiB so far)
25/06/16 14:43:29 WARN BlockManager: Persisting block rdd_34_5 to disk instead.
25/06/16 14:43:29 WARN MemoryStore: Not enough space to cache rdd_34_13 in memory! (computed 18.4 MiB so far)
25/06/16 14:43:29 WARN BlockManager: Persisting block rdd_34_13 to disk instead.
25/06/16 14:43:29 WARN MemoryStore: Not enough space to cache rdd_34_2 in memory! (computed 18.4 MiB so far)
25/06/16 14:43:29 WARN BlockManager: Persisting block rdd_34_2 to disk instead.
25/06/16 14:43:29 WARN MemoryStore: Not enough space 

In [9]:
preprocessor.unitary_func

{'tr_datetime': Unitary transformation,
 'mcc_code': Unitary transformation,
 'tr_type': Unitary transformation,
 'amount': Unitary transformation}

In [8]:
for ct in preprocessor._all_col_transformers:
    print(type(ct), ct.col_name_original, ct.col_name_target, ct.is_drop_original_col)

<class 'ptls.preprocessing.base.transformation.col_identity_transformer.ColIdentityEncoder'> tr_datetime event_time False
<class 'ptls.preprocessing.pandas.pandas_transformation.pandas_freq_transformer.FrequencyEncoder'> mcc_code mcc_code True
<class 'ptls.preprocessing.pandas.pandas_transformation.pandas_freq_transformer.FrequencyEncoder'> tr_type tr_type True
<class 'ptls.preprocessing.pandas.pandas_transformation.discretizer.ColNumericDiscretizer'> amount amount_cat False
<class 'ptls.preprocessing.base.transformation.col_identity_transformer.ColIdentityEncoder'> amount amount False
<class 'ptls.preprocessing.base.transformation.user_group_transformer.UserGroupTransformer'> client_id client_id False


In [11]:
            cols_discretize={'amount' : ('quantile', 100)},

In [5]:
data1 = preprocessor.transform(data)

In [7]:
aa = data1.select('amount_cat').toPandas()

                                                                                

In [12]:
np.concatenate(list(map(np.array, aa.amount_cat)))

array([3, 4, 3, ..., 2, 3, 2])

In [70]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import QuantileDiscretizer

def test_distribution():
    for discr_type in ['quantile', 'kmeans']:
        np.random.seed(42)
        num_rows = 10000
        df = pd.DataFrame({
            'id': np.random.randint(1, 4, size=num_rows),
            'event_dt': np.random.randint(1, 100, size=num_rows),
            'num_value': np.random.normal(loc=0, scale=100, size=num_rows)
        })
        df_spark = spark.createDataFrame(df)

        n_bins_discr = 100
        preprocessor =  PysparkDataPreprocessor(
                col_id='id',
                col_event_time='event_dt',
                event_time_transformation='none',
                category_transformation = 'frequency',
                cols_discretize={'num_value' : (discr_type,  n_bins_discr)},
            )
        processed = preprocessor.fit_transform(df_spark)
        preproc_cats = processed.select('num_value_cat').toPandas()
        preproc_cats = np.concatenate(list(map(np.array, preproc_cats.num_value_cat)))

        if discr_type == 'quantile':
            kbins = QuantileDiscretizer(
                numBuckets=n_bins_discr,
                inputCol='num_value',
                outputCol='num_value_cat',
                handleInvalid="skip"
            )
            wrapped = df_spark
        else:
            kbins = KMeans(
                    k=n_bins_discr, 
                    featuresCol='num_value_wrapped', 
                    predictionCol='num_value_cat',
                    seed=42,                 
                )
            va = VectorAssembler(inputCols=['num_value'], outputCol='num_value_wrapped')
            wrapped = va.transform(df_spark)
        processed = kbins.fit(wrapped).transform(wrapped)
        kbins_cats = processed.select('num_value_cat').toPandas()
        kbins_cats = kbins_cats.num_value_cat.values.astype(int)

        preproc_counts = np.bincount(preproc_cats - 1, minlength=n_bins_discr)
        kbins_counts = sorted(np.bincount(kbins_cats, minlength=n_bins_discr))[::-1]

        stat, p_value = chisquare(f_obs=preproc_counts, f_exp=kbins_counts)
        print(f"Discretization {discr_type}: chi-square p-value is {p_value:.5f}")

        assert p_value > 0.05, f"Discretization {discr_type}: distributions differ significantly (p={p_value:.5f})"


In [29]:
np.bincount([1, 1, 2, 4, 4, 4, 4, 4], minlength=3), np.bincount([1, 1, 1, 2, 4, 1, 2, 1], minlength=3)

(array([0, 2, 1, 0, 5]), array([0, 5, 2, 0, 1]))

In [71]:
test_distribution()

Creating Dask Server


Perhaps you already have a cluster running?
Hosting the HTTP server on port 43135 instead


Link Dask Server - http://172.20.10.8:43135/status


25/06/16 18:36:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:11 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Discretization quantile: chi-square p-value is 1.00000
Creating Dask Server


Perhaps you already have a cluster running?
Hosting the HTTP server on port 44935 instead


Link Dask Server - http://172.20.10.8:44935/status


25/06/16 18:36:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:36:19 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Discretization kmeans: chi-square p-value is 1.00000


In [34]:
def test_add_replace_col():
    np.random.seed(42)
    num_rows = 1000
    df = pd.DataFrame({
        'id': np.random.randint(1, 4, size=num_rows),
        'event_dt': np.random.randint(1, 100, size=num_rows),
        'num_value': np.random.normal(loc=0, scale=100, size=num_rows)
    })

    n_bins_discr = 10
    discr_type = 'quantile'
    preprocessor =  PandasDataPreprocessor(
            col_id='id',
            col_event_time='event_dt',
            event_time_transformation='none',
            category_transformation = 'none',
            cols_discretize={'num_value' : (discr_type,  n_bins_discr)},
            cols_numerical=['num_value'],
            return_records=True,
        )
    processed = preprocessor.fit_transform(df)

    assert 'num_value' in processed[0], f"Original numeric expected in preprocessed data but not found"
    assert 'num_value_cat' in processed[0], f"Discretized column 'num_value_cat' expected in preprocessed data but not found"

    preprocessor =  PandasDataPreprocessor(
            col_id='id',
            col_event_time='event_dt',
            event_time_transformation='none',
            category_transformation = 'none',
            cols_discretize={'num_value' : (discr_type,  n_bins_discr)},
            return_records=True,
        )
    processed = preprocessor.fit_transform(df)

    assert 'num_value' not in processed[0], f"Original numeric not expected in preprocessed data but found"
    assert 'num_value_cat' in processed[0], f"Discretized column expected in preprocessed data but not found"

In [35]:
test_add_replace_col()

Creating Dask Server
Link Dask Server - http://172.19.0.1:59004/status


Perhaps you already have a cluster running?
Hosting the HTTP server on port 59004 instead


Creating Dask Server


Perhaps you already have a cluster running?
Hosting the HTTP server on port 59011 instead


Link Dask Server - http://172.19.0.1:59011/status




In [63]:
for discr_type in ['kmeans', 'quantile']:
    np.random.seed(42)
    num_rows = 10000
    df = pd.DataFrame({
        'id': np.random.randint(1, 4, size=num_rows),
        'event_dt': np.random.randint(1, 100, size=num_rows),
        'num_value': np.random.normal(loc=0, scale=100, size=num_rows)
    })

    n_bins_discr = 100
    preprocessor =  PandasDataPreprocessor(
            col_id='id',
            col_event_time='event_dt',
            event_time_transformation='none',
            category_transformation = 'frequency',
            cols_discretize={'num_value' : (discr_type,  n_bins_discr)},
            return_records=True,
        )
    processed = preprocessor.fit_transform(df)
    preproc_cats = torch.cat([x['num_value_cat'] for x in processed]).numpy()

    kbins = KBinsDiscretizer(n_bins=n_bins_discr, encode='ordinal', strategy=discr_type)
    sklearn_cats = kbins.fit_transform(df[['num_value']]).astype(int).flatten()

    preproc_counts = np.bincount(preproc_cats - 1, minlength=n_bins_discr)
    sklearn_counts = sorted(np.bincount(sklearn_cats, minlength=n_bins_discr))[::-1]

    stat, p_value = chisquare(f_obs=preproc_counts, f_exp=sklearn_counts)
    print(f"Discretization {discr_type}: chi-square p-value is {p_value:.5f}")

    assert p_value > 0.05, f"Discretization {discr_type}: distributions differ significantly (p={p_value:.5f})"


Creating Dask Server


Perhaps you already have a cluster running?
Hosting the HTTP server on port 40837 instead


Link Dask Server - http://172.20.10.8:40837/status
Discretization kmeans: chi-square p-value is 1.00000
Creating Dask Server


Perhaps you already have a cluster running?
Hosting the HTTP server on port 46575 instead


Link Dask Server - http://172.20.10.8:46575/status
Discretization quantile: chi-square p-value is 1.00000


In [None]:
from collections import Counter
Counter(preproc_cats)

In [74]:
np.random.seed(42)
num_rows = 1000
df = pd.DataFrame({
    'id': np.random.randint(1, 4, size=num_rows),
    'event_dt': np.random.randint(1, 100, size=num_rows),
    'num_value': np.random.normal(loc=0, scale=100, size=num_rows)
})
df_spark = spark.createDataFrame(df)

n_bins_discr = 10
discr_type = 'quantile'
preprocessor =  PysparkDataPreprocessor(
        col_id='id',
        col_event_time='event_dt',
        event_time_transformation='none',
        category_transformation = 'none',
        cols_discretize={'num_value' : (discr_type,  n_bins_discr)},
        cols_numerical=['num_value'],
    )
processed = preprocessor.fit_transform(df_spark)

assert 'num_value' in processed.columns, f"Original numeric expected in preprocessed data but not found"
assert 'num_value_cat' in processed.columns, f"Discretized column 'num_value_cat' expected in preprocessed data but not found"

preprocessor =  PysparkDataPreprocessor(
        col_id='id',
        col_event_time='event_dt',
        event_time_transformation='none',
        category_transformation = 'none',
        cols_discretize={'num_value' : (discr_type,  n_bins_discr)},
    )
processed = preprocessor.fit_transform(df_spark)

assert 'num_value' not in processed.columns, f"Original numeric not expected in preprocessed data but found"
assert 'num_value_cat' in processed.columns, f"Discretized column expected in preprocessed data but not found"


Creating Dask Server


Perhaps you already have a cluster running?
Hosting the HTTP server on port 39095 instead


Link Dask Server - http://172.20.10.8:39095/status




Creating Dask Server


Perhaps you already have a cluster running?
Hosting the HTTP server on port 40627 instead


Link Dask Server - http://172.20.10.8:40627/status


In [2]:
from ptls_tests.test_preprocessing.test_pyspark.test_discretizer import test_add_replace_col, test_distribution 

In [3]:
test_add_replace_col()
test_distribution()

Creating Dask Server
Link Dask Server - http://172.20.10.8:8787/status




Creating Dask Server
Link Dask Server - http://172.20.10.8:35641/status


Perhaps you already have a cluster running?
Hosting the HTTP server on port 35641 instead


Creating Dask Server
Link Dask Server - http://172.20.10.8:34439/status


Perhaps you already have a cluster running?
Hosting the HTTP server on port 34439 instead
25/06/16 18:50:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:15 WARN WindowExec: No Partition Defined for Window operation! Moving 

Discretization quantile: chi-square p-value is 1.00000
Creating Dask Server


Perhaps you already have a cluster running?
Hosting the HTTP server on port 33181 instead


Link Dask Server - http://172.20.10.8:33181/status


25/06/16 18:50:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/16 18:50:20 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Discretization kmeans: chi-square p-value is 1.00000


In [2]:
from ptls_tests.test_preprocessing.test_pandas.test_discretizer import test_add_replace_col, test_distribution 

In [3]:
test_add_replace_col()
test_distribution()

Creating Dask Server
Link Dask Server - http://192.168.1.90:8787/status
Creating Dask Server
Link Dask Server - http://192.168.1.90:42717/status
Creating Dask Server
Link Dask Server - http://192.168.1.90:40685/status


Perhaps you already have a cluster running?
Hosting the HTTP server on port 42717 instead
Perhaps you already have a cluster running?
Hosting the HTTP server on port 40685 instead


Discretization kmeans: chi-square p-value is 1.00000
Creating Dask Server
Link Dask Server - http://192.168.1.90:36855/status
Discretization quantile: chi-square p-value is 1.00000


Perhaps you already have a cluster running?
Hosting the HTTP server on port 36855 instead
