In [None]:
import sys

sys.path.append('../')

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

from probabilistic_covshift.constants.automl_constants import AutoMLConfig
from probabilistic_covshift.constants.automl_constants import H2OServerInfo
from probabilistic_covshift.constants.main_constants import OriginFeatures, WeightFeatures
from probabilistic_covshift.probabilistic_classification_covshift import ProbabilisticClassification

In [None]:
spark = SparkSession.builder.appName('main').master('local[4]').getOrCreate()

In [None]:
source_df = spark.createDataFrame([
    ('38.9', 40.0, 55, 10.0), ('88.9', 50.0, 15, 20.0),
    ('38.9', 50.0, 15, 10.0), ('48.9', 40.0, 55, 20.0),
    ('38.9', 40.0, 55, 10.0), ('98.9', 50.0, 15, 20.0),
    ('88.9', 50.0, 15, 20.0), ('18.9', 40.0, 55, 30.0),
    ('48.9', 40.0, 55, 20.0), ('58.9', 50.0, 15, 30.0),
    ('98.9', 50.0, 15, 20.0), ('38.9', 40.0, 55, 10.0),
    ('18.9', 40.0, 55, 30.0), ('38.9', 50.0, 15, 10.0),
    ('58.9', 50.0, 15, 30.0), ('38.9', 40.0, 55, 10.0),
    ('38.9', 40.0, 55, 10.0), ('88.9', 50.0, 15, 20.0),
    ('38.9', 50.0, 15, 10.0), ('48.9', 40.0, 55, 20.0),
    ('38.9', 40.0, 55, 10.0), ('98.9', 50.0, 15, 20.0),
    ('88.9', 50.0, 15, 20.0), ('18.9', 40.0, 55, 30.0),
    ('48.9', 40.0, 55, 20.0), ('58.9', 50.0, 15, 30.0),
    ('98.9', 50.0, 15, 20.0), ('38.9', 40.0, 55, 10.0),
    ('18.9', 40.0, 55, 30.0), ('38.9', 50.0, 15, 10.0),
    ('58.9', 50.0, 15, 30.0), ('38.9', 40.0, 55, 10.0)],
    ['col_a', 'col_b', 'col_c', 'col_d'])

In [None]:
target_df = spark.createDataFrame([
    ('18.9', 40.0, 95, 10.0), ('38.9', 50.0, 15, 20.0),
    ('18.9', 50.0, 95, 10.0), ('38.9', 40.0, 55, 20.0),
    ('18.9', 40.0, 95, 10.0), ('38.9', 50.0, 15, 20.0),
    ('18.9', 50.0, 95, 30.0), ('38.9', 40.0, 55, 30.0),
    ('18.9', 40.0, 95, 30.0), ('38.9', 50.0, 15, 30.0),
    ('38.9', 50.0, 95, 30.0), ('18.9', 40.0, 55, 10.0),
    ('38.9', 40.0, 95, 30.0), ('18.9', 50.0, 15, 10.0),
    ('38.9', 50.0, 95, 30.0), ('18.9', 40.0, 55, 10.0),
    ('38.9', 40.0, 55, 30.0), ('58.9', 50.0, 15, 20.0),
    ('38.9', 50.0, 15, 30.0), ('58.9', 40.0, 55, 20.0),
    ('38.9', 40.0, 55, 30.0), ('58.9', 50.0, 15, 20.0),
    ('58.9', 50.0, 15, 30.0), ('58.9', 40.0, 55, 30.0),
    ('58.9', 40.0, 55, 30.0), ('58.9', 50.0, 15, 30.0),
    ('58.9', 50.0, 15, 30.0), ('58.9', 40.0, 55, 10.0),
    ('58.9', 40.0, 55, 30.0), ('58.9', 50.0, 15, 10.0),
    ('58.9', 50.0, 15, 30.0), ('58.9', 40.0, 55, 10.0)],
    ['col_a', 'col_b', 'col_c', 'col_d'])

# Compute weights

In [None]:
conf = {
    AutoMLConfig.DATA: {
        AutoMLConfig.CATEGORICAL_VARIABLES: [
            'col_a'
        ],
        AutoMLConfig.LABEL_COL: 'col_d',
        AutoMLConfig.ORIGIN_COL: OriginFeatures.ORIGIN,
        AutoMLConfig.WEIGHT_COL: WeightFeatures.WEIGHT,
        AutoMLConfig.BASE_TABLE_PATH: 'data/base_table.parquet',
        AutoMLConfig.WEIGHT_PATH: 'data/weight.csv'
    },
    AutoMLConfig.SERVER_CONN_INFO: {
        H2OServerInfo.IP: 'localhost',
        H2OServerInfo.PORT: '54321'
    },
    AutoMLConfig.CROSS_VAL: {
        AutoMLConfig.FOLD_COL: "fold",
        AutoMLConfig.NFOLDS: 8,
    },
    AutoMLConfig.MODELING: {
        AutoMLConfig.MAX_RUNTIME_SECS: 3600,
        AutoMLConfig.MAX_MODELS: 10,
        AutoMLConfig.STOPPING_METRIC: 'logloss',
        AutoMLConfig.SORT_METRIC: 'logloss'
    },
    AutoMLConfig.EXCLUDE_ALGOS: [
        "StackedEnsemble",
        "DeepLearning"
    ],
    AutoMLConfig.MODEL: {
        AutoMLConfig.BEST_MODEL_PATH: 'data/model/'
    },
    AutoMLConfig.SEED: 23
}

In [None]:
pc = ProbabilisticClassification(source_df, target_df, conf)
pc.run()

# Append the weight to the base table

In [None]:
base_frame_df = spark.read.parquet(conf[AutoMLConfig.DATA][AutoMLConfig.BASE_TABLE_PATH])\
                     .drop(conf[AutoMLConfig.DATA][AutoMLConfig.ORIGIN_COL])
base_frame_df.toPandas().head()

In [None]:
weight_df = spark.read.csv(conf[AutoMLConfig.DATA][AutoMLConfig.WEIGHT_PATH], header=True)
weight_df.toPandas().head()

In [None]:
weighted_base_frame_df = base_frame_df.join(weight_df, how='left', on='row_id').drop('row_id')
weighted_base_frame_df.toPandas().head()