## Chapter 25, Preprocessing and Feature Engineering

Code in the book and code in github repository differ quite a lot, especially at the beginning.

**Note:** I needed to change file names because I am loading them from community cloud DataBricks account. The files are the same as the ones provided in the STDG github.

In [3]:
sales = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/databricks-datasets/definitive-guide/data/retail-data/by-day/*.csv")\
  .coalesce(5)\
  .where("Description IS NOT NULL")
fakeIntDF = spark.read.parquet("/databricks-datasets/definitive-guide/data/simple-ml-integers")
simpleDF = spark.read.json("/databricks-datasets/definitive-guide/data/simple-ml")
scaleDF = spark.read.parquet("/databricks-datasets/definitive-guide/data/simple-ml-scaling")

In [4]:
sales.cache()
sales.show()

In [5]:
from pyspark.ml.feature import Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn.transform(sales.select("Description"))
tokenized.show(20, False)

In [6]:
from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

In [7]:
from pyspark.ml.feature import RFormula

supervised = RFormula(formula="lab ~ . + color:value1 + color:value2")
supervised.fit(simpleDF).transform(simpleDF).show()

In [8]:
from pyspark.ml.feature import SQLTransformer

basicTransformation = SQLTransformer()\
  .setStatement("""
    SELECT sum(Quantity), count(*), CustomerID
    FROM __THIS__
    GROUP BY CustomerID
  """)

basicTransformation.transform(sales).show()

In [9]:
from pyspark.ml.feature import VectorAssembler
va = VectorAssembler().setInputCols(["int1", "int2", "int3"])
va.transform(fakeIntDF).show()

In [10]:
contDF = spark.range(20).selectExpr("cast(id as double)")


In [11]:
from pyspark.ml.feature import Bucketizer
bucketBorders = [-1.0, 5.0, 10.0, 250.0, 600.0]
bucketer = Bucketizer().setSplits(bucketBorders).setInputCol("id")
bucketer.transform(contDF).show()

You may note that our transformations are done in a similar manner: we import an object with transformation methods, and apply `.transform` method to our data. Although the command looks exactly the same but when utilized with different object it works differently. With `Tokenizer` object the command separates a record description into singe words, and with `Bucketizer` object we get a bucket number for each row. Below we will see that this method is preceeded by additional one, `.fit`. It is here because this time we need additional information from our data for the transformation to work: cutoff points for quantiles. The `.fit` method extracts the information. In the STDG such method is called a Spark estimator.

In [13]:
from pyspark.ml.feature import QuantileDiscretizer
discretizer = QuantileDiscretizer(numBuckets=5, inputCol="id", outputCol ='quantiles')
result = discretizer.fit(contDF).transform(contDF)
result.show()

For `StandardScaler` object we need additional information, too: a column mean and its standard deviation. 

These transformations are done in 2 stages because we often need to apply the same transformation to our test set. For this our transformation object keeps the information which was derived from our data, and we can apply a required transformation with `.transform` method only. When we work with text data such approach allows to ignore new words in test set which were not in our train set. 

By the way many tranfsormation objects have `.inverse_transform` method as well. Because if you transformed your output (`label`), for example, binarizing, then your predictions will be in the same format, and you will want them in the original form.

In [15]:
from pyspark.ml.feature import StandardScaler
sScaler = StandardScaler().setInputCol("features")
sScaler.fit(scaleDF).transform(scaleDF).show()

In [16]:
from pyspark.ml.feature import MinMaxScaler
minMax = MinMaxScaler().setMin(5).setMax(10).setInputCol("features")
fittedminMax = minMax.fit(scaleDF)
fittedminMax.transform(scaleDF).show()

In [17]:
from pyspark.ml.feature import MaxAbsScaler
maScaler = MaxAbsScaler().setInputCol("features")
fittedmaScaler = maScaler.fit(scaleDF)
fittedmaScaler.transform(scaleDF).show()

In [18]:
from pyspark.ml.feature import ElementwiseProduct
from pyspark.ml.linalg import Vectors
scaleUpVec = Vectors.dense(10.0, 15.0, 20.0)
scalingUp = ElementwiseProduct()\
  .setScalingVec(scaleUpVec)\
  .setInputCol("features")
scalingUp.transform(scaleDF).show()

In [19]:
from pyspark.ml.feature import Normalizer
manhattanDistance = Normalizer().setP(1).setInputCol("features")
manhattanDistance.transform(scaleDF).show()

In [20]:
from pyspark.ml.feature import StringIndexer
lblIndxr = StringIndexer().setInputCol("lab").setOutputCol("labelInd")
idxRes = lblIndxr.fit(simpleDF).transform(simpleDF)
idxRes.show()

In [21]:
valIndexer = StringIndexer().setInputCol("value1").setOutputCol("valueInd")
valIndexer.fit(simpleDF).transform(simpleDF).show(5)

In [22]:
from pyspark.ml.feature import IndexToString
labelReverse = IndexToString().setInputCol("labelInd")
labelReverse.transform(idxRes).show(5)

In [23]:
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.linalg import Vectors
idxIn = spark.createDataFrame([
  (Vectors.dense(1, 2, 3),1),
  (Vectors.dense(2, 5, 6),2),
  (Vectors.dense(1, 8, 9),3)
]).toDF("features", "label")
indxr = VectorIndexer()\
  .setInputCol("features")\
  .setOutputCol("idxed")\
  .setMaxCategories(2)
indxr.fit(idxIn).transform(idxIn).show()

In [24]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
lblIndxr = StringIndexer().setInputCol("color").setOutputCol("colorInd")
colorLab = lblIndxr.fit(simpleDF).transform(simpleDF.select("color"))
ohe = OneHotEncoder().setInputCol("colorInd")
ohe.transform(colorLab).show(10)

The following cell was `RegexTokenizer` instead of `Tokenizer` as it was presented in the book.

In [26]:
from pyspark.ml.feature import Tokenizer
tkn = Tokenizer()\
  .setInputCol("Description")\
  .setOutputCol("DescOut")
tokenized = tkn.transform(sales.select("Description"))
tokenized.show(20, False)

In [27]:
from pyspark.ml.feature import RegexTokenizer
rt = RegexTokenizer()\
  .setInputCol("Description")\
  .setOutputCol("DescOut")\
  .setPattern(" ")\
  .setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

We see a lot of spaces in the left column, as we are supposed.

In [29]:
from pyspark.ml.feature import RegexTokenizer
rt = RegexTokenizer()\
  .setInputCol("Description")\
  .setOutputCol("DescOut")\
  .setPattern(" ")\
  .setGaps(False)\
  .setToLowercase(True)
rt.transform(sales.select("Description")).show(20, False)

In [30]:
from pyspark.ml.feature import StopWordsRemover
englishStopWords = StopWordsRemover.loadDefaultStopWords("english")
stops = StopWordsRemover()\
  .setStopWords(englishStopWords)\
  .setInputCol("DescOut")
stops.transform(tokenized).show()

The `NGram` method did not work. Maybe it was depreciated. I looked up its documentation to fix it.

In [32]:
from pyspark.ml.feature import NGram
unigram = NGram(n=1, inputCol="DescOut", outputCol="unigrams")
unigramDataFrame = unigram.transform(tokenized)
unigramDataFrame.select("unigrams").show(truncate=False)

In [33]:
bigram = NGram(n=2, inputCol="DescOut", outputCol="bigrams")
bigramDataFrame = bigram.transform(tokenized)
bigramDataFrame.select("bigrams").show(truncate=False)

Here the `CountVectorizer.fit` makes a frequency table for all words, takes the first 500 the most frequent, and creates a table for the word frequencies for each row. This is a truly sparse table. All 500 words are kept in `CountVectorizer` object, so a similar transformation can be applied to a test set and the resulting table will have the same columns for the same words.

In [35]:
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol = "DescOut", outputCol = "countVec", vocabSize = 500, minDF=2.0, minTF =1.0)
fittedCV = cv.fit(tokenized)
result = fittedCV.transform(tokenized)
result.show()

In [36]:
tfIdfIn = tokenized\
  .where("array_contains(DescOut, 'red')")\
  .select("DescOut")\
  .limit(10)
tfIdfIn.show(10, False)

In [37]:
from pyspark.ml.feature import HashingTF, IDF
tf = HashingTF()\
  .setInputCol("DescOut")\
  .setOutputCol("TFOut")\
  .setNumFeatures(10000)
idf = IDF()\
  .setInputCol("TFOut")\
  .setOutputCol("IDFOut")\
  .setMinDocFreq(2)

In [38]:
idf.fit(tf.transform(tfIdfIn)).transform(tf.transform(tfIdfIn)).show(10, False)

In [39]:
from pyspark.ml.feature import Word2Vec
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark.createDataFrame([
    ("Hi I heard about Spark".split(" "), ),
    ("I wish Java could use case classes".split(" "), ),
    ("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text",
  outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for row in result.collect():
    text, vector = row
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

In [40]:
from pyspark.ml.feature import PCA
pca = PCA().setInputCol("features").setK(2)
pca.fit(scaleDF).transform(scaleDF).show(20, False)

In [41]:
from pyspark.ml.feature import PolynomialExpansion
pe = PolynomialExpansion().setInputCol("features").setDegree(2).setOutputCol("polyFeatures")
pe.transform(scaleDF).show()

In [42]:
from pyspark.ml.feature import ChiSqSelector, Tokenizer
tkn = Tokenizer().setInputCol("Description").setOutputCol("DescOut")
tokenized = tkn\
  .transform(sales.select("Description", "CustomerId"))\
  .where("CustomerId IS NOT NULL")
prechi = fittedCV.transform(tokenized)\
  .where("CustomerId IS NOT NULL")
chisq = ChiSqSelector()\
  .setFeaturesCol("countVec")\
  .setLabelCol("CustomerId")\
  .setNumTopFeatures(2)
chisq.fit(prechi).transform(prechi)\
  .drop("customerId", "Description", "DescOut").show()

In [43]:
fittedPCA = pca.fit(scaleDF)
fittedPCA.write().overwrite().save("/tmp/fittedPCA")

In [44]:
from pyspark.ml.feature import PCAModel
loadedPCA = PCAModel.load("/tmp/fittedPCA")
loadedPCA.transform(scaleDF).show()

At the end of the chapter we can see a custom transformer in Scala but not in Python.