# Feature Extraction


## TF-IDF

Term frequency-inverse document frequency (TF-IDF) is a feature vectorization method widely used in text mining to reflect the importance of a term to a document in the corpus. 


Both HashingTF and CountVectorizer can be used to generate the term frequency vectors. A few important differences:

partially reversible (CountVectorizer) vs irreversible (HashingTF) 1. since hashing is not reversible you cannot restore original input from a hash vector. From the other hand count vector with model (index) can be used to restore unordered input. As a consequence models created using hashed input can be much harder to interpret and monitor.

2. memory and computational overhead - HashingTF requires only a single data scan and no additional memory beyond original input and vector. CountVectorizer requires additional scan over the data to build a model and additional memory to store vocabulary (index). In case of unigram language model it is usually not a problem but in case of higher n-grams it can be prohibitively expensive or not feasible.

3. hashing depends on a size of the vector , hashing function and a document. Counting depends on a size of the vector, training corpus and a document.
4. a source of the information loss - in case of HashingTF it is dimensionality reduction with possible collisions. CountVectorizer discards infrequent tokens. How it affects downstream models depends on a particular use case and data.

**HashingTF** and **CountVectorizer** are the two popular alogoritms which used to generate term frequency vectors. They basically convert documents into a numerical representation which can be fed directly or with further processing into other algorithms like LDA, MinHash for Jaccard Distance, Cosine Distance.

t: term
d: document
D: corpus
|D|: the number of the elements in corpus
TF(t,d): Term Frequency: the number of times that term t appears in document d
DF(t,D): Document Frequency: the number of documents that contains term t
IDF(t, D): Inverse Document Frequency is a numerical measure of how much information a term provides

![](./image/1.png)

TFIDF(t, d, D) the product of TF and IDF

![](./image/2.png)


In [2]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark import SparkContext , SparkConf
from pyspark.sql import SparkSession
try:
    sc.stop()
except:
    pass
sc=SparkContext()
spark = SparkSession(sparkContext=sc)



In [3]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0, "Python python Spark Spark"),
    (1, "Python SQL")],
 ["document", "sentence"])
sentenceData.show(truncate=False)

+--------+-------------------------+
|document|sentence                 |
+--------+-------------------------+
|0       |Python python Spark Spark|
|1       |Python SQL               |
+--------+-------------------------+



![](./image/3.png)


In [4]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[6,8,13,16],[...|
|  0.0|(20,[0,2,7,13,15,...|
|  1.0|(20,[3,4,6,11,19]...|
+-----+--------------------+



### Countvectorizer
 CountVectorizer and CountVectorizerModel aim to help convert a collection of text documents to vectors of token counts. When an a-priori dictionary is not available, CountVectorizer can be used as an Estimator to extract the vocabulary, and generates a CountVectorizerModel. The model produces sparse representations for the documents over the vocabulary, which can then be passed to other algorithms like LDA.

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0, "Python python Spark Spark"),
    (1, "Python SQL")],
 ["document", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer  = CountVectorizer(inputCol="words", outputCol="rawFeatures")

idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])

model = pipeline.fit(sentenceData)


In [6]:
import numpy as np

total_counts = model.transform(sentenceData)\
                    .select('rawFeatures').rdd\
                    .map(lambda row: row['rawFeatures'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

vocabList = model.stages[1].vocabulary
d = {'vocabList':vocabList,'counts':total_counts}

spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+---------+------+
|vocabList|counts|
+---------+------+
|   python|   3.0|
|    spark|   2.0|
|      sql|   1.0|
+---------+------+



In [7]:
counts = model.transform(sentenceData).select('rawFeatures').collect()
counts

[Row(rawFeatures=SparseVector(3, {0: 2.0, 1: 2.0})),
 Row(rawFeatures=SparseVector(3, {0: 1.0, 2: 1.0}))]

In [8]:
model.transform(sentenceData).show(truncate=False)


+--------+-------------------------+------------------------------+-------------------+----------------------------------+
|document|sentence                 |words                         |rawFeatures        |features                          |
+--------+-------------------------+------------------------------+-------------------+----------------------------------+
|0       |Python python Spark Spark|[python, python, spark, spark]|(3,[0,1],[2.0,2.0])|(3,[0,1],[0.0,0.8109302162163288])|
|1       |Python SQL               |[python, sql]                 |(3,[0,2],[1.0,1.0])|(3,[0,2],[0.0,0.4054651081081644])|
+--------+-------------------------+------------------------------+-------------------+----------------------------------+



In [9]:
from pyspark.sql.types import ArrayType, StringType

def termsIdx2Term(vocabulary):
    def termsIdx2Term(termIndices):
        return [vocabulary[int(index)] for index in termIndices]
    return udf(termsIdx2Term, ArrayType(StringType()))

vectorizerModel = model.stages[1]
vocabList = vectorizerModel.vocabulary
vocabList

['python', 'spark', 'sql']

In [10]:
rawFeatures = model.transform(sentenceData).select('rawFeatures')
rawFeatures.show()

+-------------------+
|        rawFeatures|
+-------------------+
|(3,[0,1],[2.0,2.0])|
|(3,[0,2],[1.0,1.0])|
+-------------------+



In [11]:
from pyspark.sql.functions import udf
import pyspark.sql.functions as F
from pyspark.sql.types import  StringType, DoubleType, IntegerType

indices_udf = udf(lambda vector: vector.indices.tolist(), ArrayType(IntegerType()))
values_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType()))


rawFeatures.withColumn('indices', indices_udf(F.col('rawFeatures')))\
           .withColumn('values', values_udf(F.col('rawFeatures')))\
           .withColumn("Terms", termsIdx2Term(vocabList)("indices")).show()

+-------------------+-------+---------------+---------------+
|        rawFeatures|indices|         values|          Terms|
+-------------------+-------+---------------+---------------+
|(3,[0,1],[2.0,2.0])| [0, 1]|[2.0, 2.0, 0.0]|[python, spark]|
|(3,[0,2],[1.0,1.0])| [0, 2]|[1.0, 0.0, 1.0]|  [python, sql]|
+-------------------+-------+---------------+---------------+



In [12]:
from pyspark.ml.feature import CountVectorizer

# Input data: Each row is a bag of words with a ID.
df = spark.createDataFrame([
    (0, "a b c".split(" ")),
    (1, "a b b c a".split(" "))
], ["id", "words"])

# fit a CountVectorizerModel from the corpus.
cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)

model = cv.fit(df)

result = model.transform(df)
result.show(truncate=False)

+---+---------------+-------------------------+
|id |words          |features                 |
+---+---------------+-------------------------+
|0  |[a, b, c]      |(3,[0,1,2],[1.0,1.0,1.0])|
|1  |[a, b, b, c, a]|(3,[0,1,2],[2.0,2.0,1.0])|
+---+---------------+-------------------------+



### HashingTF

HashingTF is a Transformer which takes sets of terms and converts those sets into fixed-length feature vectors. In text processing, a “set of terms” might be a bag of words. HashingTF utilizes the hashing trick. A raw feature is mapped into an index (term) by applying a hash function. The hash function used here is MurmurHash 3. Then term frequencies are calculated based on the mapped indices. This approach avoids the need to compute a global term-to-index map, which can be expensive for a large corpus, but it suffers from potential hash collisions, where different raw features may become the same term after hashing.

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0, "Python python Spark Spark"),
    (1, "Python SQL")],
 ["document", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer  = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=5)

idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])


model = pipeline.fit(sentenceData)
model.transform(sentenceData).show(truncate=28)

+--------+-------------------------+----------------------------+-------------------+----------------------------+
|document|                 sentence|                       words|        rawFeatures|                    features|
+--------+-------------------------+----------------------------+-------------------+----------------------------+
|       0|Python python Spark Spark|[python, python, spark, s...|(5,[1,4],[2.0,2.0])|(5,[1,4],[0.8109302162163...|
|       1|               Python SQL|               [python, sql]|(5,[2,4],[1.0,1.0])|(5,[2,4],[0.4054651081081...|
+--------+-------------------------+----------------------------+-------------------+----------------------------+



+ partially reversible (CountVectorizer) vs irreversible (HashingTF) - since hashing is not reversible you cannot restore original input from a hash vector. From the other hand count vector with model (index) can be used to restore unordered input. As a consequence models created using hashed input can be much harder to interpret and monitor.

+ memory and computational overhead - HashingTF requires only a single data scan and no additional memory beyond original input and vector. CountVectorizer requires additional scan over the data to build a model and additional memory to store vocabulary (index). In case of unigram language model it is usually not a problem but in case of higher n-grams it can be prohibitively expensive or not feasible.

+ hashing depends on a size of the vector , hashing function and a document. Counting depends on a size of the vector, training corpus and a document.
+ a source of the information loss - in case of HashingTF it is dimensionality reduction with possible collisions. CountVectorizer discards infrequent tokens. How it affects downstream models depends on a particular use case and data.

##  Word2Vec
### Word Embeddings

Word2Vec is one of the popupar method to implement the Word Embeddings. Word embeddings (The best tutorial I have read. The following word and images content are from Chris Bail, PhD Duke University. So the copyright belongs to Chris Bail, PhD Duke University.) gained fame in the world of automated text analysis when it was demonstrated that they could be used to identify analogies. Figure 1 illustrates the output of a word embedding model where individual words are plotted in three dimensional space generated by the model. By examining the adjacency of words in this space, word embedding models can complete analogies such as “Man is to woman as king is to queen.” If you’d like to explore what the output of a large word embedding model looks like in more detail, check out this fantastic visualization of most words in the English language that was produced using a word embedding model called GloVE.

![](./image/w2v_1.png)

In [14]:
from pyspark.ml.feature import Word2Vec

# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])

# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)

result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [Hi, I, heard, about, Spark] => 
Vector: [0.02902129157446325,0.02679591476917267,0.008123540971428157]

Text: [I, wish, Java, could, use, case, classes] => 
Vector: [-0.03149755285786731,-0.044931650190846995,0.024991953346346105]

Text: [Logistic, regression, models, are, neat] => 
Vector: [-0.028359790518879893,0.03383468156680465,0.0024358076974749566]



### The Context Window

Word embeddings are created by identifying the words that occur within something called a “Context Window.” The Figure below illustrates context windows of varied length for a single sentence. The context window is defined by a string of words before and after a focal or “center” word that will be used to train a word embedding model. Each center word and context words can be represented as a vector of numbers that describe the presence or absence of unique words within a dataset, which is perhaps why word embedding models are often described as “word vector” models, or “word2vec” models.

![](./image/w2v_2.png)

### Two Types of Embedding Models
 Word embeddings are usually performed in one of two ways: “Continuous Bag of Words” (CBOW) or a “Skip-Gram Model.” The figure below illustrates the differences between the two models. The CBOW model reads in the context window words and tries to predict the most likely center word. The Skip-Gram Model predicts the context words given the center word. The examples above were created using the Skip-Gram model, which is perhaps most useful for people who want to identify patterns within texts to represent them in multimensional space, whereas the CBOW model is more useful in practical applications such as predictive web search.
 
![](./image/w2v_3.png)

### Word Embedding Models in PySpark

In [15]:
from pyspark.ml.feature import Word2Vec

from pyspark.ml import Pipeline

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="feature")

pipeline = Pipeline(stages=[tokenizer, word2Vec])


model = pipeline.fit(sentenceData)
result = model.transform(sentenceData)
result.show()


+--------+--------------------+--------------------+--------------------+
|document|            sentence|               words|             feature|
+--------+--------------------+--------------------+--------------------+
|       0|Python python Spa...|[python, python, ...|[-0.0316445454955...|
|       1|          Python SQL|       [python, sql]|[0.05269111506640...|
+--------+--------------------+--------------------+--------------------+



In [16]:
w2v = model.stages[1]
w2v.getVectors().show()

+------+--------------------+
|  word|              vector|
+------+--------------------+
|python|[0.08114035427570...|
| spark|[-0.1444294452667...|
|   sql|[0.02424187585711...|
+------+--------------------+



In [17]:
from pyspark.sql.functions import format_number as fmt
w2v.findSynonyms("python", 2).select("word", fmt("similarity", 5).alias("similarity")).show()

+-----+----------+
| word|similarity|
+-----+----------+
|  sql|  -0.79374|
|spark|  -0.89169|
+-----+----------+



## FeatureHasher


Feature hashing projects a set of categorical or numerical features into a feature vector of specified dimension (typically substantially smaller than that of the original feature space). This is done using the hashing trick to map features to indices in the feature vector.

The FeatureHasher transformer operates on multiple columns. Each column may contain either numeric or categorical features. Behavior and handling of column data types is as follows:

Numeric columns: For numeric features, the hash value of the column name is used to map the feature value to its index in the feature vector. By default, numeric features are not treated as categorical (even when they are integers). To treat them as categorical, specify the relevant columns using the categoricalCols parameter.
String columns: For categorical features, the hash value of the string “column_name=value” is used to map to the vector index, with an indicator value of 1.0. Thus, categorical features are “one-hot” encoded (similarly to using OneHotEncoder with dropLast=false).
Boolean columns: Boolean values are treated in the same way as string columns. That is, boolean features are represented as “column_name=true” or “column_name=false”, with an indicator value of 1.0.
Null (missing) values are ignored (implicitly zero in the resulting feature vector).

The hash function used here is also the MurmurHash 3 used in HashingTF. Since a simple modulo on the hashed value is used to determine the vector index, it is advisable to use a power of two as the numFeatures parameter; otherwise the features will not be mapped evenly to the vector indices.

Examples

Assume that we have a DataFrame with 4 input columns real, bool, stringNum, and string. These different data types as input will illustrate the behavior of the transform to produce a column of feature vectors.

real| bool|stringNum|string
----|-----|---------|------
 2.2| true|        1|   foo
 3.3|false|        2|   bar
 4.4|false|        3|   baz
 5.5|false|        4|   foo
 
Then the output of FeatureHasher.transform on this DataFrame is:

real|bool |stringNum|string|features
----|-----|---------|------|-------------------------------------------------------
2.2 |true |1        |foo   |(262144,[51871, 63643,174475,253195],[1.0,1.0,2.2,1.0])
3.3 |false|2        |bar   |(262144,[6031,  80619,140467,174475],[1.0,1.0,1.0,3.3])
4.4 |false|3        |baz   |(262144,[24279,140467,174475,196810],[1.0,1.0,4.4,1.0])
5.5 |false|4        |foo   |(262144,[63643,140467,168512,174475],[1.0,1.0,1.0,5.5])


The resulting feature vectors could then be passed to a learning algorithm.

In [18]:
from pyspark.ml.feature import FeatureHasher

dataset = spark.createDataFrame([
    (2.2, True, "1", "foo"),
    (3.3, False, "2", "bar"),
    (4.4, False, "3", "baz"),
    (5.5, False, "4", "foo")
], ["real", "bool", "stringNum", "string"])

hasher = FeatureHasher(inputCols=["real", "bool", "stringNum", "string"],
                       outputCol="features")

featurized = hasher.transform(dataset)
featurized.show(truncate=False)

+----+-----+---------+------+--------------------------------------------------------+
|real|bool |stringNum|string|features                                                |
+----+-----+---------+------+--------------------------------------------------------+
|2.2 |true |1        |foo   |(262144,[174475,247670,257907,262126],[2.2,1.0,1.0,1.0])|
|3.3 |false|2        |bar   |(262144,[70644,89673,173866,174475],[1.0,1.0,1.0,3.3])  |
|4.4 |false|3        |baz   |(262144,[22406,70644,174475,187923],[1.0,1.0,4.4,1.0])  |
|5.5 |false|4        |foo   |(262144,[70644,101499,174475,257907],[1.0,1.0,5.5,1.0]) |
+----+-----+---------+------+--------------------------------------------------------+



## RFormula

RFormula selects columns specified by an R model formula. Currently we support a limited subset of the R operators, including ‘~’, ‘.’, ‘:’, ‘+’, and ‘-‘. The basic operators are:

* ~ separate target and terms
* + concat terms, “+ 0” means removing intercept
* - remove a term, “- 1” means removing intercept
* : interaction (multiplication for numeric values, or binarized categorical values)
* . all columns except target

Suppose a and b are double columns, we use the following simple examples to illustrate the effect of RFormula:

* y ~ a + b means model y ~ w0 + w1 * a + w2 * b where w0 is the intercept and w1, w2 are coefficients.

* y ~ a + b + a:b - 1 means model y ~ w1 * a + w2 * b + w3 * a * b where w1, w2, w3 are coefficients.

RFormula produces a vector column of features and a double or string column of label. Like when formulas are used in R for linear regression, numeric columns will be cast to doubles. As to string input columns, they will first be transformed with StringIndexer using ordering determined by stringOrderType, and the last category after ordering is dropped, then the doubles will be one-hot encoded.

Suppose a string feature column containing values {'b', 'a', 'b', 'a', 'c', 'b'}, we set stringOrderType to control the encoding:


stringOrderType | Category mapped to 0 by StringIndexer |  Category dropped by RFormula
----------------|---------------------------------------|---------------------------------
'frequencyDesc' | most frequent category ('b')          | least frequent category ('c')
'frequencyAsc'  | least frequent category ('c')         | most frequent category ('b')
'alphabetDesc'  | last alphabetical category ('c')      | first alphabetical category ('a')
'alphabetAsc'   | first alphabetical category ('a')     | last alphabetical category ('c')


If the label column is of type string, it will be first transformed to double with StringIndexer using frequencyDesc ordering. If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.

Note: The ordering option stringOrderType is NOT used for the label column. When the label column is indexed, it uses the default descending frequency ordering in StringIndexer.

Examples

Assume that we have a DataFrame with the columns id, country, hour, and clicked:

id | country | hour | clicked
---|---------|------|---------
 7 | "US"    | 18   | 1.0
 8 | "CA"    | 12   | 0.0
 9 | "NZ"    | 15   | 0.0
 
 If we use RFormula with a formula string of clicked ~ country + hour, which indicates that we want to predict clicked based on country and hour, after transformation we should get the following DataFrame:
 
 

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
 7 | "US"    | 18   | 1.0     | [0.0, 0.0, 18.0] | 1.0
 8 | "CA"    | 12   | 0.0     | [0.0, 1.0, 12.0] | 0.0
 9 | "NZ"    | 15   | 0.0     | [1.0, 0.0, 15.0] | 0.0

In [19]:
from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "CA", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()

+----------+-----+
|  features|label|
+----------+-----+
|[0.0,18.0]|  1.0|
|[1.0,12.0]|  0.0|
|[1.0,15.0]|  0.0|
+----------+-----+



# Feature Transform
## Tokenizer

Tokenization is the process of taking text (such as a sentence) and breaking it into individual terms (usually words). A simple Tokenizer class provides this functionality. The example below shows how to split sentences into sequences of words.

RegexTokenizer allows more advanced tokenization based on regular expression (regex) matching. By default, the parameter “pattern” (regex, default: "\\s+") is used as delimiters to split the input text. Alternatively, users can set parameter “gaps” to false indicating the regex “pattern” denotes “tokens” rather than splitting gaps, and find all matching occurrences as the tokenization result.

In [24]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (1, "I wish Java could use case classes"),
    (2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
    .withColumn("tokens", countTokens(col("words"))).show(truncate=False)

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case classes |[i, wish, java, could, use, case, classes]|7     |
|Logistic,regression,models,are,neat|[logistic,regression,models,are,neat]     |1     |
+-----------------------------------+------------------------------------------+------+

+-----------------------------------+------------------------------------------+------+
|sentence                           |words                                     |tokens|
+-----------------------------------+------------------------------------------+------+
|Hi I heard about Spark             |[hi, i, heard, about, spark]              |5     |
|I wish Java could use case cla

In [23]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0.0, "Hi I heard about Spark"),
    (0.0, "I wish Java could use case classes"),
    (1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[6,8,13,16],[...|
|  0.0|(20,[0,2,7,13,15,...|
|  1.0|(20,[3,4,6,11,19]...|
+-----+--------------------+



## StopWordsRemover

Stop words are words which should be excluded from the input, typically because the words appear frequently and don’t carry as much meaning.

StopWordsRemover takes as input a sequence of strings (e.g. the output of a Tokenizer) and drops all the stop words from the input sequences. The list of stopwords is specified by the stopWords parameter. Default stop words for some languages are accessible by calling StopWordsRemover.loadDefaultStopWords(language), for which available options are “danish”, “dutch”, “english”, “finnish”, “french”, “german”, “hungarian”, “italian”, “norwegian”, “portuguese”, “russian”, “spanish”, “swedish” and “turkish”. A boolean parameter caseSensitive indicates if the matches should be case sensitive (false by default).

Examples

Assume that we have the following DataFrame with columns id and raw:

id | raw
----|----------
 0  | [I, saw, the, red, balloon]
 1  | [Mary, had, a, little, lamb]
 
Applying StopWordsRemover with raw as the input column and filtered as the output column, we should get the following:

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, balloon] |  [saw, red, balloon]
 1  | [Mary, had, a, little, lamb]|[Mary, little, lamb]

In [25]:
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
    (0, ["I", "saw", "the", "red", "balloon"]),
    (1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="removeded")
remover.transform(sentenceData).show(truncate=False)

+---+----------------------------+--------------------+
|id |raw                         |removeded           |
+---+----------------------------+--------------------+
|0  |[I, saw, the, red, balloon] |[saw, red, balloon] |
|1  |[Mary, had, a, little, lamb]|[Mary, little, lamb]|
+---+----------------------------+--------------------+



## n-gram
An n-gram is a sequence of n tokens (typically words) for some integer n. The NGram class can be used to transform input features into n-grams.

NGram takes as input a sequence of strings (e.g. the output of a Tokenizer). The parameter n is used to determine the number of terms in each n-gram. The output will consist of a sequence of n-grams where each n-gram is represented by a space-delimited string of n consecutive words. If the input sequence contains fewer than n strings, no output is produced.

In [26]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

from pyspark.ml.feature import NGram

sentenceData = spark.createDataFrame([
    (0.0, "I love Spark"),
    (0.0, "I love python"),
    (1.0, "I think ML is awesome")],
 ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, ngram])

model = pipeline.fit(sentenceData)

model.transform(sentenceData).show(truncate=False)

+-----+---------------------+---------------------------+--------------------------------------+
|label|sentence             |words                      |ngrams                                |
+-----+---------------------+---------------------------+--------------------------------------+
|0.0  |I love Spark         |[i, love, spark]           |[i love, love spark]                  |
|0.0  |I love python        |[i, love, python]          |[i love, love python]                 |
|1.0  |I think ML is awesome|[i, think, ml, is, awesome]|[i think, think ml, ml is, is awesome]|
+-----+---------------------+---------------------------+--------------------------------------+



In [27]:
from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



## Binarizer
Binarization is the process of thresholding numerical features to binary (0/1) features.


Binarizer takes the common parameters inputCol and outputCol, as well as the threshold for binarization. Feature values greater than the threshold are binarized to 1.0; values equal to or less than the threshold are binarized to 0.0. Both Vector and Double types are supported for inputCol.

In [28]:
from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
    (0, 0.1),
    (1, 0.8),
    (2, 0.2),
    (3,0.5)
], ["id", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()

Binarizer output with Threshold = 0.500000
+---+-------+-----------------+
| id|feature|binarized_feature|
+---+-------+-----------------+
|  0|    0.1|              0.0|
|  1|    0.8|              1.0|
|  2|    0.2|              0.0|
|  3|    0.5|              0.0|
+---+-------+-----------------+



## Bucketizer

Bucketizer transforms a column of continuous features to a column of feature buckets, where the buckets are specified by users. It takes a parameter:

* splits: Parameter for mapping continuous features into buckets. With n+1 splits, there are n buckets. A bucket defined by splits x,y holds values in the range (x,y) except the last bucket, which also includes y. Splits should be strictly increasing. Values at -inf, inf must be explicitly provided to cover all Double values; Otherwise, values outside the splits specified will be treated as errors. Two examples of splits are Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity) and Array(0.0, 1.0, 2.0).

Note that if you have no idea of the upper and lower bounds of the targeted column, you should add Double.NegativeInfinity and Double.PositiveInfinity as the bounds of your splits to prevent a potential out of Bucketizer bounds exception.

Note also that the splits that you provided have to be in strictly increasing order, i.e. s0 < s1 < s2 < ... < sn.

More details can be found in the API docs for Bucketizer.

Examples

The following example demonstrates how to bucketize a column of Doubles into another index-wised column.

In [29]:
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.0)]
df = spark.createDataFrame(data, ["id", "age"])
print(df.show())

splits = [-float("inf"),3, 10,float("inf")]
result_bucketizer = Bucketizer(splits=splits, inputCol="age",outputCol="result").transform(df)
result_bucketizer.show()

+---+----+
| id| age|
+---+----+
|  0|18.0|
|  1|19.0|
|  2| 8.0|
|  3| 5.0|
|  4| 2.0|
+---+----+

None
+---+----+------+
| id| age|result|
+---+----+------+
|  0|18.0|   2.0|
|  1|19.0|   2.0|
|  2| 8.0|   1.0|
|  3| 5.0|   1.0|
|  4| 2.0|   0.0|
+---+----+------+



In [30]:
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]

data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Transform original data into its bucket index.
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()

Bucketizer output with 4 buckets
+--------+----------------+
|features|bucketedFeatures|
+--------+----------------+
|  -999.9|             0.0|
|    -0.5|             1.0|
|    -0.3|             1.0|
|     0.0|             2.0|
|     0.2|             2.0|
|   999.9|             3.0|
+--------+----------------+



## QuantileDiscretizer

QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The number of bins is set by the numBuckets parameter. It is possible that the number of buckets used will be smaller than this value, for example, if there are too few distinct values of the input to create enough distinct quantiles.

NaN values: NaN values will be removed from the column during QuantileDiscretizer fitting. This will produce a Bucketizer model for making predictions. During the transformation, Bucketizer will raise an error when it finds NaN values in the dataset, but the user can also choose to either keep or remove NaN values within the dataset by setting handleInvalid. If the user chooses to keep NaN values, they will be handled specially and placed into their own bucket, for example, if 4 buckets are used, then non-NaN data will be put into buckets[0-3], but NaNs will be counted in a special bucket[4].

Algorithm: The bin ranges are chosen using an approximate algorithm (see the documentation for approxQuantile for a detailed description). The precision of the approximation can be controlled with the relativeError parameter. When set to zero, exact quantiles are calculated (Note: Computing exact quantiles is an expensive operation). The lower and upper bin bounds will be -Infinity and +Infinity covering all real values.

Examples

Assume that we have a DataFrame with the columns id, hour:

 id | hour
----|------
 0  | 18.0
 1  | 19.0
 2  | 8.0
 3  | 5.0
 4  | 2.2
 
 hour is a continuous feature with Double type. We want to turn the continuous feature into a categorical one. Given numBuckets = 3, we should get the following DataFrame:

 id | hour | result
 ---|------|--------
 0  | 18.0 | 2.0
 1  | 19.0 | 2.0
 2  | 8.0  | 1.0
 3  | 5.0  | 1.0
 4  | 2.2  | 0.0

In [31]:
from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()

+---+----+------+
| id|hour|result|
+---+----+------+
|  0|18.0|   2.0|
|  1|19.0|   2.0|
|  2| 8.0|   1.0|
|  3| 5.0|   1.0|
|  4| 2.2|   0.0|
+---+----+------+



In [32]:
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.0)]
df = spark.createDataFrame(data, ["id", "age"])
print(df.show())

qds = QuantileDiscretizer(numBuckets=5, inputCol="age", outputCol="buckets",
                               relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
bucketizer.transform(df).show()
bucketizer.setHandleInvalid("skip").transform(df).show()

+---+----+
| id| age|
+---+----+
|  0|18.0|
|  1|19.0|
|  2| 8.0|
|  3| 5.0|
|  4| 2.0|
+---+----+

None
+---+----+-------+
| id| age|buckets|
+---+----+-------+
|  0|18.0|    4.0|
|  1|19.0|    4.0|
|  2| 8.0|    3.0|
|  3| 5.0|    2.0|
|  4| 2.0|    1.0|
+---+----+-------+

+---+----+-------+
| id| age|buckets|
+---+----+-------+
|  0|18.0|    4.0|
|  1|19.0|    4.0|
|  2| 8.0|    3.0|
|  3| 5.0|    2.0|
|  4| 2.0|    1.0|
+---+----+-------+



If the data has NULL values, then you will get the following results:



In [33]:
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, None)]
df = spark.createDataFrame(data, ["id", "age"])
print(df.show())

splits = [-float("inf"),3, 10,float("inf")]
result_bucketizer = Bucketizer(splits=splits,
                               inputCol="age",outputCol="result").transform(df)
result_bucketizer.show()

qds = QuantileDiscretizer(numBuckets=5, inputCol="age", outputCol="buckets",
                               relativeError=0.01, handleInvalid="error")
bucketizer = qds.fit(df)
bucketizer.transform(df).show()
bucketizer.setHandleInvalid("skip").transform(df).show()

+---+----+
| id| age|
+---+----+
|  0|18.0|
|  1|19.0|
|  2| 8.0|
|  3| 5.0|
|  4|null|
+---+----+

None
+---+----+------+
| id| age|result|
+---+----+------+
|  0|18.0|   2.0|
|  1|19.0|   2.0|
|  2| 8.0|   1.0|
|  3| 5.0|   1.0|
|  4|null|  null|
+---+----+------+

+---+----+-------+
| id| age|buckets|
+---+----+-------+
|  0|18.0|    3.0|
|  1|19.0|    4.0|
|  2| 8.0|    2.0|
|  3| 5.0|    1.0|
|  4|null|   null|
+---+----+-------+

+---+----+-------+
| id| age|buckets|
+---+----+-------+
|  0|18.0|    3.0|
|  1|19.0|    4.0|
|  2| 8.0|    2.0|
|  3| 5.0|    1.0|
+---+----+-------+



## StringIndexer

StringIndexer encodes a string column of labels to a column of label indices. StringIndexer can encode multiple columns. The indices are in [0, numLabels), and four ordering options are supported: “frequencyDesc”: descending order by label frequency (most frequent label assigned 0), “frequencyAsc”: ascending order by label frequency (least frequent label assigned 0), “alphabetDesc”: descending alphabetical order, and “alphabetAsc”: ascending alphabetical order (default = “frequencyDesc”). Note that in case of equal frequency when under “frequencyDesc”/”frequencyAsc”, the strings are further sorted by alphabet.

The unseen labels will be put at index numLabels if user chooses to keep them. If the input column is numeric, we cast it to string and index the string values. When downstream pipeline components such as Estimator or Transformer make use of this string-indexed label, you must set the input column of the component to this string-indexed column name. In many cases, you can set the input column with setInputCol.

Examples

Assume that we have the following DataFrame with columns id and category:

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

category is a string column with three labels: “a”, “b”, and “c”. Applying StringIndexer with category as the input column and categoryIndex as the output column, we should get the following:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0
 
“a” gets index 0 because it is the most frequent, followed by “c” with index 1 and “b” with index 2.

Additionally, there are three strategies regarding how StringIndexer will handle unseen labels when you have fit a StringIndexer on one dataset and then use it to transform another:

throw an exception (which is the default)
skip the row containing the unseen label entirely
put unseen labels in a special additional bucket, at index numLabels

Examples

Let’s go back to our previous example but this time reuse our previously defined StringIndexer on the following dataset:

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
 4  | e

If you’ve not set how StringIndexer handles unseen labels or set it to “error”, an exception will be thrown. However, if you had called setHandleInvalid("skip"), the following dataset will be generated:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

Notice that the rows containing “d” or “e” do not appear.


If you call setHandleInvalid("keep"), the following dataset will be generated:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | d        | 3.0
 4  | e        | 3.0

Notice that the rows containing “d” or “e” are mapped to index “3.0”

In [35]:
from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+



## labelConverter

In [36]:
from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "Yes"), (1, "Yes"), (2, "Yes"), (3, "No"), (4, "No"), (5, "No")],
    ["id", "label"])

indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="labelIndex", outputCol="originalLabel")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "labelIndex", "originalLabel").show()

Transformed string column 'label' to indexed column 'labelIndex'
+---+-----+----------+
| id|label|labelIndex|
+---+-----+----------+
|  0|  Yes|       1.0|
|  1|  Yes|       1.0|
|  2|  Yes|       1.0|
|  3|   No|       0.0|
|  4|   No|       0.0|
|  5|   No|       0.0|
+---+-----+----------+

StringIndexer will store labels in output column metadata

Transformed indexed column 'labelIndex' back to original string column 'originalLabel' using labels in metadata
+---+----------+-------------+
| id|labelIndex|originalLabel|
+---+----------+-------------+
|  0|       1.0|          Yes|
|  1|       1.0|          Yes|
|  2|       1.0|          Yes|
|  3|       0.0|           No|
|  4|       0.0|           No|
|  5|       0.0|           No|
+---+----------+-------------+



In [37]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "Yes"), (1, "Yes"), (2, "Yes"), (3, "No"), (4, "No"), (5, "No")],
    ["id", "label"])

indexer = StringIndexer(inputCol="label", outputCol="labelIndex")
converter = IndexToString(inputCol="labelIndex", outputCol="originalLabel")

pipeline = Pipeline(stages=[indexer, converter])


model = pipeline.fit(df)
result = model.transform(df)

result.show()

+---+-----+----------+-------------+
| id|label|labelIndex|originalLabel|
+---+-----+----------+-------------+
|  0|  Yes|       1.0|          Yes|
|  1|  Yes|       1.0|          Yes|
|  2|  Yes|       1.0|          Yes|
|  3|   No|       0.0|           No|
|  4|   No|       0.0|           No|
|  5|   No|       0.0|           No|
+---+-----+----------+-------------+



## VectorIndexer

VectorIndexer helps index categorical features in datasets of Vectors. It can both automatically decide which features are categorical and convert original values to category indices. Specifically, it does the following:

1. Take an input column of type Vector and a parameter maxCategories.
2. Decide which features should be categorical based on the number of distinct values, where features with at most maxCategories are declared categorical.
3. Compute 0-based category indices for each categorical feature.
4. Index categorical features and transform original feature values to indices.

Indexing categorical features allows algorithms such as Decision Trees and Tree Ensembles to treat categorical features appropriately, improving performance.

Examples

In the example below, we read in a dataset of labeled points and then use VectorIndexer to decide which features should be treated as categorical. We transform the categorical feature values to their indices. This transformed data could then be passed to algorithms such as DecisionTreeRegressor that handle categorical features.

In [39]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

from pyspark.ml.feature import RFormula

df = spark.createDataFrame([
    (0, 2.2, True, "1", "foo", 'CA'),
    (1, 3.3, False, "2", "bar", 'US'),
    (0, 4.4, False, "3", "baz", 'CHN'),
    (1, 5.5, False, "4", "foo", 'AUS')
], ['label',"real", "bool", "stringNum", "string","country"])

formula = RFormula(
    formula="label ~ real + bool + stringNum + string + country",
    featuresCol="features",
    labelCol="label")

# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values
# are treated as continuous.
featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="indexedFeatures",\
                               maxCategories=2)

pipeline = Pipeline(stages=[formula, featureIndexer])

model = pipeline.fit(df)
result = model.transform(df)

result.show()

+-----+----+-----+---------+------+-------+--------------------+--------------------+
|label|real| bool|stringNum|string|country|            features|     indexedFeatures|
+-----+----+-----+---------+------+-------+--------------------+--------------------+
|    0| 2.2| true|        1|   foo|     CA|(10,[0,1,2,5,8],[...|(10,[0,1,2,5,8],[...|
|    1| 3.3|false|        2|   bar|     US|(10,[0,3,6],[3.3,...|(10,[0,3,6],[3.3,...|
|    0| 4.4|false|        3|   baz|    CHN|(10,[0,4,9],[4.4,...|(10,[0,4,9],[4.4,...|
|    1| 5.5|false|        4|   foo|    AUS|(10,[0,5,7],[5.5,...|(10,[0,5,7],[5.5,...|
+-----+----+-----+---------+------+-------+--------------------+--------------------+



## VectorAssembler

VectorAssembler is a transformer that combines a given list of columns into a single vector column. It is useful for combining raw features and features generated by different feature transformers into a single feature vector, in order to train ML models like logistic regression and decision trees. VectorAssembler accepts the following input column types: all numeric types, boolean type, and vector type. In each row, the values of the input columns will be concatenated into a vector in the specified order.

Examples

Assume that we have a DataFrame with the columns id, hour, mobile, userFeatures, and clicked:

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0
 
userFeatures is a vector column that contains three user features. We want to combine hour, mobile, and userFeatures into a single feature vector called features and use it to predict clicked or not. If we set VectorAssembler’s input columns to hour, mobile, and userFeatures and output column to features, after transformation we should get the following DataFrame:


 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|----------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]

In [40]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

output = assembler.transform(dataset)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+



## OneHotEncoder

One-hot encoding maps a categorical feature, represented as a label index, to a binary vector with at most a single one-value indicating the presence of a specific feature value from among the set of all feature values. This encoding allows algorithms which expect continuous features, such as Logistic Regression, to use categorical features. For string type input data, it is common to encode categorical features using StringIndexer first.


OneHotEncoder can transform multiple columns, returning an one-hot-encoded output vector column for each input column. It is common to merge these vectors into a single feature vector using VectorAssembler.


OneHotEncoder supports the handleInvalid parameter to choose how to handle invalid input during transforming data. Available options include ‘keep’ (any invalid inputs are assigned to an extra categorical index) and ‘error’ (throw an error).

In [41]:
from pyspark.ml.feature import OneHotEncoder

df = spark.createDataFrame([
    (0.0, 1.0),
    (1.0, 0.0),
    (2.0, 1.0),
    (0.0, 2.0),
    (0.0, 1.0),
    (2.0, 0.0)
], ["categoryIndex1", "categoryIndex2"])

encoder = OneHotEncoder(inputCols=["categoryIndex1", "categoryIndex2"],
                        outputCols=["categoryVec1", "categoryVec2"])
model = encoder.fit(df)
encoded = model.transform(df)
encoded.show()

+--------------+--------------+-------------+-------------+
|categoryIndex1|categoryIndex2| categoryVec1| categoryVec2|
+--------------+--------------+-------------+-------------+
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           1.0|           0.0|(2,[1],[1.0])|(2,[0],[1.0])|
|           2.0|           1.0|    (2,[],[])|(2,[1],[1.0])|
|           0.0|           2.0|(2,[0],[1.0])|    (2,[],[])|
|           0.0|           1.0|(2,[0],[1.0])|(2,[1],[1.0])|
|           2.0|           0.0|    (2,[],[])|(2,[0],[1.0])|
+--------------+--------------+-------------+-------------+



## Vector Assembler

In [48]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
categoricalCols = ['category']

indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]
# default setting: dropLast=True
encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()),dropLast=False)
                 for indexer in indexers ]
assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                            , outputCol="features")
pipeline = Pipeline(stages=indexers + encoders + [assembler])

model=pipeline.fit(df)
data = model.transform(df)
data.show()


+---+--------+----------------+------------------------+-------------+
| id|category|category_indexed|category_indexed_encoded|     features|
+---+--------+----------------+------------------------+-------------+
|  0|       a|             0.0|           (3,[0],[1.0])|[1.0,0.0,0.0]|
|  1|       b|             2.0|           (3,[2],[1.0])|[0.0,0.0,1.0]|
|  2|       c|             1.0|           (3,[1],[1.0])|[0.0,1.0,0.0]|
|  3|       a|             0.0|           (3,[0],[1.0])|[1.0,0.0,0.0]|
|  4|       a|             0.0|           (3,[0],[1.0])|[1.0,0.0,0.0]|
|  5|       c|             1.0|           (3,[1],[1.0])|[0.0,1.0,0.0]|
+---+--------+----------------+------------------------+-------------+



## Application: Get Dummy Variable

In [52]:
def get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol,dropLast=False):

    '''
    Get dummy variables and concat with continuous variables for ml modeling.
    :param df: the dataframe
    :param categoricalCols: the name list of the categorical data
    :param continuousCols:  the name list of the numerical data
    :param labelCol:  the name of label column
    :param dropLast:  the flag of drop last column
    :return: feature matrix

    :author: Wenqiang Feng
    :email:  von198@gmail.com

    >>> df = spark.createDataFrame([
                  (0, "a"),
                  (1, "b"),
                  (2, "c"),
                  (3, "a"),
                  (4, "a"),
                  (5, "c")
              ], ["id", "category"])

    >>> indexCol = 'id'
    >>> categoricalCols = ['category']
    >>> continuousCols = []
    >>> labelCol = []

    >>> mat = get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol)
    >>> mat.show()

    >>>
        +---+-------------+
        | id|     features|
        +---+-------------+
        |  0|[1.0,0.0,0.0]|
        |  1|[0.0,0.0,1.0]|
        |  2|[0.0,1.0,0.0]|
        |  3|[1.0,0.0,0.0]|
        |  4|[1.0,0.0,0.0]|
        |  5|[0.0,1.0,0.0]|
        +---+-------------+
    '''

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col

    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
                 for c in categoricalCols ]

    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),
                 outputCol="{0}_encoded".format(indexer.getOutputCol()),dropLast=dropLast)
                 for indexer in indexers ]

    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]
                                + continuousCols, outputCol="features")

    pipeline = Pipeline(stages=indexers + encoders + [assembler])

    model=pipeline.fit(df)
    data = model.transform(df)

    if indexCol and labelCol:
        # for supervised learning
        data = data.withColumn('label',col(labelCol))
        return data.select(indexCol,'features','label')
    elif not indexCol and labelCol:
        # for supervised learning
        data = data.withColumn('label',col(labelCol))
        return data.select('features','label')
    elif indexCol and not labelCol:
        # for unsupervised learning
        return data.select(indexCol,'features')
    elif not indexCol and not labelCol:
        # for unsupervised learning
        return data.select('features')

## Unsupervised scenario

In [53]:
df = spark.createDataFrame([
    (0, "a"),
    (1, "b"),
    (2, "c"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])
df.show()

indexCol = 'id'
categoricalCols = ['category']
continuousCols = []
labelCol = []

mat = get_dummy(df,indexCol,categoricalCols,continuousCols,labelCol)
mat.show()

+---+--------+
| id|category|
+---+--------+
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
|  5|       c|
+---+--------+

+---+-------------+
| id|     features|
+---+-------------+
|  0|[1.0,0.0,0.0]|
|  1|[0.0,0.0,1.0]|
|  2|[0.0,1.0,0.0]|
|  3|[1.0,0.0,0.0]|
|  4|[1.0,0.0,0.0]|
|  5|[0.0,1.0,0.0]|
+---+-------------+



## Supervised scenario

In [66]:
import pyspark.sql.functions as F
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.createDataFrame([
    (1, "A", "X1"),
    (2, "B", "X2"),
    (3, "B", "X3"),
    (1, "B", "X3"),
    (2, "C", "X2"),
    (3, "C", "X2"),
    (1, "C", "X1"),
    (1, "B", "X1"),
], ["ID", "TYPE", "CODE"])

types = df.select("TYPE").distinct().rdd.flatMap(lambda x: x).collect()
codes = df.select("CODE").distinct().rdd.flatMap(lambda x: x).collect()
types_expr = [F.when(F.col("TYPE") == ty, 1).otherwise(0).alias("e_TYPE_" + ty) for ty in types]
codes_expr = [F.when(F.col("CODE") == code, 1).otherwise(0).alias("e_CODE_" + code) for code in codes]
df = df.select("ID", "TYPE", "CODE", *types_expr+codes_expr)
df.show()

+---+----+----+--------+--------+--------+---------+---------+---------+
| ID|TYPE|CODE|e_TYPE_B|e_TYPE_C|e_TYPE_A|e_CODE_X1|e_CODE_X3|e_CODE_X2|
+---+----+----+--------+--------+--------+---------+---------+---------+
|  1|   A|  X1|       0|       0|       1|        1|        0|        0|
|  2|   B|  X2|       1|       0|       0|        0|        0|        1|
|  3|   B|  X3|       1|       0|       0|        0|        1|        0|
|  1|   B|  X3|       1|       0|       0|        0|        1|        0|
|  2|   C|  X2|       0|       1|       0|        0|        0|        1|
|  3|   C|  X2|       0|       1|       0|        0|        0|        1|
|  1|   C|  X1|       0|       1|       0|        1|        0|        0|
|  1|   B|  X1|       1|       0|       0|        1|        0|        0|
+---+----+----+--------+--------+--------+---------+---------+---------+



In [None]:
## Scaler

In [67]:
from pyspark.ml.feature import Normalizer, StandardScaler, MinMaxScaler, MaxAbsScaler

scaler_type = 'Normal'
if scaler_type=='Normal':
    scaler = Normalizer(inputCol="features", outputCol="scaledFeatures", p=1.0)
elif scaler_type=='Standard':
    scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                            withStd=True, withMean=False)
elif scaler_type=='MinMaxScaler':
    scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
elif scaler_type=='MaxAbsScaler':
    scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")


In [68]:
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])
df.show()


pipeline = Pipeline(stages=[scaler])

model  =pipeline.fit(df)
data = model.transform(df)
data.show()

+---+--------------+
| id|      features|
+---+--------------+
|  0|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]|
|  2|[4.0,10.0,2.0]|
+---+--------------+

+---+--------------+------------------+
| id|      features|    scaledFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+



## Normalizer

Normalizer is a Transformer which transforms a dataset of Vector rows, normalizing each Vector to have unit norm. It takes parameter p, which specifies the p-norm used for normalization. (p=2 by default.) This normalization can help standardize your input data and improve the behavior of learning algorithms.

Examples

The following example demonstrates how to load a dataset in libsvm format and then normalize each row to have unit L1 norm and unit L∞ norm.

In [69]:
from pyspark.ml.feature import Normalizer
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

# Normalize each Vector using $L^1$ norm.
normalizer = Normalizer(inputCol="features", outputCol="normFeatures", p=1.0)
l1NormData = normalizer.transform(dataFrame)
print("Normalized using L^1 norm")
l1NormData.show()

# Normalize each Vector using $L^\infty$ norm.
lInfNormData = normalizer.transform(dataFrame, {normalizer.p: float("inf")})
print("Normalized using L^inf norm")
lInfNormData.show()

Normalized using L^1 norm
+---+--------------+------------------+
| id|      features|      normFeatures|
+---+--------------+------------------+
|  0|[1.0,0.5,-1.0]|    [0.4,0.2,-0.4]|
|  1| [2.0,1.0,1.0]|   [0.5,0.25,0.25]|
|  2|[4.0,10.0,2.0]|[0.25,0.625,0.125]|
+---+--------------+------------------+

Normalized using L^inf norm
+---+--------------+--------------+
| id|      features|  normFeatures|
+---+--------------+--------------+
|  0|[1.0,0.5,-1.0]|[1.0,0.5,-1.0]|
|  1| [2.0,1.0,1.0]| [1.0,0.5,0.5]|
|  2|[4.0,10.0,2.0]| [0.4,1.0,0.2]|
+---+--------------+--------------+



## StandardScaler

StandardScaler transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation and/or zero mean. It takes parameters:

* withStd: True by default. Scales the data to unit standard deviation.
* withMean: False by default. Centers the data with mean before scaling. It will build a dense output, so take care when applying to sparse input.

StandardScaler is an Estimator which can be fit on a dataset to produce a StandardScalerModel; this amounts to computing summary statistics. The model can then transform a Vector column in a dataset to have unit standard deviation and/or zero mean features.

Note that if the standard deviation of a feature is zero, it will return default 0.0 value in the Vector for that feature.

Examples

The following example demonstrates how to load a dataset in libsvm format and then normalize each feature to have unit standard deviation.

In [70]:
from pyspark.ml.feature import Normalizer, StandardScaler, MinMaxScaler, MaxAbsScaler

from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                            withStd=True, withMean=False)
scaleredData = scaler.fit((dataFrame)).transform(dataFrame)
scaleredData.show(truncate=False)

+---+--------------+------------------------------------------------------------+
|id |features      |scaledFeatures                                              |
+---+--------------+------------------------------------------------------------+
|0  |[1.0,0.5,-1.0]|[0.6546536707079771,0.09352195295828246,-0.6546536707079772]|
|1  |[2.0,1.0,1.0] |[1.3093073414159542,0.18704390591656492,0.6546536707079772] |
|2  |[4.0,10.0,2.0]|[2.6186146828319083,1.8704390591656492,1.3093073414159544]  |
+---+--------------+------------------------------------------------------------+



## MinMaxScaler

MinMaxScaler transforms a dataset of Vector rows, rescaling each feature to a specific range (often [0, 1]). It takes parameters:

min: 0.0 by default. Lower bound after transformation, shared by all features.
max: 1.0 by default. Upper bound after transformation, shared by all features.
MinMaxScaler computes summary statistics on a data set and produces a MinMaxScalerModel. The model can then transform each feature individually such that it is in the given range.

The rescaled value for a feature E is calculated as,


\begin{equation}
  Rescaled(e_i) = \frac{e_i - E_{min}}{E_{max} - E_{min}} * (max - min) + min
\end{equation}


For the case E_{max} == E_{min} , Rescaled(e_i) = 0.5 * (max + min)


Note that since zero values will probably be transformed to non-zero values, output of the transformer will be DenseVector even for sparse input.

Examples

The following example demonstrates how to load a dataset in libsvm format and then rescale each feature to [0, 1].

In [71]:
from pyspark.ml.feature import Normalizer, StandardScaler, MinMaxScaler, MaxAbsScaler

from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")
scaledData = scaler.fit((dataFrame)).transform(dataFrame)
scaledData.show(truncate=False)

+---+--------------+-----------------------------------------------------------+
|id |features      |scaledFeatures                                             |
+---+--------------+-----------------------------------------------------------+
|0  |[1.0,0.5,-1.0]|(3,[],[])                                                  |
|1  |[2.0,1.0,1.0] |[0.3333333333333333,0.05263157894736842,0.6666666666666666]|
|2  |[4.0,10.0,2.0]|[1.0,1.0,1.0]                                              |
+---+--------------+-----------------------------------------------------------+



## MaxAbsScaler

MaxAbsScaler transforms a dataset of Vector rows, rescaling each feature to range [-1, 1] by dividing through the maximum absolute value in each feature. It does not shift/center the data, and thus does not destroy any sparsity.

MaxAbsScaler computes summary statistics on a data set and produces a MaxAbsScalerModel. The model can then transform each feature individually to range [-1, 1].

Examples

The following example demonstrates how to load a dataset in libsvm format and then rescale each feature to [-1, 1].

In [72]:
from pyspark.ml.feature import Normalizer, StandardScaler, MinMaxScaler, MaxAbsScaler

from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
    (0, Vectors.dense([1.0, 0.5, -1.0]),),
    (1, Vectors.dense([2.0, 1.0, 1.0]),),
    (2, Vectors.dense([4.0, 10.0, 2.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")
scaledData = scaler.fit((dataFrame)).transform(dataFrame)
scaledData.show(truncate=False)

+---+--------------+----------------+
|id |features      |scaledFeatures  |
+---+--------------+----------------+
|0  |[1.0,0.5,-1.0]|[0.25,0.05,-0.5]|
|1  |[2.0,1.0,1.0] |[0.5,0.1,0.5]   |
|2  |[4.0,10.0,2.0]|[1.0,1.0,1.0]   |
+---+--------------+----------------+



## PCA

PCA is a statistical procedure that uses an orthogonal transformation to convert a set of observations of possibly correlated variables into a set of values of linearly uncorrelated variables called principal components. A PCA class trains a model to project vectors to a low-dimensional space using PCA. The example below shows how to project 5-dimensional feature vectors into 3-dimensional principal components.

In [73]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
        (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
        (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)

+-----------------------------------------------------------+
|pcaFeatures                                                |
+-----------------------------------------------------------+
|[1.6485728230883807,-4.013282700516296,-5.524543751369388] |
|[-4.645104331781534,-1.1167972663619026,-5.524543751369387]|
|[-6.428880535676489,-5.337951427775355,-5.524543751369389] |
+-----------------------------------------------------------+



## DCT & Discrete Cosine Transform (DCT)

The Discrete Cosine Transform transforms a length N real-valued sequence in the time domain into another length N real-valued sequence in the frequency domain. A DCT class provides this functionality, implementing the DCT-II and scaling the result by 1/2–√ such that the representing matrix for the transform is unitary. No shift is applied to the transformed sequence (e.g. the 0th element of the transformed sequence is the 0th DCT coefficient and not the N/2th).

Examples

In [74]:
from pyspark.ml.feature import DCT
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([0.0, 1.0, -2.0, 3.0]),),
    (Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),
    (Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(truncate=False)

+----------------------------------------------------------------+
|featuresDCT                                                     |
+----------------------------------------------------------------+
|[1.0,-1.1480502970952693,2.0000000000000004,-2.7716385975338604]|
|[-1.0,3.378492794482933,-7.000000000000001,2.9301512653149677]  |
|[4.0,9.304453421915744,11.000000000000002,1.5579302036357163]   |
+----------------------------------------------------------------+



## PolynomialExpansion
Polynomial expansion is the process of expanding your features into a polynomial space, which is formulated by an n-degree combination of original dimensions. A PolynomialExpansion class provides this functionality. The example below shows how to expand your features into a 3-degree polynomial space.

Examples

In [80]:
from pyspark.ml.feature import PolynomialExpansion
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (Vectors.dense([2.0, 1.0]),),
    (Vectors.dense([0.0, 0.0]),),
    (Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)

+----------+------------------------------------------+
|features  |polyFeatures                              |
+----------+------------------------------------------+
|[2.0,1.0] |[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0]     |
|[0.0,0.0] |[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]     |
|[3.0,-1.0]|[3.0,9.0,27.0,-1.0,-3.0,-9.0,1.0,3.0,-1.0]|
+----------+------------------------------------------+



## StringIndexer
StringIndexer encodes a string column of labels to a column of label indices. StringIndexer can encode multiple columns. The indices are in (0, numLabels), and four ordering options are supported: “frequencyDesc”: descending order by label frequency (most frequent label assigned 0), “frequencyAsc”: ascending order by label frequency (least frequent label assigned 0), “alphabetDesc”: descending alphabetical order, and “alphabetAsc”: ascending alphabetical order (default = “frequencyDesc”). Note that in case of equal frequency when under “frequencyDesc”/”frequencyAsc”, the strings are further sorted by alphabet.

The unseen labels will be put at index numLabels if user chooses to keep them. If the input column is numeric, we cast it to string and index the string values. When downstream pipeline components such as Estimator or Transformer make use of this string-indexed label, you must set the input column of the component to this string-indexed column name. In many cases, you can set the input column with setInputCol.

Examples

Assume that we have the following DataFrame with columns id and category:

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

category is a string column with three labels: “a”, “b”, and “c”. Applying StringIndexer with category as the input column and categoryIndex as the output column, we should get the following:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0

“a” gets index 0 because it is the most frequent, followed by “c” with index 1 and “b” with index 2.

Additionally, there are three strategies regarding how StringIndexer will handle unseen labels when you have fit a StringIndexer on one dataset and then use it to transform another:

throw an exception (which is the default)
skip the row containing the unseen label entirely
put unseen labels in a special additional bucket, at index numLabels
Examples

Let’s go back to our previous example but this time reuse our previously defined StringIndexer on the following dataset:

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d
 4  | e

If you’ve not set how StringIndexer handles unseen labels or set it to “error”, an exception will be thrown. However, if you had called setHandleInvalid("skip"), the following dataset will be generated:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

Notice that the rows containing “d” or “e” do not appear.

If you call setHandleInvalid("keep"), the following dataset will be generated:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | d        | 3.0
 4  | e        | 3.0
 
Notice that the rows containing “d” or “e” are mapped to index “3.0”

In [84]:
from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+



## IndexToString

Symmetrically to StringIndexer, IndexToString maps a column of label indices back to a column containing the original labels as strings. A common use case is to produce indices from labels with StringIndexer, train a model with those indices and retrieve the original labels from the column of predicted indices with IndexToString. However, you are free to supply your own labels.

Examples

Building on the StringIndexer example, let’s assume we have the following DataFrame with columns id and categoryIndex:

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0
 
Applying IndexToString with categoryIndex as the input column, originalCategory as the output column, we are able to retrieve our original labels (they will be inferred from the columns’ metadata):

 id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c

In [85]:
from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(
    [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
    ["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = indexer.fit(df)
indexed = model.transform(df)

print("Transformed string column '%s' to indexed column '%s'"
      % (indexer.getInputCol(), indexer.getOutputCol()))
indexed.show()

print("StringIndexer will store labels in output column metadata\n")

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
converted = converter.transform(indexed)

print("Transformed indexed column '%s' back to original string column '%s' using "
      "labels in metadata" % (converter.getInputCol(), converter.getOutputCol()))
converted.select("id", "categoryIndex", "originalCategory").show()

Transformed string column 'category' to indexed column 'categoryIndex'
+---+--------+-------------+
| id|category|categoryIndex|
+---+--------+-------------+
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|
+---+--------+-------------+

StringIndexer will store labels in output column metadata

Transformed indexed column 'categoryIndex' back to original string column 'originalCategory' using labels in metadata
+---+-------------+----------------+
| id|categoryIndex|originalCategory|
+---+-------------+----------------+
|  0|          0.0|               a|
|  1|          2.0|               b|
|  2|          1.0|               c|
|  3|          0.0|               a|
|  4|          0.0|               a|
|  5|          1.0|               c|
+---+-------------+----------------+



## Interaction
Interaction is a Transformer which takes vector or double-valued columns, and generates a single vector column that contains the product of all combinations of one value from each input column.

For example, if you have 2 vector type columns each of which has 3 dimensions as input columns, then you’ll get a 9-dimensional vector as the output column.

Examples

Assume that we have the following DataFrame with the columns “id1”, “vec1”, and “vec2”:

  id1|vec1          |vec2          
  ---|--------------|--------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] 
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] 
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] 
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] 
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0]     

Applying Interaction with those input columns, then interactedCol as the output column contains:

  id1|vec1          |vec2          |interactedCol                                         
  ---|--------------|--------------|--------------------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] 
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]  

In [86]:
from pyspark.ml.feature import Interaction, VectorAssembler

df = spark.createDataFrame(
    [(1, 1, 2, 3, 8, 4, 5),
     (2, 4, 3, 8, 7, 9, 8),
     (3, 6, 1, 9, 2, 3, 6),
     (4, 10, 8, 6, 9, 4, 5),
     (5, 9, 2, 7, 10, 7, 3),
     (6, 1, 1, 4, 2, 8, 4)],
    ["id1", "id2", "id3", "id4", "id5", "id6", "id7"])

assembler1 = VectorAssembler(inputCols=["id2", "id3", "id4"], outputCol="vec1")

assembled1 = assembler1.transform(df)

assembler2 = VectorAssembler(inputCols=["id5", "id6", "id7"], outputCol="vec2")

assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

interaction = Interaction(inputCols=["id1", "vec1", "vec2"], outputCol="interactedCol")

interacted = interaction.transform(assembled2)

interacted.show(truncate=False)

+---+--------------+--------------+------------------------------------------------------+
|id1|vec1          |vec2          |interactedCol                                         |
+---+--------------+--------------+------------------------------------------------------+
|1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            |
|2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     |
|3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        |
|4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]|
|5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] |
|6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]       |
+---+--------------+--------------+------------------------------------------------------+



## RobustScaler

RobustScaler transforms a dataset of Vector rows, removing the median and scaling the data according to a specific quantile range (by default the IQR: Interquartile Range, quantile range between the 1st quartile and the 3rd quartile). Its behavior is quite similar to StandardScaler, however the median and the quantile range are used instead of mean and standard deviation, which make it robust to outliers. It takes parameters:

* lower: 0.25 by default. Lower quantile to calculate quantile range, shared by all features.
* upper: 0.75 by default. Upper quantile to calculate quantile range, shared by all features.
* withScaling: True by default. Scales the data to quantile range.
* withCentering: False by default. Centers the data with median before scaling. It will build a dense output, so take care when applying to sparse input.

RobustScaler is an Estimator which can be fit on a dataset to produce a RobustScalerModel; this amounts to computing quantile statistics. The model can then transform a Vector column in a dataset to have unit quantile range and/or zero median features.

Note that if the quantile range of a feature is zero, it will return default 0.0 value in the Vector for that feature.

Examples

The following example demonstrates how to load a dataset in libsvm format and then normalize each feature to have unit quantile range.

In [88]:
from pyspark.ml.feature import RobustScaler

dataFrame = spark.read.format("libsvm").load("../data/sample_libsvm_data.txt")
scaler = RobustScaler(inputCol="features", outputCol="scaledFeatures",
                      withScaling=True, withCentering=False,
                      lower=0.25, upper=0.75)

# Compute summary statistics by fitting the RobustScaler
scalerModel = scaler.fit(dataFrame)

# Transform each feature to have unit quantile range.
scaledData = scalerModel.transform(dataFrame)
scaledData.show()

+-----+--------------------+--------------------+
|label|            features|      scaledFeatures|
+-----+--------------------+--------------------+
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[124,125,126...|(692,[124,125,126...|
|  1.0|(692,[152,153,154...|(692,[152,153,154...|
|  1.0|(692,[151,152,153...|(692,[151,152,153...|
|  0.0|(692,[129,130,131...|(692,[129,130,131...|
|  1.0|(692,[158,159,160...|(692,[158,159,160...|
|  1.0|(692,[99,100,101,...|(692,[99,100,101,...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[127,128,129...|(692,[127,128,129...|
|  1.0|(692,[154,155,156...|(692,[154,155,156...|
|  0.0|(692,[153,154,155...|(692,[153,154,155...|
|  0.0|(692,[151,152,153...|(692,[151,152,153...|
|  1.0|(692,[129,130,131...|(692,[129,130,131...|
|  0.0|(692,[154,155,156...|(692,[154,155,156...|
|  1.0|(692,[150,151,152...|(692,[150,151,152...|
|  0.0|(692,[124,125,126...|(692,[124,125,126...|


## ElementwiseProduct

ElementwiseProduct multiplies each input vector by a provided “weight” vector, using element-wise multiplication. In other words, it scales each column of the dataset by a scalar multiplier. This represents the Hadamard product between the input vector, v and transforming vector, w, to yield a result vector.

\begin{pmatrix}
v_1 \\
\vdots \\
v_N
\end{pmatrix} \circ \begin{pmatrix}
                    w_1 \\
                    \vdots \\
                    w_N
                    \end{pmatrix}
= \begin{pmatrix}
  v_1 w_1 \\
  \vdots \\
  v_N w_N
  \end{pmatrix}
  
  
 Examples

This example below demonstrates how to transform vectors using a transforming vector value.

In [89]:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors

# Create some vector data; also works for sparse vectors
data = [(Vectors.dense([1.0, 2.0, 3.0]),), (Vectors.dense([4.0, 5.0, 6.0]),)]
df = spark.createDataFrame(data, ["vector"])
transformer = ElementwiseProduct(scalingVec=Vectors.dense([0.0, 1.0, 2.0]),
                                 inputCol="vector", outputCol="transformedVector")
# Batch transform the vectors to create new column:
transformer.transform(df).show()

+-------------+-----------------+
|       vector|transformedVector|
+-------------+-----------------+
|[1.0,2.0,3.0]|    [0.0,2.0,6.0]|
|[4.0,5.0,6.0]|   [0.0,5.0,12.0]|
+-------------+-----------------+



## SQLTransformer

SQLTransformer implements the transformations which are defined by SQL statement. Currently, we only support SQL syntax like "SELECT ... FROM __THIS__ ..." where "__THIS__" represents the underlying table of the input dataset. The select clause specifies the fields, constants, and expressions to display in the output, and can be any select clause that Spark SQL supports. Users can also use Spark SQL built-in function and UDFs to operate on these selected columns. For example, SQLTransformer supports statements like:

SELECT a, a + b AS a_b FROM __THIS__
SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b
Examples

Assume that we have the following DataFrame with columns id, v1 and v2:

 id |  v1 |  v2
----|-----|-----
 0  | 1.0 | 3.0  
 2  | 2.0 | 5.0

This is the output of the SQLTransformer with statement "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__":

 id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
 0  | 1.0 | 3.0 | 4.0 | 3.0
 2  | 2.0 | 5.0 | 7.0 |10.0


In [90]:
from pyspark.ml.feature import SQLTransformer

df = spark.createDataFrame([
    (0, 1.0, 3.0),
    (2, 2.0, 5.0)
], ["id", "v1", "v2"])
sqlTrans = SQLTransformer(
    statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()

+---+---+---+---+----+
| id| v1| v2| v3|  v4|
+---+---+---+---+----+
|  0|1.0|3.0|4.0| 3.0|
|  2|2.0|5.0|7.0|10.0|
+---+---+---+---+----+



## VectorSizeHint
It can sometimes be useful to explicitly specify the size of the vectors for a column of VectorType. For example, VectorAssembler uses size information from its input columns to produce size information and metadata for its output column. While in some cases this information can be obtained by inspecting the contents of the column, in a streaming dataframe the contents are not available until the stream is started. VectorSizeHint allows a user to explicitly specify the vector size for a column so that VectorAssembler, or other transformers that might need to know vector size, can use that column as an input.

To use VectorSizeHint a user must set the inputCol and size parameters. Applying this transformer to a dataframe produces a new dataframe with updated metadata for inputCol specifying the vector size. Downstream operations on the resulting dataframe can get this size using the metadata.

VectorSizeHint can also take an optional handleInvalid parameter which controls its behaviour when the vector column contains nulls or vectors of the wrong size. By default handleInvalid is set to “error”, indicating an exception should be thrown. This parameter can also be set to “skip”, indicating that rows containing invalid values should be filtered out from the resulting dataframe, or “optimistic”, indicating that the column should not be checked for invalid values and all rows should be kept. Note that the use of “optimistic” can cause the resulting dataframe to be in an inconsistent state, meaning the metadata for the column VectorSizeHint was applied to does not match the contents of that column. Users should take care to avoid this kind of inconsistent state.

In [91]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import (VectorSizeHint, VectorAssembler)

dataset = spark.createDataFrame(
    [(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0),
     (0, 18, 1.0, Vectors.dense([0.0, 10.0]), 0.0)],
    ["id", "hour", "mobile", "userFeatures", "clicked"])

sizeHint = VectorSizeHint(
    inputCol="userFeatures",
    handleInvalid="skip",
    size=3)

datasetWithSize = sizeHint.transform(dataset)
print("Rows where 'userFeatures' is not the right size are filtered out")
datasetWithSize.show(truncate=False)

assembler = VectorAssembler(
    inputCols=["hour", "mobile", "userFeatures"],
    outputCol="features")

# This dataframe can be used by downstream transformers as before
output = assembler.transform(datasetWithSize)
print("Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'")
output.select("features", "clicked").show(truncate=False)

Rows where 'userFeatures' is not the right size are filtered out
+---+----+------+--------------+-------+
|id |hour|mobile|userFeatures  |clicked|
+---+----+------+--------------+-------+
|0  |18  |1.0   |[0.0,10.0,0.5]|1.0    |
+---+----+------+--------------+-------+

Assembled columns 'hour', 'mobile', 'userFeatures' to vector column 'features'
+-----------------------+-------+
|features               |clicked|
+-----------------------+-------+
|[18.0,1.0,0.0,10.0,0.5]|1.0    |
+-----------------------+-------+



QuantileDiscretizer
QuantileDiscretizer takes a column with continuous features and outputs a column with binned categorical features. The number of bins is set by the numBuckets parameter. It is possible that the number of buckets used will be smaller than this value, for example, if there are too few distinct values of the input to create enough distinct quantiles.

NaN values: NaN values will be removed from the column during QuantileDiscretizer fitting. This will produce a Bucketizer model for making predictions. During the transformation, Bucketizer will raise an error when it finds NaN values in the dataset, but the user can also choose to either keep or remove NaN values within the dataset by setting handleInvalid. If the user chooses to keep NaN values, they will be handled specially and placed into their own bucket, for example, if 4 buckets are used, then non-NaN data will be put into buckets[0-3], but NaNs will be counted in a special bucket[4].

Algorithm: The bin ranges are chosen using an approximate algorithm (see the documentation for approxQuantile for a detailed description). The precision of the approximation can be controlled with the relativeError parameter. When set to zero, exact quantiles are calculated (Note: Computing exact quantiles is an expensive operation). The lower and upper bin bounds will be -Infinity and +Infinity covering all real values.

Examples

Assume that we have a DataFrame with the columns id, hour:

 id | hour
 ---|-----
 0  | 18.0
 1  | 19.0
 2  | 8.0
 3  | 5.0
 4  | 2.2
    
hour is a continuous feature with Double type. We want to turn the continuous feature into a categorical one. Given numBuckets = 3, we should get the following DataFrame:

 id | hour | result
----|------|------
 0  | 18.0 | 2.0
 1  | 19.0 | 2.0
 2  | 8.0  | 1.0
 3  | 5.0  | 1.0
 4  | 2.2  | 0.0

In [92]:
from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2)]
df = spark.createDataFrame(data, ["id", "hour"])

discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

result = discretizer.fit(df).transform(df)
result.show()

+---+----+------+
| id|hour|result|
+---+----+------+
|  0|18.0|   2.0|
|  1|19.0|   2.0|
|  2| 8.0|   1.0|
|  3| 5.0|   1.0|
|  4| 2.2|   0.0|
+---+----+------+



## Imputer
The Imputer estimator completes missing values in a dataset, either using the mean or the median of the columns in which the missing values are located. The input columns should be of numeric type. Currently Imputer does not support categorical features and possibly creates incorrect values for columns containing categorical features. Imputer can impute custom values other than ‘NaN’ by .setMissingValue(custom_value). For example, .setMissingValue(0) will impute all occurrences of (0).

Note all null values in the input columns are treated as missing, and so are also imputed.

Examples

Suppose that we have a DataFrame with the columns a and b:

      a     |      b      
     -------|-----------
     1.0    |     NaN
     2.0    |     NaN
     NaN    |     3.0   
     4.0    |     4.0   
     5.0    |     5.0  
        
In this example, Imputer will replace all occurrences of Double.NaN (the default for the missing value) with the mean (the default imputation strategy) computed from the other values in the corresponding columns. In this example, the surrogate values for columns a and b are 3.0 and 4.0 respectively. After transformation, the missing values in the output columns will be replaced by the surrogate value for the relevant column.

      a     |      b     | out_a | out_b   
     -------|------------|-------|-------
     1.0    |     NaN    |  1.0  |  4.0 
     2.0    |     NaN    |  2.0  |  4.0 
     NaN    |     3.0    |  3.0  |  3.0 
     4.0    |     4.0    |  4.0  |  4.0
     5.0    |     5.0    |  5.0  |  5.0 

In [93]:
from pyspark.ml.feature import Imputer

df = spark.createDataFrame([
    (1.0, float("nan")),
    (2.0, float("nan")),
    (float("nan"), 3.0),
    (4.0, 4.0),
    (5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()

+---+---+-----+-----+
|  a|  b|out_a|out_b|
+---+---+-----+-----+
|1.0|NaN|  1.0|  4.0|
|2.0|NaN|  2.0|  4.0|
|NaN|3.0|  3.0|  3.0|
|4.0|4.0|  4.0|  4.0|
|5.0|5.0|  5.0|  5.0|
+---+---+-----+-----+



# Feature Selection

##  LASSO
Variable selection and the removal of correlated variables. The Ridge method shrinks the coefficients of correlated variables while the LASSO method picks one variable and discards the others. The elastic net penalty is a mixture of these two; if variables are correlated in groups then \alpha=0.5 tends to select the groups as in or out. If α is close to 1, the elastic net performs much like the LASSO method and removes any degeneracies and wild behavior caused by extreme correlations.

## VectorSlicer

VectorSlicer is a transformer that takes a feature vector and outputs a new feature vector with a sub-array of the original features. It is useful for extracting features from a vector column.

VectorSlicer accepts a vector column with specified indices, then outputs a new vector column whose values are selected via those indices. There are two types of indices,

Integer indices that represent the indices into the vector, setIndices().

String indices that represent the names of features into the vector, setNames(). This requires the vector column to have an AttributeGroup since the implementation matches on the name field of an Attribute.

Specification by integer and string are both acceptable. Moreover, you can use integer index and string name simultaneously. At least one feature must be selected. Duplicate features are not allowed, so there can be no overlap between selected indices and names. Note that if names of features are selected, an exception will be thrown if empty input attributes are encountered.

The output vector will order features with the selected indices first (in the order given), followed by the selected names (in the order given).

Examples

Suppose that we have a DataFrame with the column userFeatures:

 userFeatures
------------------
 [0.0, 10.0, 0.5]
 
userFeatures is a vector column that contains three user features. Assume that the first column of userFeatures are all zeros, so we want to remove it and select only the last two columns. The VectorSlicer selects the last two elements with setIndices(1, 2) then produces a new vector column named features:

 userFeatures     | features
------------------|----------- 
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 
Suppose also that we have potential input attributes for the userFeatures, i.e. ["f1", "f2", "f3"], then we can use setNames("f2", "f3") to select them.

 userFeatures     | features
------------------|--------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 ["f1", "f2", "f3"] | ["f2", "f3"]

In [94]:
from pyspark.ml.feature import VectorSlicer
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import Row

df = spark.createDataFrame([
    Row(userFeatures=Vectors.sparse(3, {0: -2.0, 1: 2.3})),
    Row(userFeatures=Vectors.dense([-2.0, 2.3, 0.0]))])

slicer = VectorSlicer(inputCol="userFeatures", outputCol="features", indices=[1])

output = slicer.transform(df)

output.select("userFeatures", "features").show()

+--------------------+-------------+
|        userFeatures|     features|
+--------------------+-------------+
|(3,[0,1],[-2.0,2.3])|(1,[0],[2.3])|
|      [-2.0,2.3,0.0]|        [2.3]|
+--------------------+-------------+



## ChiSqSelector
ChiSqSelector stands for Chi-Squared feature selection. It operates on labeled data with categorical features. ChiSqSelector uses the Chi-Squared test of independence to decide which features to choose. It supports five selection methods: numTopFeatures, percentile, fpr, fdr, fwe:

numTopFeatures chooses a fixed number of top features according to a chi-squared test. This is akin to yielding the features with the most predictive power.
percentile is similar to numTopFeatures but chooses a fraction of all features instead of a fixed number.
fpr chooses all features whose p-values are below a threshold, thus controlling the false positive rate of selection.
fdr uses the Benjamini-Hochberg procedure to choose all features whose false discovery rate is below a threshold.
fwe chooses all features whose p-values are below a threshold. The threshold is scaled by 1/numFeatures, thus controlling the family-wise error rate of selection. By default, the selection method is numTopFeatures, with the default number of top features set to 50. The user can choose a selection method using setSelectorType.
Examples

Assume that we have a DataFrame with the columns id, features, and clicked, which is used as our target to be predicted:

id | features              | clicked
---|-----------------------|---------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0
    
If we use ChiSqSelector with numTopFeatures = 1, then according to our label clicked the last column in our features is chosen as the most useful feature:

id | features              | clicked | selectedFeatures
---|-----------------------|---------|------------------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0     | [1.0]
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0     | [0.0]
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0     | [0.1]

In [95]:
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([
    (7, Vectors.dense([0.0, 0.0, 18.0, 1.0]), 1.0,),
    (8, Vectors.dense([0.0, 1.0, 12.0, 0.0]), 0.0,),
    (9, Vectors.dense([1.0, 0.0, 15.0, 0.1]), 0.0,)], ["id", "features", "clicked"])

selector = ChiSqSelector(numTopFeatures=1, featuresCol="features",
                         outputCol="selectedFeatures", labelCol="clicked")

result = selector.fit(df).transform(df)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())
result.show()

ChiSqSelector output with top 1 features selected
+---+------------------+-------+----------------+
| id|          features|clicked|selectedFeatures|
+---+------------------+-------+----------------+
|  7|[0.0,0.0,18.0,1.0]|    1.0|          [18.0]|
|  8|[0.0,1.0,12.0,0.0]|    0.0|          [12.0]|
|  9|[1.0,0.0,15.0,0.1]|    0.0|          [15.0]|
+---+------------------+-------+----------------+



# Unbalanced data: Undersampling

Since we use PySpark to deal with the big data, Undersampling for Unbalanced Classification is a useful method to deal with the Unbalanced data. Undersampling is a popular technique for unbalanced datasets to reduce the skew in class distributions. However, it is well-known that undersampling one class modifies the priors of the training set and consequently biases the posterior probabilities of a classifier. After you applied the Undersampling, you need to recalibrate the Probability 
https://www3.nd.edu/~dial/publications/dalpozzolo2015calibrating.pdf

![](./image/underSampling.png)

In [75]:
df = spark.createDataFrame([
    (0, "Yes"),
    (1, "Yes"),
    (2, "Yes"),
    (3, "Yes"),
    (4, "No"),
    (5, "No")
], ["id", "label"])
df.show()

+---+-----+
| id|label|
+---+-----+
|  0|  Yes|
|  1|  Yes|
|  2|  Yes|
|  3|  Yes|
|  4|   No|
|  5|   No|
+---+-----+



## Calculate undersampling Ratio

In [77]:
import math
def round_up(n, decimals=0):
    multiplier = 10 ** decimals
    return math.ceil(n * multiplier) / multiplier

  # drop missing value rows
df = df.dropna()
# under-sampling majority set
label_Y = df.filter(df.label=='Yes')
label_N = df.filter(df.label=='No')
sampleRatio = round_up(label_N.count() / df.count(),2)
sampleRatio

0.34

## Undersampling

In [78]:
label_Y_sample = label_Y.sample(False, sampleRatio)
# union minority set and the under-sampling majority set
data = label_N.unionAll(label_Y_sample)
data.show()

+---+-----+
| id|label|
+---+-----+
|  4|   No|
|  5|   No|
|  3|  Yes|
+---+-----+



## Recalibrating Probability
Undersampling is a popular technique for unbalanced datasets to reduce the skew in class distributions. However, it is well-known that undersampling one class modifies the priors of the training set and consequently biases the posterior probabilities of a classifier