In [1]:
import findspark
findspark.init()
import pyspark# only run after findspark.init()
from pyspark.sqlimport SparkSession
spark = SparkSession.builder.getOrCreate()
df= spark.sql('''select 'spark' as hello ''')
df.show()

SyntaxError: invalid syntax (<ipython-input-1-edde57d30296>, line 4)

In [2]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local[2]").appName("word count").getOrCreate()

In [17]:
#read all files of type csv in the location mentioned, when setting "Header" to true, means the first line of files name columns and are not included in data. All types are assumed to be string.
#"inferSchema" automatically infer column types. It requires one extra pass over the data and is false by default. 
#Coalesce it is recommended to use coalesce while reducing no. of partitions, it only reads the rows with description that are not null.
sales = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")\
.coalesce(5)\
.where("Description IS NOT NULL")
#this reads the json file in this specified folder location.
simpleDF = spark.read.json("Spark-The-Definitive-Guide-master/data/simple-ml")
#this reads the parquet file in this specified folder location.
scaleDF = spark.read.parquet("Spark-The-Definitive-Guide-master/data/simple-ml-scaling")

In [18]:
from pyspark.ml.feature import StandardScaler
#StandardScaler standarizes a feature by subtracting the mean and then scaling to unit variance. Unit variance means dividing all the values by the standard deviation.
sScaler = StandardScaler().setInputCol("features")
# Compute summary statistics by fitting the StandardScaler and Normalize each feature to have unit standard deviation.
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+-----------------------------------+
| id|      features|StandardScaler_c93a10577b2d__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



In [19]:
from pyspark.ml.feature import MinMaxScaler
#Transform features by scaling each feature to a given range, which is from 5 to 10 here.
#replaces the values below 1.0 and 1.0 in features to 5.0 in MinMaxScaler new column. And so on rescale each value in features
#column to fit the range.
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
#Scale features of fittedminMax according to feature_range.
# rescale each feature to range [min, max].
fittedminMax.transform(scaleDF).show()

+---+--------------+---------------------------------+
| id|      features|MinMaxScaler_a452a61c2e5c__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|                 [10.0,10.0,10.0]|
+---+--------------+---------------------------------+



In [20]:
from pyspark.ml.feature import Normalizer
#Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each feature to have unit norm. 
#It takes parameter p, which specifies the p-norm used for normalization. 
#This normalization can help standardize your input data and improve the behavior of learning algorithms.
manhattanDistance = Normalizer().setP(1).setInputCol("features")
# Normalize each feature using norm.
manhattanDistance.transform(scaleDF).show()

+---+--------------+-------------------------------+
| id|      features|Normalizer_ebd61adb20d8__output|
+---+--------------+-------------------------------+
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  1|[3.0,10.1,3.0]|           [0.18633540372670...|
+---+--------------+-------------------------------+



In [21]:
from pyspark.ml.feature import StringIndexer
#StringIndexer encodes a string column of labels to a column of label indices.
#in lab column he replaced each good value to 1.0 in labelInd and each bad value to be 0.0 in labelInd.
#It shows the first 20 rows only.
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|labelInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|     1.0|
|  red| bad|     1| 38.97187133755819|     0.0|
|  red| bad|     2|14.386294994851129|     0.0|
|  red| bad|    16|14.386294994851129|     0.0|
|  red|good|    45| 38.97187133755819|     1.0|
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|  

In [22]:
#StringIndexer encodes a string column of value1 to a column of value indices.
#here encodes value1 column to valueInd coumn.
#If the input column is numeric, we cast it to string and index the string values. As value1 is a numeric column.
valIndexer = StringIndexer().setInputCol("value1").setOutputCol("valueInd")
valIndexer.fit(simpleDF).transform(simpleDF).show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|valueInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     0.0|
| blue| bad|     8|14.386294994851129|     7.0|
| blue| bad|    12|14.386294994851129|     1.0|
|green|good|    15| 38.97187133755819|     3.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     2.0|
|  red|good|    35|14.386294994851129|     5.0|
|  red| bad|     1| 38.97187133755819|     0.0|
|  red| bad|     2|14.386294994851129|     4.0|
|  red| bad|    16|14.386294994851129|     2.0|
|  red|good|    45| 38.97187133755819|     6.0|
|green|good|     1|14.386294994851129|     0.0|
| blue| bad|     8|14.386294994851129|     7.0|
| blue| bad|    12|14.386294994851129|     1.0|
|green|good|    15| 38.97187133755819|     3.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     2.0|
|  red|good|    35|14.386294994851129|  

In [24]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
#First, StringIndexer encodes a string column of color to a column of color indices.
#Then, One-hot encoding transforms the values in colorInd into a binary vector with at most a single one-value per row,
#that indicates the input color index.
#Since there are three values, the vector is of length 2
lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
ohe = OneHotEncoder().setInputCol("colorInd")
ohe.fit(colorLab).transform(colorLab).show()

+-----+--------+----------------------------------+
|color|colorInd|OneHotEncoder_0b6899e3c10b__output|
+-----+--------+----------------------------------+
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     

In [25]:
from pyspark.ml.feature import RegexTokenizer
#RegexTokenizer() takes as input Description column and returns as output DescOut.
#This Function takes String from Description column and make the Capital letters to small, and put the whole string
#into array with comma between each word and another.
rt = RegexTokenizer()\
.setInputCol("Description")\
.setOutputCol("DescOut")\
.setPattern(" ")\
.setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

+-----------------------------------+------------------------------------------+
|Description                        |DescOut                                   |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT                 |[rabbit, night, light]                    |
|DOUGHNUT LIP GLOSS                 |[doughnut, lip, gloss]                    |
|12 MESSAGE CARDS WITH ENVELOPES    |[12, message, cards, with, envelopes]     |
|BLUE HARMONICA IN BOX              |[blue, harmonica, in, box]                |
|GUMBALL COAT RACK                  |[gumball, coat, rack]                     |
|SKULLS  WATER TRANSFER TATTOOS     |[skulls, water, transfer, tattoos]        |
|FELTCRAFT GIRL AMELIE KIT          |[feltcraft, girl, amelie, kit]            |
|CAMOUFLAGE LED TORCH               |[camouflage, led, torch]                  |
|WHITE SKULL HOT WATER BOTTLE       |[white, skull, hot, water, bottle]        |
|ENGLISH ROSE HOT WATER BOTT

In [28]:
from pyspark.ml.feature import ChiSqSelector, Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn\
.transform(sales.select("Description", "CustomerId"))\
.where("CustomerId IS NOT NULL")
prechi = fittedCV.transform(tokenized)\
.where("CustomerId IS NOT NULL")
chisq = ChiSqSelector()\
.setFeaturesCol("countVec")\
.setLabelCol("CustomerId")\
.setNumTopFeatures(2)
chisq.fit(prechi).transform(prechi)\
.drop("customerId", "Description", "DescOut").show()

NameError: name 'fittedCV' is not defined