In [102]:
import findspark
findspark.find()
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.sql('''select 'spark' as hello ''')
df.show()

IndexError: list index out of range

In [103]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local[2]").appName("Assignment 3").getOrCreate()

In [104]:
sales = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("data/retail-data/by-day/*.csv")\
.coalesce(5)\
.where("Description IS NOT NULL")
simpleDF = spark.read.json("data/simple-ml")
scaleDF = spark.read.parquet("data/simple-ml-scaling")

# spark.read
In the above script spark.read was used 3 times. once to read a <strong>csv</strong> file and another time to read a <strong>json</strong> file and the last time to read a <strong>parquet</strong>. Please note that when reading the csv file, we specified that the first row in the csv file is a header by setting the option to true and we only read the rows that has a description.

the sales data frame consists of the following attributes: <strong>InvoiceNo</strong>, <strong>StockCode</strong>, <strong>Description</strong>, <strong>Quantity</strong>, <strong>InvoiceDate</strong>, <strong>UnitPrice</strong>, <strong>CustomerID</strong> & <strong>Country</strong>

the simpleDF data frame consists of the following attributes: <strong>color</strong>, <strong>lab</strong>, <strong>value1</strong> & <strong>value2</strong>

the scaleDF data frame consists of the following attributes: <strong>id</strong> & <strong>features</strong>

the next 3 code blocks shows this in detail.

In [105]:
sales

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string]

In [106]:
simpleDF

DataFrame[color: string, lab: string, value1: bigint, value2: double]

In [107]:
scaleDF

DataFrame[id: int, features: vector]

In [108]:
from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+-----------------------------------+
| id|      features|StandardScaler_7699f4c2e4c6__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



# Standard Scaler
StandardScaler transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation. What we did is that we applied standard scaler to scaleDF which is a dataset of vector rows and we set the input to the standard scaler to be the feature attribute (which is a vector) so now the output is in StandardScaler_a44b18fd37aa__output column which holds the transformed vectors where their standard deviation is equal to 1

In [109]:
from pyspark.ml.feature import MinMaxScaler
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()

+---+--------------+---------------------------------+
| id|      features|MinMaxScaler_ceb715083c45__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|                 [10.0,10.0,10.0]|
+---+--------------+---------------------------------+



# Min Max Scaler
MinMaxScaler transforms a dataset of Vector rows, rescaling each feature to a specific range from the min value to the max value using this equation:  Rescaled(e)= ( (e−Emin)/(Emax−Emin) ) ∗ (max−min)+min. So in our case here we set the min value to 5 and the maximum value to 10 and applied the min max scaler to the scaleDF vector rows dataset with the features attribute as the input and the output is in this column MinMaxScaler_4aebd7fddf6d__output. Now we have an output within the range of 5 to 10.

In [110]:
from pyspark.ml.feature import Normalizer
manhattanDistance = Normalizer().setP(1).setInputCol("features")
manhattanDistance.transform(scaleDF).show()

+---+--------------+-------------------------------+
| id|      features|Normalizer_b411c80c0ce9__output|
+---+--------------+-------------------------------+
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  1|[3.0,10.1,3.0]|           [0.18633540372670...|
+---+--------------+-------------------------------+



# Normalizer
Normalizer from its name is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have norm of value 1. It also takes the parameter p which is used in normalization. In our case we applied the normalizer to the scaleDF vector rows dataset with the parameter p set to 1 and the features attribute as an input and the output is in Normalizer_1ac522676130__output. So now we have vectors in the output that have norm of value 1 (unit norm).

In [111]:
from pyspark.ml.feature import StringIndexer
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|labelInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|     1.0|
|  red| bad|     1| 38.97187133755819|     0.0|
|  red| bad|     2|14.386294994851129|     0.0|
|  red| bad|    16|14.386294994851129|     0.0|
|  red|good|    45| 38.97187133755819|     1.0|
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|  

# String Indexer on lab attribute
StringIndexer encodes a string column of labels to a column of label indices. In other words it checks the different strings in the column and assigns a label to every unique string. The label goes from 0 until n-1 where n is the number of different strings in the column. each string gets an assigned number according to its frequency occurance (Default setting). For example, the most repeated words will have index 0 and the second most repeated word will have the value 1 and so on. In our case we applied string indexer on simpleDF and we set lab as input and we specified the output to be labelInd. As we can see the word bad took index 0 because it occured more than good which took an index of 1. 

In [112]:
valIndexer = StringIndexer().setInputCol("value1").setOutputCol("valueInd")
valIndexer.fit(simpleDF).transform(simpleDF).show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|valueInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     2.0|
| blue| bad|     8|14.386294994851129|     4.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     5.0|
|green|good|    12|14.386294994851129|     0.0|
|green| bad|    16|14.386294994851129|     1.0|
|  red|good|    35|14.386294994851129|     6.0|
|  red| bad|     1| 38.97187133755819|     2.0|
|  red| bad|     2|14.386294994851129|     7.0|
|  red| bad|    16|14.386294994851129|     1.0|
|  red|good|    45| 38.97187133755819|     3.0|
|green|good|     1|14.386294994851129|     2.0|
| blue| bad|     8|14.386294994851129|     4.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     5.0|
|green|good|    12|14.386294994851129|     0.0|
|green| bad|    16|14.386294994851129|     1.0|
|  red|good|    35|14.386294994851129|  

# String Indexer on value 1 attribute
we did the same thing like the previous block of code but we applied string indexer on value 1 attribute. so what happened in this case is that the values in the value1 column was indexed and the most occured value which is 12 took an index of 0 and the second most occured value which is 16 took an index of 1 and so on.

In [113]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
ohe = OneHotEncoder().setInputCol("colorInd")
ohe.transform(colorLab).show()

+-----+--------+----------------------------------+
|color|colorInd|OneHotEncoder_3eea2ec35c67__output|
+-----+--------+----------------------------------+
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     

# One Hot Encoder
It converts a value to one hot encoding but in <strong>sparse</strong> form. Now what is sparse form? Sparse form is a vector of length 3. where the first value in the vector is an integer which specifies the number of bits needed to represent the one hot encoding. the second and third value are also vectors. the two vectors combined represent the value of encoding in binary. The first vector represents the locations of non-zero values and the second vector represents theses values.

In our case what we did is that we applied string indexer on the column attribute of the simpleDF data frame and the output is in colorInd. Then we applied One Hot Encoding on colorInd which outputs in OneHotEncoder_830bddd3704f__output. Now lets take a look at the results. First of all we only need 2 bits to represent 3 numbers (Please not that this is not the one hot encoding we are used to which means there could only be one bit of 1 and the rest are zeros but it just represents different combinations of bits to represent numbers). Anyways we only need 2 bits to represent 3 numbers so the first value in the vector in always 2. now value 0 will map to 01 and value 1 will map to 10 and value 2 will map to 00. lets take a look at an example.The value 1 is mapped to (2, [1], [1.0]) which means that we need a vector of 2 bits were the first value in location 1 is set 1.0 which is 10 and that is correct. 

In [114]:
from pyspark.ml.feature import RegexTokenizer
rt = RegexTokenizer()\
.setInputCol("Description")\
.setOutputCol("DescOut")\
.setPattern(" ")\
.setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

+-----------------------------------+------------------------------------------+
|Description                        |DescOut                                   |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT                 |[rabbit, night, light]                    |
|DOUGHNUT LIP GLOSS                 |[doughnut, lip, gloss]                    |
|12 MESSAGE CARDS WITH ENVELOPES    |[12, message, cards, with, envelopes]     |
|BLUE HARMONICA IN BOX              |[blue, harmonica, in, box]                |
|GUMBALL COAT RACK                  |[gumball, coat, rack]                     |
|SKULLS  WATER TRANSFER TATTOOS     |[skulls, water, transfer, tattoos]        |
|FELTCRAFT GIRL AMELIE KIT          |[feltcraft, girl, amelie, kit]            |
|CAMOUFLAGE LED TORCH               |[camouflage, led, torch]                  |
|WHITE SKULL HOT WATER BOTTLE       |[white, skull, hot, water, bottle]        |
|ENGLISH ROSE HOT WATER BOTT

# Regex Tokenizer
Regex tokenizer with " " set as regex split the sentence into words where the difference between one word and the other in the sentence could be seperated by one space or more. So in our case the Description column from sales dataframe was used as an input to the Regex Tokenizer which outputs in column DescOut. The ouput consits of an array of the words in the sentence.

In [115]:
from pyspark.ml.feature import ChiSqSelector, Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn\
.transform(sales.select("Description", "CustomerId"))\
.where("CustomerId IS NOT NULL")
prechi = fittedCV.transform(tokenized)\
.where("CustomerId IS NOT NULL")
chisq = ChiSqSelector()\
.setFeaturesCol("countVec")\
.setLabelCol("CustomerId")\
.setNumTopFeatures(2)
chisq.fit(prechi).transform(prechi)\
.drop("customerId", "Description", "DescOut").show()

NameError: name 'fittedCV' is not defined

"You may skip the last block of code as it depends on fitting the model to the data frame. It is ok."