In [1]:
# https://www.guru99.com/pyspark-tutorial.html
# but fixed so if want to follow - go to compass

# Also see https://towardsdatascience.com/pyspark-data-manipulation-tutorial-8c62652f35fa

In [2]:
# Start the docker container we have downloaded in the Docker exercise using the following command.

# docker run  --rm  -p 8888:8888  -v <YOUR WORKING DIR>:/home/jovyan/work/  jupyter/pyspark-notebook

In [1]:
# Instantiate SparkContext

import pyspark
from pyspark import SparkContext
sc = SparkContext()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/10/27 23:20:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/10/27 23:20:18 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


#### SQLContext
Can use it to create DataFrame.
Allows connecting the engine with different data sources


In [4]:
from pyspark.sql import Row
from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

In [6]:
from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()

df = sqlContext.read.option("inferSchema",True).option("header",True).csv("titanic_dataset.csv")

### Machine Learning with PySpark
Following are the steps to build a Machine Learning program with PySpark:

1. Basic operation with PySpark
2. Data preprocessing
3. Build a data processing pipeline
4. Build the classifier: logistic
5. Train and evaluate the model
6. Tune the hyperparameter

### 1. Basic operation with PySpark

In [7]:
# Let's have a look at the data type

df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [18]:
# df['Fare'].values.show()

TypeError: 'Column' object is not callable

In [8]:
# You can see the data with show.

df.show(5, truncate = False)

+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|Name                                               |Sex   |Age |SibSp|Parch|Ticket          |Fare   |Cabin|Embarked|
+-----------+--------+------+---------------------------------------------------+------+----+-----+-----+----------------+-------+-----+--------+
|1          |0       |3     |Braund, Mr. Owen Harris                            |male  |22.0|1    |0    |A/5 21171       |7.25   |null |S       |
|2          |1       |1     |Cumings, Mrs. John Bradley (Florence Briggs Thayer)|female|38.0|1    |0    |PC 17599        |71.2833|C85  |C       |
|3          |1       |3     |Heikkinen, Miss. Laina                             |female|26.0|0    |0    |STON/O2. 3101282|7.925  |null |S       |
|4          |1       |1     |Futrelle, Mrs. Jacques Heath (Lily May Peel)       |female|35.0|1    |0    |113803          |53

To convert the continuous variable in the right format, you can use recast the columns. You can use **withColumn** to tell Spark which column to operate the transformation.



In [14]:
# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

# List of continuous features
CONTI_FEATURES  = ['age', 'Fare']

# Convert the type
df = convertColumn(df, CONTI_FEATURES, FloatType())

# Check the dataset
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- age: float (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



### Select Columns

In [15]:
df.select('age','Fare').show(5)

+----+-------+
| age|   Fare|
+----+-------+
|22.0|   7.25|
|38.0|71.2833|
|26.0|  7.925|
|35.0|   53.1|
|35.0|   8.05|
+----+-------+
only showing top 5 rows



### Count by group

In [16]:
# can chain these
# df.groupBy("education").count().sort("count",ascending=True).show()

### Describe data

In [17]:
df.describe().show()

[Stage 4:>                                                          (0 + 1) / 1]

+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|summary|      PassengerId|           Survived|            Pclass|                Name|   Sex|               age|             SibSp|              Parch|            Ticket|             Fare|Cabin|Embarked|
+-------+-----------------+-------------------+------------------+--------------------+------+------------------+------------------+-------------------+------------------+-----------------+-----+--------+
|  count|              891|                891|               891|                 891|   891|               714|               891|                891|               891|              891|  204|     889|
|   mean|            446.0| 0.3838383838383838| 2.308641975308642|                null|  null| 29.69911764704046|0.5230078563411896|0.38159371492704824|260318.54916792738|32.204208

                                                                                

In [75]:
# if only want statistics of one column:
df.describe('capital_gain').show()

+-------+------------------+
|summary|      capital_gain|
+-------+------------------+
|  count|             32561|
|   mean|1077.6488437087312|
| stddev| 7385.292084840329|
|    min|               0.0|
|    max|           99999.0|
+-------+------------------+



### Crosstab computation
In some occasions, it can be interesting to see the descriptive statistics between two pairwise columns. For instance, you can count the number of people with income below or above 50k by education level. This operation is called a crosstab.


In [34]:
df.crosstab('age', 'label').sort("age_label").show()

# You can see no people have revenue above 50k when they are young.

+---------+------+-----+
|age_label| <=50K| >50K|
+---------+------+-----+
|     17.0|   395|    0|
|     18.0|   550|    0|
|     19.0|   710|    2|
|     20.0|   753|    0|
|     21.0|   717|    3|
|     22.0|   752|   13|
|     23.0|   865|   12|
|     24.0|   767|   31|
|     25.0|   788|   53|
|     26.0|   722|   63|
|     27.0|   754|   81|
|     28.0|   748|  119|
|     29.0|   679|  134|
|     30.0|   690|  171|
|     31.0|   705|  183|
|     32.0|   639|  189|
|     33.0|   684|  191|
|     34.0|   643|  243|
|     35.0|   659|  217|
|     36.0|   635|  263|
+---------+------+-----+
only showing top 20 rows



### Drop column
There are two intuitive commands to drop columns:

* drop(): Drop a column
* dropna(): Drop NA's

In [16]:
df.drop('education_num').columns

['age',
 'workclass',
 'fnlwgt',
 'education',
 'marital',
 'occupation',
 'relationship',
 'race',
 'sex',
 'capital_gain',
 'capital_loss',
 'hours_week',
 'native_country',
 'label']

### Filter data
You can use filter() to apply descriptive statistics in a subset of data. For instance, you can count the number of people above 40:

In [77]:
df.filter(df.age > 40).count()

13443

### Descriptive statistics by group

In [37]:
df.groupby('marital').agg({'capital_gain': 'mean'}).show()

+--------------------+------------------+
|             marital| avg(capital_gain)|
+--------------------+------------------+
|             Widowed| 571.0715005035247|
| Married-spouse-a...| 653.9832535885167|
|   Married-AF-spouse| 432.6521739130435|
|  Married-civ-spouse|1764.8595085470085|
|            Divorced| 728.4148098131893|
|       Never-married|376.58831788823363|
|           Separated| 535.5687804878049|
+--------------------+------------------+



## Step 2: Data preprocessing

For instance, you know that age is not a linear function with the income. When people are young, their income is usually lower than mid-age. After retirement, a household uses their saving, meaning a decrease in income. To capture this pattern, you can add a square to the age feature

### Add age square
To add a new feature, you need to:

* Select the column
* Apply the transformation and add it to the DataFrame

In [17]:
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()

root
 |-- age: float (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: float (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: float (nullable = true)
 |-- marital: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: float (nullable = true)
 |-- capital_loss: float (nullable = true)
 |-- hours_week: float (nullable = true)
 |-- native_country: string (nullable = true)
 |-- label: string (nullable = true)
 |-- age_square: double (nullable = true)



In [18]:
# Change column order:
COLUMNS = ['age', 'age_square', 'workclass', 'fnlwgt', 'education', 'education_num', 'marital',
           'occupation', 'relationship', 'race', 'sex', 'capital_gain', 'capital_loss',
           'hours_week', 'native_country', 'label']
df = df.select(COLUMNS)
df.first()

Row(age=39.0, age_square=1521.0, workclass=' State-gov', fnlwgt=77516.0, education=' Bachelors', education_num=13.0, marital=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174.0, capital_loss=0.0, hours_week=40.0, native_country=' United-States', label=' <=50K')

### Exclude Holand-Netherlands
When a group within a feature has only one observation, it brings no information to the model. On the contrary, it can lead to an error during the cross-validation.


In [19]:
# Let's check the origin of the household

df.filter(df.native_country == 'Holand-Netherlands').count()
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()

+--------------------+---------------------+
|      native_country|count(native_country)|
+--------------------+---------------------+
|  Holand-Netherlands|                    1|
|            Scotland|                   12|
|            Honduras|                   13|
|             Hungary|                   13|
| Outlying-US(Guam...|                   14|
|          Yugoslavia|                   16|
|                Laos|                   18|
|            Thailand|                   18|
|            Cambodia|                   19|
|     Trinadad&Tobago|                   19|
|                Hong|                   20|
|             Ireland|                   24|
|             Ecuador|                   28|
|              Greece|                   29|
|              France|                   29|
|                Peru|                   31|
|           Nicaragua|                   34|
|            Portugal|                   37|
|                Iran|                   43|
|         

In [20]:
# The feature native_country has only one household coming from Netherland. We can exclude it.

df_remove = df.filter(df.native_country != 'Holand-Netherlands')

## Step 3: Build a data processing pipeline
Similar to scikit-learn, Pyspark has a pipeline API.

The steps to transform the data are very similar to scikit-learn. You need to:

* Index the string to numeric
* Create the one hot encoder
* Transform the data
* Two APIs do the job: StringIndexer, OneHotEncoder

1. First of all, you select the string column to index. The **inputCol** is the name of the column in the dataset. **outputCol** is the new name given to the transformed column.
2. Fit the data and transform it
3. Create the news columns based on the group. For instance, if there are 10 groups in the feature, the new matrix will have 10 columns, one for each group.

In [21]:
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec").fit(indexed)
encoded = encoder.transform(indexed)
encoded.show(2)

                                                                                

+----+----------+-----------------+-------+----------+-------------+-------------------+----------------+--------------+------+-----+------------+------------+----------+--------------+------+-----------------+-------------+
| age|age_square|        workclass| fnlwgt| education|education_num|            marital|      occupation|  relationship|  race|  sex|capital_gain|capital_loss|hours_week|native_country| label|workclass_encoded|workclass_vec|
+----+----------+-----------------+-------+----------+-------------+-------------------+----------------+--------------+------+-----+------------+------------+----------+--------------+------+-----------------+-------------+
|39.0|    1521.0|        State-gov|77516.0| Bachelors|         13.0|      Never-married|    Adm-clerical| Not-in-family| White| Male|      2174.0|         0.0|      40.0| United-States| <=50K|              4.0|(9,[4],[1.0])|
|50.0|    2500.0| Self-emp-not-inc|83311.0| Bachelors|         13.0| Married-civ-spouse| Exec-manage

### Build the pipeline
You will build a pipeline to convert all the precise features and add them to the final dataset. The pipeline will have four operations, but feel free to add as many operations as you want.

1. Encode the categorical data
2. Index the label feature
3. Add continuous variable
4. Assemble the steps.

Each step is stored in a list named stages. This list will tell the VectorAssembler what operation to perform inside the pipeline.

1. Encode the categorical data

This step is very similar to the above example, except that you loop over all the categorical features.

In [36]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    print(stringIndexer)
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
#     print(encoder)
    stages += [stringIndexer, encoder]
print(stages)

StringIndexer_3c8c8d4e0202
StringIndexer_f39ad1d15440
StringIndexer_8188add96cbd
StringIndexer_63d0c8058a5b
StringIndexer_eb34314034fc
StringIndexer_621c47d0cfd2
StringIndexer_5d14730ed855
StringIndexer_37e8ff5d2753
[StringIndexer_3c8c8d4e0202, OneHotEncoder_4336bd16f9ff, StringIndexer_f39ad1d15440, OneHotEncoder_6be94051fa2e, StringIndexer_8188add96cbd, OneHotEncoder_f0d50dd4d00b, StringIndexer_63d0c8058a5b, OneHotEncoder_149dcd2f2e37, StringIndexer_eb34314034fc, OneHotEncoder_7c799f02c055, StringIndexer_621c47d0cfd2, OneHotEncoder_6b0047914bd2, StringIndexer_5d14730ed855, OneHotEncoder_893aeddd2290, StringIndexer_37e8ff5d2753, OneHotEncoder_784128f7793b]


2. Index the label feature 

In [23]:
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="label", outputCol="newlabel")
stages += [label_stringIdx]

3. Add continuous variable

The inputCols of the VectorAssembler is a list of columns. You can create a new list containing all the new columns. The code below populate the list with encoded categorical features and continuous features.

In [24]:
assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Assemble the steps.

Finally, you pass all the steps in the VectorAssembler

In [25]:
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [26]:
# push to pipeline
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df_remove)
model = pipelineModel.transform(df_remove)

If you check the new dataset, you can see that it contains all the features, transformed and not transformed. You are only interested by the newlabel and features. The features includes all the transformed features and the continuous variables.

In [27]:
model.take(1)

21/10/27 22:14:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Row(age=39.0, age_square=1521.0, workclass=' State-gov', fnlwgt=77516.0, education=' Bachelors', education_num=13.0, marital=' Never-married', occupation=' Adm-clerical', relationship=' Not-in-family', race=' White', sex=' Male', capital_gain=2174.0, capital_loss=0.0, hours_week=40.0, native_country=' United-States', label=' <=50K', workclassIndex=4.0, workclassclassVec=SparseVector(8, {4: 1.0}), educationIndex=2.0, educationclassVec=SparseVector(15, {2: 1.0}), maritalIndex=1.0, maritalclassVec=SparseVector(6, {1: 1.0}), occupationIndex=3.0, occupationclassVec=SparseVector(14, {3: 1.0}), relationshipIndex=1.0, relationshipclassVec=SparseVector(5, {1: 1.0}), raceIndex=0.0, raceclassVec=SparseVector(4, {0: 1.0}), sexIndex=0.0, sexclassVec=SparseVector(1, {0: 1.0}), native_countryIndex=0.0, native_countryclassVec=SparseVector(41, {0: 1.0}), newlabel=0.0, features=SparseVector(100, {4: 1.0, 10: 1.0, 24: 1.0, 32: 1.0, 44: 1.0, 48: 1.0, 52: 1.0, 53: 1.0, 94: 39.0, 95: 77516.0, 96: 2174.0, 9

### Step 4 Build the Classifier: Logistic

To make it go faster, convert to DenseVector type

In [28]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))

In [29]:
# You are ready to create the train data as a DataFrame. You use the sqlContext

df_train = sqlContext.createDataFrame(input_data, ["label", "features"])
df_train.show(2)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|[0.0,0.0,0.0,0.0,...|
|  0.0|[0.0,1.0,0.0,0.0,...|
+-----+--------------------+
only showing top 2 rows



### Create train/test set
You split the dataset 80/20 with randomSplit.

In [30]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

In [31]:
# Let's count how many people with income below/above 50k in both training and test set

train_data.groupby('label').agg({'label': 'count'}).show()

[Stage 48:>                                                         (0 + 4) / 4]

+-----+------------+
|label|count(label)|
+-----+------------+
|  0.0|       19841|
|  1.0|        6299|
+-----+------------+





### Build Logistic Regressor
Last but not least, you can build the classifier. Pyspark has an API called LogisticRegression to perform logistic regression.

You initialize lr by indicating the label column and feature columns. You set a maximum of 10 iterations and add a regularization parameter with a value of 0.3. Note that in the next section, you will use cross-validation with a parameter grid to tune the model

In [32]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="label",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

                                                                                

In [33]:
# You can see the coefficients from the regression

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.022261695001277962,-0.05032306829157042,0.010203541364465971,-0.14690249353039908,-0.028731828187010432,0.3079565205336943,0.24483571267941676,-0.45163240060880216,-0.13944916605346339,-0.04041437957688153,0.23396170840124259,0.37833938300269637,0.02429212251952236,-0.2237891723229518,-0.017442166499909727,-0.20776352656917138,-0.2635618813554921,0.5080414527391149,-0.25121934098108106,-0.16855515865642703,0.5318433794982383,-0.22298649788601912,-0.22314315429737694,0.4029854393132571,-0.3118249867981298,-0.1717767367800661,-0.18707730975554712,-0.14760345320392115,-0.1266490200267906,0.23604783168497237,-0.03695994909427555,0.34301177276749417,-0.09145629959558013,0.05635663170655678,-0.2544672496517156,-0.15888236142218015,-0.14757909351234558,-0.0703133372682584,-0.23995758879069964,-0.2873179640664358,0.15073914750751394,0.16606724019136448,-0.23281713292315143,0.3272820732356272,-0.18226702580987747,-0.28786648805884874,-0.2161959064074811,0.4789179777821609,0.09

## Step 5: Train and evaluate the model
To generate predictions for your test set,

You can use linearModel with transform() on test_data

In [34]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [None]:
# You can print the variables in predictions
predictions.printSchema()

In [None]:
# We are interested in the label, prediction and the probability

selected = predictions.select("label", "prediction", "probability")
selected.show(20)

## Evalutate the model
You need to look at the accuracy metric to see how well (or bad) the model performs. Currently, there is no API to compute the accuracy measure in Spark. The default value is the ROC, receiver operating characteristic curve. It is a different metric that take into account the false positive rate.

Before you look at the ROC, let's construct the accuracy measure. You are more familiar with this metric. The accuracy measure is the sum of the correct prediction over the total number of observations.

In [None]:
# You create a DataFrame with the label and the prediction.
cm = predictions.select("label", "prediction")

In [None]:
# You can check the number of class in the label and the prediction
cm.groupby('label').agg({'label': 'count'}).show()

In [None]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()

For instance, in the test set, there is 1568 household with an income above 50k and 4934 below. The classifier, however, predicted 651 households with income above 50k.

Your numbers can be slightly different!!





In [None]:
# You can compute the accuracy by computing the count when the label 
# is correctly classified over the total number of rows.
cm.filter(cm.label == cm.prediction).count() / cm.count()

In [None]:
# You can wrap everything together and write a function to compute the accuracy.

def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("label", "prediction")
    acc = cm.filter(cm.label == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)

#### ROC metrics
The module BinaryClassificationEvaluator includes the ROC measures. The Receiver Operating Characteristic curve is another common tool used with binary classification. It is very similar to the precision/recall curve, but instead of plotting precision versus recall, the ROC curve shows the true positive rate (i.e. recall) against the false positive rate. The false positive rate is the ratio of negative instances that are incorrectly classified as positive. It is equal to one minus the true negative rate. The true negative rate is also called specificity. Hence the ROC curve plots sensitivity (recall) versus 1 - specificity.



In [None]:
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())


## Step 6 (Stretch): Tune the hyperparameter
Last but not least, you can tune the hyperparameters. Similar to scikit-learn you create a parameter grid, and you add the parameters you want to tune.

To reduce the time of the computation, you only tune the regularization parameter with only two values.

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

In [None]:
# Finally, you evaluate the model with using the cross-validation method with 5 folds. It takes some time to train.

from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

In [None]:
# The best regularization hyperparameter is 0.01, with an accuracy of 85.316 percent.

accuracy_m(model = cvModel)

In [None]:
# You can extract the recommended parameter by chaining cvModel.bestModel with extractParamMap()

bestModel = cvModel.bestModel
bestModel.extractParamMap()

## Conclusion
To begin with Spark, you need to initiate a Spark Context with:

SparkContext()

and SQL Context to connect to different data sources:

SQLContext()