In [1]:
# !pip install findspark

In [2]:
import findspark
findspark.init()
findspark.find()

'/usr/lib/spark'

In [3]:
import sys
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType
from pyspark.sql.functions import *
from pyspark.sql import functions as F, Window

In [4]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql.types import StructField, StructType
from pyspark.sql.types import IntegerType, StringType, NumericType

In [5]:
spark = SparkSession.builder\
    .master('local[*]')\
    .appName('CatBoostWithSpark')\
    .config("spark.jars.packages", "ai.catboost:catboost-spark_3.0_2.12:1.2.2")\
    .getOrCreate()

    

In [6]:
spark

In [7]:
import catboost_spark

In [8]:
path = "s3a://test-bucket-mlops/train/train.csv"
df = spark.read.option("header", True).option("inferSchema", "true").csv(path)

In [9]:
TARGET_LABEL = 'loss'

In [10]:
def identify_variable_types(df, unique_threshold=10, id_vars=[]):
    """
    Identify variable types in a PySpark DataFrame.

    :param df: The input PySpark DataFrame
    :param unique_threshold: The maximum number of unique values for a discrete variable. Default is 10.
    :id_vars: Unique keys like CustomerId, Names etc..
    :return: A dictionary with variable names as keys and variable types as values.
    """

    discrete_columns = []
    categorical_columns = []
    continuous_columns = []
    Other_columns = id_vars
    df = df.drop(*Other_columns)

    for column in df.columns:
        dtype = df.schema[column].dataType

        if isinstance(dtype, StringType):
            unique_count = df.agg(countDistinct(col(column)).alias("unique_count")).collect()[0]["unique_count"]
            if unique_count <= unique_threshold:
                categorical_columns.append(column)
            else:
                Other_columns.append(column)

        elif isinstance(dtype, IntegerType):
            unique_count = df.agg(countDistinct(col(column)).alias("unique_count")).collect()[0]["unique_count"]
            if unique_count <= unique_threshold:
                discrete_columns.append(column)
            else:
                continuous_columns.append(column)

        elif isinstance(dtype, NumericType):
            continuous_columns.append(column)

    return discrete_columns, categorical_columns, continuous_columns, Other_columns

In [11]:
# Identify variable types
discrete_columns, categorical_columns, continuous_columns, Other_columns = identify_variable_types(df, id_vars=['id'])

print("Discrete columns:", discrete_columns)
print("Categorical columns:", categorical_columns)
print("Continuous columns:", continuous_columns)
print("Other columns:", Other_columns)

In [13]:
train_df, test_df = df.randomSplit([0.7, 0.3])

In [16]:
features = ['f674', 'f2', 'f67', 'f471', 'f766', 'f670', 'f596', 'f332', 'f464']


In [17]:
for feature in features:
    train_df = train_df.fillna({feature: 0.})
    test_df = test_df.fillna({feature: 0.})    

In [18]:
assembler = VectorAssembler(inputCols=features, outputCol='features')

In [19]:
def prepare_vector(df: DataFrame)-> DataFrame:    
    result_df = assembler.transform(df)
    return result_df

In [20]:
train = prepare_vector(train_df)
test = prepare_vector(test_df)

In [21]:
train_pool = catboost_spark.Pool(train.select(['features', TARGET_LABEL]))
train_pool.setLabelCol(TARGET_LABEL)
train_pool.setFeaturesCol('features')

Pool_a5d05244e8fd

In [22]:
classifier = catboost_spark.CatBoostClassifier(featuresCol='features', labelCol=TARGET_LABEL)

In [23]:
model = classifier.fit(train_pool)

In [24]:
predict = model.transform(test)

In [29]:
predicted = predict.select("features", "probability", "prediction")

+-------------------------------------------------------------+------------------------------------------+----------+
|features                                                     |probability                               |prediction|
+-------------------------------------------------------------+------------------------------------------+----------+
|[33.0,9.0,4.9649,1.24,-0.28,381.0,2.845,28.26,4882.19]       |[0.9478083537148293,0.05219164628517063]  |0.0       |
|[97.0,9.0,11.571,1.16,-0.445,887.0,8.7,11.07,1955646.0]      |[0.9176042321300583,0.08239576786994174]  |0.0       |
|[23.0,9.0,11.571,1.13,-0.715,887.0,8.7,812.62,3526328.12]    |[0.8424871148785694,0.15751288512143052]  |0.0       |
|[22.0,9.0,11.571,1.16,-0.31,887.0,8.7,0.0,350509.97]         |[0.9487498965015742,0.051250103498425854] |0.0       |
|[102.0,9.0,11.1023,1.18,-0.795,842.0,8.12,1330.09,109637.45] |[0.8321861840197288,0.16781381598027123]  |0.0       |
|[24.0,9.0,11.1023,1.17,-0.335,842.0,8.12,119.44,2265474

In [30]:
evaluator = BinaryClassificationEvaluator(
                                         labelCol=TARGET_LABEL,
                                         rawPredictionCol="probability", 
                                         metricName="areaUnderROC")
        

In [31]:
print(f'Model auc = {evaluator.evaluate(predict)}')

Model auc = 0.687040572902892
