In [3]:
import findspark
findspark.init()

import pyspark as ps
from pyspark.sql import SparkSession
from pyspark import SQLContext

In [2]:
spark = ps.sql.SparkSession.builder.master("local[4]").appName("spark-ML").getOrCreate()

sc = spark.sparkContext

In [6]:
sqlContext = SQLContext(sc)

In [7]:
df_aapl = sqlContext.read.csv('data/aapl.csv',
                         header=True,       # use headers or not
                         quote='"',         # char for quotes
                         sep=",",           # char for separation
                         inferSchema=True)  # do we infer schema or not ?

df_aapl.show(5) #df.head(2)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2018-05-09|186.550003|187.399994|185.220001|187.360001|186.640305|23211200|
|2018-05-10|187.740005|190.369995|187.649994|190.039993|189.309998|27989300|
|2018-05-11|189.490005|190.059998|187.449997|188.589996|188.589996|26212200|
|2018-05-14|189.009995|189.529999|187.860001|188.149994|188.149994|20778800|
|2018-05-15|186.779999|187.070007|185.100006|186.440002|186.440002|23695200|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows



In [9]:
df_aapl.schema

StructType(List(StructField(Date,StringType,true),StructField(Open,DoubleType,true),StructField(High,DoubleType,true),StructField(Low,DoubleType,true),StructField(Close,DoubleType,true),StructField(Adj Close,DoubleType,true),StructField(Volume,IntegerType,true)))

In [11]:
df_sorted = df_aapl.select("Date", "Close").orderBy("Close", ascending = False)
df_sorted.show(5)

+----------+----------+
|      Date|     Close|
+----------+----------+
|2018-06-06|193.979996|
|2018-06-07|193.460007|
|2018-06-05|193.309998|
|2018-06-04|191.830002|
|2018-06-08|191.699997|
+----------+----------+
only showing top 5 rows



In [14]:
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

In [16]:
vectorAssembler = VectorAssembler(inputCols=["Close"], outputCol="Features")
df_vector = vectorAssembler.transform(df_aapl)
df_aapl.show(5)

df_vector.show(5)

+----------+----------+----------+----------+----------+----------+--------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|
+----------+----------+----------+----------+----------+----------+--------+
|2018-05-09|186.550003|187.399994|185.220001|187.360001|186.640305|23211200|
|2018-05-10|187.740005|190.369995|187.649994|190.039993|189.309998|27989300|
|2018-05-11|189.490005|190.059998|187.449997|188.589996|188.589996|26212200|
|2018-05-14|189.009995|189.529999|187.860001|188.149994|188.149994|20778800|
|2018-05-15|186.779999|187.070007|185.100006|186.440002|186.440002|23695200|
+----------+----------+----------+----------+----------+----------+--------+
only showing top 5 rows

+----------+----------+----------+----------+----------+----------+--------+------------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|    Features|
+----------+----------+----------+----------+----------+----------+--------+------------+
|2018-05-09|

In [19]:
scaler = MinMaxScaler(inputCol = "Features", outputCol = "Scaled Features")

scaler_model = scaler.fit(df_vector)
scaled_data = scaler_model.transform(df_vector)
scaled_data.show(5)

+----------+----------+----------+----------+----------+----------+--------+------------+--------------------+
|      Date|      Open|      High|       Low|     Close| Adj Close|  Volume|    Features|     Scaled Features|
+----------+----------+----------+----------+----------+----------+--------+------------+--------------------+
|2018-05-09|186.550003|187.399994|185.220001|187.360001|186.640305|23211200|[187.360001]|[0.13689742813492...|
|2018-05-10|187.740005|190.369995|187.649994|190.039993|189.309998|27989300|[190.039993]|[0.48630977478742...|
|2018-05-11|189.490005|190.059998|187.449997|188.589996|188.589996|26212200|[188.589996]|[0.2972618767306078]|
|2018-05-14|189.009995|189.529999|187.860001|188.149994|188.149994|20778800|[188.149994]|[0.23989523856459...|
|2018-05-15|186.779999|187.070007|185.100006|186.440002|186.440002|23695200|[186.440002]|[0.01694967847449...|
+----------+----------+----------+----------+----------+----------+--------+------------+--------------------+
o

In [21]:
scaled_data.select("Features", "Scaled Features").show(5)

+------------+--------------------+
|    Features|     Scaled Features|
+------------+--------------------+
|[187.360001]|[0.13689742813492...|
|[190.039993]|[0.48630977478742...|
|[188.589996]|[0.2972618767306078]|
|[188.149994]|[0.23989523856459...|
|[186.440002]|[0.01694967847449...|
+------------+--------------------+
only showing top 5 rows



In [22]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import RegexTokenizer, HashingTF

# Prepare training documents from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "spark is like hadoop mapreduce", 1.0),
    (1, "sparks light fire!!!", 0.0),
    (2, "elephants like simba", 0.0),
    (3, "hadoop is an elephant", 1.0),
    (4, "hadoop mapreduce", 1.0)
], ["id", "text", "label"])

In [26]:
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="tokens", pattern="\\W")
hashingTF = HashingTF(inputCol="tokens", outputCol="features")
tokens = regexTokenizer.transform(training)
hashes = hashingTF.transform(tokens)

In [27]:
tokens.show(5)
hashes.show(5)

+---+--------------------+-----+--------------------+
| id|                text|label|              tokens|
+---+--------------------+-----+--------------------+
|  0|spark is like had...|  1.0|[spark, is, like,...|
|  1|sparks light fire!!!|  0.0|[sparks, light, f...|
|  2|elephants like simba|  0.0|[elephants, like,...|
|  3|hadoop is an elep...|  1.0|[hadoop, is, an, ...|
|  4|    hadoop mapreduce|  1.0| [hadoop, mapreduce]|
+---+--------------------+-----+--------------------+

+---+--------------------+-----+--------------------+--------------------+
| id|                text|label|              tokens|            features|
+---+--------------------+-----+--------------------+--------------------+
|  0|spark is like had...|  1.0|[spark, is, like,...|(262144,[106841,1...|
|  1|sparks light fire!!!|  0.0|[sparks, light, f...|(262144,[91799,10...|
|  2|elephants like simba|  0.0|[elephants, like,...|(262144,[53025,17...|
|  3|hadoop is an elep...|  1.0|[hadoop, is, an, ...|(262144,[1

In [29]:
lr = LogisticRegression(maxIter=10, regParam=0.001)

logistic_model = lr.fit(hashes) # Uses columns named features/label by default

In [30]:
test = spark.createDataFrame([
    (5, "simba has a spark"),
    (6, "hadoop"),
    (7, "mapreduce in spark"),
    (8, "apache hadoop")
], ["id", "text"])

# What do we need to do to this to get a prediction?
preds = logistic_model.transform(hashingTF.transform(regexTokenizer.transform(test)))
preds.select('text', 'prediction', 'probability').show()

+------------------+----------+--------------------+
|              text|prediction|         probability|
+------------------+----------+--------------------+
| simba has a spark|       0.0|[0.78779795057740...|
|            hadoop|       1.0|[0.02996000405249...|
|mapreduce in spark|       1.0|[0.02396543994089...|
|     apache hadoop|       1.0|[0.02996000405249...|
+------------------+----------+--------------------+

