So far we've used toy examples to introduce the RDD API along with a few of its Transformations and Actions. Now let's look at a more real-life example: let's wrangle a fairly big "semi-structured" file and turn it into something a Data Scientist would be ready to work with. In fact, let's ask a few Data Science-y questions of this data and use Spark itself to answer them while we are at it!

In order to continue with this lesson, first download the required data files and put it in your data folder inside your working directory.

This example file is a standard Apache web server log. It's the logs from a month's worth of requests to NASA's website, in the distant year of 1995, combined into one fairly big file to be more specific.

This log contains the following information:

* The IP Address or the DNS name performing a request
* A time stamp of the form: "dd/Mon/YYYY:hh:mm:ss Timezone"
* The request type (HTTP verb), the resource being requested and the Protocol used
* The code returned by the server (200 OK, 400 Not Found etc...)
* The Size of the resource being requested

We will use the textFile method to read in this file. This, like the parallelize method, turns the data inside this file into an RDD. There are two important things you need to know about this method: 

* In a real-life Spark Cluster, the location of the file (the argument you will pass to textFile) must be visible/accessible to all nodes of the Cluster. In practice, a lot of the time this location will be a path on a Hadoop Distributed File System (HDFS), but this can be any Network File System, or a location mounted on all nodes, or Amazon S3... as long as it's visible and accessible on all nodes! 

* This method turns each line of the input file into an element in a Partition. So ,no matter what the format of the file is, when it gets turned into an RDD, each line (as delimited by a newline a.k.a. "\n") becomes an element.


In [1]:
#let's import pyspark, initialize a Spark connection
import pyspark
sc = pyspark.SparkContext()

#Let's read NASA logs as a textfile into a variable and find out the type of variable we defined here
nasa_logs = sc.textFile('data/NASA_access_log_Jul95.gz')
type(nasa_logs)

pyspark.rdd.RDD

The first step in any data problem is to look at the data to get a sense of what we are dealing with. A good practice is to find out how many elements we have to get a sense of what we are dealing with. The RDD API has the count method for that: 

In [2]:
# use count() method to see how many elements (lines) are in the NASA logs
nasa_logs.count()


1891715

The RDD API has the take Action, that brings a number of elements (remember, an element here is a line of the original file) back to the Driver so we can see them. The important thing here is to be careful not to bring too many elements back to Driver and blow up its memory capacity!

In [3]:
# Use take() action to bring a number of elements from Cluster back to the Driver!
nasa_logs.take(1)

['199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245']

Now that we can see what the data looks like, a reasonable first step seems to be to split the data on the " " (space) character:

In [6]:
# Split the data on space characters
nasa_logs.map(lambda line : line.split(" ")).take(1)

[['199.72.81.55',
  '-',
  '-',
  '[01/Jul/1995:00:00:01',
  '-0400]',
  '"GET',
  '/history/apollo/',
  'HTTP/1.0"',
  '200',
  '6245']]

Next, for the sake of this example, let's say we are not interested in lines where there is data missing. In other words, we are only interested in lines that have all 10 elements. We will use the filter method to filter any lines that don't have all 10 elements out of our RDD:

In [8]:
# Filter the data and count the lines that have 10 exactly elements

nasa_logs.map(lambda line : line.split(" ")).\
filter(lambda line : len(line)==10).count()

1886891


This line of code in PySpark performs a series of operations to count the number of lines in the 
nasa_logs RDD that have exactly 10 words (or elements) separated by spaces.

You might be asking yourself whether using the take method all the time to check if we are doing things right is the best practice... and the answer is no. Everytime you call it, you are computing a new RDD and thus having the Spark Cluster do work for you. In real-life you will rarely have a Cluster all for yourself, so you should expect your computations to get queued and competing for resources with other users. in this scenario, minimizing the amount of times you move things back and forth between the Driver and the Executors is a good idea.

So in practice, one approach would be to use the RDD API method sample to extract a sample of your data to examine in the driver and figure out what you need to do before farming out computations to the cluster. The take method also works here, but getting a random sample (using sample() method) instead of the first N elements of your RDD is almost always a better plan.


In [9]:
# Make sure you know how much data 0.01% of your dataset is! 
#It might look like a small fraction, but in the Big Data world 
#even that might be too much for your local computer!

local_sample = \
nasa_logs.sample(withReplacement=False,fraction=0.0001).\
collect()

print(local_sample)

['pm1-09.magicnet.net - - [01/Jul/1995:14:02:01 -0400] "GET /images/ksclogosmall.gif HTTP/1.0" 200 3635', 'ad03-027.compuserve.com - - [01/Jul/1995:16:20:57 -0400] "GET /shuttle/missions/sts-78/mission-sts-78.html HTTP/1.0" 200 4378', 'line10.pm1.abb.mindlink.net - - [01/Jul/1995:17:07:53 -0400] "GET /images/launchmedium.gif HTTP/1.0" 200 11853', 'slsyd2p64.ozemail.com.au - - [01/Jul/1995:19:34:46 -0400] "GET /history/history.html HTTP/1.0" 304 0', 'dd08-018.compuserve.com - - [01/Jul/1995:20:49:11 -0400] "GET /images/USA-logosmall.gif HTTP/1.0" 200 234', 'cslip6.irb.hr - - [01/Jul/1995:22:00:32 -0400] "GET /shuttle/missions/sts-71/images/KSC-95EC-0874.txt HTTP/1.0" 200 541', 'dialup1.starg.com - - [02/Jul/1995:01:03:06 -0400] "GET /facilities/lc39a.html HTTP/1.0" 200 7008', 'dyn-267.direct.ca - - [02/Jul/1995:01:56:00 -0400] "GET /history/apollo/apollo.html HTTP/1.0" 200 3258', 'bobjoy.pr.mcs.net - - [02/Jul/1995:16:56:37 -0400] "GET /images/KSC-logosmall.gif HTTP/1.0" 200 1204', 'ste



Web server logs like this are called 'semi-structured' for a reason: we can be pretty sure that every line will be formatted the same way. This means every element in each of our Partitions looks pretty much the same after our first step. We can be confident that the same unwanted characters ended up inside the elements of all partitions of our RDD. So our next step takes care of removing them:

In [10]:
# Data cleaning: remove the unwanted characters
# The dictionary maps three unwanted characters ([, ], and ") to empty strings ('')

replacement_dict = {"[":'',"]":'',"\"":''}

nasa_logs_structured = nasa_logs.map(lambda line : line.split(" ")).\
filter(lambda line : len(line)==10).\
map(lambda line : [element.translate(str.maketrans(replacement_dict)) \
               	for element in line])

In summary, this code cleaned the data by removing square brackets and double quotes from each element in lines that have exactly 10 elements.

Ok, so now our RDD has the following elements: 

IP/NAME_OF_ORIGIN 
DATE/TIME, TIMEZONE
 REQUEST_METHOD
 RESOURCE_REQUESTED
 PROTOCOL
 STATUS_CODE, SIZE_OF_RESOURCE

That looks pretty much like a CSV (or a Dataframe) a Data Scientist could work with! We aim to take advantage of our now-structured dataset and see if we can do a bit of Data Science using the RDD API directly. Let's find out where most requests to the NASA web server came from on our dataset. To do this, let's do a little bit of Map-Reduce.


In [11]:
# Take each line of our structured log and return a Key-Value Pair

nasa_logs_structured.map(lambda line : (line[0],1) ).take(5)


[('199.72.81.55', 1),
 ('unicomp6.unicomp.net', 1),
 ('199.120.110.21', 1),
 ('burger.letters.com', 1),
 ('199.120.110.21', 1)]

In [12]:
# identify and return the five most frequent encounters 
# like the count program, we create a tuple containing the encounters and a count of 1 
# representing its initial occurrence, then compute the total count.

nasa_logs_structured.map(lambda line : (line[0],1) ).\
reduceByKey(lambda a,b : a+b).\
map(lambda kv_pair : (kv_pair[1],kv_pair[0])).\
sortByKey(ascending=False).take(5)

# The second map() transforms the DataFrame to have the count as the first element 
# for sorting purposes and the word as the second element.

[(17572, 'piweba3y.prodigy.com'),
 (11591, 'piweba4y.prodigy.com'),
 (9868, 'piweba1y.prodigy.com'),
 (7852, 'alyssa.prodigy.com'),
 (7573, 'siltb10.orl.mmc.com')]


Exercise 3.1 - Word count in NASA log

If we take the element containing NASA's website resource names and we replace the "/"s and "."s by " "s, we sort of get words. Write a word count program to find the top 5 most frequent words.


In [13]:
# The first step essentially should splits the resource name based on
# / and . characters and treats them as word separators.
# The result should be a new DataFrame containing these "cleaned-up" website resource names.

words = nasa_logs_structured.\
map(lambda line : line[6].\
    replace('/',' ').\
    replace('.',' '))

In [14]:
# This step effectively should create a new DataFrame
# where each row is a single word extracted from the website resource names.
# Use flatMap() method to flattening the list of words into a single level.
# Then use a map() transformation to create your count tuple
# Finally use a reduceByKey action to calculate the total count for each word

words.\
flatMap(lambda line: line.split(" ")).\
map(lambda word : (word, 1)).\
reduceByKey(lambda a,b : a+b).\
take(5)

[('', 2024051),
 ('history', 296076),
 ('apollo', 250775),
 ('shuttle', 655862),
 ('countdown', 184637)]


Reading a CSV file with Core Spark API (RDD API):

The RDD API is very powerful, but on its own it has some serious limitations. Ironically, one of its biggest limitations is its usefulness on structured data... like CSV files.

We had caught a glimpse of that on the NASA website example, but now let's look at a real-life CSV to illustrate this and introduce the Pandas on Spark API - a powerful API for which the RDD API can work as a beautiful complement.

The file below contains data about all pieces owned/maintained by the Metropolitan Museum of Art in New York City. As we've seen before, the RDD API only allows us to load it as a plain text file:


In [15]:
sample_data = sc.textFile('data/surveys.csv')

In [16]:
sample_data.count()

35550

In [17]:
sample_data.take(1)

['record_id,month,day,year,plot_id,species_id,sex,hindfoot_length,weight']

In [19]:
sample_data_split = sample_data.map(lambda line : line.split(","))

In [21]:
sample_data_split.take(2)

[['record_id',
  'month',
  'day',
  'year',
  'plot_id',
  'species_id',
  'sex',
  'hindfoot_length',
  'weight'],
 ['1', '7', '16', '1977', '2', 'NL', 'M', '32', '']]

Spark Pandas API:

The Spark Core RDD API is a powerful tool for operating on very large Data. However, the RDD API and its Functional Programming flavor are not for everyone. Most people dealing with heavy-duty data analytics problems are used to far more structured data types. Whether they're R users or Python users, data people love data that is in a tabular format - a Table in database or a DataFrame in R or Pandas. In most data analysis situations, it is important to be able to mimic some functionality and design choices from the Pandas package as one of the most powerful python packages to analyze data. To cater to this particular user base, Spark maintainers have introduced a new API in Spark v3: Pandas on Spark. As the name suggests, the idea behind this API is to reproduce the user experience from the Pandas package with as many of its methods and operators as possible, but on a very large scale distributed DataFrames. Note that as this API is actively being developed, you might encounter some errors with some functions. Usually, downgrading PySpark or Pandas version can fix those issues.

To get started, first let's import the module pyspark.pandas:

In [22]:
#import PySpark Pandas API
#ignore the warning!

import pyspark.pandas as ps




A handy way of using Pandas on Spark is by converting an actual Pandas DataFrame into a Pandas on Spark DataFrame. In this scenario, you would have a regular Pandas DataFrame, created without any calls to Spark that you wish to perform work on in a parallelized or even distributed fashion.


In [2]:
#let's create a Pandas dataframe from a CSV file and then convert it to Pandas on Spark DataFrame
import pandas as pd
survey_df_local = pd.read_csv("data/surveys.csv")
type(survey_df_local)


pandas.core.frame.DataFrame

In [38]:
#Let's look at the data:
survey_df_local.head()

Unnamed: 0,record_id,month,day,year,plot_id,species_id,sex,hindfoot_length,weight
0,1,7,16,1977,2,NL,M,32.0,
1,2,7,16,1977,3,NL,M,33.0,
2,3,7,16,1977,2,DM,F,37.0,
3,4,7,16,1977,7,DM,M,36.0,
4,5,7,16,1977,3,DM,M,35.0,


In [3]:
# Now, let's create a Spark DataFrame from a Pandas DataFrame!

survey_df_distributed = ps.from_pandas(survey_df_local)
type(survey_df_distributed)

pyspark.pandas.frame.DataFrame

Now we have a parallelized or distributed DataFrame that looks and behaves just like a regular Pandas DataFrame.

In [4]:
# Let's look at this DataFrame using some familiar methods: first head()
survey_df_distributed.head()


Unnamed: 0,record_id,month,day,year,plot_id,species_id,sex,hindfoot_length,weight
0,1,7,16,1977,2,NL,M,32.0,
1,2,7,16,1977,3,NL,M,33.0,
2,3,7,16,1977,2,DM,F,37.0,
3,4,7,16,1977,7,DM,M,36.0,
4,5,7,16,1977,3,DM,M,35.0,


Next, we will go through a few examples of how to use Pandas on Spark DataFrame. Accessing columns and rows, as well as slicing a DataFrame works just like in Pandas.

In [5]:

survey_df_pandas = survey_df_distributed # make a copy of the Spark DataFrame

# Access columns by name with two different syntaxes:
survey_df_pandas['weight'].head()
#survey_df_pandas.weight.head()




0   NaN
1   NaN
2   NaN
3   NaN
4   NaN
Name: weight, dtype: float64

In [40]:
# Use the .iloc() method to access a row by index
survey_df_pandas.iloc[10]

record_id            11
month                 7
day                  16
year               1977
plot_id               5
species_id           DS
sex                   F
hindfoot_length    53.0
weight              NaN
Name: 10, dtype: object

In [41]:
# Use conditionals to find subsets of a DataFrame that match a condition
survey_df_pandas[survey_df_pandas.weight > 40].head()

Unnamed: 0,record_id,month,day,year,plot_id,species_id,sex,hindfoot_length,weight
63,64,8,19,1977,7,DM,M,37.0,48.0
65,66,8,19,1977,4,DM,F,35.0,46.0
67,68,8,19,1977,8,DO,F,32.0,52.0
78,79,8,19,1977,7,DM,F,34.0,42.0
81,82,8,19,1977,4,DM,F,35.0,41.0


In [42]:
# summary statistics
survey_df_pandas.describe()


Unnamed: 0,record_id,month,day,year,plot_id,hindfoot_length,weight
count,35549.0,35549.0,35549.0,35549.0,35549.0,31438.0,32283.0
mean,17775.0,6.474022,16.105966,1990.475231,11.397001,29.287932,42.672428
std,10262.256696,3.396583,8.256691,7.493355,6.799406,9.564759,36.631259
min,1.0,1.0,1.0,1977.0,1.0,2.0,4.0
25%,8888.0,4.0,9.0,1984.0,5.0,21.0,20.0
50%,17772.0,6.0,16.0,1990.0,11.0,32.0,37.0
75%,26661.0,9.0,23.0,1997.0,17.0,36.0,48.0
max,35549.0,12.0,31.0,2002.0,24.0,70.0,280.0


In [44]:
# location measures
survey_df_pandas.weight.mean()
#survey_df_pandas.weight.median()
#survey_df_pandas.weight.quantile()



42.672428212991356

In [45]:
# dispersion measures:
#survey_df_pandas.weight.std()
survey_df_pandas.weight.var()

1341.8491706942696

Last, but not least, Pandas on Spark introduces the plot class along with its subclasses that allow you to easily create different types of plots. Before Pandas on Spark, it would have been necessary to bring data over from the cluster to the Driver in order to visualize it. Of course, this would have been impractical with very large datasets that do not fit in the Driver's memory. With this new API, plot objects are generated directly on the cluster and only then returned to the Driver for you to see!


Notes: 

* Parallelizing a DataFrame does not necessarily mean any arbitrary operation will run faster. In general, you can expect Pandas on Spark to outperform Pandas as the size of a DataFrame grows, even if you are running PySpark on a single node. That being said, you should always reason about scalability before choosing to parallelize work over multiple cores, or multiple nodes. See this article for more about scalability: https://docs.alliancecan.ca/wiki/Scalability

* Pandas on Spark is not a 100% perfect clone of Pandas - some Pandas functionalities have not yet been implemented, some probably never will be, and Pandas on Spark have a few features that do not exist on Pandas. See the complete API reference for more details: https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html

Exercise 4.1 

Use pandas on Spark API:

* Create a parallel query that finds all rows with a weight value greater than 50 and hindfoot_length larger than 52, and then calculate the summary statistics of these rows.

* Hint: You can use where() method to introduce two different conditions in your search and dropna() method to remove rows with missing values in weight or hindfoot_length


In [21]:
#Exercise 4.1 Solution

filtered_df = (
    survey_df_pandas
    .where((survey_df_pandas.weight > 50) & (survey_df_pandas.hindfoot_length > 52))
    .dropna(subset=["weight", "hindfoot_length"])  # Remove rows with missing values in weight or hindfoot_length
  
)
filtered_df.describe()

Unnamed: 0,record_id,month,day,year,plot_id,hindfoot_length,weight
count,189.0,189.0,189.0,189.0,189.0,189.0,189.0
mean,8102.560847,6.391534,13.232804,1983.380952,7.328042,53.534392,137.740741
std,5436.685915,3.163141,7.784244,3.902108,5.801532,0.866107,17.657543
min,392.0,1.0,1.0,1977.0,1.0,53.0,51.0
25%,3914.0,4.0,7.0,1981.0,2.0,53.0,129.0
50%,6844.0,6.0,12.0,1982.0,8.0,53.0,140.0
75%,12973.0,9.0,20.0,1987.0,9.0,54.0,150.0
max,28927.0,12.0,31.0,1998.0,22.0,58.0,172.0
