In [1]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
sales = spark.read.format("csv")\
 .option("header", "true")\
 .option("inferSchema", "true")\
 .load("/Users/doaasamer/Downloads/Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")\
 .coalesce(5)\
 .where("Description IS NOT NULL")

To load a dataframe we use the .read function. We can also specify the format that will be used by the .format function where we should write their fully qualified name (i.e., org.apache.spark.sql.parquet) but since there are built-ins we can use their short names (parquet, json, csv, text..). The .option function specifies extra options when loading the dataframe(also used during write operation)

In [3]:
simpleDF = spark.read.json("/Users/doaasamer/Downloads/Spark-The-Definitive-Guide/data/simple-ml")
scaleDF = spark.read.parquet("/Users/doaasamer/Downloads/Spark-The-Definitive-Guide/data/simple-ml-scaling")

read.json automatically infers the schema of a json dataset and load it as a Dataset[Row]. 
And read.parquet reads from parquet format. The result of loading a parquet file is also a DataFrame.

In [12]:
from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+-----------------------------------+
| id|      features|StandardScaler_210e0a72169b__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



The StandardScalar standardizes each feature by removing the mean(=0) and scaling to unit variance(=1) using column summary statistics on the samples in the training set before applying machine learning techniques. It normalizes each feature(column) individually. In our example, we apply the StandardScalar to scaleDF where the output column has the new normalized values with unit variance. 

In [13]:
from pyspark.ml.feature import MinMaxScaler
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()

+---+--------------+---------------------------------+
| id|      features|MinMaxScaler_9a3bd61b8321__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|                 [10.0,10.0,10.0]|
+---+--------------+---------------------------------+



The MinMaxScaler transforms a dataset of Vector rows, rescaling each feature individually to a specific range (often [0, 1]) linearly using column summary statistics. It takes(min, max) as parameters. The rescaled value is caluclated using the formula Rescaled(ei)=ei−EminEmax−Emin∗(max−min)+min. In this exmaple, the min is set to 5 and the max is set to 10 where the new rescaled results are represented in the output column. 

In [14]:
from pyspark.ml.feature import Normalizer
manhattanDistance = Normalizer().setP(1).setInputCol("features")
manhattanDistance.transform(scaleDF).show()

+---+--------------+-------------------------------+
| id|      features|Normalizer_f8cbac5a0758__output|
+---+--------------+-------------------------------+
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  1|[3.0,10.1,3.0]|           [0.18633540372670...|
+---+--------------+-------------------------------+



The Normalizer transforms a dataset of Vector rows, normalizing each Vector to have unit norm. It takes parameter p, which specifies the p-norm used for normalization. (p=2 by default.) This normalization can help standardize input data and improve the behavior of learning algorithms. However, the p is set to 1 here in this example. 

In [15]:
from pyspark.ml.feature import StringIndexer
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|labelInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|     1.0|
|  red| bad|     1| 38.97187133755819|     0.0|
|  red| bad|     2|14.386294994851129|     0.0|
|  red| bad|    16|14.386294994851129|     0.0|
|  red|good|    45| 38.97187133755819|     1.0|
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|  

The StringIndexer encodes a string column of labels to a column of label indices.  If the input column is numeric, we cast it to string and index the string values. It has 4 supported ordering options(frequencyDesc, alphabetDesc, frequencyAsc, alphabetAsc) where the most frequent label is assigned 0. In our case, we apply it now on the simpleDF table and the output indicates that "bad" is the most frequent label hence given the value 0.

In [16]:
valIndexer = StringIndexer().setInputCol("value1").setOutputCol("valueInd")
valIndexer.fit(simpleDF).transform(simpleDF).show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|valueInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     2.0|
| blue| bad|     8|14.386294994851129|     4.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     5.0|
|green|good|    12|14.386294994851129|     0.0|
|green| bad|    16|14.386294994851129|     1.0|
|  red|good|    35|14.386294994851129|     6.0|
|  red| bad|     1| 38.97187133755819|     2.0|
|  red| bad|     2|14.386294994851129|     7.0|
|  red| bad|    16|14.386294994851129|     1.0|
|  red|good|    45| 38.97187133755819|     3.0|
|green|good|     1|14.386294994851129|     2.0|
| blue| bad|     8|14.386294994851129|     4.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     5.0|
|green|good|    12|14.386294994851129|     0.0|
|green| bad|    16|14.386294994851129|     1.0|
|  red|good|    35|14.386294994851129|  

Whereas in the previous part we applied the StringIndexer on the column "lab" showing that "bad" is the most frequent label, in this part, we apply it on "value1" column showing that the most frequent label in this case is "12" as its given the value 0. 

In [18]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
ohe = OneHotEncoder().setInputCol("colorInd")
ohe.transform(colorLab).show()

+-----+--------+----------------------------------+
|color|colorInd|OneHotEncoder_278c4e7ab963__output|
+-----+--------+----------------------------------+
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     

The OneHotEncoder maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features to use categorical features. It is common for string type inputs to encode using StringIndexer first which was done here in this example where we applied it on the column "color" in the table "simpleDF" resulting in the column "colorInd" then the OneHotEncoder was applied resulting in the output column. Since the first part "colorInd" resulted in 3 values {0.0, 1.0, 2.0}, the vector in the output column is of length 2 where (length, position of 1.0, 1.0) hence for 2.0 the last 2 entries in the vector are empty.

In [19]:
from pyspark.ml.feature import RegexTokenizer
rt = RegexTokenizer()\
 .setInputCol("Description")\
 .setOutputCol("DescOut")\
 .setPattern(" ")\
 .setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

+-----------------------------------+------------------------------------------+
|Description                        |DescOut                                   |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT                 |[rabbit, night, light]                    |
|DOUGHNUT LIP GLOSS                 |[doughnut, lip, gloss]                    |
|12 MESSAGE CARDS WITH ENVELOPES    |[12, message, cards, with, envelopes]     |
|BLUE HARMONICA IN BOX              |[blue, harmonica, in, box]                |
|GUMBALL COAT RACK                  |[gumball, coat, rack]                     |
|SKULLS  WATER TRANSFER TATTOOS     |[skulls, water, transfer, tattoos]        |
|FELTCRAFT GIRL AMELIE KIT          |[feltcraft, girl, amelie, kit]            |
|CAMOUFLAGE LED TORCH               |[camouflage, led, torch]                  |
|WHITE SKULL HOT WATER BOTTLE       |[white, skull, hot, water, bottle]        |
|ENGLISH ROSE HOT WATER BOTT

Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). A simple Tokenizer class provides this functionality. RegexTokenizer allows more advanced tokenization based on regular expression (regex) matching. In our example, the pattern given to the Tokenizer is the " ", hence the output contains all the space seperated words in the column "Description" for each entry. 

In [20]:
from pyspark.ml.feature import ChiSqSelector, Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn\
 .transform(sales.select("Description", "CustomerId"))\
 .where("CustomerId IS NOT NULL")
prechi = fittedCV.transform(tokenized)\
 .where("CustomerId IS NOT NULL")
chisq = ChiSqSelector()\
 .setFeaturesCol("countVec")\
 .setLabelCol("CustomerId")\
 .setNumTopFeatures(2)
chisq.fit(prechi).transform(prechi)\
 .drop("customerId", "Description", "DescOut").show()

NameError: name 'fittedCV' is not defined

(depends on fitting the model to the dataframe)