### Spark for Machine Learning & AI
### 02 Data Preprocessing

In [2]:
import findspark
findspark.init()
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

### Normalize numeric data

In [3]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [4]:
features_df=spark.createDataFrame([(1, Vectors.dense([25.0,10000.0,1.0]),),
                                   (2, Vectors.dense([20.0,30000.0,2.0]),),
                                   (3, Vectors.dense([30.0,40000.0,3.0]),)],["id","features"])

In [5]:
features_df.take(1)

[Row(id=1, features=DenseVector([25.0, 10000.0, 1.0]))]

In [6]:
#scale the features column from 0 to 1
feature_scaler=MinMaxScaler(inputCol="features",outputCol="sfeatures")
smodel=feature_scaler.fit(features_df)
sfeatures_df=smodel.transform(features_df)
sfeatures_df.take(3)

[Row(id=1, features=DenseVector([25.0, 10000.0, 1.0]), sfeatures=DenseVector([0.5, 0.0, 0.0])),
 Row(id=2, features=DenseVector([20.0, 30000.0, 2.0]), sfeatures=DenseVector([0.0, 0.6667, 0.5])),
 Row(id=3, features=DenseVector([30.0, 40000.0, 3.0]), sfeatures=DenseVector([1.0, 1.0, 1.0]))]

In [7]:
sfeatures_df.select("features","sfeatures").show()

+------------------+--------------------+
|          features|           sfeatures|
+------------------+--------------------+
|[25.0,10000.0,1.0]|       [0.5,0.0,0.0]|
|[20.0,30000.0,2.0]|[0.0,0.6666666666...|
|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+------------------+--------------------+



### Standardize Numeric data

In [8]:
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors

In [9]:
features_df=spark.createDataFrame([(1, Vectors.dense([10.0,10000.00,1.0]),),
                                   (2, Vectors.dense([20.0,30000.00,2.0]),),
                                   (3, Vectors.dense([30.0,40000.00,3.0]),)],["id","features"])

In [10]:
features_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]))]

In [11]:
#scale data to normally distributed with 0 mean and 1 standar diviation (value is from -1 to 1)
feature_stand_scaler=StandardScaler(inputCol="features",outputCol="sfeatures", withStd=True, withMean=True)
stand_smodel=feature_stand_scaler.fit(features_df)
stand_sfeatures_df=stand_smodel.transform(features_df)
stand_sfeatures_df.take(3)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -1.0911, -1.0])),
 Row(id=2, features=DenseVector([20.0, 30000.0, 2.0]), sfeatures=DenseVector([0.0, 0.2182, 0.0])),
 Row(id=3, features=DenseVector([30.0, 40000.0, 3.0]), sfeatures=DenseVector([1.0, 0.8729, 1.0]))]

In [12]:
stand_sfeatures_df.show()

+---+------------------+--------------------+
| id|          features|           sfeatures|
+---+------------------+--------------------+
|  1|[10.0,10000.0,1.0]|[-1.0,-1.09108945...|
|  2|[20.0,30000.0,2.0]|[0.0,0.2182178902...|
|  3|[30.0,40000.0,3.0]|[1.0,0.8728715609...|
+---+------------------+--------------------+



### Bucketize numeric data

In [13]:
from pyspark.ml.feature import Bucketizer

In [14]:
splits=[-float("inf"), -10.0, 0.0, 10.0, float("inf")]

In [15]:
b_data=[(-800.0,),(-10.5,),(-1.7,),(0.0,),(8.2,),(90.1,)]

In [16]:
b_df=spark.createDataFrame(b_data,["features"])
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [17]:
# put data in different bucket according to data value
bucketizer=Bucketizer(splits=splits, inputCol="features",outputCol="bfeatures")
bucketed_df=bucketizer.transform(b_df)
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



### Tokenize text data

In [18]:
from pyspark.ml.feature import Tokenizer

In [19]:
sentences_df=spark.createDataFrame([
    (1, "This is an introduction to Spark MLlib"),
    (2, "MLlib includes libraries for classification and regression"),
    (3, "It also contains supporting tools for pipelines")],["id","sentence"])
sentences_df.show()

+---+--------------------+
| id|            sentence|
+---+--------------------+
|  1|This is an introd...|
|  2|MLlib includes li...|
|  3|It also contains ...|
+---+--------------------+



In [20]:
#split a sentence into words
sent_token=Tokenizer(inputCol="sentence",outputCol="words")
sent_tokenized_df=sent_token.transform(sentences_df)

In [21]:
sent_tokenized_df.show()

+---+--------------------+--------------------+
| id|            sentence|               words|
+---+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|
|  2|MLlib includes li...|[mllib, includes,...|
|  3|It also contains ...|[it, also, contai...|
+---+--------------------+--------------------+



### TF-IDF

In [22]:
from pyspark.ml.feature import HashingTF, IDF

In [23]:
sentences_df

DataFrame[id: bigint, sentence: string]

In [24]:
sentences_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib')]

In [25]:
sent_tokenized_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'])]

In [28]:
hashingTF=HashingTF(inputCol="words",outputCol="rawFeatures",numFeatures=20)
sent_hfTF_df=hashingTF.transform(sent_tokenized_df)
sent_hfTF_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}))]

In [29]:
idf=IDF(inputCol="rawFeatures",outputCol="idf_features")
idfModel=idf.fit(sent_hfTF_df)
tfidf_df=idfModel.transform(sent_hfTF_df)
tfidf_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}), idf_features=SparseVector(20, {6: 0.5754, 8: 0.2877, 9: 0.6931, 10: 0.6931, 13: 0.6931, 15: 0.2877}))]