In [1]:
import findspark
findspark.init()
import pyspark# only run after findspark.init()
from pyspark.sqlimport SparkSession
spark = SparkSession.builder.getOrCreate()
df= spark.sql('''select 'spark' as hello ''')
df.show()

SyntaxError: invalid syntax (<ipython-input-1-edde57d30296>, line 4)

In [2]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder.master("local[2]").appName("word count").getOrCreate()

In [29]:
# read all files of type CSV in the location mentioned, when setting "Header" to true, this means that the first line in the files "name of columns" are not included in data.

#"inferSchema" is automatically inferred column types. It requires an additional pass over the data and is false by default.
#Coalesce is recommended to be used while reducing the number of partitions, it only reads the rows with description that are not null.

sales = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("Spark-The-Definitive-Guide-master/data/retail-data/by-day/*.csv")\
.coalesce(5)\
.where("Description IS NOT NULL")
simpleDF = spark.read.json("Spark-The-Definitive-Guide-master/data/simple-ml")
scaleDF = spark.read.parquet("Spark-The-Definitive-Guide-master/data/simple-ml-scaling")

In [30]:
from pyspark.ml.feature import StandardScaler
# By subtracting the mean and then scaling to unit variance, StandardScaler is standardized. 
#Unit variance means that all the values are divided by the standard deviation.
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

+---+--------------+-----------------------------------+
| id|      features|StandardScaler_c7920b3830e6__output|
+---+--------------+-----------------------------------+
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  0|[1.0,0.1,-1.0]|               [1.19522860933439...|
|  1| [2.0,1.1,1.0]|               [2.39045721866878...|
|  1|[3.0,10.1,3.0]|               [3.58568582800318...|
+---+--------------+-----------------------------------+



In [31]:
from pyspark.ml.feature import MinMaxScaler
# In a range from 5 to 10, features are Transformed by scaling.
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()

+---+--------------+---------------------------------+
| id|      features|MinMaxScaler_14707cba26e5__output|
+---+--------------+---------------------------------+
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  0|[1.0,0.1,-1.0]|                    [5.0,5.0,5.0]|
|  1| [2.0,1.1,1.0]|                    [7.5,5.5,7.5]|
|  1|[3.0,10.1,3.0]|                 [10.0,10.0,10.0]|
+---+--------------+---------------------------------+



In [32]:
from pyspark.ml.feature import Normalizer
# Normalizer is a transformer that is used to transform the Vector rows dataset to a normalized standard unit. 
#Parameter p is taken in order to define the normalization of p-norm. 
#This normalization can help standardize the input data and improve learning algorithm behaviour.
manhattanDistance = Normalizer().setP(1).setInputCol("features")
# Normalize each feature using norm.
manhattanDistance.transform(scaleDF).show()

+---+--------------+-------------------------------+
| id|      features|Normalizer_b95ecfe6b187__output|
+---+--------------+-------------------------------+
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  0|[1.0,0.1,-1.0]|           [0.47619047619047...|
|  1| [2.0,1.1,1.0]|           [0.48780487804878...|
|  1|[3.0,10.1,3.0]|           [0.18633540372670...|
+---+--------------+-------------------------------+



In [33]:
from pyspark.ml.feature import StringIndexer
# StringIndexer encodes a label string column to a label index column.
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|labelInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|     1.0|
|  red| bad|     1| 38.97187133755819|     0.0|
|  red| bad|     2|14.386294994851129|     0.0|
|  red| bad|    16|14.386294994851129|     0.0|
|  red|good|    45| 38.97187133755819|     1.0|
|green|good|     1|14.386294994851129|     1.0|
| blue| bad|     8|14.386294994851129|     0.0|
| blue| bad|    12|14.386294994851129|     0.0|
|green|good|    15| 38.97187133755819|     1.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     0.0|
|  red|good|    35|14.386294994851129|  

In [34]:
# StringIndexer here encodes a label float column to a label index column,
#so that the label float column is casted to string and the indexes these corresponding values.
valIndexer = StringIndexer().setInputCol("value1").setOutputCol("valueInd")
valIndexer.fit(simpleDF).transform(simpleDF).show()

+-----+----+------+------------------+--------+
|color| lab|value1|            value2|valueInd|
+-----+----+------+------------------+--------+
|green|good|     1|14.386294994851129|     0.0|
| blue| bad|     8|14.386294994851129|     7.0|
| blue| bad|    12|14.386294994851129|     1.0|
|green|good|    15| 38.97187133755819|     3.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     2.0|
|  red|good|    35|14.386294994851129|     5.0|
|  red| bad|     1| 38.97187133755819|     0.0|
|  red| bad|     2|14.386294994851129|     4.0|
|  red| bad|    16|14.386294994851129|     2.0|
|  red|good|    45| 38.97187133755819|     6.0|
|green|good|     1|14.386294994851129|     0.0|
| blue| bad|     8|14.386294994851129|     7.0|
| blue| bad|    12|14.386294994851129|     1.0|
|green|good|    15| 38.97187133755819|     3.0|
|green|good|    12|14.386294994851129|     1.0|
|green| bad|    16|14.386294994851129|     2.0|
|  red|good|    35|14.386294994851129|  

In [35]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# First, StringIndexer encodes the color column string to an index color table. 
#Then, one-hot encoding converts the values of colorInd to binary vectors with a limit of one value per row,
#indicating the color index of the data.  
#The size of the vector is 2; because there are three values
lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
ohe = OneHotEncoder().setInputCol("colorInd")
ohe.fit(colorLab).transform(colorLab).show()

+-----+--------+----------------------------------+
|color|colorInd|OneHotEncoder_81e5529a72ac__output|
+-----+--------+----------------------------------+
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|  red|     0.0|                     (2,[0],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
| blue|     2.0|                         (2,[],[])|
| blue|     2.0|                         (2,[],[])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     1.0|                     (2,[1],[1.0])|
|green|     

In [36]:
from pyspark.ml.feature import RegexTokenizer
#It separates the string words in the column into an array, each word represents a cell in the array. 
#In addition, it removes the capital letters by changing them to small ones.
rt = RegexTokenizer()\
.setInputCol("Description")\
.setOutputCol("DescOut")\
.setPattern(" ")\
.setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

+-----------------------------------+------------------------------------------+
|Description                        |DescOut                                   |
+-----------------------------------+------------------------------------------+
|RABBIT NIGHT LIGHT                 |[rabbit, night, light]                    |
|DOUGHNUT LIP GLOSS                 |[doughnut, lip, gloss]                    |
|12 MESSAGE CARDS WITH ENVELOPES    |[12, message, cards, with, envelopes]     |
|BLUE HARMONICA IN BOX              |[blue, harmonica, in, box]                |
|GUMBALL COAT RACK                  |[gumball, coat, rack]                     |
|SKULLS  WATER TRANSFER TATTOOS     |[skulls, water, transfer, tattoos]        |
|FELTCRAFT GIRL AMELIE KIT          |[feltcraft, girl, amelie, kit]            |
|CAMOUFLAGE LED TORCH               |[camouflage, led, torch]                  |
|WHITE SKULL HOT WATER BOTTLE       |[white, skull, hot, water, bottle]        |
|ENGLISH ROSE HOT WATER BOTT

In [28]:
from pyspark.ml.feature import ChiSqSelector, Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn\
.transform(sales.select("Description", "CustomerId"))\
.where("CustomerId IS NOT NULL")
prechi = fittedCV.transform(tokenized)\
.where("CustomerId IS NOT NULL")
chisq = ChiSqSelector()\
.setFeaturesCol("countVec")\
.setLabelCol("CustomerId")\
.setNumTopFeatures(2)
chisq.fit(prechi).transform(prechi)\
.drop("customerId", "Description", "DescOut").show()

NameError: name 'fittedCV' is not defined