### Spark-ML

1. Chain spark dataframe methods together to do data munging.
2. Be able to describe the Spark-ML API, and recognize differences to sk-learn.
3. Chain Spark-ML Transformers and Estimators together to compose ML pipelines.

In [1]:
import pyspark.sql.functions as F
import pyspark as ps
from pyspark import SQLContext  

spark = ps.sql.SparkSession.builder \
    .master('local[2]') \
    .appName('spark-ml') \
    .getOrCreate()

sc = spark.sparkContext
print("imported pyspark")

imported pyspark


In [2]:
sqlContext = SQLContext(sc)
print("defined sqlcontext")

defined sqlcontext


### Find the date on which AAPL's closing stock price was the highest

#### Input DataFrame



In [4]:

# read CSV
df_aapl = sqlContext.read.csv('data/aapl.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

df_aapl.show(5) #df.head(2)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|09/05/2018|186.550003|187.399994|185.220001|187.360001|186.640305|23211200|
|10/05/2018|187.740005|190.369995|187.649994|190.039993|189.309998|27989300|
|11/05/2018|189.490005|190.059998|187.449997|188.589996|188.589996|26212200|
|14/05/2018|189.009995|189.529999|187.860001|188.149994|188.149994|20778800|
|15/05/2018|186.779999|187.070007|185.100006|186.440002|186.440002|23695200|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows



In [5]:
df_aapl.schema #df.info()

StructType(List(StructField(Date,StringType,true),StructField(Open,DoubleType,true),StructField(High,DoubleType,true),StructField(Low,DoubleType,true),StructField(Close,DoubleType,true),StructField(Adj Close,DoubleType,true),StructField(Volume,IntegerType,true)))

### Task

Now, design a pipeline that will:

1. Keep only fields for Date and Close
2. Order by Close in descending order

### Code

In [6]:
df_out = df_aapl.select('Date', 'Close').orderBy('Close', ascending=False)

df_out.show(5)

+----------+----------+
|      Date|     Close|
+----------+----------+
|06/06/2018|193.979996|
|07/06/2018|193.460007|
|05/06/2018|193.309998|
|04/06/2018|191.830002|
|08/06/2018|191.699997|
+----------+----------+
only showing top 5 rows



**Solution**

df_out.select("Close", "Date").orderBy(df_aapl.Close, ascending=False).show(5)

**Supervised Machine Learning on DataFrames**


What is the difference between df_aapl and df_vector after running the code below?

In [7]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

# assemble values in a vector
vectorAssembler = VectorAssembler(inputCols=["Close"], outputCol="Features")


df_vector = vectorAssembler.transform(df_aapl)
df_aapl.show(5)

df_vector.show(5)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|09/05/2018|186.550003|187.399994|185.220001|187.360001|186.640305|23211200|
|10/05/2018|187.740005|190.369995|187.649994|190.039993|189.309998|27989300|
|11/05/2018|189.490005|190.059998|187.449997|188.589996|188.589996|26212200|
|14/05/2018|189.009995|189.529999|187.860001|188.149994|188.149994|20778800|
|15/05/2018|186.779999|187.070007|185.100006|186.440002|186.440002|23695200|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows

+----------+----------+----------+----------+----------+----------+--------+------------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|    Features|
+----------+----------+----------+----------+----------+----------+--------+------------+
|09/05/2018|

In [8]:
# Gotta have the column be a vector.
scaler = MinMaxScaler(inputCol="Features", outputCol="Scaled Features")

# Compute summary statistics and generate MinMaxScalerModel
scaler_model = scaler.fit(df_vector)

# rescale each feature to range [min, max].
scaled_data = scaler_model.transform(df_vector)
scaled_data.select("Features", "Scaled Features").show(10)

+------------+--------------------+
|    Features|     Scaled Features|
+------------+--------------------+
|[187.360001]|[0.13689742813492...|
|[190.039993]|[0.48630977478742...|
|[188.589996]|[0.2972618767306078]|
|[188.149994]|[0.23989523856459...|
|[186.440002]|[0.01694967847449...|
|[188.179993]|[0.24380645210076...|
|[186.990005]|[0.08865804137106...|
|[186.309998]|               [0.0]|
|[187.630005]|[0.17210004487615...|
|[187.160004]|[0.11082219317397...|
+------------+--------------------+
only showing top 10 rows



## Transformers

The VectorAssembler class above is an example of a generic type in Spark, called a Transformer. Important things to know about this type:

* They implement a transform method.
* They convert one DataFrame into another, usually by adding columns.

Examples of Transformers: VectorAssembler, Tokenizer, StopWordsRemover, and many more.

## Estimators

According to the docs: "An Estimator abstracts the concept of a learning algorithm or any algorithm that fits or trains on data". Important things to know about this type:

* They implement a fit method whose argument is a DataFrame.
* The output of fit is another type called Model, which is a Transformer.

Examples of Estimators: LogisticRegression, DecisionTreeRegressor, and many more.

## Pipelines

Many Data Science workflows can be described as sequential application of various Transforms and Estimators.

<img src="http://spark.apache.org/docs/latest/img/ml-Pipeline.png">

Let's see two ways to implement the above flow!



In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RegexTokenizer, HashingTF

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "spark is like hadoop mapreduce", 1.0),
    (1, "sparks light fire!!!", 0.0),
    (2, "elephants like simba", 0.0),
    (3, "hadoop is an elephant", 1.0),
    (4, "hadoop mapreduce", 1.0)
], ["id", "text", "label"])

print("imported pyspark")

imported pyspark


In [11]:
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W")
hashingTF = HashingTF(inputCol="tokens", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

tokens = regexTokenizer.transform(training)
hashes = hashingTF.transform(tokens)
logistic_model = lr.fit(hashes) # Uses columns named features/label by default

In [12]:
# Prepare test documents, which are unlabeled (id, text) tuples.
test = spark.createDataFrame([
    (5, "simba has a spark"),
    (6, "hadoop"),
    (7, "mapreduce in spark"),
    (8, "apache hadoop")
], ["id", "text"])

# What do we need to do to this to get a prediction?
preds = logistic_model.transform(hashingTF.transform(regexTokenizer.transform(test)))
preds.select('text', 'prediction', 'probability').show()

+------------------+----------+--------------------+
|              text|prediction|         probability|
+------------------+----------+--------------------+
| simba has a spark|       0.0|[0.78779795057740...|
|            hadoop|       1.0|[0.02996000405249...|
|mapreduce in spark|       1.0|[0.02396543994089...|
|     apache hadoop|       1.0|[0.02996000405249...|
+------------------+----------+--------------------+



**Alternatively**

In [13]:
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W")
hashingTF = HashingTF(inputCol="tokens", outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[regexTokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

In [14]:
#How can we test this against our training data?
prediction = model.transform(test)
prediction.select(['text', 'prediction', 'probability']).show()

+------------------+----------+--------------------+
|              text|prediction|         probability|
+------------------+----------+--------------------+
| simba has a spark|       0.0|[0.78779795057740...|
|            hadoop|       1.0|[0.02996000405249...|
|mapreduce in spark|       1.0|[0.02396543994089...|
|     apache hadoop|       1.0|[0.02996000405249...|
+------------------+----------+--------------------+

