# Make Notebook run with IBM Watson

In [1]:
# The code was removed by Watson Studio for sharing.

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200423112957-0004
KERNEL_ID = 7a15a6c5-cf58-4e64-9128-97aa81491851


In [2]:
# START CODE BLOCK
# cos2file - takes an object from Cloud Object Storage and writes it to file on container file system.
# Uses the IBM project_lib library.
# See https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/project-lib-python.html
# Arguments:
# p: project object defined in project token
# data_path: the directory to write the file
# filename: name of the file in COS

import os
def cos2file(p,data_path,filename):
    data_dir = p.project_context.home + data_path
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
    open( data_dir + '/' + filename, 'wb').write(p.get_file(filename).read())

# file2cos - takes file on container file system and writes it to an object in Cloud Object Storage.
# Uses the IBM project_lib library.
# See https://dataplatform.cloud.ibm.com/docs/content/wsj/analyze-data/project-lib-python.html
# Arguments:
# p: prooject object defined in project token
# data_path: the directory to read the file from
# filename: name of the file on container file system

import os
def file2cos(p,data_path,filename):
    data_dir = p.project_context.home + data_path
    path_to_file = data_dir + '/' + filename
    if os.path.exists(path_to_file):
        file_object = open(path_to_file, 'rb')
        p.save_data(filename, file_object, set_project_asset=True, overwrite=True)
    else:
        print("file2cos error: File not found")
# END CODE BLOCK

In [3]:
cos2file(project, '/data', 'aavail-target.csv')

# Spark Machine Learning

Keep the main [Spark ML documentation](https://spark.apache.org/docs/latest/ml-pipeline.html) as you go through this tutorial.  MLlib is Spark’s machine learning (ML) library. **Spark ML** is not an official name, but we will use it to refer to the MLlib DataFrame-based API that embraces ML pipelines. Before we get into Spark ML by demonstrating a couple of examples we will first review Spark DataFrames.

In [4]:
import re
import os
from collections import Counter
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

plt.style.use('seaborn')
%matplotlib inline

## Spark SQL and DataFrames

What is Spark SQL?
- Spark SQL takes basic RDDs and **puts a schema on them**.

What is a DataFrame?
- DataFrames are the primary abstraction in Spark SQL.
- Think of a DataFrames as **RDDs with schema**.

What are **schemas**?
- Schemas are metadata about your data.
- Schema = Table Names + Column Names + Column Types

What are the pros of schemas?
- Schemas enable using **column names** instead of column positions
- Schemas enable **queries** using SQL and DataFrame syntax
- Schemas make your data more **structured**.

See the [Spark SQL documentation](https://spark.apache.org/docs/latest/sql-programming-guide.html) as a main point of reference for Spark SQL, DataFrames and Datasets.

## Creating DataFrames

You can create a DataFrame from an existing RDD (whatever source you used to create this one), if you add a schema.

To build a schema, you will use existing data types provided in the [`pyspqrk.sql.types`](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types) module.  

<center>
<table style="width:50%">
  <tr>
      <th>Types</th>
      <th>Python Equivalent</th>
    </tr>
  <tr>
      <td>StringType</td>
      <td>string</td>
  </tr>
  <tr>
      <td>IntegerType</td>
      <td>integer</td>
   <tr>
      <td>FloatType</td>
      <td>float</td> 
  <tr>
      <td>ArrayType</td>
      <td>array or list</td>
   </tr>
    <tr>
      <td>MapType</td>
      <td>dict</td>
   </tr>       
</table>
</center>

First we initialize the Spark Environment

In [5]:
import pyspark as ps

spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("spark-ml-examples") \
            .getOrCreate()

sc = spark.sparkContext

the `local[4]` will create a `local` cluster made of the driver using all 4 cores.  Lets start with a very small file to to demonstrate the different ways to create Spark DataFrames.

In [6]:
data_dir = os.path.join(".","data")
def casting_function(args):
    customer_id, is_subscriber, country, age, customer_name, subscriber_type, num_streams = args
    return((int(customer_id), int(is_subscriber), country, int(age), customer_name, subscriber_type, int(num_streams)))

rdd_aavail = sc.textFile(os.path.join(data_dir,'aavail-target.csv'))\
                         .map(lambda rowstr : rowstr.split(","))\
                         .filter(lambda row: not row[0].startswith('c'))\
                         .map(casting_function)

rdd_aavail.collect()[:5]

[(1, 1, 'united_states', 21, 'Kasen Todd', 'aavail_premium', 23),
 (2, 0, 'singapore', 30, 'Ensley Garza', 'aavail_unlimited', 12),
 (3, 0, 'united_states', 21, 'Lillian Carey', 'aavail_premium', 22),
 (4, 1, 'united_states', 20, 'Beau Christensen', 'aavail_basic', 19),
 (5, 1, 'singapore', 21, 'Ernesto Gibson', 'aavail_premium', 23)]

You can create a Spark DataFrame using a schema that you have defined or it can be inferred.  To create your own. 

In [7]:
from pyspark.sql.types import *

schema = StructType([
    StructField('customer_id',IntegerType(),True),
    StructField('is_subscriber',IntegerType(),True),
    StructField('country',StringType(),True),
    StructField('age',IntegerType(),True),
    StructField('customer_name',StringType(),True),
    StructField('subscriber_type',StringType(),True),
    StructField('num_streams',IntegerType(),True) ])
    
# feed that into a DataFrame
df = spark.createDataFrame(rdd_aavail,schema)

# show the result
df.show()

# print the schema
df.printSchema()  

+-----------+-------------+-------------+---+----------------+----------------+-----------+
|customer_id|is_subscriber|      country|age|   customer_name| subscriber_type|num_streams|
+-----------+-------------+-------------+---+----------------+----------------+-----------+
|          1|            1|united_states| 21|      Kasen Todd|  aavail_premium|         23|
|          2|            0|    singapore| 30|    Ensley Garza|aavail_unlimited|         12|
|          3|            0|united_states| 21|   Lillian Carey|  aavail_premium|         22|
|          4|            1|united_states| 20|Beau Christensen|    aavail_basic|         19|
|          5|            1|    singapore| 21|  Ernesto Gibson|  aavail_premium|         23|
|          6|            1|united_states| 21|  Deshawn Murray|  aavail_premium|         20|
|          7|            0|    singapore| 48|     Daxton Tate|    aavail_basic|         18|
|          8|            1|united_states| 47|    Tenley Small|  aavail_premium| 

You may also read the data directly from a file and **infer** the schema

In [8]:
# read CSV
df = spark.read.csv(os.path.join(data_dir,'aavail-target.csv'),
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

# prints the schema
df.printSchema()

# some functions are still valid
print("line count: {}".format(df.count()))

# show the table in a nice format
df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- is_subscriber: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- subscriber_type: string (nullable = true)
 |-- num_streams: integer (nullable = true)

line count: 1000
+-----------+-------------+-------------+---+----------------+----------------+-----------+
|customer_id|is_subscriber|      country|age|   customer_name| subscriber_type|num_streams|
+-----------+-------------+-------------+---+----------------+----------------+-----------+
|          1|            1|united_states| 21|      Kasen Todd|  aavail_premium|         23|
|          2|            0|    singapore| 30|    Ensley Garza|aavail_unlimited|         12|
|          3|            0|united_states| 21|   Lillian Carey|  aavail_premium|         22|
|          4|            1|united_states| 20|Beau Christensen|    aavail_basic|         19|
|          5|            1|    s

You can turn the DataFrame into a Panda DataFrame, but be careful since this 'action' will put all the data into memory

In [9]:
df.toPandas()

Unnamed: 0,customer_id,is_subscriber,country,age,customer_name,subscriber_type,num_streams
0,1,1,united_states,21,Kasen Todd,aavail_premium,23
1,2,0,singapore,30,Ensley Garza,aavail_unlimited,12
2,3,0,united_states,21,Lillian Carey,aavail_premium,22
3,4,1,united_states,20,Beau Christensen,aavail_basic,19
4,5,1,singapore,21,Ernesto Gibson,aavail_premium,23
5,6,1,united_states,21,Deshawn Murray,aavail_premium,20
6,7,0,singapore,48,Daxton Tate,aavail_basic,18
7,8,1,united_states,47,Tenley Small,aavail_premium,20
8,9,0,united_states,21,Kyra Chase,aavail_premium,24
9,10,0,united_states,26,London Barber,aavail_basic,20


Here are some common operations that you might perform on a DataFrame

In [10]:
# prints the schema
print("--- printSchema()")
df.printSchema()

# prints the table itself
print("--- show()")
df.show()

# show the statistics of all numerical columns
print("--- describe()")
df.describe().show()

# show the statistics of one specific column
print("--- describe(Amount)")
df.describe("num_streams").show()

--- printSchema()
root
 |-- customer_id: integer (nullable = true)
 |-- is_subscriber: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- subscriber_type: string (nullable = true)
 |-- num_streams: integer (nullable = true)

--- show()
+-----------+-------------+-------------+---+----------------+----------------+-----------+
|customer_id|is_subscriber|      country|age|   customer_name| subscriber_type|num_streams|
+-----------+-------------+-------------+---+----------------+----------------+-----------+
|          1|            1|united_states| 21|      Kasen Todd|  aavail_premium|         23|
|          2|            0|    singapore| 30|    Ensley Garza|aavail_unlimited|         12|
|          3|            0|united_states| 21|   Lillian Carey|  aavail_premium|         22|
|          4|            1|united_states| 20|Beau Christensen|    aavail_basic|         19|
|          5|       

## Transformations on DataFrames

- They are still **lazy**: Spark doesn't apply the transformation right away, it just builds on the **DAG**
- They transform a DataFrame into another because DataFrames are also **immutable**.
- They can be **wide** or **narrow** (whether they shuffle partitions or not).


Lets read in in the AAVAIL dataset that we have been working with to demonstrate the transformations.

In [11]:
# read CSV
df_aavail = spark.read.csv(os.path.join(data_dir,'aavail-target.csv'),
                           header=True,       
                           quote='"',         
                           sep=",",          
                           inferSchema=True)
df_aavail.describe().show()

+-------+-----------------+------------------+-------------+------------------+--------------+----------------+-----------------+
|summary|      customer_id|     is_subscriber|      country|               age| customer_name| subscriber_type|      num_streams|
+-------+-----------------+------------------+-------------+------------------+--------------+----------------+-----------------+
|  count|             1000|              1000|         1000|              1000|          1000|            1000|             1000|
|   mean|            500.5|             0.711|         null|            25.325|          null|            null|           17.695|
| stddev|288.8194360957494|0.4535247343692345|         null|12.184655959067568|          null|            null|4.798020007877829|
|    min|                1|                 0|    singapore|               -50|Aaliyah Duarte|    aavail_basic|                1|
|    max|             1000|                 1|united_states|                50|   Zoie Cor

## Remove one or more columns

In [12]:
columns_to_drop = ['customer_id','customer_name']
df_aavail = df_aavail.drop(*columns_to_drop)
df_aavail.describe().show()
df_aavail.groupBy("subscriber_type").count().show()

+-------+------------------+-------------+------------------+----------------+-----------------+
|summary|     is_subscriber|      country|               age| subscriber_type|      num_streams|
+-------+------------------+-------------+------------------+----------------+-----------------+
|  count|              1000|         1000|              1000|            1000|             1000|
|   mean|             0.711|         null|            25.325|            null|           17.695|
| stddev|0.4535247343692345|         null|12.184655959067568|            null|4.798020007877829|
|    min|                 0|    singapore|               -50|    aavail_basic|                1|
|    max|                 1|united_states|                50|aavail_unlimited|               29|
+-------+------------------+-------------+------------------+----------------+-----------------+

+----------------+-----+
| subscriber_type|count|
+----------------+-----+
|  aavail_premium|  331|
|aavail_unlimited|  302|
|

## Transformations on a feature matrix

The following example demonstrates how to deal with categorical features and scale continuous ones

In [13]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline

## scale the continuous features
va = VectorAssembler(inputCols=["age", "num_streams"], outputCol="cont_features")
ss = standardScaler = StandardScaler(inputCol="cont_features", outputCol="cont_scaled")

## categorical variable transformation
cat_cols = ["country","subscriber_type"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in cat_cols]
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_oh") for column in cat_cols]

## assemple the features for input into the ML model
assembler = VectorAssembler(inputCols=["cont_scaled", "country_oh","subscriber_type_oh"], outputCol="features")

MLlib Estimators and Transformers use the same API for specifying parameters. There are two basic methods to pass parameters:

* **Param** - A named parameter with a self-contained documentation
* **ParamMap** - Is a set of (parameter, value) pairs

In [14]:
## setup a model
gbt = GBTClassifier(labelCol="is_subscriber", featuresCol="features")
paramMap = {gbt.maxIter: 20}

## Setup the pipeline and train the model

In [15]:
## run the whole pipeline
pipe = Pipeline(stages=indexers+encoders+[va,ss,assembler,gbt])
_result = pipe.fit(df_aavail,paramMap).transform(df_aavail)
result = _result.select("features", "is_subscriber", "rawPrediction", "probability","prediction")
result.show()

+--------------------+-------------+--------------------+--------------------+----------+
|            features|is_subscriber|       rawPrediction|         probability|prediction|
+--------------------+-------------+--------------------+--------------------+----------+
|[1.72347910934425...|            1|[-0.7104551723677...|[0.19451890992497...|       1.0|
|(5,[0,1],[2.46211...|            0|[0.77502044150328...|[0.82491963653378...|       0.0|
|[1.72347910934425...|            0|[-0.7104551723677...|[0.19451890992497...|       1.0|
|[1.64140867556596...|            1|[-0.9291971347680...|[0.13489032271470...|       1.0|
|[1.72347910934425...|            1|[0.00233192525665...|[0.50116596051488...|       0.0|
|[1.72347910934425...|            1|[-0.7703335442768...|[0.17643832065912...|       1.0|
|[3.93938082135830...|            0|[0.74986801956411...|[0.81753510406590...|       0.0|
|[3.85731038758001...|            1|[-0.9215852644165...|[0.13667675103731...|       1.0|
|[1.723479

## Now the same procedure with a train-test split, cross-validations and grid-search

A train-test split can be carried out with TrainValidationSplit. Cross Validation is accomplished in Spark MLlib using the CrossValidator() object. A data set is split into a set of folds which are used as separate training and test datasets. The CrossValidator computes the average evaluation metric for the k models produced by fitting the Estimator on the k different (training, test) dataset pairs. This helps identify the best ParamMap, which is then used to re-fit the Estimator with the entire dataset.

In [16]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

train, test = df_aavail.randomSplit([0.8, 0.2], seed=42)

gbt = GBTClassifier(labelCol="is_subscriber", featuresCol="features")
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10, 20]) \
    .addGrid(gbt.stepSize, [0.01, 0.1]) \
    .build()

pipe = Pipeline(stages=indexers+encoders+[va,ss,assembler])
pipeline_model = pipe.fit(train)
prepped_train = pipeline_model.transform(train)
prepped_test = pipeline_model.transform(test)

crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(labelCol="is_subscriber"),
                          numFolds=3)

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(prepped_train)
print("model trained")

model trained


In [17]:
prediction = cvModel.transform(prepped_test)
result = prediction.select("features", "is_subscriber", "rawPrediction", "probability","prediction")
result.show()

+--------------------+-------------+--------------------+--------------------+----------+
|            features|is_subscriber|       rawPrediction|         probability|prediction|
+--------------------+-------------+--------------------+--------------------+----------+
|[-4.0596848816199...|            0|[0.51824301405819...|[0.73817141398644...|       0.0|
|[-3.9733086075428...|            0|[-0.6025984312611...|[0.23055201394382...|       1.0|
|(5,[0,1],[-3.9733...|            0|[0.56437071574429...|[0.75560655995383...|       0.0|
|(5,[0,1],[1.46839...|            0|[0.53723327942381...|[0.74544540342558...|       0.0|
|(5,[0,1],[1.46839...|            0|[0.53723327942381...|[0.74544540342558...|       0.0|
|(5,[0,1],[1.64114...|            0|[0.47340782396592...|[0.72047432841242...|       0.0|
|(5,[0,1],[1.72752...|            0|[0.47340782396592...|[0.72047432841242...|       0.0|
|(5,[0,1],[1.72752...|            0|[0.46519184595527...|[0.71715312177940...|       0.0|
|[1.813901

# Spark Supervised Learning

Spark MLlib has a number of available supervised learning algorithms—specifically those used for classification and regression. Many of the commonly used algorithms have been implemented including: random forests, gradient boosted trees, linear support vector machines and even basic multilayer perceptrons.

Spark Mlib has fewer models and algorithms to choose from compared to scikit-learn’s supervised learning, but many of the most popular methods are present. Both random forests and gradient boosted trees are models used in production and should be on your radar when comparing models. They both use decision trees as a base model.

In [18]:
import pyspark as ps
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [19]:
import requests
text = requests.get('https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_libsvm_data.txt').text

with open("sample_libsvm_data.txt", "w") as text_file:
    text_file.write(text)

In [20]:
# Index and parse the data file, converting it to a DataFrame
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")
data.show(5)

# Index labels, adding metadata to the labels columns
# Fit on whole dataset to include all labels in index
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)

# Automatically identify categorical features and index them.
# Set maxCategories so features with > 4 distinct values are treates as continuous.
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labelIndexer.labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

# Train model. This also runs the Indexers
model = pipeline.fit(trainingData)

# Make predictions
predictions = model.transform(testData)

# Select example rows to display
predictions.select("predictedLabel", "label", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", 
                                              predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)  # summary only

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|
+-----+--------------------+
only showing top 5 rows

+--------------+-----+--------------------+
|predictedLabel|label|            features|
+--------------+-----+--------------------+
|           0.0|  0.0|(692,[100,101,102...|
|           0.0|  0.0|(692,[124,125,126...|
|           0.0|  0.0|(692,[126,127,128...|
|           0.0|  0.0|(692,[126,127,128...|
|           0.0|  0.0|(692,[152,153,154...|
+--------------+-----+--------------------+
only showing top 5 rows

Test Error = 0.0333333
RandomForestClassificationModel (uid=RandomForestClassifier_04ee130a95c9) with 10 trees


# Spark Unsupervised Learning

Spark MLlib has several available tools for unsupervised learning—namely dimension reduction and clustering. For clustering, K-means and Gaussian Mixture Models (GMMs) are the main tools. Latent Dirichlet Allocation (LDA) is available as a tool for clustering over documents of natural language. This is a particularly important tool since the size of NLP datasets can often make single-node computation challenging.

For dimension reduction, two of the most frequently used tools are PCA and the Chi-Squared Feature Selector. All of the tools in the unsupervised learning category take the form of a transformer or an estimator and, in keeping with the scikit-learn API, they too can be assembled in pipelines.