## Data Preprocessing using PySpark
### Agenda
<hr>
* Understanding How DataFrames are stored internally
* Why Preprocessing ?
* Vectorizing Data
* Scaling Techniques
* Binning Techniques
* Encoding Techniques
* Text Processing
* Dimensionality Reduction
* Dealing with missing data using Imputer
* Converting data to higher derivatives
* Custom Transformer

<hr>

### 1. Understanding How DataFrames are stored internally
<hr>
* Spark 1.0 to 1.3: It started with RDD’s where data is represented as Java Objects.
* Spark 1.4 to 1.6: Deprioritised Java objects. DataSet and DataFrame evolved where data is stored in row-based format.
* Spark 2.x: Support for Vectorized Parquet which is columnar in-memory data is added.

<img src="https://github.com/awantik/machine-learning-slides/blob/master/ML-Pipeline.png?raw=true">

### 2. Why Preprocessing 
<hr>
* Learning algorithms have affinity towards certain pattern of data.
* Unscaled or unstandardized data have might have unacceptable prediction
* Learning algorithms understands only number, converting text image to number is required
* Preprocessing refers to transformation before feeding data to learning models
* PySpark wants training data to be vectorized. One column will be holding the feature data.
* DataFrame is columner datastructure & thus, same column data will be in same partition. 

<hr>

<img src="https://github.com/awantik/machine-learning-slides/blob/master/pipeline-ml.png?raw=true" width="800px">

* Format for Machine Learning

<img src="https://github.com/zekelabs/machine-learning-using-pyspark/blob/master/vec.PNG?raw=true">

### 2. Vectorizing Data
<hr>
* Preprocessing transformers can be applied on columns of vectors.
* Data of neumeric columns can be fed to VectorAssembler which can do the vectorization

In [6]:
from pyspark.ml.feature import VectorAssembler
dataFrame = spark.createDataFrame(zip([1,2,3],[5,6,7],[7,8,9]),['A','B','C'])
display(dataFrame)

A,B,C
1,5,7
2,6,8
3,7,9


In [7]:
ve = VectorAssembler(inputCols=['A','B','C'], outputCol='vec_feature')
out = ve.transform(dataFrame)
display(out)

A,B,C,vec_feature
1,5,7,"List(1, 3, List(), List(1.0, 5.0, 7.0))"
2,6,8,"List(1, 3, List(), List(2.0, 6.0, 8.0))"
3,7,9,"List(1, 3, List(), List(3.0, 7.0, 9.0))"


### 3. Scaling
<hr> 
* A transformer for bringing data to same scale. Linear scling techniques like StandardScaler & MinMaxScaler
* If data is normally distributed, StandardScaling works well. 
* Otherwise, MinMaxScaler comes very handy
* StandardScaler - Subtract mean of column & div by standard deviation
* MinMaxScaler - Subtract min of column & div by difference between max & min

In [9]:
from pyspark.ml.feature import StandardScaler, MinMaxScaler

mm_scaler = MinMaxScaler(inputCol='vec_feature', outputCol='minmax_scaled')
ss_scaler = StandardScaler(inputCol='vec_feature', outputCol='standard_scaled', withMean=True, withStd=True)

In [10]:
mm = mm_scaler.fit(out)
ss = ss_scaler.fit(out)

In [11]:
res = mm.transform(out)
res = ss.transform(res)
display(res)

A,B,C,vec_feature,minmax_scaled,standard_scaled
1,5,7,"List(1, 3, List(), List(1.0, 5.0, 7.0))","List(1, 3, List(), List(0.0, 0.0, 0.0))","List(1, 3, List(), List(-1.0, -1.0, -1.0))"
2,6,8,"List(1, 3, List(), List(2.0, 6.0, 8.0))","List(1, 3, List(), List(0.5, 0.5, 0.5))","List(1, 3, List(), List(0.0, 0.0, 0.0))"
3,7,9,"List(1, 3, List(), List(3.0, 7.0, 9.0))","List(1, 3, List(), List(1.0, 1.0, 1.0))","List(1, 3, List(), List(1.0, 1.0, 1.0))"


#### Normalization across vector
* Transforms each vector to a unit norm.
* Reduces complexity of model & make model more generalized.
* Spark allows to configure p-norm

<img src="https://d2vlcm61l7u1fs.cloudfront.net/media%2Fb14%2Fb140f0cd-b2e2-4f62-a3fe-31c1021f5ae3%2FphpIOG6p7.png">

In [13]:
from pyspark.ml.feature import Normalizer

In [14]:
normalizer = Normalizer(inputCol="vec_feature", outputCol="normFeatures", p=1.0)

In [15]:
norm_out = normalizer.transform(out)

In [16]:
display(norm_out)

A,B,C,vec_feature,normFeatures
1,5,7,"List(1, 3, List(), List(1.0, 5.0, 7.0))","List(1, 3, List(), List(0.07692307692307693, 0.38461538461538464, 0.5384615384615384))"
2,6,8,"List(1, 3, List(), List(2.0, 6.0, 8.0))","List(1, 3, List(), List(0.125, 0.375, 0.5))"
3,7,9,"List(1, 3, List(), List(3.0, 7.0, 9.0))","List(1, 3, List(), List(0.15789473684210525, 0.3684210526315789, 0.47368421052631576))"


### 4. Binning Techniques
<hr>
* There are bunch of transformers for converting continues values to categorical.
* There are many learning algorithms which expects categorical values & 
* Binerizer converts continues values into 0 or 1.
* Bucketizer converts continues values into configued buckets.
* QuantileDiscretizer converts continues values into configured number of buckets

#### Binarizer
* Some learning algorithms like Bernoulli naive Bayes expects feature data to be in 0/1.
* We need to transform data using BInarizer before feeding it to learning algorithm

In [19]:
from pyspark.ml.feature import Binarizer

In [20]:
from pyspark.ml.feature import VectorAssembler
dataFrame = spark.createDataFrame(zip([1,2,3],[5,6,7],[17,-3,89]),['A','B','C'])
display(dataFrame)

A,B,C
1,5,17
2,6,-3
3,7,89


In [21]:
bin = Binarizer(inputCol='C',outputCol='bin_C')

In [22]:
dataFrame = dataFrame.withColumn('C', dataFrame.C.cast('double'))
bin_out = bin.transform(dataFrame)

In [23]:
display(bin_out)

A,B,C,bin_C
1,5,17.0,1.0
2,6,-3.0,0.0
3,7,89.0,1.0


##### Bucketizer
* Used for converting Continues data into categorical values.
* Learning algorithm like Multinomial Naive Bayes expects counts & this algorithm becomes handy

In [25]:
from pyspark.ml.feature import Bucketizer

In [26]:
splits = [-float("inf"), -20, -10, 0.0, 10, 20, float("inf")]

In [27]:
bucket = Bucketizer(splits=splits, inputCol='C',outputCol='bucket_out')

In [28]:
bucket_out = bucket.transform(dataFrame)

In [29]:
display(bucket_out)

A,B,C,bucket_out
1,5,17.0,4.0
2,6,-3.0,2.0
3,7,89.0,5.0


##### QuantileDiscretizer
* Configuration involves number of buckets.
* What goes in which bucket is decided by the algorithm rather than configuration

In [31]:
from pyspark.ml.feature import QuantileDiscretizer

In [32]:
quantiledis = QuantileDiscretizer(numBuckets=2, inputCol='C', outputCol='quant_out')

In [33]:
quant = quantiledis.fit(dataFrame)

In [34]:
quant_out = quant.transform(dataFrame)

In [35]:
display(quant_out)

A,B,C,quant_out
1,5,17.0,1.0
2,6,-3.0,0.0
3,7,89.0,1.0


### 5. Encoding Techniques
<hr>
* Learning algorithms only understands number.
* Categorical columns containing string needs to be mapped into numbers.
* Nominal values should be one hot encoded.
* Ordinal values should be assigned values as per order if expected by learning algorithm.

##### StringIndexer
* Converts string to numbers.
* Numbers are assigned based either on frequency or alphabetical order, stringOrderType can be configured for this.
* Handling unseen or invalid data can be controlled by handleInvalid
  - throw an exception (which is the default)
  - skip the row containing the unseen label entirely
  - put unseen labels in a special additional bucket, at index numLabels

In [38]:
from pyspark.ml.feature import StringIndexer
string_indexer = StringIndexer(inputCol='category', outputCol='outcategory', stringOrderType="frequencyDesc", handleInvalid="keep")

In [39]:
dataFrame = spark.createDataFrame(zip([1,2,22],[5,6,22],['orange','mango','orange']),['A','B','category'])

In [40]:
display(dataFrame)

A,B,category
1,5,orange
2,6,mango
22,22,orange


In [41]:
string_indexer = string_indexer.fit(dataFrame)
string_indexer_out = string_indexer.transform(dataFrame)

In [42]:
display(string_indexer_out)

A,B,category,outcategory
1,5,orange,0.0
2,6,mango,1.0
22,22,orange,0.0


##### OneHotEstimator
* Nominal values are required to be converted to one hot vector.
* OneHotEstimator is a transformer designed to do this.
* String data first have to converted to number & then OneHotEstimator can work upon it.

In [44]:
from pyspark.ml.feature import OneHotEncoderEstimator

In [45]:
ohe = OneHotEncoderEstimator(inputCols=['outcategory'], outputCols=['ohe_outcategory'])

In [46]:
ohe = ohe.fit(string_indexer_out)

In [47]:
ohe_dataframe = ohe.transform(string_indexer_out)

In [48]:
display(ohe_dataframe)

A,B,category,outcategory,ohe_outcategory
1,5,orange,0.0,"List(0, 1, List(0), List(1.0))"
2,6,mango,1.0,"List(0, 1, List(), List())"
22,22,orange,0.0,"List(0, 1, List(0), List(1.0))"


### 6. Text Processing
<hr>
* Again, machine learning models don't understand text.
* We need to convert text into vectors.
* First step is tokenization - Converting text to sequence of words
* Followed by tokenization, minimization of word space happens.
* Tokens with multiple words using NGrams
* The vector representation can be word frequencies using CountVectorizer.
* Also, vector representation can be importance of word using TFIDF
* Using HashingVectorizer, words can be hashed into buckets.

##### Tokenizer
* Convert text into seq of words
* Tokenizer splits' sentences bases on white spaces only.
* For much more robust behaviour RegexTokenizer can be used.

In [51]:
from pyspark.ml.feature import Tokenizer

In [52]:
sentenceData = spark.createDataFrame([
    (1.0, "The food is very tasty. I would recommand this to all my friends"),
    (0.0, "I wish they could be more organized. Services seems very much laid back."),
    (1.0, "Loved the ambiance. Good place to be in weekdays.")
], ["label", "sentence"])

In [53]:
tokenizer = Tokenizer(inputCol='sentence',outputCol='tokenized_sentence')

In [54]:
tokenized_sentence = tokenizer.transform(sentenceData)

In [55]:
display(tokenized_sentence)

label,sentence,tokenized_sentence
1.0,The food is very tasty. I would recommand this to all my friends,"List(the, food, is, very, tasty., i, would, recommand, this, to, all, my, friends)"
0.0,I wish they could be more organized. Services seems very much laid back.,"List(i, wish, they, could, be, more, organized., services, seems, very, much, laid, back.)"
1.0,Loved the ambiance. Good place to be in weekdays.,"List(loved, the, ambiance., good, place, to, be, in, weekdays.)"


##### StopWordsRemover
* Not all words are important.
* StopWordsRemover transformer allows to drop words.
* This technique also reduces dimensionality of data.

In [57]:
from pyspark.ml.feature import StopWordsRemover

In [58]:
stwr = StopWordsRemover(inputCol='tokenized_sentence', outputCol='stopwordsremoved_sentence', stopWords=['the','is','i','in','to','be'], caseSensitive=False)

In [59]:
stopwords_removed_sentence = stwr.transform(tokenized_sentence)

In [60]:
display(stopwords_removed_sentence)

label,sentence,tokenized_sentence,stopwordsremoved_sentence
1.0,The food is very tasty. I would recommand this to all my friends,"List(the, food, is, very, tasty., i, would, recommand, this, to, all, my, friends)","List(food, very, tasty., would, recommand, this, all, my, friends)"
0.0,I wish they could be more organized. Services seems very much laid back.,"List(i, wish, they, could, be, more, organized., services, seems, very, much, laid, back.)","List(wish, they, could, more, organized., services, seems, very, much, laid, back.)"
1.0,Loved the ambiance. Good place to be in weekdays.,"List(loved, the, ambiance., good, place, to, be, in, weekdays.)","List(loved, ambiance., good, place, weekdays.)"


##### NGrams
* Sometimes multiple words together gives the meaning & not singluar words.
* Based on configuration NGrams creates token of multiple words

In [62]:
from pyspark.ml.feature import NGram

In [63]:
ngram = NGram(inputCol='stopwordsremoved_sentence', outputCol='ngram_sentence', n=2)

In [64]:
ngram_sentence = ngram.transform(stopwords_removed_sentence)

In [65]:
display(ngram_sentence)

label,sentence,tokenized_sentence,stopwordsremoved_sentence,ngram_sentence
1.0,The food is very tasty. I would recommand this to all my friends,"List(the, food, is, very, tasty., i, would, recommand, this, to, all, my, friends)","List(food, very, tasty., would, recommand, this, all, my, friends)","List(food very, very tasty., tasty. would, would recommand, recommand this, this all, all my, my friends)"
0.0,I wish they could be more organized. Services seems very much laid back.,"List(i, wish, they, could, be, more, organized., services, seems, very, much, laid, back.)","List(wish, they, could, more, organized., services, seems, very, much, laid, back.)","List(wish they, they could, could more, more organized., organized. services, services seems, seems very, very much, much laid, laid back.)"
1.0,Loved the ambiance. Good place to be in weekdays.,"List(loved, the, ambiance., good, place, to, be, in, weekdays.)","List(loved, ambiance., good, place, weekdays.)","List(loved ambiance., ambiance. good, good place, place weekdays.)"


##### CountVectorizer
* Assigns one word to each column.
* Value of the column for a row is fequency of the word in the text

In [67]:
from pyspark.ml.feature import CountVectorizer

In [68]:
cv = CountVectorizer(inputCol='stopwordsremoved_sentence', outputCol='countvectorizer_sentence')

In [69]:
cv = cv.fit(stopwords_removed_sentence)

In [70]:
cv_sentence = cv.transform(stopwords_removed_sentence)

In [71]:
display(cv_sentence)

label,sentence,tokenized_sentence,stopwordsremoved_sentence,countvectorizer_sentence
1.0,The food is very tasty. I would recommand this to all my friends,"List(the, food, is, very, tasty., i, would, recommand, this, to, all, my, friends)","List(food, very, tasty., would, recommand, this, all, my, friends)","List(0, 24, List(0, 2, 4, 6, 7, 10, 11, 16, 20), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,I wish they could be more organized. Services seems very much laid back.,"List(i, wish, they, could, be, more, organized., services, seems, very, much, laid, back.)","List(wish, they, could, more, organized., services, seems, very, much, laid, back.)","List(0, 24, List(0, 5, 8, 9, 12, 13, 14, 17, 18, 21, 22), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
1.0,Loved the ambiance. Good place to be in weekdays.,"List(loved, the, ambiance., good, place, to, be, in, weekdays.)","List(loved, ambiance., good, place, weekdays.)","List(0, 24, List(1, 3, 15, 19, 23), List(1.0, 1.0, 1.0, 1.0, 1.0))"


##### TFIDF 
* Returns a sparse vector, where value corresponding to each column is in range of 0 & 1.
* Importance of word is derived from how many times it appears in document verses entire corpus.

In [73]:
sentenceData = spark.createDataFrame([
    (1.0, "The food is very tasty . I would recommand this to all my friends"),
    (0.0, "I wish they could be more organized. Food is tasty. Services seems very much laid back."),
    (1.0, "Loved the ambiance. Good place to be in weekdays. Tasty Food.")
], ["label", "sentence"])

In [74]:
tokenized_sentence = tokenizer.transform(sentenceData)

In [75]:
display(tokenized_sentence)

label,sentence,tokenized_sentence
1.0,The food is very tasty . I would recommand this to all my friends,"List(the, food, is, very, tasty, ., i, would, recommand, this, to, all, my, friends)"
0.0,I wish they could be more organized. Food is tasty. Services seems very much laid back.,"List(i, wish, they, could, be, more, organized., food, is, tasty., services, seems, very, much, laid, back.)"
1.0,Loved the ambiance. Good place to be in weekdays. Tasty Food.,"List(loved, the, ambiance., good, place, to, be, in, weekdays., tasty, food.)"


In [76]:
from pyspark.ml.feature import HashingTF,IDF

In [77]:
hashingtf = HashingTF(inputCol='tokenized_sentence', outputCol='hashed_sentence')

In [78]:
hashed_sentence = hashingtf.transform(tokenized_sentence)

In [79]:
display(hashed_sentence)

label,sentence,tokenized_sentence,hashed_sentence
1.0,The food is very tasty . I would recommand this to all my friends,"List(the, food, is, very, tasty, ., i, would, recommand, this, to, all, my, friends)","List(0, 262144, List(1536, 15889, 24417, 37852, 68867, 103838, 108541, 121133, 135560, 194490, 205044, 210040, 222394, 227406), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
0.0,I wish they could be more organized. Food is tasty. Services seems very much laid back.,"List(i, wish, they, could, be, more, organized., food, is, tasty., services, seems, very, much, laid, back.)","List(0, 262144, List(4333, 15889, 20719, 24417, 76764, 100620, 121133, 131149, 147765, 151536, 158129, 167152, 195807, 210040, 224471, 239029), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"
1.0,Loved the ambiance. Good place to be in weekdays. Tasty Food.,"List(loved, the, ambiance., good, place, to, be, in, weekdays., tasty, food.)","List(0, 262144, List(33933, 61231, 66865, 103838, 113432, 167152, 176674, 205044, 216731, 222453, 227406), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))"


In [80]:
idf = IDF(inputCol='hashed_sentence', outputCol='idf_sentence')

In [81]:
idf = idf.fit(hashed_sentence)

In [82]:
idf_sentence = idf.transform(hashed_sentence)

In [83]:
display(idf_sentence)

label,sentence,tokenized_sentence,hashed_sentence,idf_sentence
1.0,The food is very tasty . I would recommand this to all my friends,"List(the, food, is, very, tasty, ., i, would, recommand, this, to, all, my, friends)","List(0, 262144, List(1536, 15889, 24417, 37852, 68867, 103838, 108541, 121133, 135560, 194490, 205044, 210040, 222394, 227406), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(0, 262144, List(1536, 15889, 24417, 37852, 68867, 103838, 108541, 121133, 135560, 194490, 205044, 210040, 222394, 227406), List(0.6931471805599453, 0.28768207245178085, 0.28768207245178085, 0.6931471805599453, 0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.6931471805599453, 0.28768207245178085, 0.28768207245178085, 0.6931471805599453, 0.28768207245178085))"
0.0,I wish they could be more organized. Food is tasty. Services seems very much laid back.,"List(i, wish, they, could, be, more, organized., food, is, tasty., services, seems, very, much, laid, back.)","List(0, 262144, List(4333, 15889, 20719, 24417, 76764, 100620, 121133, 131149, 147765, 151536, 158129, 167152, 195807, 210040, 224471, 239029), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(0, 262144, List(4333, 15889, 20719, 24417, 76764, 100620, 121133, 131149, 147765, 151536, 158129, 167152, 195807, 210040, 224471, 239029), List(0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.6931471805599453, 0.6931471805599453, 0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.6931471805599453))"
1.0,Loved the ambiance. Good place to be in weekdays. Tasty Food.,"List(loved, the, ambiance., good, place, to, be, in, weekdays., tasty, food.)","List(0, 262144, List(33933, 61231, 66865, 103838, 113432, 167152, 176674, 205044, 216731, 222453, 227406), List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0))","List(0, 262144, List(33933, 61231, 66865, 103838, 113432, 167152, 176674, 205044, 216731, 222453, 227406), List(0.6931471805599453, 0.6931471805599453, 0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.28768207245178085, 0.6931471805599453, 0.6931471805599453, 0.28768207245178085))"


##### FeatureHasher
* Projects a set of categorical or numerical values into a feature vector of fixed dimension.
* FeatureHasher works on multiple columns
* Columns can be neumeric columns, string columns or binary columns.

In [85]:
from pyspark.ml.feature import FeatureHasher

dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")

featurized = hasher.transform(dataset)
featurized.show(truncate=False)

##### Word2Vec
* Returns a dense representation of text.
* Vector size is configurable
* An efficeint way to reduce vectorspace & reduce computation.

In [87]:
from pyspark.ml.feature import Word2Vec

In [88]:
wc = Word2Vec(vectorSize=3, minCount=0, inputCol='stopwordsremoved_sentence', outputCol='word2vec_sentence')

In [89]:
wc = wc.fit(stopwords_removed_sentence)

In [90]:
word2vec_stentence = wc.transform(stopwords_removed_sentence)

In [91]:
display(word2vec_stentence)

label,sentence,tokenized_sentence,stopwordsremoved_sentence,word2vec_sentence
1.0,The food is very tasty. I would recommand this to all my friends,"List(the, food, is, very, tasty., i, would, recommand, this, to, all, my, friends)","List(food, very, tasty., would, recommand, this, all, my, friends)","List(1, 3, List(), List(0.001430635547472371, 0.028169848904427554, 0.06262729689478874))"
0.0,I wish they could be more organized. Services seems very much laid back.,"List(i, wish, they, could, be, more, organized., services, seems, very, much, laid, back.)","List(wish, they, could, more, organized., services, seems, very, much, laid, back.)","List(1, 3, List(), List(0.021398486019196836, -0.024643760555508463, 0.02174479878422889))"
1.0,Loved the ambiance. Good place to be in weekdays.,"List(loved, the, ambiance., good, place, to, be, in, weekdays.)","List(loved, ambiance., good, place, weekdays.)","List(1, 3, List(), List(-0.01969942003488541, -0.020766012882813812, -0.022866672463715078))"


### 7. Dimensionality Reduction
<hr>
* Reduce noise (unimportant columns) is an important step in getting data ready for machine learning.
* We can use feature selecting techniques as well as feature deriving techniques like pca etc.

##### VectorSlicer
* This takes a feature vector and outputs a new feature vector with a subarray of the original features.
* We can pass the columns of our interest & those columns will be retained.

In [94]:
from pyspark.ml.feature import VectorSlicer

In [95]:
dataFrame = spark.createDataFrame(zip([1,2,3],[5,6,7],[17,-3,89]),['A','B','C'])
va = VectorAssembler(inputCols=['A','B','C'], outputCol='vec_sentence')
vec_sentences = va.transform(dataFrame)
display(vec_sentences)

A,B,C,vec_sentence
1,5,17,"List(1, 3, List(), List(1.0, 5.0, 17.0))"
2,6,-3,"List(1, 3, List(), List(2.0, 6.0, -3.0))"
3,7,89,"List(1, 3, List(), List(3.0, 7.0, 89.0))"


In [96]:
vec_slicer = VectorSlicer(inputCol='vec_sentence', outputCol='vec_sliced_sentence',indices=[1])

In [97]:
vec_slice_sentences = vec_slicer.transform(vec_sentences)

In [98]:
display(vec_slice_sentences)

A,B,C,vec_sentence,vec_sliced_sentence
1,5,17,"List(1, 3, List(), List(1.0, 5.0, 17.0))","List(1, 1, List(), List(5.0))"
2,6,-3,"List(1, 3, List(), List(2.0, 6.0, -3.0))","List(1, 1, List(), List(6.0))"
3,7,89,"List(1, 3, List(), List(3.0, 7.0, 89.0))","List(1, 1, List(), List(7.0))"


##### ChiSeqSelector
* Transformer for selecting catagorical features for selecting catagorical label

In [100]:
income_data = spark.read.csv('/FileStore/tables/adult.csv',header=True, inferSchema=True)

In [101]:
cat_cols = map(lambda x:x[0] ,filter(lambda x:x[1] == 'string', income_data.dtypes))
cat_cols

In [102]:
from pyspark.ml.feature import ChiSqSelector

In [103]:
for col in cat_cols:
  st = StringIndexer(inputCol=col, outputCol=col+'tf').fit(income_data)
  income_data = st.transform(income_data)

In [104]:
display(income_data.head(5))

age,workclass,fnlwgt,education,educational-num,marital-status,occupation,relationship,race,gender,capital-gain,capital-loss,hours-per-week,native-country,income,workclasstf,educationtf,marital-statustf,occupationtf,relationshiptf,racetf,gendertf,native-countrytf,incometf
25,Private,226802,11th,7,Never-married,Machine-op-inspct,Own-child,Black,Male,0,0,40,United-States,<=50K,0.0,5.0,1.0,6.0,2.0,1.0,0.0,0.0,0.0
38,Private,89814,HS-grad,9,Married-civ-spouse,Farming-fishing,Husband,White,Male,0,0,50,United-States,<=50K,0.0,0.0,0.0,10.0,0.0,0.0,0.0,0.0,0.0
28,Local-gov,336951,Assoc-acdm,12,Married-civ-spouse,Protective-serv,Husband,White,Male,0,0,40,United-States,>50K,2.0,6.0,0.0,12.0,0.0,0.0,0.0,0.0,1.0
44,Private,160323,Some-college,10,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,7688,0,40,United-States,>50K,0.0,1.0,0.0,6.0,0.0,1.0,0.0,0.0,1.0
18,?,103497,Some-college,10,Never-married,?,Own-child,White,Female,0,0,30,United-States,<=50K,3.0,1.0,1.0,7.0,2.0,0.0,1.0,0.0,0.0


In [105]:
vec_cols = map(lambda x:x+'tf',cat_cols)

In [106]:
vec_cols

In [107]:
vec_cols.remove('incometf')

In [108]:
vec_cols

In [109]:
ve = VectorAssembler(inputCols=vec_cols, outputCol='vec_out')

In [110]:
income_data = ve.transform(income_data)

In [111]:
chi2 = ChiSqSelector(numTopFeatures=4, featuresCol='vec_out', outputCol='chi2_features', labelCol='incometf')

In [112]:
chi2 = chi2.fit(income_data)

In [113]:
chi2_data = chi2.transform(income_data)

In [114]:
display(chi2_data.head(5))

age,workclass,fnlwgt,education,educational-num,marital-status,occupation,relationship,race,gender,capital-gain,capital-loss,hours-per-week,native-country,income,workclasstf,educationtf,marital-statustf,occupationtf,relationshiptf,racetf,gendertf,native-countrytf,incometf,vec_out,chi2_features
25,Private,226802,11th,7,Never-married,Machine-op-inspct,Own-child,Black,Male,0,0,40,United-States,<=50K,0.0,5.0,1.0,6.0,2.0,1.0,0.0,0.0,0.0,"List(1, 8, List(), List(0.0, 5.0, 1.0, 6.0, 2.0, 1.0, 0.0, 0.0))","List(1, 4, List(), List(0.0, 5.0, 1.0, 6.0))"
38,Private,89814,HS-grad,9,Married-civ-spouse,Farming-fishing,Husband,White,Male,0,0,50,United-States,<=50K,0.0,0.0,0.0,10.0,0.0,0.0,0.0,0.0,0.0,"List(0, 8, List(3), List(10.0))","List(0, 4, List(3), List(10.0))"
28,Local-gov,336951,Assoc-acdm,12,Married-civ-spouse,Protective-serv,Husband,White,Male,0,0,40,United-States,>50K,2.0,6.0,0.0,12.0,0.0,0.0,0.0,0.0,1.0,"List(0, 8, List(0, 1, 3), List(2.0, 6.0, 12.0))","List(0, 4, List(0, 1, 3), List(2.0, 6.0, 12.0))"
44,Private,160323,Some-college,10,Married-civ-spouse,Machine-op-inspct,Husband,Black,Male,7688,0,40,United-States,>50K,0.0,1.0,0.0,6.0,0.0,1.0,0.0,0.0,1.0,"List(0, 8, List(1, 3, 5), List(1.0, 6.0, 1.0))","List(0, 4, List(1, 3), List(1.0, 6.0))"
18,?,103497,Some-college,10,Never-married,?,Own-child,White,Female,0,0,30,United-States,<=50K,3.0,1.0,1.0,7.0,2.0,0.0,1.0,0.0,0.0,"List(1, 8, List(), List(3.0, 1.0, 1.0, 7.0, 2.0, 0.0, 1.0, 0.0))","List(1, 4, List(), List(3.0, 1.0, 1.0, 7.0))"


##### PCA
* PCA is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables into a set of values of linearly uncorrelated variables called principal components. 
* A PCA class trains a model to project vectors to a low-dimensional space using PCA.
* Data should be standardized before subjecting to PCA

In [116]:
from sklearn.datasets import load_iris
import pandas as pd

In [117]:
iris_data = load_iris()

In [118]:
df = pd.DataFrame(iris_data.data)

In [119]:
df.columns = ['PL','PW','SL','SW']

In [120]:
dataFrame = spark.createDataFrame(df)

In [121]:
ve = VectorAssembler(inputCols=list(df.columns), outputCol='vec_data')

In [122]:
ve_data = ve.transform(dataFrame)

In [123]:
display(ve_data)

PL,PW,SL,SW,vec_data
5.1,3.5,1.4,0.2,"List(1, 4, List(), List(5.1, 3.5, 1.4, 0.2))"
4.9,3.0,1.4,0.2,"List(1, 4, List(), List(4.9, 3.0, 1.4, 0.2))"
4.7,3.2,1.3,0.2,"List(1, 4, List(), List(4.7, 3.2, 1.3, 0.2))"
4.6,3.1,1.5,0.2,"List(1, 4, List(), List(4.6, 3.1, 1.5, 0.2))"
5.0,3.6,1.4,0.2,"List(1, 4, List(), List(5.0, 3.6, 1.4, 0.2))"
5.4,3.9,1.7,0.4,"List(1, 4, List(), List(5.4, 3.9, 1.7, 0.4))"
4.6,3.4,1.4,0.3,"List(1, 4, List(), List(4.6, 3.4, 1.4, 0.3))"
5.0,3.4,1.5,0.2,"List(1, 4, List(), List(5.0, 3.4, 1.5, 0.2))"
4.4,2.9,1.4,0.2,"List(1, 4, List(), List(4.4, 2.9, 1.4, 0.2))"
4.9,3.1,1.5,0.1,"List(1, 4, List(), List(4.9, 3.1, 1.5, 0.1))"


In [124]:
from pyspark.ml.feature import PCA

In [125]:
pca = PCA(k=3, inputCol='vec_data', outputCol='pca_vec_data')

In [126]:
pca = pca.fit(ve_data)

In [127]:
pca_data = pca.transform(ve_data)

In [128]:
display(pca_data)

PL,PW,SL,SW,vec_data,pca_vec_data
5.1,3.5,1.4,0.2,"List(1, 4, List(), List(5.1, 3.5, 1.4, 0.2))","List(1, 3, List(), List(-2.827135972679023, -5.641331045573361, 0.664276931510384))"
4.9,3.0,1.4,0.2,"List(1, 4, List(), List(4.9, 3.0, 1.4, 0.2))","List(1, 3, List(), List(-2.7959524821488406, -5.145166883252949, 0.8462865195139235))"
4.7,3.2,1.3,0.2,"List(1, 4, List(), List(4.7, 3.2, 1.3, 0.2))","List(1, 3, List(), List(-2.6215235581650553, -5.177378121203946, 0.6180558535094693))"
4.6,3.1,1.5,0.2,"List(1, 4, List(), List(4.6, 3.1, 1.5, 0.2))","List(1, 3, List(), List(-2.764905900474237, -5.00359941505698, 0.6050931192231175))"
5.0,3.6,1.4,0.2,"List(1, 4, List(), List(5.0, 3.6, 1.4, 0.2))","List(1, 3, List(), List(-2.7827501159516568, -5.648648294377426, 0.546535394733807))"
5.4,3.9,1.7,0.4,"List(1, 4, List(), List(5.4, 3.9, 1.7, 0.4))","List(1, 3, List(), List(-3.2314457367733738, -6.062506444034105, 0.46843947549201426))"
4.6,3.4,1.4,0.3,"List(1, 4, List(), List(4.6, 3.4, 1.4, 0.3))","List(1, 3, List(), List(-2.690452415602341, -5.23261921978429, 0.3785140093177235))"
5.0,3.4,1.5,0.2,"List(1, 4, List(), List(5.0, 3.4, 1.5, 0.2))","List(1, 3, List(), List(-2.884861104459153, -5.485129079769262, 0.6585666047727341))"
4.4,2.9,1.4,0.2,"List(1, 4, List(), List(4.4, 2.9, 1.4, 0.2))","List(1, 3, List(), List(-2.6233845324473375, -4.74392570447738, 0.6154296883939168))"
4.9,3.1,1.5,0.1,"List(1, 4, List(), List(4.9, 3.1, 1.5, 0.1))","List(1, 3, List(), List(-2.8374984110638506, -5.208032027056235, 0.8342983942440643))"


### 8. Imputer for Handling Missing Data
<hr>
* Missing data needs to be handled before feeding them to learning algorithms.
* This becomes even more challenging at large scale.
* PySpark provides transformer for handling missing data as well

In [130]:
from pyspark.ml.feature import Imputer

In [131]:
df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"], strategy="median")
model = imputer.fit(df)

model.transform(df).show()

### 8. Transforming data to higher derivatives
<hr>
* PolynomialExpansion is a transformer to convert data to higher degree
* Linear models needs data in higher degree to check possibility of a non-linear seperator

In [133]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)

### 9. Custom Transformer
<hr>
* What if all the transformers provided till now is not sufficient ?
* We can create a custom transformer.

In [135]:
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
 
class StringLengthTransformer(Transformer, HasInputCol, HasOutputCol):
 
   @keyword_only
   def __init__(self, inputCol=None, outputCol=None):
       super(StringLengthTransformer, self).__init__()
       kwargs = self._input_kwargs
       self.setParams(**kwargs)
 
   @keyword_only
   def setParams(self, inputCol=None, outputCol=None):
       kwargs = self._input_kwargs
       return self._set(**kwargs)
 
   def _transform(self, dataset):
       reverse = udf(lambda sentence: len(sentence), IntegerType())
       return dataset.withColumn(self.getOutputCol(), reverse(dataset[self.getInputCol()]))

In [136]:
df = spark.createDataFrame([("foo bar",),("hello world",)]).toDF("sentence")
strlength = StringLengthTransformer(inputCol="sentence", outputCol="len")

In [137]:
display(strlength.transform(df))

sentence,len
foo bar,7
hello world,11
