In [0]:
import numpy as np
import pandas as pd

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, randn, col

In [0]:
pip install deepchecks

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting deepchecks
  Downloading deepchecks-0.18.1-py3-none-any.whl (7.8 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 7.8/7.8 MB 38.4 MB/s eta 0:00:00
Collecting tqdm>=4.62.3
  Downloading tqdm-4.66.4-py3-none-any.whl (78 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 78.3/78.3 kB 11.6 MB/s eta 0:00:00
Collecting jsonpickle>=2
  Downloading jsonpickle-3.2.2-py3-none-any.whl (41 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 41.8/41.8 kB 5.6 MB/s eta 0:00:00
Collecting PyNomaly>=0.3.3
  Downloading PyNomaly-0.3.3.tar.gz (8.3 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting plotly>=5.13.1
  Downloading plotly-5.22.0-py3-none-any.whl (16.4 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 16.4/16.4 MB 90.4 MB/s eta 0:00:00
Collecting category-encoders>=2.3.0
  Downloading category_encoders-2.6.3-p

In [0]:
schema = """`age` DOUBLE,
`workclass` STRING,
`fnlwgt` DOUBLE,
`education` STRING,
`education_num` DOUBLE,
`marital_status` STRING,
`occupation` STRING,
`relationship` STRING,
`race` STRING,
`sex` STRING,
`capital_gain` DOUBLE,
`capital_loss` DOUBLE,
`hours_per_week` DOUBLE,
`native_country` STRING,
`income` STRING"""

dataset = spark.read.csv("/databricks-datasets/adult/adult.data", schema=schema)

# Splitting the data to train/test set
trainDF, testDF = dataset.randomSplit([0.8, 0.2], seed=42)

In [0]:
from deepchecks.tabular import Dataset

pd_train = trainDF.toPandas()
pd_test = testDF.toPandas()

ds_train = Dataset(pd_train, label='income', cat_features=['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country'])
ds_test = Dataset(pd_test, label='income', cat_features=['workclass', 'education', 'marital_status', 'occupation', 'relationship', 'race', 'sex', 'native_country'])

In [0]:
from deepchecks.tabular.suites import data_integrity
# Validate the training set
train_res = data_integrity().run(ds_train)

In [0]:
report_path = "/Workspace/Users/tushar.pathak@tigeranalytics.com/Reports/Data_Integrity_Report.html"
train_res.save_as_html(report_path)

'/Workspace/Users/tushar.pathak@tigeranalytics.com/Reports/Data_Integrity_Report.html'

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

categoricalCols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex"]

# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols])
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols])

# The label column ("income") is also a string value - it has two possible values, "<=50K" and ">50K".
# Convert it to a numeric value using StringIndexer.
labelToIndex = StringIndexer(inputCol="income", outputCol="label")

stringIndexerModel = stringIndexer.fit(trainDF)

# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline

lr = LogisticRegression(featuresCol="features", labelCol="label")

# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, labelToIndex, vecAssembler, lr])

# Fit the pipeline model.
pipelineModel = pipeline.fit(trainDF)

In [0]:
import numpy as np
import pyspark
from pyspark.ml.feature import IndexToString

class PySparkModelWrapper:
    def __init__(self, model: pyspark.ml.pipeline.PipelineModel, label_map):
        self.model = model
        self.idx_to_string = IndexToString(inputCol="prediction", outputCol="predictedValue")
        self.idx_to_string.setLabels(label_map)

    def predict(self, X: np.ndarray) -> np.ndarray:
        df=spark.createDataFrame(X)
        preds = self.idx_to_string.transform(self.model.transform(df).select('prediction')).select('predictedValue').collect()
        return np.array(preds).reshape(-1)

    def predict_proba(self, X: np.ndarray) -> np.ndarray:
        df=spark.createDataFrame(X)
        preds = self.model.transform(df).select('prediction').collect()
        return np.array(preds).reshape(-1, 2)

    @property
    def feature_importances_(self):
        return np.array([1/14] * 14)

In [0]:
from deepchecks.tabular.suites import model_evaluation, train_test_validation

eval_suite = model_evaluation()
model_evaluation_res = eval_suite.run(ds_train,ds_test, PySparkModelWrapper(pipelineModel,
                                      pipelineModel.stages[2].labels))

train_test_suite = train_test_validation()
train_test_res = train_test_suite.run(ds_train, ds_test, PySparkModelWrapper(pipelineModel,
                                      pipelineModel.stages[2].labels))





In [0]:
report_path = "/Workspace/Users/tushar.pathak@tigeranalytics.com/Reports/Test_Train_Validation_Report.html"
train_test_res.save_as_html(report_path)
report_path = "/Workspace/Users/tushar.pathak@tigeranalytics.com/Reports/Monitor_Evaluation.html"
model_evaluation_res.save_as_html(report_path)

'/Workspace/Users/tushar.pathak@tigeranalytics.com/Reports/Monitor_Evaluation.html'