What is Apache Spark?
https://www.guru99.com/pyspark-tutorial.html

Spark is a big data solution that has been proven to be easier and faster than Hadoop MapReduce. 

Earlier tools like MapReduce were favorite but were slow. To overcome this issue, Spark offers a solution that is both fast and general-purpose. The main difference between Spark and MapReduce is that Spark runs computations in memory during the later on the hard disk. It allows high-speed access and data processing, reducing times from hours to minutes. 

What is Pyspark?

Spark is the name of the engine to realize cluster computing while PySpark is the Python's library to use Spark. 

How Does Spark work?

Spark is based on computational engine, meaning it takes care of the scheduling, distributing and monitoring application. Each task is done across various worker machines called computing cluster. A computing cluster refers to the division of tasks. One machine performs one task, while the others contribute to the final output through a different task. In the end, all the tasks are aggregated to produce an output. The Spark admin gives a 360 overview of various Spark Jobs. 

Spark is designed to work with

    Python
    Java
    Scala
    SQL

A significant feature of Spark is the vast amount of built-in library, including MLlib for machine learning. Spark is also designed to work with Hadoop clusters and can read the broad type of files, including Hive data, CSV, JSON, Casandra data among other. 

Why use Spark?

As a future data practitioner, you should be familiar with python's famous libraries: Pandas and scikit-learn. These two libraries are fantastic to explore dataset up to mid-size. Regular machine learning projects are built around the following methodology:

    Load the data to the disk
    Import the data into the machine's memory
    Process/analyze the data
    Build the machine learning model
    Store the prediction back to disk

The problem arises if the data scientist wants to process data that's too big for one computer. During earlier days of data science, the practitioners would sample the as training on huge data sets was not always needed. The data scientist would find a good statistical sample, perform an additional robustness check and comes up with an excellent model.

However, there are some problems with this:

    Is the dataset reflecting the real world?
    Does the data include a specific example?
    Is the model fit for sampling?

Take users recommendation for instance. Recommenders rely on comparing users with other users in evaluating their preferences. If the data practitioner takes only a subset of the data, there won't be a cohort of users who are very similar to one another. Recommenders need to run on the full dataset or not at all. 

What is the solution?

The solution has been evident for a long time, split the problem up onto multiple computers. Parallel computing comes with multiple problems as well. Developers often have trouble writing parallel code and end up having to solve a bunch of the complex issues around multi-processing itself.

Pyspark gives the data scientist an API that can be used to solve the parallel data proceedin problems. Pyspark handles the complexities of multiprocessing, such as distributing the data, distributing code and collecting output from the workers on a cluster of machines.

Spark can run standalone but most often runs on top of a cluster computing framework such as Hadoop. In test and development, however, a data scientist can efficiently run Spark on their development boxes or laptops without a cluster 

One of the main advantages of Spark is to build an architecture that encompasses data streaming management, seamlessly data queries, machine learning prediction and real-time access to various analysis.

• Spark works closely with SQL language, i.e., structured data. It allows querying the data in real time.

• Data scientist main's job is to analyze and build predictive models. In short, a data scientist needs to know how to query data using SQL, produce a statistical report and make use of machine learning to produce predictions. Data scientist spends a significant amount of their time on cleaning, transforming and analyzing the data. Once the dataset or data workflow is ready, the data scientist uses various techniques to discover insights and hidden patterns. The data manipulation should be robust and the same easy to use. Spark is the right tool thanks to its speed and rich APIs. 

As of this writing, PySpark is not compatible with Java9 and above.

Machine learning with Spark

Now that you have a brief idea of Spark and SQLContext, you are ready to build your first Machine learning program.

You will proceed as follow:

    Step 1) Basic operation with PySpark
    Step 2) Data preprocessing
    Step 3) Build a data processing pipeline
    Step 4) Build the classifier
    Step 5) Train and evaluate the model
    Step 6) Tune the hyperparameter

In this tutorial, we will use the adult dataset. The purpose of this tutorial is to learn how to use Pyspark. For more information about the dataset, refer to this tutorial.

Note that, the dataset is not significant and you may think that the computation takes a long time. Spark is designed to process a considerable amount of data. Spark's performances increase relative to other machine learning libraries when the dataset processed grows larger. 

In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext(appName="ML")

url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
from pyspark import SparkFiles
from pyspark.sql import SQLContext
sc.addFile(url)
sqlContext = SQLContext(sc)

then, you can read the cvs file with sqlContext.read.csv. You use inferSchema set to True to tell Spark to guess automatically the type of data. By default, it is turn to False. 

In [2]:
df = sqlContext.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [3]:
df.show(5, truncate = False)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv 

If you didn't set inderShema to True, here is what is happening to the type. There are all in string. 

In [5]:
df.select('age','fnlwgt').show(5)

+---+------+
|age|fnlwgt|
+---+------+
| 25|226802|
| 38| 89814|
| 28|336951|
| 44|160323|
| 18|103497|
+---+------+
only showing top 5 rows



In [6]:
df.groupBy("education").count().sort("count",ascending=True).show()

+------------+-----+
|   education|count|
+------------+-----+
|   Preschool|   83|
|     1st-4th|  247|
|     5th-6th|  509|
|   Doctorate|  594|
|        12th|  657|
|         9th|  756|
| Prof-school|  834|
|     7th-8th|  955|
|        10th| 1389|
|  Assoc-acdm| 1601|
|        11th| 1812|
|   Assoc-voc| 2061|
|     Masters| 2657|
|   Bachelors| 8025|
|Some-college|10878|
|     HS-grad|15784|
+------------+-----+



In [7]:
df.describe().show()

+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|summary|                 x|               age|  workclass|            fnlwgt|   education|   educational-num|marital-status|      occupation|relationship|              race|gender|      capital-gain|     capital-loss|    hours-per-week|native-country|income|
+-------+------------------+------------------+-----------+------------------+------------+------------------+--------------+----------------+------------+------------------+------+------------------+-----------------+------------------+--------------+------+
|  count|             48842|             48842|      48842|             48842|       48842|             48842|         48842|           48842|       48842|             48842| 48842|             48842|            48842|  

In [9]:
df.describe('age').show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|             48842|
|   mean| 38.64358543876172|
| stddev|13.710509934443525|
|    min|                17|
|    max|                90|
+-------+------------------+



In [12]:
df.crosstab('age', 'relationship').sort("age_relationship").show()

+----------------+-------+-------------+--------------+---------+---------+----+
|age_relationship|Husband|Not-in-family|Other-relative|Own-child|Unmarried|Wife|
+----------------+-------+-------------+--------------+---------+---------+----+
|              17|      1|           22|            30|      536|        6|   0|
|              18|      5|           74|            47|      721|       10|   5|
|              19|      9|          177|            83|      745|       31|   8|
|              20|     30|          231|            81|      722|       43|   6|
|              21|     50|          241|            81|      666|       44|  14|
|              22|     85|          343|            74|      565|       83|  28|
|              23|    141|          463|            64|      544|       93|  24|
|              24|    182|          436|            55|      383|       96|  54|
|              25|    213|          417|            62|      348|      107|  48|
|              26|    260|  

Step 2) Data preprocessing

Data processing is a critical step in machine learning. After you remove garbage data, you get some important insights. For instance, you know that age is not a linear function with the income. When people are young, their income is usually lower than mid-age. After retirement, a household uses their saving, meaning a decrease in income. To capture this pattern, you can add a square to the age feature

Add age square

To add a new feature, you need to:

    Select the column
    Apply the transformation and add it to the DataFrame

In [13]:
from pyspark.sql.functions import *

# 1 Select the column
age_square = df.select(col("age")**2)

# 2 Apply the transformation and add it to the DataFrame
df = df.withColumn("age_square", col("age")**2)

df.printSchema()

root
 |-- x: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: integer (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: integer (nullable = true)
 |-- capital-loss: integer (nullable = true)
 |-- hours-per-week: integer (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)
 |-- age_square: double (nullable = true)



Step 3) Build a data processing pipeline

Similar to scikit-learn, Pyspark has a pipeline API. A pipeline is very convenient to maintain the structure of the data. You push the data into the pipeline. Inside the pipeline, various operations are done, the output is used to feed the algorithm.

For instance, one universal transformation in machine learning consists of converting a string to one hot encoder, i.e., one column by a group. One hot encoder is usually a matrix full of zeroes.

The steps to transform the data are very similar to scikit-learn. You need to:

    Index the string to numeric
    Create the one hot encoder
    Transform the data

Two APIs do the job: StringIndexer, OneHotEncoder 

First of all, you select the string column to index. The inputCol is the name of the column in the dataset. outputCol is the new name given to the transformed column. 

In [None]:
StringIndexer(inputCol="workclass", outputCol="workclass_encoded")	

Fit the data and transform it

In [None]:
model = stringIndexer.fit(df)		
indexed = model.transform(df)

Create the news columns based on the group. For instance, if there are 10 groups in the feature, the new matrix will have 10 columns, one for each group.

In [None]:
OneHotEncoder(dropLast=False, inputCol="workclassencoded", outputCol="workclassvec")

In [30]:
### Example encoder
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

stringIndexer = StringIndexer(inputCol="workclass", outputCol="workclass_encoded")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="workclass_encoded", outputCol="workclass_vec")
encoder = encoder.fit(indexed)
encoded = encoder.transform(indexed)
encoded.show(2)

+---+---+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+----------+-----------------+-------------+
|  x|age|workclass|fnlwgt|education|educational-num|    marital-status|       occupation|relationship| race|gender|capital-gain|capital-loss|hours-per-week|native-country|income|age_square|workclass_encoded|workclass_vec|
+---+---+---------+------+---------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+----------+-----------------+-------------+
|  1| 25|  Private|226802|     11th|              7|     Never-married|Machine-op-inspct|   Own-child|Black|  Male|           0|           0|            40| United-States| <=50K|     625.0|              0.0|(9,[0],[1.0])|
|  2| 38|  Private| 89814|  HS-grad|              9|Married-civ-spouse|  Farming-fishing|     Husband|White|  Ma

Build the pipeline

You will build a pipeline to convert all the precise features and add them to the final dataset. The pipeline will have four operations, but feel free to add as many operations as you want.

    Encode the categorical data
    Index the label feature
    Add continuous variable
    Assemble the steps.

Each step is stored in a list named stages. This list will tell the VectorAssembler what operation to perform inside the pipeline.

1. Encode the categorical data

This step is exaclty the same as the above example, except that you loop over all the categorical features. 

In [32]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
CATE_FEATURES = ['workclass', 'education', 'marital-status', 'occupation', 'relationship', 'race', 'gender', 'native-country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

2. Index the label feature

Spark, like many other libraries, does not accept string values for the label. You convert the label feature with StringIndexer and add it to the list stages 

In [33]:
# Convert label into label indices using the StringIndexer
label_stringIdx =  StringIndexer(inputCol="income", outputCol="newincome")
stages += [label_stringIdx]

3. Add continuous variable

The inputCols of the VectorAssembler is a list of columns. You can create a new list containing all the new columns. The code below popluate the list with encoded categorical features and the continuous features. 

In [34]:
# List of continuous features
CONTI_FEATURES  = ['age', 'fnlwgt','capital-gain', 'educational-num', 'capital-loss', 'hours-per-week']

assemblerInputs = [c + "classVec" for c in CATE_FEATURES] + CONTI_FEATURES

4. Assemble the steps.

Finally, you pass all the steps in the VectorAssembler 

In [36]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

Now that all the steps are ready, you push the data to the pipeline. 

In [37]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
model = pipelineModel.transform(df)

If you check the new dataset, you can see that it contains all the features, transformed and not transformed. You are only interested by the newlabel and features. The features includes all the transformed features and the continuous variables. 

In [38]:
model.take(1)

[Row(x=1, age=25, workclass='Private', fnlwgt=226802, education='11th', educational-num=7, marital-status='Never-married', occupation='Machine-op-inspct', relationship='Own-child', race='Black', gender='Male', capital-gain=0, capital-loss=0, hours-per-week=40, native-country='United-States', income='<=50K', age_square=625.0, workclassIndex=0.0, workclassclassVec=SparseVector(8, {0: 1.0}), educationIndex=5.0, educationclassVec=SparseVector(15, {5: 1.0}), marital-statusIndex=1.0, marital-statusclassVec=SparseVector(6, {1: 1.0}), occupationIndex=6.0, occupationclassVec=SparseVector(14, {6: 1.0}), relationshipIndex=2.0, relationshipclassVec=SparseVector(5, {2: 1.0}), raceIndex=1.0, raceclassVec=SparseVector(4, {1: 1.0}), genderIndex=0.0, genderclassVec=SparseVector(1, {0: 1.0}), native-countryIndex=0.0, native-countryclassVec=SparseVector(41, {0: 1.0}), newincome=0.0, features=SparseVector(100, {0: 1.0, 13: 1.0, 24: 1.0, 35: 1.0, 45: 1.0, 49: 1.0, 52: 1.0, 53: 1.0, 94: 25.0, 95: 226802.0, 

Step 4) Build the classifier: logistic

To make the computation faster, you convert model to a DataFrame. You need to select newlabel and features from model using map. 

In [48]:
from pyspark.ml.linalg import DenseVector
input_data = model.rdd.map(lambda x: (x["newincome"], DenseVector(x["features"])))

You are ready to create the train data as a DataFrame. You use the sqlContext 

In [49]:
df_train = sqlContext.createDataFrame(input_data, ["income", "features"])

In [50]:
df_train.show(2)

+------+--------------------+
|income|            features|
+------+--------------------+
|   0.0|[1.0,0.0,0.0,0.0,...|
|   0.0|[1.0,0.0,0.0,0.0,...|
+------+--------------------+
only showing top 2 rows



Create a train/test set

You split the dataset 80/20 with randomSplit. 

In [51]:
# Split the data into train and test sets
train_data, test_data = df_train.randomSplit([.8,.2],seed=1234)

Let's count how many people with income below/above 50k in both training and test set 

In [52]:
train_data.groupby('income').agg({'income': 'count'}).show()	

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|        29731|
|   1.0|         9295|
+------+-------------+



In [53]:
test_data.groupby('income').agg({'income': 'count'}).show()	

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|         7424|
|   1.0|         2392|
+------+-------------+



Build the logistic regressor

Last but not least, you can build the classifier. Pyspark has an API called LogisticRegression to perform logistic regression.

You initialize lr by indicating the label column and feature columns. You set a maximum of 10 iterations and add a regularization parameter with a value of 0.3. Note that in the next section, you will use cross-validation with a parameter grid to tune the model 

In [54]:
# Import `LinearRegression`
from pyspark.ml.classification import LogisticRegression

# Initialize `lr`
lr = LogisticRegression(labelCol="income",
                        featuresCol="features",
                        maxIter=10,
                        regParam=0.3)

# Fit the data to the model
linearModel = lr.fit(train_data)

In [55]:
# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(linearModel.coefficients))
print("Intercept: " + str(linearModel.intercept))

Coefficients: [-0.062290459463056475,-0.1596092472910736,-0.06575267374093999,-0.15692179895946792,-0.1272492548940923,0.1645720986637153,0.19207847304400286,-0.2634703312602943,-0.19262480707620794,-0.06530587930152512,0.22083028208154978,0.3703503304642222,-0.009029093914692343,-0.2952283126280717,0.016111973903526002,-0.3352553665206933,-0.436755581550605,0.5366020022693624,-0.40896416018289755,-0.18468796676651497,0.603982838076872,-0.34140682703931285,-0.37948075355152383,0.3279778384592752,-0.3479105587484022,-0.20337271152379682,-0.21453545879299532,-0.13431514394370955,-0.15157587937021094,0.19010797769160911,-0.06868428196292928,0.2995150011312705,-0.12428164472354983,0.042102538721418954,-0.28154545879138165,-0.19453938122902198,-0.1582091687368907,-0.1340897761328162,-0.28536922494593336,-0.3580558840220062,0.12797378841052606,0.10738329738201971,-0.3008495785218508,0.2724039232382477,-0.19373670821554728,-0.2895726784347856,-0.24501638986086982,0.4349447725931794,-0.0550928

Step 5) Train and evaluate the model

To generate prediction for your test set, you can use linearModel with transform() on test_data 

In [56]:
# Make predictions on test data using the transform() method.
predictions = linearModel.transform(test_data)

In [57]:
predictions.printSchema()

root
 |-- income: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



You are interested by the label, prediction and the probability 

In [59]:
selected = predictions.select("income", "prediction", "probability")
selected.show(20)

+------+----------+--------------------+
|income|prediction|         probability|
+------+----------+--------------------+
|   0.0|       0.0|[0.93098566170068...|
|   0.0|       0.0|[0.92369259506918...|
|   0.0|       1.0|[0.35954621838240...|
|   0.0|       0.0|[0.91064720394777...|
|   0.0|       0.0|[0.88933661204705...|
|   0.0|       0.0|[0.64834756972696...|
|   0.0|       0.0|[0.66262040852769...|
|   0.0|       1.0|[0.44841664127453...|
|   0.0|       0.0|[0.84050868469418...|
|   0.0|       0.0|[0.84999336882262...|
|   0.0|       0.0|[0.72698078269065...|
|   0.0|       0.0|[0.84158214926617...|
|   0.0|       0.0|[0.75952643445408...|
|   0.0|       0.0|[0.83301890740706...|
|   0.0|       0.0|[0.79353029070515...|
|   0.0|       0.0|[0.81356864076497...|
|   0.0|       0.0|[0.85152568791280...|
|   0.0|       0.0|[0.87866998095111...|
|   0.0|       0.0|[0.83846221320186...|
|   0.0|       0.0|[0.63312826371495...|
+------+----------+--------------------+
only showing top

Evaluate the model

You need to look at the accuracy metric to see how well (or bad) the model performs. Currently, there is no API to compute the accuracy measure in Spark. The default value is the ROC, receiver operating characteristic curve. It is a different metrics that take into account the false positive rate.

Before you look at the ROC, let's construct the accuracy measure. You are more familiar with this metric. The accuracy measure is the sum of the correct prediction over the total number of observations.

You create a DataFrame with the label and the `prediction. 

In [60]:
cm = predictions.select("income", "prediction")
cm.groupby('income').agg({'income': 'count'}).show()

+------+-------------+
|income|count(income)|
+------+-------------+
|   0.0|         7424|
|   1.0|         2392|
+------+-------------+



In [61]:
cm.groupby('prediction').agg({'prediction': 'count'}).show()	

+----------+-----------------+
|prediction|count(prediction)|
+----------+-----------------+
|       0.0|             8842|
|       1.0|              974|
+----------+-----------------+



For instance, in the test set, there is 1578 household with an income above 50k and 5021 below. The classifier, however, predicted 617 households with income above 50k.

You can compute the accuracy by computing the count when the label are correctly classified over the total number of rows. 

In [62]:
cm.filter(cm.income == cm.prediction).count() / cm.count()

0.8192746536267319

You can wrap everything together and write a function to compute the accuracy. 

In [65]:
def accuracy_m(model): 
    predictions = model.transform(test_data)
    cm = predictions.select("income", "prediction")
    acc = cm.filter(cm.income == cm.prediction).count() / cm.count()
    print("Model accuracy: %.3f%%" % (acc * 100)) 
accuracy_m(model = linearModel)

Model accuracy: 81.927%


ROC metrics

The module BinaryClassificationEvaluator includes the ROC measures. The Receiver Operating Characteristic curve is another common tool used with binary classification. It is very similar to the precision/recall curve, but instead of plotting precision versus recall, the ROC curve shows the true positive rate (i.e. recall) against the false positive rate. The false positive rate is the ratio of negative instances that are incorrectly classified as positive. It is equal to one minus the true negative rate. The true negative rate is also called specificity. Hence the ROC curve plots sensitivity (recall) versus 1 - specificity 

In [67]:
### Use ROC 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol='income', rawPredictionCol="rawPrediction")
print(evaluator.evaluate(predictions))
print(evaluator.getMetricName())

0.8950386210140153
areaUnderROC


Step 6) Tune the hyperparameter

Last but not least, you can tune the hyperparameters. Similar to scikit learn you create a parameter grid, and you add the parameters you want to tune. To reduce the time of the computation, you only tune the regularization parameter with only two values. 

In [68]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5])
             .build())

Finally, you evaluate the model with using the cross valiation method with 5 folds. It takes around 16 minutes to train. 

In [69]:
from time import *
start_time = time()

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=evaluator, numFolds=2)

# Run cross validations
cvModel = cv.fit(train_data)
# likely take a fair amount of time
end_time = time()
elapsed_time = end_time - start_time
print("Time to train model: %.3f seconds" % elapsed_time)

Time to train model: 413.322 seconds


In [70]:
accuracy_m(model = cvModel)

Model accuracy: 84.658%


You can exctract the recommended parameter by chaining cvModel.bestModel with extractParamMap() 

In [71]:
bestModel = cvModel.bestModel
bestModel.extractParamMap()

{Param(parent='LogisticRegression_ffc9dc059fc1', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_ffc9dc059fc1', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_ffc9dc059fc1', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_ffc9dc059fc1', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_ffc9dc059fc1', name='labelCol', doc='label column name.'): 'income',
 Param(parent='LogisticRegression_ffc9dc059fc1', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='LogisticRegression_ffc9dc059fc1', name='probabilityCol', doc='Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! Thes

Summary

Spark is a fundamental tool for a data scientist. It allows the practitioner to connect an app to different data sources, perform data analysis seamlessly or add a predictive model.

To begin with Spark, you need to initiate a Spark Context with:

`SparkContext()``

and and SQL context to connect to a data source:

`SQLContext()``

In the tutorial, you learn how to train a logistic regression:

    Convert the dataset to a Dataframe with:

In [None]:
rdd.map(lambda x: (x["newlabel"], DenseVector(x["features"])))
sqlContext.createDataFrame(input_data, ["label", "features"])

Note that the label's column name is newlabel and all the features are gather in features. Change these values if different in your dataset.

    Create the train/test set

In [None]:
randomSplit([.8,.2],seed=1234)

In [None]:
Train the model

In [None]:
LogisticRegression(labelCol="income",featuresCol="features",maxIter=10, regParam=0.3)
lr.fit()

In [None]:
Make prediction

In [None]:
linearModel.transform()