In [1]:
!pip install pyspark
import pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 35 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 37.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=571e1928af0f376db350222c81d9fcf6f69088d231912cef1be9b9332eaa4a61
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('cuse_binary').getOrCreate()
sc = spark.sparkContext

In [6]:
cuse = spark.read.csv('/content/cuse_binary.csv', header=True, inferSchema=True)
cuse.show(5)

+---+---------+---------+---+
|age|education|wantsMore|  y|
+---+---------+---------+---+
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
|<25|      low|      yes|  0|
+---+---------+---------+---+
only showing top 5 rows



In [7]:
cuse.columns[0:3]
# cuse.select('age').distinct().show()
cuse.select('age').rdd.countByValue()
# cuse.select('education').rdd.countByValue()

defaultdict(int,
            {Row(age='25-29'): 404,
             Row(age='30-39'): 612,
             Row(age='40-49'): 194,
             Row(age='<25'): 397})

In [8]:
# string index each categorical string columns
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
indexers = [StringIndexer(inputCol=column, outputCol="indexed_"+column) for column in ('age', 'education', 'wantsMore')]
pipeline = Pipeline(stages=indexers)
indexed_cuse = pipeline.fit(cuse).transform(cuse)
indexed_cuse.select('age', 'indexed_age').distinct().show(5)

+-----+-----------+
|  age|indexed_age|
+-----+-----------+
|30-39|        0.0|
|  <25|        2.0|
|25-29|        1.0|
|40-49|        3.0|
+-----+-----------+



In [9]:
# onehotencode each indexed categorical columns
from pyspark.ml.feature import OneHotEncoder
columns = indexed_cuse.columns[0:3]
onehoteencoders = [OneHotEncoder(inputCol="indexed_"+column, outputCol="onehotencode_"+column) for column in columns]
pipeline = Pipeline(stages=onehoteencoders)
onehotencode_columns = ['onehotencode_age', 'onehotencode_education', 'onehotencode_wantsMore', 'y']
onehotencode_cuse = pipeline.fit(indexed_cuse).transform(indexed_cuse).select(onehotencode_columns)
onehotencode_cuse.distinct().show(5)

+----------------+----------------------+----------------------+---+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|  y|
+----------------+----------------------+----------------------+---+
|   (3,[1],[1.0])|             (1,[],[])|         (1,[0],[1.0])|  0|
|   (3,[2],[1.0])|         (1,[0],[1.0])|             (1,[],[])|  1|
|   (3,[0],[1.0])|         (1,[0],[1.0])|         (1,[0],[1.0])|  0|
|       (3,[],[])|         (1,[0],[1.0])|         (1,[0],[1.0])|  1|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|  0|
+----------------+----------------------+----------------------+---+
only showing top 5 rows



In [10]:
# assemble all feature columns into on single vector column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=['onehotencode_age', 'onehotencode_education', 'onehotencode_wantsMore'], outputCol='features')
cuse_df_2 = assembler.transform(onehotencode_cuse).withColumnRenamed('y', 'label')
cuse_df_2.show(5)

+----------------+----------------------+----------------------+-----+-------------------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label|           features|
+----------------+----------------------+----------------------+-----+-------------------+
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
|   (3,[2],[1.0])|             (1,[],[])|         (1,[0],[1.0])|    0|(5,[2,4],[1.0,1.0])|
+----------------+----------------------+----------------------+-----+-------------------+
only showing top 5 rows



In [11]:
# split data into training and test datasets
training, test = cuse_df_2.randomSplit([0.8, 0.2], seed=1234)
training.show(5)


+----------------+----------------------+----------------------+-----+---------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label| features|
+----------------+----------------------+----------------------+-----+---------+
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|
+----------------+----------------------+----------------------+-----+---------+
only showing top 5 rows



In [12]:
## ======= build cross validation model ===========

# estimator
from pyspark.ml.regression import GeneralizedLinearRegression
glm = GeneralizedLinearRegression(featuresCol='features', labelCol='label', family='binomial')

# parameter grid
from pyspark.ml.tuning import ParamGridBuilder
param_grid = ParamGridBuilder().\
    addGrid(glm.regParam, [0, 0.5, 1, 2, 4]).\
    build()
    
# evaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction')

# build cross-validation model
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=glm, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=4)

In [13]:
# fit model
# cv_model = cv.fit(training)
cv_model = cv.fit(cuse_df_2)

In [14]:
# prediction
pred_training_cv = cv_model.transform(training)
pred_test_cv = cv_model.transform(test)

pred_training_cv.show(5)
pred_test_cv.show(5, truncate=False)

+----------------+----------------------+----------------------+-----+---------+------------------+
|onehotencode_age|onehotencode_education|onehotencode_wantsMore|label| features|        prediction|
+----------------+----------------------+----------------------+-----+---------+------------------+
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151407|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151407|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151407|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151407|
|       (3,[],[])|             (1,[],[])|             (1,[],[])|    0|(5,[],[])|0.5140024065151407|
+----------------+----------------------+----------------------+-----+---------+------------------+
only showing top 5 rows

+----------------+----------------------+----------------------+-----+-----

In [15]:
cv_model.bestModel.coefficients

DenseVector([-0.2806, -0.7999, -1.1892, 0.325, -0.833])

In [16]:
cv_model.bestModel.intercept

0.05602427516928616

In [18]:
evaluator.evaluate(pred_training_cv)

0.6716478245974649

In [19]:
evaluator.evaluate(pred_test_cv)

0.6830864197530864

In [27]:
cv_model.bestModel.params

[Param(parent='GeneralizedLinearRegression_0dc4122b6cdd', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'),
 Param(parent='GeneralizedLinearRegression_0dc4122b6cdd', name='family', doc='The name of family which is a description of the error distribution to be used in the model. Supported options: gaussian (default), binomial, poisson, gamma and tweedie.'),
 Param(parent='GeneralizedLinearRegression_0dc4122b6cdd', name='featuresCol', doc='features column name.'),
 Param(parent='GeneralizedLinearRegression_0dc4122b6cdd', name='fitIntercept', doc='whether to fit an intercept term.'),
 Param(parent='GeneralizedLinearRegression_0dc4122b6cdd', name='labelCol', doc='label column name.'),
 Param(parent='GeneralizedLinearRegression_0dc4122b6cdd', name='link', doc='The name of link function which provides the relationship between the linear predictor and the mean of the distribution function. Supported options: identity, log, inverse, logit, probit, cloglog and sqrt.'),


In [21]:
import pandas as pd
pdf = pd.DataFrame({
        'x1': ['a','a','b','c'],
        'x2': ['apple', 'orange', 'orange', 'peach'],
        'x3': [1, 1, 2, 4],
        'x4': [2.4, 2.5, 3.5, 1.4],
        'y1': [1, 0, 0, 1],
        'y2': ['yes', 'no', 'no', 'yes']
    })

df = spark.createDataFrame(pdf)

In [22]:
df.show()

+---+------+---+---+---+---+
| x1|    x2| x3| x4| y1| y2|
+---+------+---+---+---+---+
|  a| apple|  1|2.4|  1|yes|
|  a|orange|  1|2.5|  0| no|
|  b|orange|  2|3.5|  0| no|
|  c| peach|  4|1.4|  1|yes|
+---+------+---+---+---+---+

