In [1]:
import os
import sys
import pandas as pd
import numpy as np
import time
from functools import reduce
import pyarrow as pa
import pyarrow.parquet as pq

# Import other utils:
notebook_path = os.getcwd()
root_path = os.path.split(notebook_path)[0]
sys.path.append(root_path)
import parsing_utils as psu

import pyspark


# Create SparkContext and submit custom jars:
extra_jars_dir = r'C:\spark\extra_jars'
use_jars = ['xgboost4j-spark_3.0-1.2.0-0.1.0.jar', 'xgboost4j_3.0-1.2.0-0.1.0.jar']
use_jars = [os.path.join(extra_jars_dir, jj) for jj in use_jars]
spark_jars = reduce(lambda x,y: x+','+y, use_jars)
extraClassPath_jars = reduce(lambda x,y: x+':'+y, use_jars)

# pyspark has issues with Java 11 and arrow. Set "-Dio.netty.tryReflectionSetAccessible=true"
# https://stackoverflow.com/questions/62109276/errorjava-lang-unsupportedoperationexception-for-pyspark-pandas-udf-documenta


# Create SparkContext (i.e., spark local cluster by default if spark.master not set):
config = pyspark.SparkConf().setAll([
    ('spark.sql.execution.arrow.pyspark.enabled','true'),
    ('spark.driver.memory','4G'), 
    ('spark.executor.memory','6G'), 
    ('spark.jars',spark_jars),
    ('spark.driver.extraClassPath',extraClassPath_jars), 
    ('spark.executor.extraClassPath',extraClassPath_jars),
    ('spark.driver.extraJavaOptions','-Dio.netty.tryReflectionSetAccessible=true'),
    ('spark.executor.extraJavaOptions','-Dio.netty.tryReflectionSetAccessible=true')
])
sc = pyspark.SparkContext(master='local[4]', conf=config)
sc.addPyFile(use_jars[0])
# Use sc.stop() to kill SparkContext.

# Create SparkSession from existing SparkContext:
spark = pyspark.sql.SparkSession(sc)

import ml.dmlc.xgboost4j.scala.spark as xgb
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, OneHotEncoderModel, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col

from pyspark.sql import functions as F
from pyspark.ml.linalg import SparseVector, DenseVector, Vectors, VectorUDT


print('xboost version: ', xgb.__version__)

xboost version:  1.3.0


In [2]:
# Get column names from single file scan:
# data_dir = '/dbfs/FileStore/tables/telco-data' # Databricks
data_dir = 'C:/data/telco_dat'
# tmp_df = pd.read_csv(data_dir+'/csv/perturb_dat_p0000.csv', nrows=1)
tmp_df = pd.read_csv(data_dir+'/csv/perturb_dat_p0000.csv')


all_cols = list(tmp_df.columns)

# Use spark and one hot encoder to create numeric dmatrix for xgboost.
# More details here: https://stackoverflow.com/questions/32982425/encode-and-assemble-multiple-features-in-pyspark
# Categorical string columns:
cat_str_cols = list(tmp_df.select_dtypes('object').columns)

# Categorical numeric columns:
cat_num_cols = ['SeniorCitizen']

# Numeric columns:
num_cols = list(tmp_df.select_dtypes('number').columns)

# Ignore columns:
ignore_cols = ['customerID']

# Determine unique values per column. Used to determine encoding strategy. May require fillna to be invoked.
unique_cnt = tmp_df.nunique(axis=0)
binary_cols = list(unique_cnt[unique_cnt==2].index)

# Remove binary numeric columns:
binary_cols = [cc for cc in binary_cols if cc not in num_cols]

# One hot encode only non-binary, categorical columns:
ohe_cols = list(set(cat_str_cols) - set(binary_cols))

print('Binary columns detected:', binary_cols)
print('Numeric columns:', num_cols)

# TODO: low cardinality numeric cols. These may require one-hot-encoding.

# Estimate total number of columns after encoding data:
ohe_col_cnt = np.sum(unique_cnt[(unique_cnt > 2) & (unique_cnt < 5)].values)
est_col_total = ohe_col_cnt + len(binary_cols) + len(num_cols) - len(ignore_cols)

print()
print('Estimated number of columns after encoding:', est_col_total)

Binary columns detected: ['gender', 'Partner', 'Dependents', 'PhoneService', 'PaperlessBilling', 'Churn']
Numeric columns: ['customerID', 'SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges']

Estimated number of columns after encoding: 41


In [3]:
# Most columns are string. Use exceptions for other dtypes.
exception_col_dtype = {cc: FloatType() for cc in num_cols}

# # Update dictionary for numeric categorical columns:
# exception_col_dtype.update({cc: StringType() for cc in cat_num_cols})

schema_lit = []
for cc in all_cols:
    if cc in exception_col_dtype.keys():
        dtype = exception_col_dtype[cc]
    else:
        dtype = StringType()
    
    schema_lit.append(StructField(cc, dtype))
    
# For schema generation, need to have same number of columns in python and spark. 
# Don't apply the ignore_cols until after data loaded into spark.
schema_lit = StructType(schema_lit)

schema_lit

StructType(List(StructField(customerID,FloatType,true),StructField(gender,StringType,true),StructField(SeniorCitizen,FloatType,true),StructField(Partner,StringType,true),StructField(Dependents,StringType,true),StructField(tenure,FloatType,true),StructField(PhoneService,StringType,true),StructField(MultipleLines,StringType,true),StructField(InternetService,StringType,true),StructField(OnlineSecurity,StringType,true),StructField(OnlineBackup,StringType,true),StructField(DeviceProtection,StringType,true),StructField(TechSupport,StringType,true),StructField(StreamingTV,StringType,true),StructField(StreamingMovies,StringType,true),StructField(Contract,StringType,true),StructField(PaperlessBilling,StringType,true),StructField(PaymentMethod,StringType,true),StructField(MonthlyCharges,FloatType,true),StructField(TotalCharges,FloatType,true),StructField(Churn,StringType,true)))

In [4]:
# Load and apply schema to csv data:
data_format = 'csv'
# data_format = 'parquet'

# Databricks environment doesn't require "dbfs/" pre-appended.
# data_loc = '/FileStore/tables/telco-data/csv'
# data_loc = '/FileStore/tables/telco-data/csv/perturb_dat_p0000.csv'
# data_loc = '/FileStore/tables/telco-data/WA_Fn_UseC__Telco_Customer_Churn_mod.csv'
data_loc = data_dir + '/'+data_format+'/perturb_dat_p0000.'+data_format

if data_format == 'csv':
    train_data = spark.read.schema(schema_lit).option('header', True).csv(data_loc)
elif data_format == 'parquet':
    train_data = spark.read.parquet(data_loc)

train_data = train_data.drop(*ignore_cols)

# Adapted from: https://stackoverflow.com/questions/36942233/apply-stringindexer-to-several-columns-in-a-pyspark-dataframe
# First pass, convert categorical columns to numeric using StringIndexer:
indexers = [StringIndexer(inputCol=cc, outputCol=cc+'_index').fit(train_data) for cc in cat_str_cols]

# TODO: set to overwrite existing field for better mem management.

# Apply oneHotEncoder. Change the dropLast option to False to correspond with pandas get_dummies() default behavior.
encoder = OneHotEncoder(inputCols=[cc+'_index' for cc in ohe_cols],
                        outputCols=[cc+'_enc' for cc in ohe_cols],
                        dropLast=False
                       )

pipeline = Pipeline(stages=indexers + [encoder])
train_data_enc = pipeline.fit(train_data).transform(train_data)

# Encode sparse to dense format for xgboost4j to recognize:
sparse_to_vector_udf = F.udf(lambda vs: Vectors.dense([float(ii) for ii in vs]), VectorUDT())

# Assembler:
def vectorize(data_frame):
    to_floats = [ col(x.name).cast(FloatType()) for x in data_frame.schema ]
    return (VectorAssembler()
        .setInputCols(use_cols)
        .setOutputCol('features')
#         .setHandleInvalid('keep') # How to handle NaN's
        .transform(data_frame)
        .withColumn('features', sparse_to_vector_udf('features')) # Expand sparse to dense vector format
        .select(col('features'), col(label)))

# Assembler stage. Make sure ignore_cols are removed and label/target column included.
use_cols = [cc+'_enc' for cc in ohe_cols] + [cc+'_index' for cc in binary_cols] + num_cols
use_cols = [cc for cc in use_cols if cc not in ignore_cols]

label = 'Churn_index' # Needs to be numeric encoded (i.e., with StringIndexer)

# Generate the 2 column format expected by cpu version of xgboost:
train_data_enc_vec = vectorize(train_data_enc)

In [5]:
# Show data format:
# Sample encoding from sparkML:
train_data_enc.select(['StreamingMovies','StreamingMovies_enc']).show(15)
print()

# Data formated for xgboost:
# train_data_enc.show(5,False)
train_data_enc_vec.show(5,False)

+-------------------+-------------------+
|    StreamingMovies|StreamingMovies_enc|
+-------------------+-------------------+
|                 No|      (3,[0],[1.0])|
|                 No|      (3,[0],[1.0])|
|                 No|      (3,[0],[1.0])|
|                 No|      (3,[0],[1.0])|
|                 No|      (3,[0],[1.0])|
|                Yes|      (3,[1],[1.0])|
|                 No|      (3,[0],[1.0])|
|                 No|      (3,[0],[1.0])|
|                Yes|      (3,[1],[1.0])|
|                 No|      (3,[0],[1.0])|
|                 No|      (3,[0],[1.0])|
|No internet service|      (3,[2],[1.0])|
|                Yes|      (3,[1],[1.0])|
|                Yes|      (3,[1],[1.0])|
|                Yes|      (3,[1],[1.0])|
+-------------------+-------------------+
only showing top 15 rows


+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
def pd_ohe_spark(filename, ohe_cols, binary_cols, label_col, drop_cols=None, verbose=False):
    """
    Use pandas to one-hot-encode categorical data, then convert results to a spark dataframe using pyarrow.
    Make sure: spark.conf.set("spark.sql.execution.arrow.enabled", "True").

    filename: str
        Fully specified file path.
    ohe_cols: list
        List of strings with categorical columns to be one-hot-encoded.
    binary_cols: list
        List of strings with binary categorical columns to encode. 
    label_col: str
        Name of label/target column (i.e., y). Encoded as numeric.
    drop_cols: list
        List of strings with columns to drop.
    """

    file_type = filename.split('.')[-1]
    
    tic = time.perf_counter()
    if file_type == 'csv':
        data_pd = pd.read_csv(filename)
    elif file_type == 'parquet':
        # Use pyarrow to load parquet data directly. Pandas very slow at loading decimal encoded parquet. Spark appears to read decimal encoded parquet properly. 
        data_pa = pq.read_table(data_loc)
        pa_schema = data_pa.schema
        
        # Convert pa.decimal128 to pa.float64:
        updated_schema = pa.schema([pa.field(dd.name, pa.float64()) if pa.types.is_decimal(dd.type) else dd for dd in pa_schema])

        # Convert arrow table to pandas dataframe:
        data_pd = data_pa.cast(updated_schema).to_pandas()
    else:
        raise TypeError('File type not recognized. Input file must be csv or parquet.')
    
    if drop_cols != None:
        data_pd = data_pd.drop(columns=drop_cols)
    
    toc_load_data = time.perf_counter() - tic
    
    # One hot encoding:
    tic = time.perf_counter()
    # Use drop_fist=True to get k-1 dummies out of k categorical levels by removing the first level. (as done in sparkML OneHotEncoder)
    data_pd_enc = pd.get_dummies(data_pd, columns=[cc for cc in ohe_cols if cc != label_col])

    # Upconvert numeric datatype to supported arrow format. Unsupported formats: uint8
    pd_dtypes = pd.DataFrame(data_pd_enc.dtypes, columns=['dtype'])

    # TODO: write fnc for casting to smallest compatible dtype between spark & arrow.
    # Data type conversion required compatibility for pandas => arrow 
    uint_cols = list(data_pd_enc.select_dtypes('uint8').columns)
    data_pd_enc[uint_cols] = data_pd_enc[uint_cols].astype('int8')
    
    # Encode binary categorical columns (e.g., [False, True] or [No, Yes] => [0, 1]):
    for cc in binary_cols:
        data_pd_enc[cc] = data_pd_enc[cc].astype('category')
        data_pd_enc[cc] = data_pd_enc[cc].cat.codes
    
#     # Encode label column if categorical:
#     if pd.api.types.is_object_dtype(data_pd_enc[label_col].dtypes):
#         data_pd_enc[label_col] = data_pd_enc[label_col].astype('category')
#         data_pd_enc[label_col+'_index'] = data_pd_enc[label_col].cat.codes
#         data_pd_enc = data_pd_enc.drop(columns=[label_col])
    
    toc_ohe = time.perf_counter() - tic
    
    # Create spark dataframe of encoded data. Make sure that spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "True")
    tic = time.perf_counter()
    data_sp_enc = spark.createDataFrame(data_pd_enc)
    toc_spark_df = time.perf_counter() - tic
    
    if verbose:
        print(f'Loading data took: {toc_load_data:0.4f} seconds')
        print(f'One hot encoding and dtype conversion took: {toc_ohe:0.4f} seconds')
        print(f'Converting pandas to spark dataframe took: {toc_spark_df:0.4f} seconds')
    return(data_sp_enc)

# Use pandas to encode X, then pyarrow to convert X into a spark dataframe:
data_sp = pd_ohe_spark(data_loc, ohe_cols, binary_cols, 'Churn', ignore_cols, verbose=True)

print()
print('Total number of columns after encoding:', len(data_sp.columns))

data_sp_vec = (VectorAssembler()
        .setInputCols([cc for cc in data_sp.columns if cc != label])
        .setOutputCol('features')
#         .setHandleInvalid('keep') # How to handle NaN's
        .transform(data_sp)
        .withColumn('features', sparse_to_vector_udf('features'))
        .select(col('features'), col('Churn')))

data_sp

Loading data took: 20.1425 seconds
One hot encoding and dtype conversion took: 15.6191 seconds
Converting pandas to spark dataframe took: 1.4377 seconds

Total number of columns after encoding: 41


DataFrame[gender: tinyint, SeniorCitizen: bigint, Partner: tinyint, Dependents: tinyint, tenure: bigint, PhoneService: tinyint, PaperlessBilling: tinyint, MonthlyCharges: double, TotalCharges: double, Churn: tinyint, InternetService_DSL: tinyint, InternetService_Fiber optic: tinyint, InternetService_No: tinyint, StreamingMovies_No: tinyint, StreamingMovies_No internet service: tinyint, StreamingMovies_Yes: tinyint, PaymentMethod_Bank transfer (automatic): tinyint, PaymentMethod_Credit card (automatic): tinyint, PaymentMethod_Electronic check: tinyint, PaymentMethod_Mailed check: tinyint, Contract_Month-to-month: tinyint, Contract_One year: tinyint, Contract_Two year: tinyint, TechSupport_No: tinyint, TechSupport_No internet service: tinyint, TechSupport_Yes: tinyint, OnlineBackup_No: tinyint, OnlineBackup_No internet service: tinyint, OnlineBackup_Yes: tinyint, DeviceProtection_No: tinyint, DeviceProtection_No internet service: tinyint, DeviceProtection_Yes: tinyint, MultipleLines_No: 

In [7]:
data_sp_vec.show(5,False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                    |Churn|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|[0.0,0.0,1.0,0.0,1.0,0.0,1.0,34.7,34.7,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0]     |0    |
|[1.0,0.0,0.0,0.0,35.0,1.0,0.0,53.47,1810.75,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0]|0    |
|[1.0,0.0,0.0,0.0,2.0,1.0,1.0,51.01,102.01,1.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,

In [8]:
break

SyntaxError: 'break' outside loop (<ipython-input-8-6aaf1f276005>, line 1)

In [None]:
params = { 
    'seed': 0,
    'treeMethod': 'hist',
    'maxDepth': 10,
    'numRound': 10,
    # 'numWorkers': 1,
    # 'nthread': 12,
    # 'verbosity': 3
}

# Use xgboost-4j:
classifier = xgb.XGBoostClassifier(**params).setLabelCol(label).setFeaturesCol('features')

In [None]:
def with_benchmark(phrase, action):
    start = time()
    result = action()
    end = time()
    print('{} takes {} seconds'.format(phrase, round(end - start, 2)))
    return result

loaded_model = with_benchmark('Training', lambda: classifier.fit(train_data_enc))

In [None]:
# Apply model on all data:
def transform():
    result = loaded_model.transform(train_data_enc).cache()
    result.foreachPartition(lambda _: None)
    return result

result = with_benchmark('Transformation', transform)

result.select('features', label).show(5)

In [None]:
# Benchmark scoring/evaluation:
metric_eval = with_benchmark(
    'Evaluation',
    lambda: BinaryClassificationEvaluator().setMetricName('areaUnderROC').setLabelCol(label).evaluate(result))

print('AUC is ' + str(metric_eval))