# Lab - Basic RDD Operations

This lab introduces you to working with Spark and with RDDs using a Jupyter Notebook and Pyspark as the way to interact with Spark. 

There are many methods that can be used with RDDs. See [this great cheat sheet by the DataCamp team](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_Cheat_Sheet_Python.pdf). A copy is also in this repository.

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark import SparkContext

In [3]:
sc = SparkContext()

In [5]:
sc

Create an RDD called `A` that reads the following text file: `s3://bigdatateaching/shakespeare/100-0.txt`, the complete works of William Shakespeare.

In [6]:
A = sc.textFile("s3://bigdatateaching/shakespeare/100-0.txt")

Type in `A` which shows you a pointer to the file in S3

In [7]:
A

s3://bigdatateaching/shakespeare/100-0.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

Display the first 5 elements of `A` by using the `take` command.

In [11]:
A.take(5)

['',
 'Project Gutenberg’s The Complete Works of William Shakespeare, by',
 'William Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere in the United States and']

Now, store the first 5 elements of `A` in a local Python object called `a`.

In [12]:
a = A.take(5)

What kind of object is `a`? Remember, this is local object within your Python session.

In [15]:
type(a)

list

Display the contents of `a`.

In [14]:
a

['',
 'Project Gutenberg’s The Complete Works of William Shakespeare, by',
 'William Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere in the United States and']

You can index into `a` using standard Python code. What is the second element in `a`?

In [16]:
a[1]

'Project Gutenberg’s The Complete Works of William Shakespeare, by'

Now try indexing into the RDD `A`. It won't work.

In [17]:
A[0]

TypeError: 'RDD' object does not support indexing

How many elements does `A` have?

In [18]:
A.count()

147838

We talked about keeping data in memory to reuse later. To do that, you use the `cache` method on an RDD.

In [20]:
A.cache()

s3://bigdatateaching/shakespeare/100-0.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

The following python function will run on the `A` RDD using the `filter` method.

In [22]:
def hasHamlet( s ):
    return "Hamlet" in s

Create a new RDD called `b` that uses the Python `hasHamlet` function and returns only the RDD lines where Hamlet is in the text.

In [23]:
b = A.filter(hasHamlet)

What is `b`?

In [24]:
type(b)

pyspark.rdd.PipelinedRDD

How many elements does `b` have?

In [26]:
b.count()

106

That took a few seconds, didn't it? Now try counting `A` again and see that it was much quicker than before (because it is cached.)

In [27]:
A.count()

147838

You can also use the `first` method to get the first element only of an RDD. 

In [33]:
A.first()

''

Now try using `first` with a value, like in the first 10 records. 

In [34]:
b.first(10)

TypeError: first() takes 1 positional argument but 2 were given

In [31]:
first(b)

NameError: name 'first' is not defined

How many RDD partitions does `A` RDD have? Use the `getNumPartitions` method to find out.

In [35]:
A.getNumPartitions()

2

You can also sample records from an RDD using the `takeSample` method. Sample 10 records from `b` with replacement.

In [36]:
b.takeSample(num = 10, withReplacement = False)

[' [_Exeunt all but Hamlet._]',
 'What, Gertrude? How does Hamlet?',
 'I hop’d thou shouldst have been my Hamlet’s wife;',
 'Dar’d to the combat; in which our valiant Hamlet,',
 'You need not tell us what Lord Hamlet said,',
 'Did Hamlet so envenom with his envy',
 'No more, sweet Hamlet.',
 'The GHOST of the late king, Hamlet’s father.',
 'Let not thy mother lose her prayers, Hamlet.',
 'But now, my cousin Hamlet, and my son—']

Now we will re-do one of the first assignment problems with the `quazyilx` dataset. First, create an RDD called `q1` from the `s3://bigdatateaching/quazyilx/quazyilx2.txt` file.

In [12]:
q1 = sc.textFile("s3://bigdatateaching/quazyilx/quazyilx2.txt")

See how many partitions the RDD has. This is analogous to the number of blocks the file is on disk.

In [13]:
q1.getNumPartitions()

289

Create and cache an RDD called `badrec` that uses a filter statement to find the bad records. Remember that each records is a whole line of text. 

In [14]:
badrec = q1.filter(lambda bad:"fnard:-1 fnok:-1 cark:-1 gnuck:-1" in bad).cache()

How many bad records were there?

In [15]:
badrec.count()

710

In [16]:
badrec.take(5)

['2000-01-16 09:56:16 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-02-29 11:21:35 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-03-01 04:32:38 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-03-25 07:48:11 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-05-30 00:41:17 fnard:-1 fnok:-1 cark:-1 gnuck:-1']

In [17]:
print(badrec.count())

710


If you want to get all the records for an RDD, then you need to use the `collect` method. Be careful, though, because if you use it with a large dataset, it could overflow your Python session.

In [19]:
bad_rec = badrec.collect()

Take a look at the first 10 elements of bad_rec.

In [24]:
bad_rec[0:9]

['2000-01-16 09:56:16 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-02-29 11:21:35 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-03-01 04:32:38 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-03-25 07:48:11 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-05-30 00:41:17 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-07-15 16:04:36 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-07-22 06:43:14 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-09-19 02:59:19 fnard:-1 fnok:-1 cark:-1 gnuck:-1',
 '2000-09-22 14:39:34 fnard:-1 fnok:-1 cark:-1 gnuck:-1']

In [25]:
type(bad_rec)

list

Now we will work the ForensicsWiki logs dataset and use RDD methods to do the same analysis we did in previous homeworks.

First, create an RDD called `W` pointing to the ForensicsWiki dataset at `s3://bigdatateaching/forensicswiki/2012_logs.txt`.

In [6]:
W = sc.textFile("s3://bigdatateaching/forensicswiki/2012_logs.txt")

The following two cells have Python code that will be run on the RDD.

In [7]:
import re
import datetime
date_re = re.compile("(\d\d/[a-zA-Z]+/\d\d\d\d)")

In [8]:
def extract(line):
    m = date_re.search(line)
    if m:
        d = datetime.datetime.strptime(m.group(1),"%d/%b/%Y")
        return "{:04}-{:02}".format(d.year,d.month)

Create a new RDD called `dates` that runs the `extract` function on every element in the `W` RDD.

In [9]:
dates = W.map( lambda line: [ extract( line ), 1 ])

Look at the `dates` RDD.

In [10]:
dates.take(10)

[['2012-01', 1],
 ['2012-01', 1],
 ['2012-01', 1],
 ['2012-01', 1],
 ['2012-01', 1],
 ['2012-01', 1],
 ['2012-01', 1],
 ['2012-01', 1],
 ['2012-01', 1],
 ['2012-01', 1]]

In [29]:
dates.countByKey()

defaultdict(int,
            {'2012-01': 1544100,
             '2012-02': 1325030,
             '2012-03': 1274061,
             '2012-04': 1016456,
             '2012-05': 1173380,
             '2012-06': 1300250,
             '2012-07': 1287187,
             '2012-08': 1450426,
             '2012-09': 1284945,
             '2012-10': 1498895,
             '2012-11': 1397343,
             '2012-12': 1396198,
             '2013-01': 1283})

In [30]:
dates.cache()

PythonRDD[11] at RDD at PythonRDD.scala:53

In [31]:
count_by_dates = dates.countByKey()

In [32]:
from operator import add
add_by_date = dates.reduceByKey(add)

In [33]:
add_by_date

PythonRDD[19] at RDD at PythonRDD.scala:53

In [34]:
local_add_by_date = add_by_date.collect()

In [35]:
type(local_add_by_date)

list

In [37]:
local_add_by_date


[('2012-01', 1544100),
 ('2012-09', 1284945),
 ('2012-10', 1498895),
 ('2012-02', 1325030),
 ('2012-05', 1173380),
 ('2012-06', 1300250),
 ('2012-07', 1287187),
 ('2013-01', 1283),
 ('2012-04', 1016456),
 ('2012-12', 1396198),
 ('2012-11', 1397343),
 ('2012-03', 1274061),
 ('2012-08', 1450426)]

Before you close the Jupyter Notebook, it is best to close the connection to the Spark cluster. If you don't you may have an "orphan" connection that is eating up resources.

In [56]:
sc.stop()