In [1]:
import pyspark
from pyspark import SparkContext
sc =SparkContext()

In [2]:
nums= sc.parallelize([1,2,3,4])

In [5]:
nums.take(1)

[1]

In [7]:
squared = nums.map(lambda x: x*x).collect()
for num in squared:
    print(f'{num}')

1
4
9
16


# SQLContext

In [8]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [12]:
# list of tuple 
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]

In [13]:
# build RDD
rdd = sc.parallelize(list_p)

In [17]:
# convert tuples
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))

In [18]:
# Create Dataframe 
sqlContext.createDataFrame(ppl)
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = sc.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = sqlContext.createDataFrame(ppl)

In [20]:
DF_ppl.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



# Machine learning

# Step 1) Basic operation with PySpark

In [47]:
import pandas as pd

In [48]:
from pyspark import SparkFiles
url = "https://raw.githubusercontent.com/sadhana1002/PredictingSalaryClass-Classification/master/adult.csv"
sc.addFile(url)
sqlContext = SQLContext(sc)

In [49]:
df = sqlContext.createDataFrame(pd.read_csv(url, 
                                      names=['Age','workclass',
                                             'fnlwgt','education',
                                             'education_num',
                                             'marital',
                                             'occupation',
                                             'relationship','race',
                                             'sex','capital_gain',
                                             'capital_loss',
                                             'hours_week',
                                             'native_country','label']))

In [50]:
df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [51]:
df.show(5, truncate = False)

+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|Age|workclass        |fnlwgt|education |education_num|marital            |occupation        |relationship  |race  |sex    |capital_gain|capital_loss|hours_week|native_country|label |
+---+-----------------+------+----------+-------------+-------------------+------------------+--------------+------+-------+------------+------------+----------+--------------+------+
|39 | State-gov       |77516 | Bachelors|13           | Never-married     | Adm-clerical     | Not-in-family| White| Male  |2174        |0           |40        | United-States| <=50K|
|50 | Self-emp-not-inc|83311 | Bachelors|13           | Married-civ-spouse| Exec-managerial  | Husband      | White| Male  |0           |0           |13        | United-States| <=50K|
|38 | Private         |215646| HS-grad  |9            | Divorced          | Hand

In [52]:
df_string = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=  False)
df_string.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: string (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: string (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: string (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: string (nullable = true)
 |-- capital-loss: string (nullable = true)
 |-- hours-per-week: string (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [54]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']
# Convert the type
df_string = convertColumn(df_string, CONTI_FEATURES, FloatType())
# Check the dataset
df_string.printSchema()

root
 |-- x: string (nullable = true)
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: float (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: float (nullable = true)
 |-- capital-loss: float (nullable = true)
 |-- hours-per-week: float (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [55]:
from pyspark.ml.feature import StringIndexer
#stringIndexer = StringIndexer(inputCol="label", outputCol="newlabel")
#model = stringIndexer.fit(df)
#df = model.transform(df)
df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



### Select column

In [56]:
df.select('age','fnlwgt').show(10)

+---+------+
|age|fnlwgt|
+---+------+
| 39| 77516|
| 50| 83311|
| 38|215646|
| 53|234721|
| 28|338409|
| 37|284582|
| 49|160187|
| 52|209642|
| 31| 45781|
| 42|159449|
+---+------+
only showing top 10 rows



### Count, Groupby

In [57]:
df.groupBy("education").count().sort("count",ascending=True).show()

+-------------+-----+
|    education|count|
+-------------+-----+
|    Preschool|   51|
|      1st-4th|  168|
|      5th-6th|  333|
|    Doctorate|  413|
|         12th|  433|
|          9th|  514|
|  Prof-school|  576|
|      7th-8th|  646|
|         10th|  933|
|   Assoc-acdm| 1067|
|         11th| 1175|
|    Assoc-voc| 1382|
|      Masters| 1723|
|    Bachelors| 5355|
| Some-college| 7291|
|      HS-grad|10501|
+-------------+-----+



### describe data

In [58]:
df.describe().show()

+-------+------------------+------------+------------------+-------------+------------------+---------+-----------------+------------+-------------------+-------+------------------+-----------------+------------------+--------------+------+
|summary|               Age|   workclass|            fnlwgt|    education|     education_num|  marital|       occupation|relationship|               race|    sex|      capital_gain|     capital_loss|        hours_week|native_country| label|
+-------+------------------+------------+------------------+-------------+------------------+---------+-----------------+------------+-------------------+-------+------------------+-----------------+------------------+--------------+------+
|  count|             32561|       32561|             32561|        32561|             32561|    32561|            32561|       32561|              32561|  32561|             32561|            32561|             32561|         32561| 32561|
|   mean| 38.58164675532078|        

In [60]:
df.describe('capital_gain').show()

+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev|7385.2920848403455|
|    min|                 0|
|    max|             99999|
+-------+------------------+



In [61]:
df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



### Crosstab computation

In [62]:
df.crosstab('age', 'label').sort("age_label").show()

+---------+------+-----+
|age_label| <=50K| >50K|
+---------+------+-----+
|       17|   395|    0|
|       18|   550|    0|
|       19|   710|    2|
|       20|   753|    0|
|       21|   717|    3|
|       22|   752|   13|
|       23|   865|   12|
|       24|   767|   31|
|       25|   788|   53|
|       26|   722|   63|
|       27|   754|   81|
|       28|   748|  119|
|       29|   679|  134|
|       30|   690|  171|
|       31|   705|  183|
|       32|   639|  189|
|       33|   684|  191|
|       34|   643|  243|
|       35|   659|  217|
|       36|   635|  263|
+---------+------+-----+
only showing top 20 rows



### Drop column

In [63]:
df.drop('education_num').columns

['Age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

### Filter data

In [65]:
df.filter(df.Age > 40).count()

13443

### Descriptive statistics by group

In [66]:
df.groupby('marital').agg({'capital_gain': 'mean'}).show()

+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|             Widowed| 571.0715005035247|
| Married-spouse-a...| 653.9832535885167|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
|            Divorced| 728.4148098131893|
|       Never-married|376.58831788823363|
|           Separated| 535.5687804878049|
+--------------------+------------------+



# Step 2) Data preprocessing

### add age squared

In [71]:
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()

root
 |-- Age: long (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



In [72]:
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()

Row(age=39, age_square=1521.0, workclass=' State-gov', fnlwgt=77516, education=' Bachelors', education_num=13, marital=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country=' United-States', label=' <=50K')

### Exclude Holand-Netherlands

In [79]:
df.filter(df.native_country == 'Holand-Netherlands').count()

0

In [78]:
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|            Honduras|                   13|
|             Hungary|                   13|
| Outlying-US(Guam...|                   14|
|          Yugoslavia|                   16|
|            Thailand|                   18|
|                Laos|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|         

In [80]:
df_remove = df.filter(df.native_country != 'Holand-Netherlands')

# Step 3) Build a data processing pipeline

In [101]:
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
model2 = encoder.fit(indexed)
encoded = model2.transform(indexed)

encoded.show(2)

+---+----------+-----------------+------+----------+-------------+-------------------+----------------+--------------+------+-----+------------+------------+----------+--------------+------+-----------------+-------------+
|age|age_square|        workclass|fnlwgt| education|education_num|            marital|      occupation|  relationship|  race|  sex|capital_gain|capital_loss|hours_week|native_country| label|workclass_encoded|workclass_vec|
+---+----------+-----------------+------+----------+-------------+-------------------+----------------+--------------+------+-----+------------+------------+----------+--------------+------+-----------------+-------------+
| 39|    1521.0|        State-gov| 77516| Bachelors|           13|      Never-married|    Adm-clerical| Not-in-family| White| Male|        2174|           0|        40| United-States| <=50K|              4.0|(9,[4],[1.0])|
| 50|    2500.0| Self-emp-not-inc| 83311| Bachelors|           13| Married-civ-spouse| Exec-managerial|     

### 1. Encode the categorical data

In [113]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- age_square: double (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: long (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: long (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: long (nullable = true)
 |-- capital_loss: long (nullable = true)
 |-- hours_week: long (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)



In [114]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
CONTI_FEATURES  = ['age', 'fnlwgt','capital_gain', 'education_num', 'capital_loss', 'hours_week']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

### 2. Index the label feature

In [115]:
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

### 3. Add continuous variable

In [116]:
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

### 4. Assemble the steps.

In [117]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [151]:
assembler.show()

AttributeError: 'VectorAssembler' object has no attribute 'show'

In [118]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

In [119]:
model.take(1)

[Row(age=39, age_square=1521.0, workclass=' State-gov', fnlwgt=77516, education=' Bachelors', education_num=13, marital=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174, capital_loss=0, hours_week=40, native_country=' United-States', label=' <=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(41, {0: 1.0}), newlabel=0.0, features=SparseVector(100, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 94: 39.0, 95: 77516.0, 96: 2174.0, 97: 13.0, 99:

# Step 4) Build the classifier: logistic

In [120]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

In [148]:
input_data

PythonRDD[1181] at RDD at PythonRDD.scala:53

In [121]:
df_train = sqlContext.createDataFrame(input_data, ["label", "features"])

In [122]:
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



### train_test_split

In [124]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [125]:
train_data.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19756|
|  1.0|        6286|
+-----+------------+



In [126]:
test_data.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        4964|
|  1.0|        1555|
+-----+------------+



### Build the logistic regressor

In [127]:
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [128]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.06711842757639715,-0.13825934409903343,-0.08503752284030963,-0.1678713460670257,-0.10190090492386701,0.17867818744511813,0.13291092867216758,-0.5930803174699787,-0.1941025553799493,-0.08421936167518224,0.2190412632622957,0.40425828855690227,0.008183282453701701,-0.2877072669403088,-0.02977350018670854,-0.30773537965245923,-0.4217390521098367,0.5338338544306157,-0.3666035918770989,-0.23589338870201382,0.5952773417140702,-0.37280035211927653,-0.4014766500572211,0.3164334262735849,-0.34364379582427634,-0.19524585813304823,-0.26063100869134964,-0.14498892918013967,-0.18446322230208453,0.192420271348638,-0.06267320128287748,0.28888913436714564,-0.12948388764538024,0.0562004680163725,-0.29523557063307587,-0.2191215686423522,-0.1694515569521596,-0.13439368376561714,-0.309888894274416,-0.31536836615107877,0.11634597968091266,0.1552008933347143,-0.2826220928545019,0.2553773123930586,-0.18887324699636818,-0.3042726975179869,-0.24757352559184764,0.4326692828300383,-0.06441164575

# Step 5) Train and evaluate the model

In [129]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [131]:
predictions.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [132]:
selected = predictions.select("label", "prediction", "probability")
selected.show(20)

+-----+----------+--------------------+
|label|prediction|         probability|
+-----+----------+--------------------+
|  0.0|       0.0|[0.92776186638293...|
|  0.0|       0.0|[0.95934682271560...|
|  0.0|       0.0|[0.84444079151159...|
|  0.0|       0.0|[0.85431265538796...|
|  0.0|       0.0|[0.89715660422938...|
|  0.0|       0.0|[0.87867879585950...|
|  0.0|       0.0|[0.85660455179766...|
|  0.0|       0.0|[0.59559551795689...|
|  0.0|       0.0|[0.80506822160558...|
|  0.0|       0.0|[0.75910879318479...|
|  0.0|       0.0|[0.51223356814471...|
|  0.0|       0.0|[0.70488143283332...|
|  0.0|       0.0|[0.61465640167466...|
|  0.0|       0.0|[0.68648366948173...|
|  0.0|       0.0|[0.73250362732004...|
|  0.0|       0.0|[0.66110307678915...|
|  0.0|       0.0|[0.68839699814027...|
|  0.0|       0.0|[0.69670558196477...|
|  0.0|       0.0|[0.82569919743838...|
|  0.0|       0.0|[0.71424270202067...|
+-----+----------+--------------------+
only showing top 20 rows



### Evaluate the model

In [133]:
cm = predictions.select("label", "prediction")	

In [135]:
cm.groupby('label').agg({'label': 'count'}).show()

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|        4964|
|  1.0|        1555|
+-----+------------+



In [136]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             5908|
|       1.0|              611|
+----------+-----------------+



In [137]:
cm.filter(cm.label == cm.prediction).count() / cm.count()

0.8263537352354655

In [138]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)

Model accuracy: 82.635%


### ROC metrics

In [139]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8970363465828555
areaUnderROC


In [140]:
print(evaluator.evaluate(predictions))

0.8970360874826065


# Step 6) Tune the hyperparameter

In [152]:
train_data.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,0.0,0.0,0.0,...|
+-----+--------------------+
only showing top 20 rows



In [141]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

In [142]:
from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 337.474 seconds


In [143]:
accuracy_m(model = cvModel)

Model accuracy: 85.688%


In [144]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_18f8795e4c85', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_18f8795e4c85', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_18f8795e4c85', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_18f8795e4c85', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_18f8795e4c85', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_18f8795e4c85', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='LogisticRegression_18f8795e4c85', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These