# The Power of Electric Snakes!
## An Introduction to PySpark

##Section 0: Initial Data Preparation

In [3]:
from pyspark.sql import *

lines = sc.textFile("/databricks-datasets/adult/adult.data")
adult_rdd = lines \
  .filter(lambda f: f != '') \
  .map(lambda l: tuple(l.split(', '))) \
  .map(lambda r: ( \
    int(r[0]) \
    ,r[1] \
    ,int(r[2]) \
    ,r[3] \
    ,int(r[4]) \
    ,r[5] \
    ,r[6] \
    ,r[7] \
    ,r[8] \
    ,r[9] \
    ,int(r[10]) \
    ,int(r[11]) \
    ,int(r[12]) \
    ,r[13] \
    ,r[14] \
  ))
adult_df = adult_rdd \
  .map(lambda r: Row( \
    age = r[0] \
    ,workclass = r[1] \
    ,fnlwgt = r[2] \
    ,education = r[3] \
    ,educationnum = r[4] \
    ,maritalstatus = r[5] \
    ,occupation = r[6] \
    ,relationship = r[7] \
    ,race = r[8] \
    ,sex = r[9] \
    ,capitalgain = r[10] \
    ,capitalloss = r[11] \
    ,hoursperweek = r[12] \
    ,nativecountry = r[13] \
    ,income = r[14]) \
  ) \
  .toDF()

## Section 1: RDDs vs. DataFrames

Generally, RDD's are viewed by using .collect() and DataFrames are viewed using .show()

In [6]:
adult_rdd.collect()

In [7]:
adult_df.show()

We see that the RDD is represented as an unnamed tuple of objects.  Tuples can also other Tuples, allowing us to create RDDs containing tuples of tuples of tuples, creating a multidimensional data type.  DataFrames are what we would typically think of as tables, two-dimensional with fixed data types for each column.

Data Transformations are really where RDDs and DataFrames deviate.  Let's start with a simple example, creating a new RDD/DataFrame that contains "age" and "hourspermonth" = hoursperweek * 4.

In [10]:
adult_rdd_t = adult_rdd \
  .map(lambda r: (r[0], r[12] * 4))

adult_rdd_t.top(5)

In [11]:
from pyspark.sql.functions import col

adult_df_t = adult_df \
  .withColumn('hourspermonth', col('hoursperweek') * 4) \
  .select('age', 'hourspermonth')

adult_df_t.show(5)

We can see that RDDs are manipulated using the .map() function, with the fields denoted by the ordinal location in the tuple.  DataFrames on the other hand are treated more like tables, with the ability to create columns with .withColumn() and select columns using .select().  DataFrames also give us the ability to reference columns by name using the col() function.

Next, we'll make it even more complex with aggregations.  Let's try simply calculating the number of rows the RDD and DataFrame.

In [14]:
adult_rdd_c = adult_rdd \
  .count()

adult_rdd_c

In [15]:
adult_df_c = adult_df \
  .count()

adult_df_c

As it turns out, RDDs and DataFrames can both leverage the .count() function.

Now, let's try to calculate the sum of the "hoursperweek" field.

In [18]:
adult_rdd_s = adult_rdd \
  .map(lambda r: r[12]) \
  .reduce(lambda t, h: t + h)

adult_rdd_s

In [19]:
adult_df_s = adult_df \
  .groupBy() \
  .sum('hoursperweek')

adult_df_s.show()

Now that we're doing more complex calculations, the differences between RDDs and DataFrames become apparent again.  RDDs, along with the .map() function, also leverage a .reduce() function.  The syntax for this function is not intuitive to most data professionals and can be cumbersome.  DataFrames leverage familiar SQL concepts, like .groupBy() and .sum(), making them much more approachable.

Now, let's move on to something even more complex.  Let's calculate the average age for each level of "education".

In [22]:
adult_rdd_a = adult_rdd \
  .map(lambda r: (r[3], r[0])) \
  .aggregateByKey( \
    (0,0) \
    ,lambda w, r: (w[0] + r, w[1] + 1) \
    ,lambda t, p: (t[0] + p[0], t[1] + p[1]) \
  )

adult_rdd_a.top(5)

In [23]:
adult_rdd_a = adult_rdd \
  .map(lambda r: (r[3], r[0])) \
  .aggregateByKey( \
    (0,0) \
    ,lambda w, r: (w[0] + r, w[1] + 1) \
    ,lambda t, p: (t[0] + p[0], t[1] + p[1]) \
  ) \
  .map(lambda r: (r[0], r[1][0] / r[1][1]))

adult_rdd_a.top(5)

In [24]:
adult_df_a = adult_df \
  .groupBy('education') \
  .avg('age')

adult_df_a.show(5)

At this point, RDDs are becoming incredible complex to manipulate.  The .aggregateByKey() requires defining Within-Partition and Across-Partition aggregation functions, resulting in extremely complex logic to perform simple transformations.  Again, DataFrames are much approachable to those with a SQL background, leveraging the .groupBy() and .avg() functions.

Finally, let's join the average ages back onto the original RDD and DataFrame.

In [27]:
adult_rdd_j = adult_rdd \
  .map(lambda r: ( \
    r[3] \
    ,( \
      r[0] \
      ,r[1] \
      ,r[2] \
      ,r[4] \
      ,r[5] \
      ,r[6] \
      ,r[7] \
      ,r[8] \
      ,r[9] \
      ,r[10] \
      ,r[11] \
      ,r[12] \
      ,r[13] \
      ,r[14] \
    ) \
  )) \
  .join(adult_rdd_a)

adult_rdd_j.top(5)

In [28]:
adult_rdd_j = adult_rdd \
  .map(lambda r: ( \
    r[3] \
    ,( \
      r[0] \
      ,r[1] \
      ,r[2] \
      ,r[4] \
      ,r[5] \
      ,r[6] \
      ,r[7] \
      ,r[8] \
      ,r[9] \
      ,r[10] \
      ,r[11] \
      ,r[12] \
      ,r[13] \
      ,r[14] \
    ) \
  )) \
  .join(adult_rdd_a) \
  .map(lambda r: ( \
    r[1][0][0] \
    ,r[1][0][1] \
    ,r[1][0][2] \
    ,r[0] \
    ,r[1][0][3] \
    ,r[1][0][4] \
    ,r[1][0][5] \
    ,r[1][0][6] \
    ,r[1][0][7] \
    ,r[1][0][8] \
    ,r[1][0][9] \
    ,r[1][0][10] \
    ,r[1][0][11] \
    ,r[1][0][12] \
    ,r[1][0][13] \
    ,r[1][1] \
  ))

adult_rdd_j.top(5)

In [29]:
adult_df_j = adult_df \
  .join(adult_df_a, on='education')

adult_df_j.show(5)

We see that joining RDDs requires creating Key-Value pairs, which requires some additional mapping.  DataFrames can be joined using the simpler .join(on=) syntax.

##Section 2: Data Science Preparation

The first step of any Data Science Analysis is to summarize the data.

In [33]:
adult_df \
  .describe() \
  .show()

The .describe() function is a great way for us to see summary statistics of our DataFrame.  Since the count for each column equals the total count of records, we know there are no Null values.  However, this doesn't mean that there aren't any junk values in the columns.  We can see a little bit of this in the table above.  There are some troubling things here though.

<br\>

* Is the capitalgain of 99999 legitimate?
* Is the hoursperweek of 99 legitimate?
* What's in the text columns?

Let's start with the numeric ones.

In [35]:
c = 'capitalgain'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c).desc()) \
  .show()

In [36]:
c = 'hoursperweek'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c).desc()).show()

Looking at capitalgain, we see a tremendous jump between the highest value (99999) and the second highest value (41310).  The highest value also has a relatively large number of records, this leads us to believe that this is not a legitimate value.  hoursperweek on the other hand, has a seamless transition from 98 to 99.  99 is a little top-heavy, but this could easily be because they reported any value higher than 99 as being 99.  Therefore, we don't see strong evidence that this data is illegitimate.  Let's move on to the string values.

In [38]:
c = 'education'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)).show()

In [39]:
c = 'income'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)).show()

In [40]:
c = 'maritalstatus'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)).show()

In [41]:
c = 'nativecountry'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)).show()

In [42]:
c = 'occupation'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)).show()

In [43]:
c = 'race'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)).show()

In [44]:
c = 'relationship'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)).show()

In [45]:
c = 'sex'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)).show()

In [46]:
c = 'workclass'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)).show()

We can see that most of the values are acceptable.  However nativecountry, occupation, workclass all have values of "?" that are obviously illegitimate.

Now that we know where the issues are, let's talk about how to resolve them.  Strings are easiest in these scenarios.  The simplest way to deal with string values is simply to replace all of the illegitimate values with a single "Unknown" value.  In our case, these values are already using a standardized "?" value, so there's really no reason for us to change this.  Since this is a training exercise, let's go ahead and change them to "Unknown" using a loop.

In [49]:
from pyspark.sql.functions import when

cols = ['nativecountry', 'occupation', 'workclass']

for c in cols:
  adult_df = adult_df \
    .withColumn( c + '_u' \
      ,when( col(c) == '?', 'Unknown') \
        .otherwise(col(c)) \
    ) \
    .drop(c)

In [50]:
adult_df.show()

In [51]:
c = 'workclass_u'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c)) \
  .show()

We can see that this process wasn't overly complicated with the use of a loop.  It's important to note that loops in python are tab-sensitive.  Meaning that all of the lines after the start of a loop must be tabbed out one place.  As we can see, this rule goes out the window once we start using "\" as a line break.

Let's move on to dealing with capitalgain.  As we saw earlier, this field has illegitimate values of 99999.  Unlike strings, there's no "Unknown" value we can provide.  Instead, we need to provide some type of number.  Mean and Median are pretty common alternatives.  Since this field represents money, we'll use median to minimize the skew.

In [54]:
adult_df \
  .approxQuantile('capitalgain', [0.5], 0.25)

The approxQuantile() function will approximate any Quantile within our dataset.  As some of you may know, Median is simply another word for the 50th quantile.  The results of this were not planned and lead us down an interesting tangent.  Is the median legitimately 0 or is there something else going on here?  Let's run a quick test to find out.

In [56]:
c = 'capitalgain'
adult_df \
  .withColumn( c + '_>0' \
    ,when( col(c) > 0, '>0' ) \
      .otherwise( '0' ) \
  ) \
  .groupBy(c + '_>0') \
  .count() \
  .show()

We see that the vast majority of values in this column are 0.  This is very good evidence that we can use 0 as our imputed value.  Let's do that.

In [58]:
c = 'capitalgain'
adult_df = adult_df \
  .withColumn( c + '_0' \
    ,when( col(c) == 99999, 0 ) \
      .otherwise( col(c) ) \
  ) \
  .drop(c)

In [59]:
c = 'capitalgain_0'
adult_df \
  .groupBy(c) \
  .count() \
  .sort(col(c).desc()) \
  .show()

Now, there are no more values of 99999 in the capitalgain column.

Next, we need to prepare our text columns for modeling.  Most machine learning algorithms will only accept numeric columns.  A common technique for resolving this is to create indicator or dummy variables.  Let's start with the 'sex' column, as it only has a couple distinct values.

The first step is to create an array of the unique values in the column.

In [63]:
c = 'sex'
adult_df \
  .select(c) \
  .distinct() \
  .show()

Creating a DataFrame using the distinct() function is easy.  Now, we need to turn it into an array.

In [65]:
c = 'sex'
vals = adult_df \
  .select(c) \
  .distinct() \
  .rdd \
  .flatMap(lambda x: x) \
#.collect()

vals.collect()

In [66]:
c = 'sex'
vals = adult_df \
  .select(c) \
  .distinct() \
  .rdd \
  .flatMap(lambda x: x) \
  .collect()

vals

We can accomplish this by using the rdd() function to turn the DataFrame into an RDD containing Row objects.  Then, we can use the flatMap() function to turn this RDD of Row objects into an RDD of strings.  Finally, using the collect() function turns the RDD of strings into an array of strings that we can iterate over easily.

Now, we need to create new indicator columns for each unique value in the 'sex' column.

In [69]:
c = 'sex'
cols = \
  [ \
    when(col(c) == val, 1) \
    .otherwise(0)
    .alias( c + '_' + str(val).lower() ) \
    for val in vals \
  ]

cols

Here we see a loop within an array.  This allows us to create a foreach loop that loops through each element in an array, outputting the results to another array.  In this case, we are creating a column creation statement for each unique value within the 'sex' column.  Notice that this is a **COLUMN CREATION STATEMENT**, not the column itself.  This is an interesting Spark construct.  Basically, we can programmatically define the column creation as an object, then apply that column creation statement to any DataFrame we'd like.

Finally, we need to add these columns to our DataFrame and drop the original one.

In [72]:
c = 'sex'
adult_df \
    .select(adult_df.columns + cols) \
    .drop(c).show(5)

While most of this isn't new, we do see a new use for the select() function.  Instead of supply string values separated by commas, we can supply a single array.  While doing this, we can make use of the + operator to concatenate the original set of columns with the new columns, thereby creating an array that contains all of the columns we want.

Now that we've seen all of the logic, let's wrap it in a loop to iterate through all of the string columns.

The first step is to create an array with all of the string columns.

In [76]:
strings = \
  [ \
    c[0] \
    for c in adult_df.dtypes \
    if c[1].startswith('string') \
  ]

strings

Here, we see the loop within an array construct again.  In this case, we are looping through all of the values in the adult_df.dtypes array.  This array contains all of the names and data types for each column in the adult_df DataFrame.  We add some additional logic to this to store the first element (c[0], the column names) only if the second element (c[1], the column data types) starts with the letters 'string'.

Finally, let's use this array to loop through the entire DataFrame, creating all of the necessary dummy columns.

In [79]:
for c in strings:
  vals = \
    adult_df \
    .select(c) \
    .distinct() \
    .rdd \
    .flatMap(lambda x: x) \
    .collect()
  
  cols = \
    [ \
      when(col(c) == val, 1) \
     .otherwise(0)
     .alias( c + '_' + str(val).lower() ) \
     for val in vals \
    ]

  adult_df = adult_df \
    .select(adult_df.columns + cols) \
    .drop(c)
  
adult_df.columns

Now that we've created all of our new columns, there's one thing we need to take care of.  Mathematically speaking, we don't need all of these columns.  Since we've already dealt with any missing values, we know that each record in the dataset had a value for each original column.  Therefore, we also know that for each set of dummy variables, e.g. race\_x, exactly one of these dummy variables will be 1 and the others will be 0.  Therefore, we only need to keep (n - 1) of the columns.  For example, if we threw away the 'race\_other' column and saw that the other race columns were 0, then we know that the 'race\_other' column must have been 1.  The predictive models understand this too.  If we wanted to, could add some logic to our dummy variable creation process to remove the unnecessary columns.  However, they aren't hurting anything.  So, we'll leave them in for now.  But, there is one set of columns that we do need to deal with, income.  That's the column that we are ultimately trying to predict and we wouldn't want to predict 'income\_>50k' using the value 'income\_<=50k' because we would be correct 100% of the time.  So, we need to remove 'income\_<=50k' from the dataset.

In [81]:
adult_df = adult_df \
  .drop('income_<=50k')

adult_df.columns

Now, there are a few names that contain special characters.  While this might not have any impact, it's good practice to eliminate them whenever possible.  Specifically, these characters are -, >, (, ) and &.

We can start by creating an array of all the column names that need to be changed.

In [84]:
special = \
  [ \
    c \
    for c in adult_df.columns \
    if \
      '-' in c or \
      '>' in c or \
      '(' in c or \
      ')' in c or \
      '&' in c \
  ]

special

We can see that this is relatively simple using the loop within an array construct again.  The only new functionality here is using the 'in' operator to search for strings within strings.

Now, let's replace all of the special characters in these column names.  We want to replace '<' with 'lt', '>' with 'gt', '=' with 'e' and everything else with '_'.

In [87]:
newcols = [ \
  c \
    .replace('-', '_') \
    .replace('>', 'gt') \
    .replace('(', '_') \
    .replace(')', '_') \
    .replace('&', '_') \
  for c in special
]

newcols

Again, we see the power of loops within arrays.  Here, we are using the .replace() function to substitute characters within our column names.

Now, let's rename these columns in our DataFrame.

In [90]:
for idx in range(0, len(special)):
  adult_df = adult_df \
    .withColumnRenamed(special[idx], newcols[idx])

adult_df.columns

The only new functionality here is using the withColumnRenamed() function to rename columns in DataFrame.  Unfortunately, there was no way to pass the arrays directly into this function.  It would be possible using RDDs, but that would have been more complex.  Perhaps that's a good homework assignment if you want to tinker with it.

##Section 3: Data Science Modeling

Unlike many other Predictive Modeling tools, PySpark won't allow more than one Feature column.  This means that we have to combine all of our features into a single column before we can train our models.  This technique is called vectorization.  Let's give it a shot.

In [94]:
from pyspark.ml.feature import VectorAssembler

adult_vec = \
  VectorAssembler( \
    inputCols=[ \
      c \
      for c in adult_df.columns \
      if c != 'income_gt50k' \
    ] \
    ,outputCol="features" \
  ) \
  .transform(adult_df) \
  .select(['income_gt50k', 'features'])

adult_vec.head(5)

The VectorAssembler() function does most of the work for us here.  All we had to do was give it an array of the column names to vectorize, obviously leaving out the column that we are looking to predict.  This function also supports the .transform() function which can be used to apply it back to our dataset.

Viewing the resulting DataFrame as an RDD shows us the interesting structure of the Vector.  Basically, the SparseVector() contains a value with the total number of columns, as well as a key-value pair for each non-zero column.

Now that we our DataFrame has been appropriately transformed, we need to create our training testing splits.  This is very important so we can accurately evaluate whether our model is good or not.

In [97]:
(adult_train, adult_test) = adult_vec.randomSplit([0.7, 0.3])

{
  'Train': {
    'Rows': adult_train.count()
    ,'Cols': len(adult_train.columns)
  }
  ,'Test': {
    'Rows': adult_test.count()
    ,'Cols': len(adult_test.columns)
  }
}

In this snippet, the .randomSplit() function is doing the hard work.  We do see a new convention in that we can retrieve two outputs from a single function by using parentheses.  Also, since Databricks will not output more than one result per cell, we need to create a dictionary to display all of our results at once.

Now that we're finally ready to start predictive modeling, let's train a Boosted Decision Tree using the default hyperparameters.

In [100]:
from pyspark.ml.classification import GBTClassifier

gbt = \
  GBTClassifier(featuresCol='features', labelCol='income_gt50k') \
  .fit(adult_train)

gbt

The GBTClassifier() function creates an untrained model when we pass in the names of the label and features columns.  We can then apply the .fit() function to train this model against our training data.

Next, let's use this trained model to generate some predictions.

In [103]:
gbt_pred = gbt \
  .transform(adult_test)

gbt_pred.head(5)

We see that the trained model exposes the .transform() function for creating predictions.  This results in a few additional columns.  The rawPrediction column is a vector that show how likely each result is.  Since there were only two options for income_gt50k, there are only two values here.  The probability column is basically the same; however, it's standardized to the 0-1 scale that we are used to thinking about probabilities.  The probability column is simply the result with the highest probability.

Next, let's compare the predictions to the testing set to create some of the standard evaluation metrics.

In [106]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol='income_gt50k')
gbt_auc = evaluator.evaluate(gbt_pred, {evaluator.metricName: "areaUnderROC"})
gbt_aupr = evaluator.evaluate(gbt_pred, {evaluator.metricName: "areaUnderPR"})

{
  'Area Under ROC Curve': gbt_auc
  ,'Area Under Precision-Recall Curve': gbt_aupr
}

Using the BinaryClassificationEvaluator() and .evaluate() functions, we see that our Boosted Decision Tree model has an AUC of .91 and an Area Under Precision-Recall Curve of .77.  These are pretty good results, especially for not doing any substantial feature engineering or model tuning.

We can use very similar code to create Logistic Regression and Naive Bayes models as well.  Let's check them out.

In [109]:
# Logistic Regression

from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol='features', labelCol='income_gt50k') \
  .fit(adult_train)

lr_pred = lr \
  .transform(adult_test)

lr_auc = evaluator.evaluate(lr_pred, {evaluator.metricName: "areaUnderROC"})
lr_aupr = evaluator.evaluate(lr_pred, {evaluator.metricName: "areaUnderPR"})

# Naive Bayes

from pyspark.ml.classification import NaiveBayes

nb = NaiveBayes(featuresCol='features', labelCol='income_gt50k') \
  .fit(adult_train)

nb_pred = nb \
  .transform(adult_test)

nb_auc = evaluator.evaluate(nb_pred, {evaluator.metricName: "areaUnderROC"})
nb_aupr = evaluator.evaluate(nb_pred, {evaluator.metricName: "areaUnderPR"})

# Output Results

{
  'Area Under ROC Curve': {
    'Boosted Decision Tree': gbt_auc
    ,'Logistic Regression': lr_auc
    ,'Naive Bayes': nb_auc
  }
  ,'Area Under Precision-Recall Curve': {
    'Boosted Decision Tree': gbt_aupr
    ,'Logistic Regression': lr_aupr
    ,'Naive Bayes': nb_aupr
  }
}

We see that the Logistic Regression model scored very closely to the Boosted Decision Tree, while the Naive Bayes model was substantially worse.

We hope you learned a lot by working through this Notebook.  We encourage you to continue to build on this knowledge and see what other amazing things you can create.

In [112]:
%sql

SELECT 1

1
1
