# Logistic Regression Classification

In [0]:
# this allows pyspark.ml for accelerated estimators to import the accelerated versions
# comment out or skip for CPU only runs
# import spark_rapids_ml.install

In [0]:
import pandas as pd
from gen_data_distributed import ClassificationDataGen, SparseRegressionDataGen
from sklearn.model_selection import train_test_split
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import time

Disable mlflow auto logging as it is resource intensive.

In [0]:
import mlflow
mlflow.autolog(disable=True)

### Create synthetic dataset

The number of classes in the dataset is set to 2 below.  Larger values for `n_classes` are also supported.

In [0]:
n_classes = 2

In [0]:
#n_rows = 20000000000
n_rows = 20000000000
n_cols = 3
n_files = 2000
output = f"s3://eordentlich/polynomial-lr/{int(n_rows/1e9)}bx{n_cols}"
data_generator = ClassificationDataGen(argv=["--no_shutdown", "--num_rows", f"{n_rows}", "--num_cols", f"{n_cols}", 
                                             "--output_num_files", f"{n_files}", "--output_dir", output, "--n_redundant", "1"])

generated_df = data_generator.gen_dataframe(spark)

Passing {'n_redundant': 1, 'random_state': 1} to make_classification


In [0]:
n_rows = 20000000000
n_cols = 300
n_files = 2000
output = f"s3://eordentlich/polynomial-lr/{int(n_rows/1e9)}bx{n_cols}_sparse"
data_generator = SparseRegressionDataGen(argv=["--no_shutdown", "--num_rows", f"{n_rows}", "--num_cols", f"{n_cols}", 
                                             "--output_num_files", f"{n_files}", "--output_dir", output, "--n_redundant", "1",
                                             "--density_curve", "Linear",
                                             ])

generated_df = data_generator.gen_dataframe(spark)

In [0]:
generated_df[0].rdd.getNumPartitions()

2000

In [0]:
generated_df[0].write.parquet(output)

In [0]:
output

's3://eordentlich/polynomial-lr/10bx4'

### Load dataset

In [0]:
df = spark.read.parquet('s3://eordentlich/polynomial-lr/10bx4/part-00{0,1}*','s3://eordentlich/polynomial-lr/10bx4/part-002{0,1,2,3,4}*')
df.count()

1250000000

In [0]:
df

DataFrame[c0: float, c1: float, c2: float, c3: float, label: float]

In [0]:
feature_cols=df.columns
feature_cols.remove("label")
feature_cols

['c0', 'c1', 'c2', 'c3']

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PolynomialExpansion

# this is a transform only pipeline so not dataprocessing is actually run
pipeline = Pipeline(stages=[
    VectorAssembler(inputCols=feature_cols, outputCol="features"),
    PolynomialExpansion(inputCol="features", outputCol="expanded_features", degree=2),
]).fit(df)

In [0]:
# keep only expanded features for training
train_df = pipeline.transform(df).drop(*(feature_cols + ["features"]))

In [0]:
train_df

DataFrame[label: float, expanded_features: vector]

In [0]:
train_df=train_df.withColumnRenamed("expanded_features", "features")
train_df.take(5)

[Row(label=0.0, features=DenseVector([-2.1251, 4.5159, -5.1536, 10.9517, 26.5597, 0.6029, -1.2812, -3.1072, 0.3635, 2.519, -5.353, -12.9818, 1.5187, 6.3452])),
 Row(label=0.0, features=DenseVector([-1.2499, 1.5623, -0.4793, 0.5991, 0.2297, -0.0674, 0.0843, 0.0323, 0.0045, -0.775, 0.9687, 0.3715, 0.0523, 0.6006])),
 Row(label=1.0, features=DenseVector([1.341, 1.7982, -0.4507, -0.6044, 0.2032, 0.2319, 0.311, -0.1045, 0.0538, -0.376, -0.5042, 0.1695, -0.0872, 0.1414])),
 Row(label=0.0, features=DenseVector([-0.7408, 0.5488, 0.7034, -0.5211, 0.4948, -0.2033, 0.1506, -0.143, 0.0413, 0.067, -0.0496, 0.0471, -0.0136, 0.0045])),
 Row(label=0.0, features=DenseVector([-0.5943, 0.3532, 0.4852, -0.2883, 0.2354, -0.15, 0.0891, -0.0728, 0.0225, 0.9012, -0.5356, 0.4372, -0.1352, 0.8121]))]

## Train with CrossValidator

In [0]:
def build_lr_classifier(estimator_class):
    return ( estimator_class(verbose=7)
                .setFeaturesCol("features")
                .setLabelCol("label")
                .setRegParam(0.001)
                .setElasticNetParam(0.5)
                .setMaxIter(100)
                .setTol(1.0e-30)
           )

In [0]:
def build_lr_classifier(estimator_class):
    return ( estimator_class()
                .setFeaturesCol("features")
                .setLabelCol("label")
                .setRegParam(0.001)
                .setElasticNetParam(0.5)
                .setMaxIter(100)
                .setTol(1.0e-30)
           )

In [0]:
from pyspark.ml.classification import LogisticRegression
classifier = build_lr_classifier(LogisticRegression)

In [0]:
type(classifier)

pyspark.ml.classification.LogisticRegression

In [0]:
# gpu accelerated crossvalidation does not yet support AUC (and similar) - will fall back to suboptimal processing, so use logLoss, which is supported.
eval = MulticlassClassificationEvaluator(metricName="logLoss", labelCol="label")

In [0]:
def with_benchmark(phrase, action):
    start = time.time()
    result = action()
    end = time.time()
    print("{} takes {} seconds".format(phrase, end - start))
    return result

In [0]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
grid = (
        ParamGridBuilder().addGrid(classifier.regParam, [0.00005, 0.001, 0.005, 0.01])
                            .addGrid(classifier.elasticNetParam, [0.25, 0.5, 0.75, 0.9])
                            .build()
    )

cv = CrossValidator(estimator=classifier, estimatorParamMaps=grid, evaluator=eval, parallelism=1, seed=1)



In [0]:

type(cv)

pyspark.ml.tuning.CrossValidator

In [0]:
# CPU run
model = with_benchmark("Training CV", lambda: cv.fit(train_df))
print(f"average metrics: {model.avgMetrics}")

Training CV takes 109644.9176542759 seconds
average metrics: [0.08718461611098609, 0.08718101079723316, 0.08717902272513646, 0.08717718554790733, 0.08921787355291107, 0.08869773480580916, 0.08819102870637596, 0.0879038597796405, 0.10304934341820844, 0.10052843889653003, 0.09715926802152282, 0.09426236794175004, 0.11903779120209218, 0.11525118960826831, 0.10895885516324016, 0.10382720403276104]
