In [1]:
from pydataset import data
import pyspark
import pyspark.ml
from pyspark.sql.functions import *

spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.createDataFrame(data('tips'))

In [2]:
dir(pyspark.ml.feature)

['Binarizer',
 'BucketedRandomProjectionLSH',
 'BucketedRandomProjectionLSHModel',
 'Bucketizer',
 'ChiSqSelector',
 'ChiSqSelectorModel',
 'CountVectorizer',
 'CountVectorizerModel',
 'DCT',
 'DecisionTreeParams',
 'ElementwiseProduct',
 'FeatureHasher',
 'HasAggregationDepth',
 'HasCheckpointInterval',
 'HasCollectSubModels',
 'HasDistanceMeasure',
 'HasElasticNetParam',
 'HasFeaturesCol',
 'HasFitIntercept',
 'HasHandleInvalid',
 'HasInputCol',
 'HasInputCols',
 'HasLabelCol',
 'HasLoss',
 'HasMaxIter',
 'HasNumFeatures',
 'HasOutputCol',
 'HasOutputCols',
 'HasParallelism',
 'HasPredictionCol',
 'HasProbabilityCol',
 'HasRawPredictionCol',
 'HasRegParam',
 'HasSeed',
 'HasSolver',
 'HasStandardization',
 'HasStepSize',
 'HasThreshold',
 'HasThresholds',
 'HasTol',
 'HasVarianceCol',
 'HasWeightCol',
 'HashingTF',
 'IDF',
 'IDFModel',
 'Imputer',
 'ImputerModel',
 'IndexToString',
 'JavaEstimator',
 'JavaMLReadable',
 'JavaMLWritable',
 'JavaModel',
 'JavaParams',
 'JavaTransformer'

In [3]:
train, test = df.randomSplit([0.8, 0.2], seed=123)
# train, validate, test = df.randomSplit([0.6, 0.2, 0.2])

In [4]:
print('train shape')
train.count(), len(train.columns)

train shape


(193, 7)

In [5]:
print('test shape')
test.count(), len(test.columns)

test shape


(51, 7)

In [6]:
def shape(df: pyspark.sql.DataFrame):
    return df.count(), len(df.columns)

In [7]:
shape(train)

(193, 7)

## Regression

In [8]:
train.show()

+----------+----+------+------+---+------+----+
|total_bill| tip|   sex|smoker|day|  time|size|
+----------+----+------+------+---+------+----+
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|
|     10.29| 2.6|Female|    No|Sun|Dinner|   2|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|
|     12.54| 2.5|  Male|    No|Sun|Dinner|   2|
|     13.37| 2.0|  Male|    No|Sat|Dinner|   2|
|     13.94|3.06|  Male|    No|Sun|Dinner|   2|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|
|     15.77|2.23|Female|    No|Sat|Dinner|   2|
|     16.04|2.24|  Male|    No|Sat|Dinner|   3|
|     16.29|3.71|  Male|    No|Sun|Dinner|   3|
|     16.31| 2.0|  Male|    No|Sat|Dinner|   3|
|     16.93|3.07|Female|    No|Sat|Dinner|   3|
|     16.97| 3.5|Female|    No|Sun|Dinne

- `tip ~ total_bill`: predict tip based on total bill
- `tip ~ total_bill + size`: predict tip based on total bill and size
- `tip ~ .`: predict tip based on all the other features in the dataset

In [9]:
# nb: spark's rformula does encoding
rf = pyspark.ml.feature.RFormula(formula="tip ~ total_bill + size").fit(train)

rf.transform(train).show()

+----------+----+------+------+---+------+----+-----------+-----+
|total_bill| tip|   sex|smoker|day|  time|size|   features|label|
+----------+----+------+------+---+------+----+-----------+-----+
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2| [8.77,2.0]|  2.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|[10.27,2.0]| 1.71|
|     10.29| 2.6|Female|    No|Sun|Dinner|   2|[10.29,2.0]|  2.6|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|[10.33,3.0]| 1.67|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|[10.34,3.0]| 1.66|
|     12.54| 2.5|  Male|    No|Sun|Dinner|   2|[12.54,2.0]|  2.5|
|     13.37| 2.0|  Male|    No|Sat|Dinner|   2|[13.37,2.0]|  2.0|
|     13.94|3.06|  Male|    No|Sun|Dinner|   2|[13.94,2.0]| 3.06|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|[14.78,2.0]| 3.23|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|[14.83,2.0]| 3.02|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|[15.04,2.0]| 1.96|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|[15.42,2.0]| 1.57|
|     15.7

In [10]:
train_input = rf.transform(train).select('features', 'label')
train_input.show()

+-----------+-----+
|   features|label|
+-----------+-----+
| [8.77,2.0]|  2.0|
|[10.27,2.0]| 1.71|
|[10.29,2.0]|  2.6|
|[10.33,3.0]| 1.67|
|[10.34,3.0]| 1.66|
|[12.54,2.0]|  2.5|
|[13.37,2.0]|  2.0|
|[13.94,2.0]| 3.06|
|[14.78,2.0]| 3.23|
|[14.83,2.0]| 3.02|
|[15.04,2.0]| 1.96|
|[15.42,2.0]| 1.57|
|[15.77,2.0]| 2.23|
|[16.04,3.0]| 2.24|
|[16.29,3.0]| 3.71|
|[16.31,3.0]|  2.0|
|[16.93,3.0]| 3.07|
|[16.97,3.0]|  3.5|
|[17.46,2.0]| 2.54|
|[17.78,2.0]| 3.27|
+-----------+-----+
only showing top 20 rows



In [11]:
lr = pyspark.ml.regression.LinearRegression()
lr

LinearRegression_8802e1ff3f23

In [12]:
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
epsilon: The shape parameter to control the amount of robustness. Must be > 1.0. Only valid when loss is huber (default: 1.35)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
loss: The loss function to be optimized. Supported options: squaredError, huber. (default: squaredError)
maxIter: max number of iterations (>= 0). (default: 100)
predictionCol: prediction column name. (default: prediction)
regParam: regularization parameter (>= 0). (default: 0.0)
solver: The solver algorithm for optimization. Supported options: auto, normal, l-bfgs. (default: auto)
standardization: whether to standardize the training features before fitting the model.

In [13]:
lr_fit = lr.fit(train_input)
lr_fit.transform(train_input).show()

+-----------+-----+------------------+
|   features|label|        prediction|
+-----------+-----+------------------+
| [8.77,2.0]|  2.0|1.8431748565237722|
|[10.27,2.0]| 1.71| 1.984690298383021|
|[10.29,2.0]|  2.6|1.9865771709411444|
|[10.33,3.0]| 1.67|2.1519321354884777|
|[10.34,3.0]| 1.66|2.1528755717675394|
|[12.54,2.0]|  2.5|2.1988503337300176|
|[13.37,2.0]|  2.0|2.2771555448921355|
|[13.94,2.0]| 3.06|  2.33093141279865|
|[14.78,2.0]| 3.23|2.4101800602398296|
|[14.83,2.0]| 3.02|2.4148972416351375|
|[15.04,2.0]| 1.96|2.4347094034954324|
|[15.42,2.0]| 1.57|2.4705599820997755|
|[15.77,2.0]| 2.23|2.5035802518669334|
|[16.04,3.0]| 2.24| 2.690634250832685|
|[16.29,3.0]| 3.71|2.7142201578092267|
|[16.31,3.0]|  2.0|2.7161070303673496|
|[16.93,3.0]| 3.07|2.7746000796691725|
|[16.97,3.0]|  3.5|2.7783738247854193|
|[17.46,2.0]| 2.54| 2.663020983028354|
|[17.78,2.0]| 3.27| 2.693210943958327|
+-----------+-----+------------------+
only showing top 20 rows



In [14]:
lr_fit.summary

<pyspark.ml.regression.LinearRegressionTrainingSummary at 0x1166207b8>

In [15]:
dir(lr_fit.summary)

['__class__',
 '__del__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_call_java',
 '_create_from_java_class',
 '_java_obj',
 '_new_java_array',
 '_new_java_obj',
 'coefficientStandardErrors',
 'degreesOfFreedom',
 'devianceResiduals',
 'explainedVariance',
 'featuresCol',
 'labelCol',
 'meanAbsoluteError',
 'meanSquaredError',
 'numInstances',
 'objectiveHistory',
 'pValues',
 'predictionCol',
 'predictions',
 'r2',
 'r2adj',
 'residuals',
 'rootMeanSquaredError',
 'tValues',
 'totalIterations']

In [16]:
lr_fit.summary.r2, lr_fit.summary.rootMeanSquaredError

(0.5083641431094579, 0.925413890798643)

How do we do on the test data?

In [17]:
test_input = rf.transform(test)
lr_fit.transform(test_input).show(4)

+----------+----+----+------+---+------+----+-----------+-----+------------------+
|total_bill| tip| sex|smoker|day|  time|size|   features|label|        prediction|
+----------+----+----+------+---+------+----+-----------+-----+------------------+
|      9.55|1.45|Male|    No|Sat|Dinner|   2| [9.55,2.0]| 1.45| 1.916762886290582|
|      9.68|1.32|Male|    No|Sun|Dinner|   2| [9.68,2.0]| 1.32|1.9290275579183833|
|      9.94|1.56|Male|    No|Sun|Dinner|   2| [9.94,2.0]| 1.56|1.9535569011739864|
|     11.24|1.76|Male|   Yes|Sat|Dinner|   2|[11.24,2.0]| 1.76| 2.076203617452002|
+----------+----+----+------+---+------+----+-----------+-----+------------------+
only showing top 4 rows



In [18]:
evaluator = pyspark.ml.evaluation.RegressionEvaluator()
rmse = evaluator.evaluate(lr_fit.transform(test_input))
'{:.4}'.format(rmse)

'1.272'

## Classification

In [19]:
rf = pyspark.ml.feature.RFormula(formula='time ~ total_bill + size').fit(train)
train_input = rf.transform(train)
train_input.show(50)

+----------+----+------+------+---+------+----+-----------+-----+
|total_bill| tip|   sex|smoker|day|  time|size|   features|label|
+----------+----+------+------+---+------+----+-----------+-----+
|      8.77| 2.0|  Male|    No|Sun|Dinner|   2| [8.77,2.0]|  0.0|
|     10.27|1.71|  Male|    No|Sun|Dinner|   2|[10.27,2.0]|  0.0|
|     10.29| 2.6|Female|    No|Sun|Dinner|   2|[10.29,2.0]|  0.0|
|     10.33|1.67|Female|    No|Sun|Dinner|   3|[10.33,3.0]|  0.0|
|     10.34|1.66|  Male|    No|Sun|Dinner|   3|[10.34,3.0]|  0.0|
|     12.54| 2.5|  Male|    No|Sun|Dinner|   2|[12.54,2.0]|  0.0|
|     13.37| 2.0|  Male|    No|Sat|Dinner|   2|[13.37,2.0]|  0.0|
|     13.94|3.06|  Male|    No|Sun|Dinner|   2|[13.94,2.0]|  0.0|
|     14.78|3.23|  Male|    No|Sun|Dinner|   2|[14.78,2.0]|  0.0|
|     14.83|3.02|Female|    No|Sun|Dinner|   2|[14.83,2.0]|  0.0|
|     15.04|1.96|  Male|    No|Sun|Dinner|   2|[15.04,2.0]|  0.0|
|     15.42|1.57|  Male|    No|Sun|Dinner|   2|[15.42,2.0]|  0.0|
|     15.7

In [20]:
lr = pyspark.ml.classification.LogisticRegression()
print(lr.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bounds vector size must beequal wi

In [21]:
lr_fit = lr.fit(train_input)

In [22]:
dir(lr_fit.summary)

['__class__',
 '__del__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_call_java',
 '_create_from_java_class',
 '_java_obj',
 '_new_java_array',
 '_new_java_obj',
 'accuracy',
 'areaUnderROC',
 'fMeasureByLabel',
 'fMeasureByThreshold',
 'falsePositiveRateByLabel',
 'featuresCol',
 'labelCol',
 'labels',
 'objectiveHistory',
 'pr',
 'precisionByLabel',
 'precisionByThreshold',
 'predictionCol',
 'predictions',
 'probabilityCol',
 'recallByLabel',
 'recallByThreshold',
 'roc',
 'totalIterations',
 'truePositiveRateByLabel',
 'weightedFMeasure',
 'weightedFalsePositiveRate',
 'weightedPrecision',
 'weightedRecall',
 'weightedTruePositiveRate']

In [23]:
# area under TPR (recall) vs FPR (FP / (FP + TN)) curve
# https://en.wikipedia.org/wiki/Receiver_operating_characteristic
lr_fit.summary.areaUnderROC

0.6430830039525692

In [24]:
evaluator = pyspark.ml.evaluation.BinaryClassificationEvaluator()
test_auc = evaluator.evaluate(lr_fit.transform(rf.transform(test)))
test_auc

0.5809716599190283

In [25]:
test_input = rf.transform(test)

In [None]:
(lr_fit.transform(test_input)
 .select('time', 'total_bill', 'size', 'label', 'probability', 'prediction')
 .groupby('prediction') # predicted == rows
 .pivot('label') # actual values are columns
 .count()
 .show())