# Demo 1: Naive Bayes

In [1]:
! head -5 iris.csv

Id,SepalLengthCm,SepalWidthCm,PetalLengthCm,PetalWidthCm,Species
1,5.1,3.5,1.4,0.2,Iris-setosa
2,4.9,3.0,1.4,0.2,Iris-setosa
3,4.7,3.2,1.3,0.2,Iris-setosa
4,4.6,3.1,1.5,0.2,Iris-setosa


In [2]:
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
struct = StructType([
        StructField('Id', IntegerType(),True),
        StructField('SepalLengthCm', DoubleType(), True),
        StructField('SepalWidthCm', DoubleType(), True),
        StructField('PetalLengthCm', DoubleType(), True),
        StructField('PetalWidthCm', DoubleType(), True),
        StructField('Species', StringType(), True)
    ])

In [4]:
df_sp = spark.read.csv('iris.csv', header = True, schema = struct)

In [None]:
spark.read.csv?

In [5]:
df_sp.show(5)

+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows



In [6]:
df_sp.describe().show()

+-------+------------------+------------------+-------------------+------------------+------------------+--------------+
|summary|                Id|     SepalLengthCm|       SepalWidthCm|     PetalLengthCm|      PetalWidthCm|       Species|
+-------+------------------+------------------+-------------------+------------------+------------------+--------------+
|  count|               150|               150|                150|               150|               150|           150|
|   mean|              75.5| 5.843333333333335| 3.0540000000000007|3.7586666666666693|1.1986666666666672|          null|
| stddev|43.445367992456916|0.8280661279778637|0.43359431136217375| 1.764420419952262|0.7631607417008414|          null|
|    min|                 1|               4.3|                2.0|               1.0|               0.1|   Iris-setosa|
|    max|               150|               7.9|                4.4|               6.9|               2.5|Iris-virginica|
+-------+------------------+----

In [None]:
VectorAssembler?

In [7]:
vecAssembler = VectorAssembler(inputCols=['SepalLengthCm', 'SepalWidthCm','PetalLengthCm','PetalWidthCm'], \
                              outputCol = 'features')
df_sp = vecAssembler.transform(df_sp)
df_sp.show(5)

+---+-------------+------------+-------------+------------+-----------+-----------------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|         features|
+---+-------------+------------+-------------+------------+-----------+-----------------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|
+---+-------------+------------+-------------+------------+-----------+-----------------+
only showing top 5 rows



In [8]:
df_sp.select('Species').distinct().show()

+---------------+
|        Species|
+---------------+
| Iris-virginica|
|    Iris-setosa|
|Iris-versicolor|
+---------------+



In [9]:
df_sp.count()

150

In [10]:
# trasform species into number
stringIndexer = StringIndexer(inputCol='Species', outputCol='label')
df_sp = stringIndexer.fit(df_sp).transform(df_sp)
df_sp.show(5)

+---+-------------+------------+-------------+------------+-----------+-----------------+-----+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|         features|label|
+---+-------------+------------+-------------+------------+-----------+-----------------+-----+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|[4.9,3.0,1.4,0.2]|  0.0|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|  0.0|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|  0.0|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|  0.0|
+---+-------------+------------+-------------+------------+-----------+-----------------+-----+
only showing top 5 rows



In [16]:
#df_sp.show(150)

In [34]:
# split into training set and testing set
df_train, df_test = df_sp.randomSplit([.75, .25])
df_train.show(5)

+---+-------------+------------+-------------+------------+-----------+-----------------+-----+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|         features|label|
+---+-------------+------------+-------------+------------+-----------+-----------------+-----+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|[5.1,3.5,1.4,0.2]|  0.0|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|[4.7,3.2,1.3,0.2]|  0.0|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|[4.6,3.1,1.5,0.2]|  0.0|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|[5.0,3.6,1.4,0.2]|  0.0|
|  8|          5.0|         3.4|          1.5|         0.2|Iris-setosa|[5.0,3.4,1.5,0.2]|  0.0|
+---+-------------+------------+-------------+------------+-----------+-----------------+-----+
only showing top 5 rows



In [35]:
nb = NaiveBayes(featuresCol='features',labelCol='label')
model = nb.fit(df_train)

In [18]:
NaiveBayes?

In [36]:
df_predict = model.transform(df_test.select('features','label'))
df_predict.show(3)

+-----------------+-----+--------------------+--------------------+----------+
|         features|label|       rawPrediction|         probability|prediction|
+-----------------+-----+--------------------+--------------------+----------+
|[4.9,3.0,1.4,0.2]|  0.0|[-11.395821829877...|[0.64130642673914...|       0.0|
|[5.4,3.9,1.7,0.4]|  0.0|[-14.045362239663...|[0.66914240728118...|       0.0|
|[4.6,3.4,1.4,0.3]|  0.0|[-11.989473142755...|[0.64847840144876...|       0.0|
+-----------------+-----+--------------------+--------------------+----------+
only showing top 3 rows



In [39]:
df_predict.take(3)

[Row(features=DenseVector([4.9, 3.0, 1.4, 0.2]), label=0.0, rawPrediction=DenseVector([-11.3958, -12.439, -12.9709]), probability=DenseVector([0.6413, 0.2259, 0.1327]), prediction=0.0),
 Row(features=DenseVector([5.4, 3.9, 1.7, 0.4]), label=0.0, rawPrediction=DenseVector([-14.0454, -15.1871, -15.7872]), probability=DenseVector([0.6691, 0.2136, 0.1172]), prediction=0.0),
 Row(features=DenseVector([4.6, 3.4, 1.4, 0.3]), label=0.0, rawPrediction=DenseVector([-11.9895, -13.0635, -13.5968]), probability=DenseVector([0.6485, 0.2216, 0.13]), prediction=0.0)]

In [37]:
df_predict.select('rawPrediction').head(3)

[Row(rawPrediction=DenseVector([-11.3958, -12.439, -12.9709])),
 Row(rawPrediction=DenseVector([-14.0454, -15.1871, -15.7872])),
 Row(rawPrediction=DenseVector([-11.9895, -13.0635, -13.5968]))]

In [38]:
evaluator = MulticlassClassificationEvaluator()
evaluator.evaluate(df_predict)

0.9729052360631307

# Demo 2: Logistic Regression

In [55]:
#from pandas.io.data import DataReader
from pandas_datareader import data
from pyspark.sql.functions import udf
from pyspark.ml.classification import LogisticRegression
import time

In [56]:
df_pd = data.DataReader('AMZN','google').reset_index()

In [59]:
df_pd.head(3)

Unnamed: 0,Date,Open,High,Low,Close,Volume
0,2010-01-04,136.25,136.61,133.14,133.9,7600543
1,2010-01-05,133.43,135.48,131.81,134.69,8856456
2,2010-01-06,134.6,134.73,131.65,132.25,7180977


In [60]:
df_pd.tail(3)

Unnamed: 0,Date,Open,High,Low,Close,Volume
1767,2017-01-11,793.66,799.5,789.51,799.02,2992791
1768,2017-01-12,800.31,814.13,799.5,813.64,4873922
1769,2017-01-13,814.32,821.65,811.4,817.14,3791945


In [62]:
df_sp = spark.createDataFrame(df_pd)

In [65]:
df_sp.show(3)

+-------------------+------+------+------+------+-------+
|               Date|  Open|  High|   Low| Close| Volume|
+-------------------+------+------+------+------+-------+
|1262563200000000000|136.25|136.61|133.14| 133.9|7600543|
|1262649600000000000|133.43|135.48|131.81|134.69|8856456|
|1262736000000000000| 134.6|134.73|131.65|132.25|7180977|
+-------------------+------+------+------+------+-------+
only showing top 3 rows



In [75]:
# covert datatype
df_sp1 = df_sp.withColumn('data_time',df_sp.Date.cast('timestamp'))

In [76]:
df_sp1.show(3)

+-------------------+------+------+------+------+-------+--------------------+
|               Date|  Open|  High|   Low| Close| Volume|           data_time|
+-------------------+------+------+------+------+-------+--------------------+
|1262563200000000000|136.25|136.61|133.14| 133.9|7600543|180282-09-13 05:1...|
|1262649600000000000|133.43|135.48|131.81|134.69|8856456|219406-10-20 22:0...|
|1262736000000000000| 134.6|134.73|131.65|132.25|7180977|34543-07-25 05:54...|
+-------------------+------+------+------+------+-------+--------------------+
only showing top 3 rows



In [67]:
df_sp.Date.cast?

In [77]:
df_sp1.dtypes

[('Date', 'bigint'),
 ('Open', 'double'),
 ('High', 'double'),
 ('Low', 'double'),
 ('Close', 'double'),
 ('Volume', 'bigint'),
 ('data_time', 'timestamp')]

In [80]:
# convert date in pandas first
df_pd.Date = df_pd.Date.apply(lambda x: time.mktime(x.timetuple()))

In [81]:
df_sp = spark.createDataFrame(df_pd)
df_sp = df_sp.withColumn('date_time', df_sp.Date.cast('timestamp').cast('date'))
df_sp.show(3)

+-----------+------+------+------+------+-------+----------+
|       Date|  Open|  High|   Low| Close| Volume| date_time|
+-----------+------+------+------+------+-------+----------+
| 1.262592E9|136.25|136.61|133.14| 133.9|7600543|2010-01-04|
|1.2626784E9|133.43|135.48|131.81|134.69|8856456|2010-01-05|
|1.2627648E9| 134.6|134.73|131.65|132.25|7180977|2010-01-06|
+-----------+------+------+------+------+-------+----------+
only showing top 3 rows



In [78]:
?df_pd.Date.apply

In [79]:
?time.mktime

In [82]:
# create lable colum
def is_bull(close_price, open_price):
    if (close_price - open_price) > 0:
        return 1
    else:
        return 0

price_change = udf(is_bull, ByteType())
df_sp = df_sp.withColumn('label', price_change(df_sp.Close, df_sp.Open))
df_sp.show(3)

+-----------+------+------+------+------+-------+----------+-----+
|       Date|  Open|  High|   Low| Close| Volume| date_time|label|
+-----------+------+------+------+------+-------+----------+-----+
| 1.262592E9|136.25|136.61|133.14| 133.9|7600543|2010-01-04|    0|
|1.2626784E9|133.43|135.48|131.81|134.69|8856456|2010-01-05|    1|
|1.2627648E9| 134.6|134.73|131.65|132.25|7180977|2010-01-06|    0|
+-----------+------+------+------+------+-------+----------+-----+
only showing top 3 rows



In [None]:
# create new features:
# 1) last day's price range (High - Low)
# 2) last day's volume (scaled)

In [83]:
# 1) price range
df_sp1 = df_sp.withColumn('price_range', df_sp.High - df_sp.Low)
df_sp1.show(3)

+-----------+------+------+------+------+-------+----------+-----+------------------+
|       Date|  Open|  High|   Low| Close| Volume| date_time|label|       price_range|
+-----------+------+------+------+------+-------+----------+-----+------------------+
| 1.262592E9|136.25|136.61|133.14| 133.9|7600543|2010-01-04|    0|3.4700000000000273|
|1.2626784E9|133.43|135.48|131.81|134.69|8856456|2010-01-05|    1|3.6699999999999875|
|1.2627648E9| 134.6|134.73|131.65|132.25|7180977|2010-01-06|    0| 3.079999999999984|
+-----------+------+------+------+------+-------+----------+-----+------------------+
only showing top 3 rows



In [85]:
df_sp1.select('price_range').show(3)

+------------------+
|       price_range|
+------------------+
|3.4700000000000273|
|3.6699999999999875|
| 3.079999999999984|
+------------------+
only showing top 3 rows



In [87]:
# 2) scale volume
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import StandardScaler
from datetime import timedelta

In [89]:
transfer_to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())
df_sp2 = df_sp1.withColumn('volume_vector', transfer_to_vector(df_sp1.Volume))
df_sp2.select('Volume','volume_vector').show(3)

+-------+-------------+
| Volume|volume_vector|
+-------+-------------+
|7600543|  [7600543.0]|
|8856456|  [8856456.0]|
|7180977|  [7180977.0]|
+-------+-------------+
only showing top 3 rows



In [90]:
StandardScaler?

In [91]:
standardScaler = StandardScaler(withMean=True, inputCol='volume_vector',outputCol='volume_scaled')
df_sp2 = standardScaler.fit(df_sp2).transform(df_sp2)
df_sp2.show(3)

+-----------+------+------+------+------+-------+----------+-----+------------------+-------------+--------------------+
|       Date|  Open|  High|   Low| Close| Volume| date_time|label|       price_range|volume_vector|       volume_scaled|
+-----------+------+------+------+------+-------+----------+-----+------------------+-------------+--------------------+
| 1.262592E9|136.25|136.61|133.14| 133.9|7600543|2010-01-04|    0|3.4700000000000273|  [7600543.0]|[0.9717404687204194]|
|1.2626784E9|133.43|135.48|131.81|134.69|8856456|2010-01-05|    1|3.6699999999999875|  [8856456.0]|[1.3787982646085433]|
|1.2627648E9| 134.6|134.73|131.65|132.25|7180977|2010-01-06|    0| 3.079999999999984|  [7180977.0]|[0.8357536518074672]|
+-----------+------+------+------+------+-------+----------+-----+------------------+-------------+--------------------+
only showing top 3 rows



In [92]:
df_sp2.select('date_time','volume_scaled','price_range','label').show(3)

+----------+--------------------+------------------+-----+
| date_time|       volume_scaled|       price_range|label|
+----------+--------------------+------------------+-----+
|2010-01-04|[0.9717404687204194]|3.4700000000000273|    0|
|2010-01-05|[1.3787982646085433]|3.6699999999999875|    1|
|2010-01-06|[0.8357536518074672]| 3.079999999999984|    0|
+----------+--------------------+------------------+-----+
only showing top 3 rows



In [93]:
vecAssembler = VectorAssembler(inputCols=['volume_scaled','price_range'],outputCol='features')
df_sp2 = vecAssembler.transform(df_sp2)
df_sp2.show(3)

+-----------+------+------+------+------+-------+----------+-----+------------------+-------------+--------------------+--------------------+
|       Date|  Open|  High|   Low| Close| Volume| date_time|label|       price_range|volume_vector|       volume_scaled|            features|
+-----------+------+------+------+------+-------+----------+-----+------------------+-------------+--------------------+--------------------+
| 1.262592E9|136.25|136.61|133.14| 133.9|7600543|2010-01-04|    0|3.4700000000000273|  [7600543.0]|[0.9717404687204194]|[0.97174046872041...|
|1.2626784E9|133.43|135.48|131.81|134.69|8856456|2010-01-05|    1|3.6699999999999875|  [8856456.0]|[1.3787982646085433]|[1.37879826460854...|
|1.2627648E9| 134.6|134.73|131.65|132.25|7180977|2010-01-06|    0| 3.079999999999984|  [7180977.0]|[0.8357536518074672]|[0.83575365180746...|
+-----------+------+------+------+------+-------+----------+-----+------------------+-------------+--------------------+--------------------+
only s

In [94]:
df_sp2.select('features','label').show(3)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.97174046872041...|    0|
|[1.37879826460854...|    1|
|[0.83575365180746...|    0|
+--------------------+-----+
only showing top 3 rows



In [95]:
df_train, df_test = df_sp2.randomSplit([.7, .3])

In [96]:
lr = LogisticRegression(featuresCol='features', labelCol='label')
model = lr.fit(df_train)

In [97]:
df_predict = model.transform(df_test.select('date_time','label','features'))
df_predict.show(3)

+----------+-----+--------------------+--------------------+--------------------+----------+
| date_time|label|            features|       rawPrediction|         probability|prediction|
+----------+-----+--------------------+--------------------+--------------------+----------+
|2010-01-04|    0|[0.97174046872041...|[-0.1528829540082...|[0.46185353297942...|       1.0|
|2010-01-05|    1|[1.37879826460854...|[-0.1885715199592...|[0.45299632203571...|       1.0|
|2010-01-06|    0|[0.83575365180746...|[-0.1453093883969...|[0.46373643866626...|       1.0|
+----------+-----+--------------------+--------------------+--------------------+----------+
only showing top 3 rows



In [None]:
evaluator = MulticlassClassificationEvaluator()
eva