In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('employee').getOrCreate()

Dataframes are a table-like data structure, and they have named columns. Dataframes are widely used in R and in the Python-pandas library. They're also used in Spark and they're similar to what's available in most Python and in R.

In [2]:
emp_df = spark.read.csv("/employee.txt", 
                        header = True)

In [3]:
emp_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- department: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- salary: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- region_id: string (nullable = true)



In [4]:
emp_df.columns

['id',
 'last_name',
 'email',
 'gender',
 'department',
 'start_date',
 'salary',
 'job_title',
 'region_id']

In [5]:
emp_df.take(5)

[Row(id='1', last_name="'Kelley'", email="'rkelley0@soundcloud.com'", gender="'Female'", department="'Computers'", start_date="'10/2/2009'", salary='67470', job_title="'Structural Engineer'", region_id='2'),
 Row(id='2', last_name="'Armstrong'", email="'sarmstrong1@infoseek.co.jp'", gender="'Male'", department="'Sports'", start_date="'3/31/2008'", salary='71869', job_title="'Financial Advisor'", region_id='2'),
 Row(id='3', last_name="'Carr'", email="'fcarr2@woothemes.com'", gender="'Male'", department="'Automotive'", start_date="'7/12/2009'", salary='101768', job_title="'Recruiting Manager'", region_id='3'),
 Row(id='4', last_name="'Murray'", email="'jmurray3@gov.uk'", gender="'Female'", department="'Jewelery'", start_date="'12/25/2014'", salary='96897', job_title="'Desktop Support Technician'", region_id='3'),
 Row(id='5', last_name="'Ellis'", email="'jellis4@sciencedirect.com'", gender="'Female'", department="'Grocery'", start_date="'9/19/2002'", salary='63702', job_title="'Software

In [6]:
emp_df.show(5)

+---+-----------+--------------------+--------+------------+------------+------+--------------------+---------+
| id|  last_name|               email|  gender|  department|  start_date|salary|           job_title|region_id|
+---+-----------+--------------------+--------+------------+------------+------+--------------------+---------+
|  1|   'Kelley'|'rkelley0@soundcl...|'Female'| 'Computers'| '10/2/2009'| 67470|'Structural Engin...|        2|
|  2|'Armstrong'|'sarmstrong1@info...|  'Male'|    'Sports'| '3/31/2008'| 71869| 'Financial Advisor'|        2|
|  3|     'Carr'|'fcarr2@woothemes...|  'Male'|'Automotive'| '7/12/2009'|101768|'Recruiting Manager'|        3|
|  4|   'Murray'|   'jmurray3@gov.uk'|'Female'|  'Jewelery'|'12/25/2014'| 96897|'Desktop Support ...|        3|
|  5|    'Ellis'|'jellis4@scienced...|'Female'|   'Grocery'| '9/19/2002'| 63702|'Software Enginee...|        7|
+---+-----------+--------------------+--------+------------+------------+------+--------------------+---

In [7]:
emp_df.count()

1000

In [8]:
sample_df = emp_df.sample(False, 0.1)
sample_df.count()

109

In [9]:
emp_mngrs_df = emp_df.filter("salary >= 100000")
emp_mngrs_df.count()

478

In [10]:
emp_mngrs_df.select("salary").show(10)

+------+
|salary|
+------+
|101768|
|118497|
|108657|
|108093|
|121966|
|141139|
|106659|
|148952|
|109890|
|115274|
+------+
only showing top 10 rows



# Preprocessing

In [11]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.linalg import Vectors

In [12]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('preprocessing').getOrCreate()

## Normalizing numeric data

Normalizing is the process of mapping numeric data from its original range into a range from zero (0) to one (1). This is important, because we may have multiple attributes with different ranges. For example we have salaries, which might have ranges in the tens, and hundreds of thousands. Then, we might have another column, for example, Miles commuted to work, which might be on the order of tens of miles. The reason we want to normalize those attributes in a zero to one range is so that when algorithms that use distance as a measure, they don't weight some attributes, like salary, orders of magnitude, more heavily than others, like miles commuted to work.

In [13]:
features_df = spark.createDataFrame([
    (1, Vectors.dense([10.0, 10000.0, 1.0]),),
    (2, Vectors.dense([20.0, 30000.0, 2.0]),),
    (3, Vectors.dense([30.0, 40000.0, 3.0]),)],
    ["id", "features"])

Each row of the dataframe will include an identifier and a list of numeric values. The first record will have an ID of one, and then it will have a set of features which we create as a dense vector, and that vector will include the number 10, the number 10,000, and the number 1. And we'll specify the columns for the dataframe; id and features. 

In [14]:
features_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- features: vector (nullable = true)



In [15]:
features_df.show(truncate=False)

+---+------------------+
|id |features          |
+---+------------------+
|1  |[10.0,10000.0,1.0]|
|2  |[20.0,30000.0,2.0]|
|3  |[30.0,40000.0,3.0]|
+---+------------------+



We are going to create a scaler object called "feature_scaler", to call the MinMaxScaler function, and to tell it that we want to transfer the input column, which is named "features", and we want that scaled version of that input column to go to a new output column, which is called "sfeatures", which is short for scaled features. 

This object will transform the contents of feature vectors into a scaled version, and save it into the "sfeatures" column. Then we'll fit the model to the data using the fit function. To do that, we'll create an object called "smodel", and that'll be set equal to the feature_scaler, and we'll apply the fit function, and the data we're going to fit is what's loaded into our features dataframe. 

The next thing we want to do is we want to call the transform function, and what this will do is it will apply the transformation and actually create the scaled dataset. So to do this, I'm going to create a new dataframe, called sfeatures, and this is going to be built using the smodel we just defined, and we're going to transform using the features dataframe. 
So what we've done is we've created a MinMaxScaler, we fit our data to it, and then we used the transform to create a new scaled_feature set.

In [16]:
feature_scaler = MinMaxScaler(inputCol="features", outputCol="sfeatures")
smodel = feature_scaler.fit(features_df)
sfeatures_df = smodel.transform(features_df)

In [17]:
sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=SparseVector(3, {}))]

What you'll notice here is, in addition to the ID and features that we had in our original dataframe, we now have a new column, called "sfeature"s, which has a dense vector, which is scaled, and it's in the zero to one range. 

In [18]:
sfeatures_df.select("features", "sfeatures").show()

+------------------+--------------------+
|          features|           sfeatures|
+------------------+--------------------+
|[10.0,10000.0,1.0]|           (3,[],[])|
|[20.0,30000.0,2.0]|[0.5,0.6666666666...|
|[30.0,40000.0,3.0]|       [1.0,1.0,1.0]|
+------------------+--------------------+



The scaled data is in the range from zero to one, and the larger the original value, the larger the scaled value. The smallest value in each column of the feature vector is mapped to zero, and the largest value is mapped to one. Values in between the minimum and maximum are scaled proportionally between zero and one.

## Standardizing numeric data

In addition to normalizing, there's another operation that's often done to numeric data, and that's called standardizing. And basically the idea here is, we may have data that is pretty close to a bell-shaped curve, or normally distributed, but maybe not exactly. With standardization, what we can do is map the data into a range of -1 to +1 with a mean of 0. 

We do this because some machine learning algorithms, such as support vector machines, and some linear models work better when all of the features have a unit variance and a zero mean. So, what happens is when we apply standardization, our data is slightly shifted in its shape so that it becomes more normalized, or more like a bell curve. 

In [19]:
from pyspark.ml.feature import StandardScaler

Now let's create a scaler object. I want this to be standardized to a normal distribution, so I'm going to say withStd, for standardization, to true. And I want a mean around zero, so I'm going to say withMean equals true. 

In [20]:
feature_stand_scaler = StandardScaler(inputCol="features", outputCol="sfeatures", withStd=True, withMean=True)

So what this is going to do is use the model we just built, and it's going to transform the data I'm passing in. And the data I want to transform is the features data frame. So now, I've created a model, I've fit data to the model, and then I've taken that same data and I've transformed it to this new standardized form. 

In [21]:
stand_smodel = feature_stand_scaler.fit(features_df)

In [22]:
stand_sfeatures_df = stand_smodel.transform(features_df)

We've created a dataframe that has the features with the original data, as well as another column which has the features mapped to a normal distribution, with a mean of zero and a minimum range of about negative one and a maximum of one.

In [23]:
stand_sfeatures_df.take(1)

[Row(id=1, features=DenseVector([10.0, 10000.0, 1.0]), sfeatures=DenseVector([-1.0, -1.0911, -1.0]))]

In [24]:
stand_sfeatures_df.show(truncate=False)

+---+------------------+------------------------------+
|id |features          |sfeatures                     |
+---+------------------+------------------------------+
|1  |[10.0,10000.0,1.0]|[-1.0,-1.091089451179962,-1.0]|
|2  |[20.0,30000.0,2.0]|[0.0,0.2182178902359923,0.0]  |
|3  |[30.0,40000.0,3.0]|[1.0,0.8728715609439696,1.0]  |
+---+------------------+------------------------------+



## Bucketizing numeric data

Now let's take a look at how we can organize continuous ranges of data into buckets or partitions.

In [25]:
from pyspark.ml.feature import Bucketizer

Bucketizer allows us to group data based on boundaries, and so I need to provide a list of boundaries for Bucketizer to work with. So I call those boundaries "splits". And I'm going to provide a list of what these splits are. Now at the lower end, I would like anything starting at negative infinity to go in the first bucket. From negative infinity up to -10 will be one bucket and then from -10 to 0 will be another bucket from 0 to 10 will be my next bucket and everything that's greater than 10 and up to positive infinity, will go in the last bucket.

In [26]:
splits = [-float("inf"), -10.0, 0.0, 10.0, float("inf")]

What we're going to do here is create a list, but this is going to be a list of rows, the syntax my look a little odd because I'm using parentheses, that's because I want each of these to be mapped to a distinct row in the data frame later. 

In [27]:
b_data = [(-800.0,), (-10.5,), (-1.7,), (0.0,), (8.2,), (90.1,)]

In [28]:
b_df = spark.createDataFrame(b_data, ["features"])

In [29]:
b_df.show()

+--------+
|features|
+--------+
|  -800.0|
|   -10.5|
|    -1.7|
|     0.0|
|     8.2|
|    90.1|
+--------+



In [30]:
bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bfeatures")

In [31]:
bucketed_df = bucketizer.transform(b_df)

In [32]:
bucketed_df.show()

+--------+---------+
|features|bfeatures|
+--------+---------+
|  -800.0|      0.0|
|   -10.5|      0.0|
|    -1.7|      1.0|
|     0.0|      2.0|
|     8.2|      2.0|
|    90.1|      3.0|
+--------+---------+



Why didn't we have to do a fit? That's because Bucketizing is fairly simple and my splits is the list of the boundaries I want for each bucket, so there's no need to fit data, so we can skip the normal fitting operation and go right to the transformation.

## Tokenizing text data

Now let's shift our focus to working with text data. And in this dataframe we'll have three rows - sentences. 

In [33]:
from pyspark.ml.feature import Tokenizer

In [34]:
sentences_df = spark.createDataFrame([
    (1, "This is an introduction to Spark MLlib"),
    (2, "MLlib includes libraries for classification and regression"),
    (3, "It also contains supporting tools for pipelines")],
    ["id", "sentence"])

In [35]:
sentences_df.show(truncate=False)

+---+----------------------------------------------------------+
|id |sentence                                                  |
+---+----------------------------------------------------------+
|1  |This is an introduction to Spark MLlib                    |
|2  |MLlib includes libraries for classification and regression|
|3  |It also contains supporting tools for pipelines           |
+---+----------------------------------------------------------+



In [36]:
sent_token = Tokenizer(inputCol="sentence", outputCol="words")

In [37]:
sent_tokenized_df = sent_token.transform(sentences_df)

We now have a third column "words", in addition to the ID and the sentences. And the words column contains lists of words that have been broken up in the ways you would normally expect a regular expression pattern matching to break up a sentence into words. So based on white space, punctuation, etc. So this is how we can use Tokenizer to split up strings into words.

In [38]:
sent_tokenized_df.show(truncate=False)

+---+----------------------------------------------------------+------------------------------------------------------------------+
|id |sentence                                                  |words                                                             |
+---+----------------------------------------------------------+------------------------------------------------------------------+
|1  |This is an introduction to Spark MLlib                    |[this, is, an, introduction, to, spark, mllib]                    |
|2  |MLlib includes libraries for classification and regression|[mllib, includes, libraries, for, classification, and, regression]|
|3  |It also contains supporting tools for pipelines           |[it, also, contains, supporting, tools, for, pipelines]           |
+---+----------------------------------------------------------+------------------------------------------------------------------+



We don't have to call the fit function, because we're not actually fitting data. The Tokenizer already knows how to do its job, which is basically how to split up strings into separate words. We can go right from creating a Tokenizer object, to the transformation process. There's no fit operation needed.

## TF-IDF (term frequency-inverse document frequency)

In [39]:
from pyspark.ml.feature import HashingTF, IDF

So again we start with some text. For example, here's a sentence. And we tokenize it so we get a list of words. And then we count the number of times a particular word appears. So for example normalizing appears only once, so it has a count of one. The word to shows up twice, so it has a count of two. And we go through and we do this for all of the documents in our corpus. And corpus is just another name for a collection of documents. And we count up how often a term appears across all of the documents.

In [40]:
sentences_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib')]

In [41]:
sent_tokenized_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'])]

In [42]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)

In [43]:
sent_hfTF_df = hashingTF.transform(sent_tokenized_df)

In [44]:
sent_hfTF_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}))]

In [45]:
sent_hfTF_df.show()

+---+--------------------+--------------------+--------------------+
| id|            sentence|               words|         rawFeatures|
+---+--------------------+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|(20,[6,8,9,10,13,...|
|  2|MLlib includes li...|[mllib, includes,...|(20,[2,4,11,12,15...|
|  3|It also contains ...|[it, also, contai...|(20,[1,4,6,8,11,1...|
+---+--------------------+--------------------+--------------------+



In [46]:
idf = IDF(inputCol="rawFeatures", outputCol="idf_features")

In [47]:
idfModel = idf.fit(sent_hfTF_df)

In [48]:
tfidf_df = idfModel.transform(sent_hfTF_df)

In [49]:
tfidf_df.take(1)

[Row(id=1, sentence='This is an introduction to Spark MLlib', words=['this', 'is', 'an', 'introduction', 'to', 'spark', 'mllib'], rawFeatures=SparseVector(20, {6: 2.0, 8: 1.0, 9: 1.0, 10: 1.0, 13: 1.0, 15: 1.0}), idf_features=SparseVector(20, {6: 0.5754, 8: 0.2877, 9: 0.6931, 10: 0.6931, 13: 0.6931, 15: 0.2877}))]

In [50]:
tfidf_df.show()

+---+--------------------+--------------------+--------------------+--------------------+
| id|            sentence|               words|         rawFeatures|        idf_features|
+---+--------------------+--------------------+--------------------+--------------------+
|  1|This is an introd...|[this, is, an, in...|(20,[6,8,9,10,13,...|(20,[6,8,9,10,13,...|
|  2|MLlib includes li...|[mllib, includes,...|(20,[2,4,11,12,15...|(20,[2,4,11,12,15...|
|  3|It also contains ...|[it, also, contai...|(20,[1,4,6,8,11,1...|(20,[1,4,6,8,11,1...|
+---+--------------------+--------------------+--------------------+--------------------+

