# November 5, 2019 lecture notes

This week we'll take a first look at [Apache Spark](http://spark.apache.org/), a powerful system for scaling up data processing.

Spark is installed on your instances, but requires a few steps to use.  Every time you use Spark, you'll have to run the following steps:

In [1]:
import findspark

In [2]:
findspark.init()

The next two steps are required for the simple Python style of using Spark.  We'll see one other approach later on.

In [3]:
from pyspark import SparkContext

In [4]:
spark = SparkContext(appName='20191105')

The `spark` object is all you need to start doing some pretty cool things.  Let's have a quick look at it.

In [5]:
spark

If you have port 4040 open in your EC2 security groups, you can follow the link above to the Spark instance UI on your machine.  There's one caveat though - it maps automatically to an IP address that won't work unless you take extra steps.  The solution is simple:  replace the IP address in the URL with your machine name, the one that starts with "ec2-" that you are probably looking at Jupyter through right now.

**Note**, though, the Spark UI defaults to port 4040, not 8080 like Jupyter, so **be sure you use `:4040`** in your URL.

Have a look now!

## Bikeshare data

Because Capital Bikeshare data is so familiar, let's have a look at it using Spark.  Maybe you've already seen what it looks like at the command line, using CSVKit and XSV, and with SQL.  Now we'll try a functional style of approach.

These first few steps should be familiar.

In [6]:
!wget https://s3.amazonaws.com/capitalbikeshare-data/2017-capitalbikeshare-tripdata.zip

--2019-11-12 04:30:28--  https://s3.amazonaws.com/capitalbikeshare-data/2017-capitalbikeshare-tripdata.zip
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.16.83
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.16.83|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 89576218 (85M) [application/zip]
Saving to: ‘2017-capitalbikeshare-tripdata.zip’


2019-11-12 04:30:30 (78.2 MB/s) - ‘2017-capitalbikeshare-tripdata.zip’ saved [89576218/89576218]



In [7]:
!ls -lh 2017-capitalbikeshare-tripdata.zip

-rw-rw-r-- 1 ubuntu ubuntu 86M Mar 15  2018 2017-capitalbikeshare-tripdata.zip


In [8]:
!unzip 2017-capitalbikeshare-tripdata.zip

Archive:  2017-capitalbikeshare-tripdata.zip
  inflating: 2017Q1-capitalbikeshare-tripdata.csv  
  inflating: 2017Q2-capitalbikeshare-tripdata.csv  
  inflating: 2017Q3-capitalbikeshare-tripdata.csv  
  inflating: 2017Q4-capitalbikeshare-tripdata.csv  


In [9]:
!wc -l 20*.csv

   646511 2017Q1-capitalbikeshare-tripdata.csv
  1104419 2017Q2-capitalbikeshare-tripdata.csv
  1191586 2017Q3-capitalbikeshare-tripdata.csv
   815265 2017Q4-capitalbikeshare-tripdata.csv
  3757781 total


In [10]:
!csvcut -n 2017Q1-capitalbikeshare-tripdata.csv

  1: Duration
  2: Start date
  3: End date
  4: Start station number
  5: Start station
  6: End station number
  7: End station
  8: Bike number
  9: Member type


In [11]:
!csvcut -n 2017Q3-capitalbikeshare-tripdata.csv

  1: Duration
  2: Start date
  3: End date
  4: Start station number
  5: Start station
  6: End station number
  7: End station
  8: Bike number
  9: Member type


So far so good.

### CSVkit and XSV

To keep these command line pipes simple, let's combine and rename the data into one file:

In [None]:
!cp 2017Q1-capitalbikeshare-tripdata.csv 2017.csv

In [None]:
!head 2017.csv | csvlook

In [None]:
!ls -lh *.csv

In [None]:
!wc -l *.csv

Alright, then, we have 429M of raw data comprising 3,757,777 bikeshare trips.  How long does it take to sort?

First, we take a look at a sample of the data to determine its attributes' domains and ranges.

In [None]:
!head -n 1000 2017.csv | csvstat

In [None]:
import os

In [None]:
%time os.system("head -25000 2017.csv | csvsort -c1 | head | csvlook")

Is it faster if we sort only one column?

In [None]:
%time os.system("head -25000 2017.csv | csvcut -c1 | csvsort -c1 | head | csvlook")

What about if we sort using unix `sort` instead of `csvsort`?

In [None]:
%time os.system("head -25000 2017.csv | csvcut -c1 | sort | head")

## Why is `sort` faster than `csvsort`?

## Introducing `xsv`

[`xsv`](https://github.com/BurntSushi/xsv) is another CSV toolkit like CSVKit, but where CSVKit was designed to be easy and consistent, `xsv` was designed to be *fast*.  And it's really fast.

It has a lot of functions similar to CSVKit.

In [None]:
!xsv --help

In [None]:
!head 2017.csv | xsv table

In [None]:
!xsv headers 2017.csv

In [None]:
!xsv search -s5 "Eastern Market / 7th" 2017.csv | xsv select 1,5,7 | head | xsv table

In [None]:
!xsv select 4 2017.csv | head | xsv table

In [None]:
!xsv select 4 2017.csv | xsv frequency | xsv table

In [None]:
!xsv select 1 2017.csv | xsv stats | xsv table

In [None]:
!xsv sort -s4 2017.csv | head | xsv table

That seemed pretty fast... how fast was it?  Let's try to reproduce our test from before.

In [None]:
%time os.system("xsv select 4 2017.csv | xsv sort | uniq -c | head")

Wow!  That's really fast.  Even faster than unix `sort`!  Let's use `%timeit` to try repeated runs and get a better sample for comparison.

In [None]:
%timeit os.system("xsv select 4 2017.csv | xsv sort | uniq -c | head")

In [None]:
%timeit os.system("xsv select 4 2017.csv | sort | uniq -c | head")

Just for fun, let's run that again with `csvcut` instead of `xsv select`.

In [None]:
%timeit os.system("csvcut -c4 2017.csv | sort | uniq -c | head")

Pretty impressive, right?  It was designed to be *fast*.  And it is.

But wait, there's more.  `xsv` supports *indexing*, 

In [None]:
!xsv index 2017.csv

In [None]:
!ls -lh 2017*

In [None]:
!xsv select 5 2017.csv | xsv frequency | xsv table

The moral of the story:  different tools with similar goals but different designs can each both achieve their goals effectively.  Knowing which to choose for a particular task requires an understanding of the design tradeoffs.

## Looping vs. Vectorization

Vectorization (or array programming) is a critical piece of the data science puzzle.  Fast implementations of array operations take advantage of low-level hardware to make operations on matrixes very fast, which is critical for machine learning and other statistical operations on large datasets.

To get a taste of the difference vectorized operations work, let's look at a simple function:  finding the largest value in a matrix.  This should have complexity O(n), or linear, as the number of compute operations increases linearly with the size of the input set.

We can use the [pyheatmagic](https://github.com/csurfer/pyheatmagic) Jupyter extension to look at the performance of two versions of a function that find the largest value within a matrix of random numbers.  The larger the matrix size, the larger the result should be - or in this case, the closer to 1.

In [None]:
%load_ext heat

In [None]:
%%heat
import random
def arraymax1(m, n):
    ints = [random.random() for n in range(m * n)]
    maxval = 0
    for i in range(m):
        for j in range(n):
            index = (i * n) + j
            if ints[index] > maxval:
                maxval = ints[index]
    return maxval

print(arraymax1(300, 100))

In [None]:
%%heat
import numpy as np
def arraymax2(m, n):
    a = np.random.random(m * n).reshape(m, n)
    return np.max(a)

print(arraymax2(300, 100))

In [None]:
import random
def arraymax1(m, n):
    ints = [random.random() for n in range(m * n)]
    maxval = 0
    for i in range(m):
        for j in range(n):
            index = (i * n) + j
            if ints[index] > maxval:
                maxval = ints[index]
    return maxval

In [None]:
import numpy as np
def arraymax2(m, n):
    a = np.random.random(m * n).reshape(m, n)
    return np.max(a)

In [None]:
%timeit arraymax1(1000, 1000)

In [None]:
%timeit arraymax2(1000, 1000)

# Processing data with Spark

To get started, we identify the data we want to work with, in this case one of the unstacked/raw CSV files.

In [None]:
rides1 = spark.textFile('2017Q1-capitalbikeshare-tripdata.csv')

In [None]:
%time rides1.count()

Looks right so far.  Two more details:  first, we could load the second file the same way:

In [None]:
rides2 = spark.textFile('2017Q2-capitalbikeshare-tripdata.csv')

In [None]:
%time rides2.count()

But that seems so tedious.  And besides, Spark makes this easier:  you can use wildcards to load more than one file at a time into a single RDD.

In [12]:
rides = spark.textFile('2017Q*.csv')

In [13]:
%time rides.count()

CPU times: user 4 ms, sys: 8 ms, total: 12 ms
Wall time: 11.5 s


3757781

This is a good moment to pause and make sure you understand what you've done so far.  We've loaded one or more than one text file into a Spark RDD and used parallel processing to count the number of lines in the file.  If it doesn't seem like you've done that much, take a look at the Spark UI now.

Really, take a look!

Ready to continue?

Okay, let's keep going with a look at the functional style of RDD processing.  This is pretty natural to someone who's used functional languages or is used to doing a lot of list processing in Python.

`first()` will extract the first line of the file, which, in this case, contains our headers.

In [14]:
header = rides.first()
header

'"Duration","Start date","End date","Start station number","Start station","End station number","End station","Bike number","Member type"'

We're going to jump ahead and do a bunch of things at once, and then break them all down so you can see the steps one at a time.

First we load the `add` function; this is a useful shorthand we'll use in a second.

In [15]:
from operator import add

It works just like you'd expect:

In [None]:
add(1, 2)

In [None]:
add(add(add(add(add(add(1, 1), 1), 2), 3), 5), 8)

Make sense?  Good. 

Okay, now the leap:

In [16]:
top10 = rides.filter(lambda row: row != header) \
    .map(lambda row: row.replace('"', '')) \
    .map(lambda row: row.split(",")) \
    .map(lambda cols: (cols[4], 1)) \
    .reduceByKey(add) \
    .takeOrdered(10, key=lambda pair: -pair[1])
for station, count in top10:
    print("{}\t{}".format(count, station))

70062	Columbus Circle / Union Station
65884	Lincoln Memorial
59259	Jefferson Dr & 14th St SW
46702	Massachusetts Ave & Dupont Circle NW
43305	15th & P St NW
42525	Jefferson Memorial
42406	Smithsonian-National Mall / Jefferson Dr & 12th St SW
40659	Henry Bacon Dr & Lincoln Memorial Circle NW
37751	4th St & Madison Dr NW
33159	14th & V St NW


Look familiar?  Good.  Let's break it down.

First, we use `filter()`, which does just what you expect it to do.  It lets some things through, but not others.  In this case we use `lambda`, or an "anonymous function", to remove the header from the stream.

`take()` lets us extract values from the RDD - remember that the RDD is just a logical construct until we materialize some sort of result.

In [17]:
rides.take(5)

['"Duration","Start date","End date","Start station number","Start station","End station number","End station","Bike number","Member type"',
 '"2762","2017-07-01 00:01:09","2017-07-01 00:47:11","31289","Henry Bacon Dr & Lincoln Memorial Circle NW","31289","Henry Bacon Dr & Lincoln Memorial Circle NW","W21474","Casual"',
 '"2763","2017-07-01 00:01:24","2017-07-01 00:47:27","31289","Henry Bacon Dr & Lincoln Memorial Circle NW","31289","Henry Bacon Dr & Lincoln Memorial Circle NW","W22042","Casual"',
 '"690","2017-07-01 00:01:45","2017-07-01 00:13:16","31122","16th & Irving St NW","31299","Connecticut Ave & R St NW","W01182","Member"',
 '"134","2017-07-01 00:01:46","2017-07-01 00:04:00","31201","15th & P St NW","31267","17th St & Massachusetts Ave NW","W22829","Member"']

Now the same thing with the header filtered out:

In [16]:
rides.filter(lambda row: row!= header) \
    .take(5)

['"2762","2017-07-01 00:01:09","2017-07-01 00:47:11","31289","Henry Bacon Dr & Lincoln Memorial Circle NW","31289","Henry Bacon Dr & Lincoln Memorial Circle NW","W21474","Casual"',
 '"2763","2017-07-01 00:01:24","2017-07-01 00:47:27","31289","Henry Bacon Dr & Lincoln Memorial Circle NW","31289","Henry Bacon Dr & Lincoln Memorial Circle NW","W22042","Casual"',
 '"690","2017-07-01 00:01:45","2017-07-01 00:13:16","31122","16th & Irving St NW","31299","Connecticut Ave & R St NW","W01182","Member"',
 '"134","2017-07-01 00:01:46","2017-07-01 00:04:00","31201","15th & P St NW","31267","17th St & Massachusetts Ave NW","W22829","Member"',
 '"587","2017-07-01 00:02:05","2017-07-01 00:11:52","31099","Madison & N Henry St","31907","Franklin & S Washington St","W22223","Casual"']

See the difference?  Great.  We're only filtering out the header line.

But now all we have is a list of strings, which isn't very useful if we want to operate on the data.  Next, then, we'll remove `"` marks and split the CSV data up by commas using `map`, which applies a function to every row in the RDD.  We'll define a lambda function right inline again, as it's easy, although you could just as easily use a regular named Python function for this.

In [17]:
rides.filter(lambda row: row != header) \
    .map(lambda row: row.replace('"', '')) \
    .map(lambda row: row.split(",")) \
    .take(5)

[['2762',
  '2017-07-01 00:01:09',
  '2017-07-01 00:47:11',
  '31289',
  'Henry Bacon Dr & Lincoln Memorial Circle NW',
  '31289',
  'Henry Bacon Dr & Lincoln Memorial Circle NW',
  'W21474',
  'Casual'],
 ['2763',
  '2017-07-01 00:01:24',
  '2017-07-01 00:47:27',
  '31289',
  'Henry Bacon Dr & Lincoln Memorial Circle NW',
  '31289',
  'Henry Bacon Dr & Lincoln Memorial Circle NW',
  'W22042',
  'Casual'],
 ['690',
  '2017-07-01 00:01:45',
  '2017-07-01 00:13:16',
  '31122',
  '16th & Irving St NW',
  '31299',
  'Connecticut Ave & R St NW',
  'W01182',
  'Member'],
 ['134',
  '2017-07-01 00:01:46',
  '2017-07-01 00:04:00',
  '31201',
  '15th & P St NW',
  '31267',
  '17th St & Massachusetts Ave NW',
  'W22829',
  'Member'],
 ['587',
  '2017-07-01 00:02:05',
  '2017-07-01 00:11:52',
  '31099',
  'Madison & N Henry St',
  '31907',
  'Franklin & S Washington St',
  'W22223',
  'Casual']]

Here's how we'd define and use a named Python function to accomplish the CSV splitting steps.

In [18]:
def split_line(line, sep=','):
    return line.replace('"', '').split(sep)

In [19]:
rides.filter(lambda row: row != header) \
    .map(split_line) \
    .take(5)

[['2762',
  '2017-07-01 00:01:09',
  '2017-07-01 00:47:11',
  '31289',
  'Henry Bacon Dr & Lincoln Memorial Circle NW',
  '31289',
  'Henry Bacon Dr & Lincoln Memorial Circle NW',
  'W21474',
  'Casual'],
 ['2763',
  '2017-07-01 00:01:24',
  '2017-07-01 00:47:27',
  '31289',
  'Henry Bacon Dr & Lincoln Memorial Circle NW',
  '31289',
  'Henry Bacon Dr & Lincoln Memorial Circle NW',
  'W22042',
  'Casual'],
 ['690',
  '2017-07-01 00:01:45',
  '2017-07-01 00:13:16',
  '31122',
  '16th & Irving St NW',
  '31299',
  'Connecticut Ave & R St NW',
  'W01182',
  'Member'],
 ['134',
  '2017-07-01 00:01:46',
  '2017-07-01 00:04:00',
  '31201',
  '15th & P St NW',
  '31267',
  '17th St & Massachusetts Ave NW',
  'W22829',
  'Member'],
 ['587',
  '2017-07-01 00:02:05',
  '2017-07-01 00:11:52',
  '31099',
  'Madison & N Henry St',
  '31907',
  'Franklin & S Washington St',
  'W22223',
  'Casual']]

See how the two functions are equivalent?  The `map()` function will pass the data as the first parameter to the `split_lines()` function we defined.

I think `lambda` reads a little cleaner for simpler operations, so we'll continue with them for now.

Okay!  Now we're cooking.  Next, let's pull out the departure stations.  Which column was it again?

In [20]:
header

'"Duration","Start date","End date","Start station number","Start station","End station number","End station","Bike number","Member type"'

With Python's zero-based indexing, the "Start station" column is number 4.

In [21]:
rides.filter(lambda row: row != header) \
    .map(lambda row: row.replace('"', '')) \
    .map(lambda row: row.split(",")) \
    .map(lambda cols: cols[4]) \
    .take(5)

['Henry Bacon Dr & Lincoln Memorial Circle NW',
 'Henry Bacon Dr & Lincoln Memorial Circle NW',
 '16th & Irving St NW',
 '15th & P St NW',
 'Madison & N Henry St']

That works - but we want to count them, so we'll need a numeric value to count.  Thus the tuple with "`, 1`".

In [22]:
rides.filter(lambda row: row != header) \
    .map(lambda row: row.replace('"', '')) \
    .map(lambda row: row.split(",")) \
    .map(lambda cols: (cols[4], 1)) \
    .take(5)

[('Henry Bacon Dr & Lincoln Memorial Circle NW', 1),
 ('Henry Bacon Dr & Lincoln Memorial Circle NW', 1),
 ('16th & Irving St NW', 1),
 ('15th & P St NW', 1),
 ('Madison & N Henry St', 1)]

This structure sets us up neatly for another Spark function, `reduceByKey()`.  This is an addition from Spark, whereas `map()`, `filter()`, and `lambda` are all standard Python.

In [23]:
rides.filter(lambda row: row != header) \
    .map(lambda row: row.replace('"', '')) \
    .map(lambda row: row.split(",")) \
    .map(lambda cols: (cols[4], 1)) \
    .reduceByKey(add) \
    .take(5)

[('Anacostia Library', 709),
 ('15th & Crystal Dr', 5516),
 ('Key West Ave & Diamondback Dr', 23),
 ('King St Metro South', 4714),
 ('River Rd & Landy Ln', 3427)]

Did you notice that that took a little longer? 

Can you guess why?

Okay, so, this looks pretty good.  But there's one issue with the data, do you see it?

In [24]:
top10 = rides.filter(lambda row: row != header) \
    .map(lambda row: row.replace('"', '')) \
    .map(lambda row: row.split(",")) \
    .map(lambda cols: (cols[4], 1)) \
    .reduceByKey(add) \
    .takeOrdered(10, key=lambda pair: -pair[1])
top10

[('Columbus Circle / Union Station', 70062),
 ('Lincoln Memorial', 65884),
 ('Jefferson Dr & 14th St SW', 59259),
 ('Massachusetts Ave & Dupont Circle NW', 46702),
 ('15th & P St NW', 43305),
 ('Jefferson Memorial', 42525),
 ('Smithsonian-National Mall / Jefferson Dr & 12th St SW', 42406),
 ('Henry Bacon Dr & Lincoln Memorial Circle NW', 40659),
 ('4th St & Madison Dr NW', 37751),
 ('14th & V St NW', 33159)]

Note that `top10` is now a Python list, not an RDD.  When we `take` or `takeOrdered` (or `collect` or others) we act on the RDD using the logic we've built up and end up with regular Python data structures.  Until then, we still just have an RDD with more operations.

In [25]:
rdd_top10 = rides.filter(lambda row: row != header) \
    .map(lambda row: row.replace('"', '')) \
    .map(lambda row: row.split(",")) \
    .map(lambda cols: (cols[4], 1)) \
    .reduceByKey(add)
type(rdd_top10)

pyspark.rdd.PipelinedRDD

In [None]:
rdd_top10.takeOrdered(5, key=lambda r: -r[1])

In [None]:
rdd_top10.takeOrdered(10, key=lambda r: -r[1])

## Computing basic statistics

Let's go a step further.  `csvstat` and `xsv stats` / `xsv frequency` are so useful, you'd expect there to be something similar for Spark, right?  Of course there is.

First let's create an RDD of just the parsed data.

In [18]:
ride_data = rides.filter(lambda row: row != header) \
    .map(lambda row: row.replace('"', '')) \
    .map(lambda row: row.split(","))

In [19]:
ride_data

PythonRDD[10] at RDD at PythonRDD.scala:49

Now let's extract the ride durations in minutes.  Remember how we did this in SQL?

In [22]:
ride_minutes = ride_data.map(lambda cols: int(cols[0]))

In [23]:
ride_minutes.take(5)

[2762, 2763, 690, 134, 587]

In [None]:
ride_minutes.max()

In [None]:
ride_minutes.min()

Spark's [MLlib](https://spark.apache.org/mllib/) is the foundation for a lot of machine learning functionality.  A simple module inside it computes basic statistics.  It has a very creative name.

In [None]:
from pyspark.mllib.stat import Statistics

To use this module, we need to convert our data values into numpy arrays, which is just an easy `map()` call.

In [None]:
import numpy as np

In [None]:
ride_minutes_stats = Statistics.colStats(ride_minutes.map(lambda r: np.array(r)))

In [None]:
ride_minutes_stats.mean()

In [None]:
ride_minutes_stats.min()

In [None]:
ride_minutes_stats.max()

In [None]:
%timeit ride_minutes_stats.count()

In [None]:
ride_minutes_stats.variance()

## The DataFrame API

Let's start with the DataFrame API.  It's for array-oriented operations, just like you might already be used to with R or Python's Pandas module.

Note that you can find some introductory docs for this and the SQL API on the [Apache Spark docs page](https://spark.apache.org/docs/latest/sql-programming-guide.html).

The first step is that we load data a little differently.  We'll step away from the bikes this week and look at something else:  social media data from Twitter.

Note that the data in these examples, and more data you can obtain for yourself, came from the GWU Libraries' [Social Feed Manager](https://sfm.library.gwu.edu/) app.  You can log in and use it yourself, though note that access is restricted to campus or VPN connections.

First we obtain a `SQLContext` from our existing `SparkContext`.

In [None]:
from pyspark import SQLContext

In [None]:
sqlc = SQLContext(spark)

In [None]:
sqlc

In [None]:
!wget https://s3.amazonaws.com/2018-dmfa/week-9/solar-eclipse-tweets.csv

In [None]:
!mv solar-eclipse-tweets.csv tweets.csv

In [None]:
!wc -l tweets.csv

In [None]:
!head tweets.csv | csvcut -n

In [None]:
!head -5 tweets.csv | csvlook

The `read.csv()` function on `SQLContext` is very handy.  Take a close look at the parameters.

In [None]:
tweets = sqlc.read.csv("tweets.csv", header=True, inferSchema=True)

In [None]:
tweets.count()

In [None]:
tweets.take(5)

Looks like what we've seen before, yes?  Except that these are `Rows`, not an RDD.

They **do** have an RDD under the hood, though.

In [None]:
tweets.rdd

In [None]:
tweets.rdd.count()

You can do a little more with a `DataFrame` than you can with an `RDD`:

In [None]:
tweets.columns

This is all well and good, but how well did schema inference work?

In [None]:
tweets.printSchema()

Not very well!  This is not uncommon.  You might have to cast some columns to other types, like in this example:

In [None]:
import pyspark
dir(pyspark.sql.types)

In [None]:
from pyspark.sql.types import DateType

In [None]:
tweets = tweets.withColumn("created_at", tweets["created_at"].cast(DateType()))

In [None]:
tweets.printSchema()

In [None]:
tweets.select('created_at').take(5)

All fixed!

Note that you can define a full schema at load time to avoid this problem.  It would be good if `inferSchema()` were a little more reliable though, although as we'll see in a minute, our data isn't exactly clean.


### Operations on DataFrames

DataFrames support many of the kinds of df operations you're used to, they are all just a little different.  Use the docs!

In [None]:
tweets.take(2)

In [None]:
tweets.head(2)

In [None]:
tweets.show(2)

In [None]:
tweets.count()

In [None]:
tweets.describe('followers_count').show()

Whoops, looks like we've got some slop in our data.  This might be due to some strange characters in the mix.  Clean that up in a handy wrangling tool...

In [None]:
tweets.select("screen_name", "text").show(5)

In [None]:
tweets.filter("followers_count > 15000") \
    .select("followers_count") \
    .orderBy("followers_count", ascending=False) \
    .show(10)

Whoops, looks like another data type problem.  We can fix that, too.

In [None]:
from pyspark.sql.types import IntegerType
tweets = tweets.withColumn("followers_count", tweets["followers_count"].cast(IntegerType()))

In [None]:
tweets.filter("followers_count > 15000") \
    .select("followers_count") \
    .orderBy("followers_count", ascending=False) \
    .show(10)

Who are these popular tweeters?

In [None]:
tweets.filter("followers_count > 5000000").select("screen_name").show(20)

https://twitter.com/people

Yep - that looks about right.

Now that we have that column sorted out:

In [None]:
tweets.describe("followers_count").show()

In [None]:
tweets.orderBy("created_at", ascending=False).select("created_at").show(10)

### Dataframe-like operations (with '[]')

We can also write code that looks a lot more like Pandas in Python or R data frames.

In [None]:
tweets.select(tweets['created_at']).show(5)

In [None]:
tweets.filter(tweets['followers_count'] > 500000) \
    .select(tweets['screen_name'], tweets['followers_count']) \
    .orderBy(tweets['followers_count'], ascending=False) \
    .show(10)

Note that you can write the same thing somewhat more compactly:

In [None]:
tweets.filter('followers_count > 500000') \
    .select('screen_name', 'followers_count') \
    .orderBy('followers_count', ascending=False) \
    .show(10)

Note that the results are exactly the same!  How you write your code is up to you.

## Using SQL with DataFrames

All you need to do to get going with SQL is to register a table from your data frame, like so:

In [None]:
tweets.createOrReplaceTempView("tweets")

In [None]:
sqlc.sql("SELECT COUNT(*) FROM tweets")

In [None]:
sqlc.sql("SELECT COUNT(*) FROM tweets").show()

In [None]:
sqlc.sql("""
    SELECT followers_count 
    FROM tweets
    ORDER BY followers_count DESC
""").show(10)

In [None]:
sqlc.sql("""
    SELECT screen_name
    FROM tweets
    WHERE followers_count > 5000000
    ORDER BY screen_name
""").show(10)

### Other data types:  JSON

We can load in non-CSV data as well, such as JSON.  Here is a set of tweet data in JSON format, the original source.  It's much less likely to have wrangling issues.

In [None]:
!wget https://s3.amazonaws.com/2018-dmfa/week-9/mlb-world-series/9670f3399f774789b7c3e18975d25611_001.json

In [None]:
!mv 9670f3399f774789b7c3e18975d25611_001.json mlb.json

In [None]:
!wc -l mlb.json

In [None]:
!head -2 mlb.json 

JSON data is pretty common these days, and Python makes it easy to work with.  Here's what it looks like from Python:

In [None]:
!head -1 mlb.json > mlb1.json

In [None]:
import json
mlb = json.load(open("mlb1.json"))

In [None]:
mlb['user']['screen_name']

In [None]:
mlb['user']['followers_count']

In [None]:
print(json.dumps(mlb, indent=2))

Okay, that's a tour of one tweet.  Let's look at a lot more.

In [None]:
mlb = sqlc.read.json("mlb.json")

In [None]:
mlb

In [None]:
mlb.count()

In [None]:
mlb.printSchema()

In [None]:
sample = mlb.sample(False, 0.1, 12345)

In [None]:
sample.count()

There is **hierarchy** in JSON structures like tweets.  We can use `.` to address this:

In [None]:
sample.orderBy("user.followers_count", ascending=False).select('user.name').show(10)

In [None]:
small_sample = mlb.sample(False, 0.01, 12345)

In [None]:
small_sample.count()

In [None]:
small_sample.orderBy("user.followers_count", ascending=False).show(10)

In [None]:
small_sample.rdd.take(1)

In [None]:
small_sample.rdd.flatMap(lambda r: r['text'].split(' ')) \
    .map(lambda t: (t, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .takeOrdered(10, key=lambda pair: -pair[1])

In [None]:
sample.rdd.flatMap(lambda r: r['text'].split(' ')) \
    .map(lambda t: (t, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .takeOrdered(10, key=lambda pair: -pair[1])

Most importantly, we can do things like this:

In [None]:
sample.createOrReplaceTempView("sample")

In [None]:
sqlc.sql("SELECT * FROM sample").take(1)

In [None]:
sqlc.sql("""
    SELECT user.screen_name, user.followers_count AS fc
    FROM sample
    ORDER BY fc DESC
""").show(5)

## Spark MLlib example

### Fetch and prepare data - Iris classification

Let's use a classic dataset, [Fisher's Iris data](https://en.wikipedia.org/wiki/Iris_flower_data_set).  The best source for this is at its [UCI repository page](https://archive.ics.uci.edu/ml/datasets/iris), where they have a useful [descriptive page](https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.names).  Every ML library has a tutorial that uses this dataset.

The purpose of reviewing this dataset isn't to demonstrate Spark's computing power; we've already seen that.  The goal here is rather to show you how to produce a machine learning model using Spark.  You can compare the process to what you can find with environments like R, scikit-learn, and others.  The process compares pretty well.

In [None]:
!wget https://s3.amazonaws.com/2018-dmfa/week-13/iris.csv

In [None]:
!wc -l iris.csv

In [None]:
!head iris.csv | csvlook

In [None]:
!csvstat iris.csv

In [None]:
iris = sqlc.read.csv('iris.csv', header=True, inferSchema=True)

In [None]:
iris.count()

In [None]:
iris.take(5)

In [None]:
iris.printSchema()

### Explore the data

If you haven't encountered this dataset before, get used to it - you're sure to see it again.

Let's have a quick look before we dig in to get a visual sense of the data and the spreads of its independent variables.

In [None]:
%matplotlib inline

In [None]:
import matplotlib as plt

Matplotlib's [style options](https://matplotlib.org/devdocs/gallery/style_sheets/style_sheets_reference.html) are fun to play with.

In [None]:
plt.style.use('ggplot')

In [None]:
from pandas.tools.plotting import scatter_matrix

In [None]:
pd_iris = iris.toPandas()

In [None]:
pd_iris.hist(figsize=(8, 8))

In [None]:
scatter_matrix(pd_iris, figsize=(8, 8))

### Prepping features

It is common to go through stages of feature engineering that involve transformation, scaling, indexing, and similar steps.  MLlib provides a `Pipeline` to assemble these steps.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler

First, we pull our features into a feature vector using a `VectorAssembler`.

In [None]:
assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width",
                                       "petal_length", "petal_width"],
                            outputCol="features")

Next, we want to have our various target labels in `class` indexed to numeric category values.  The MLlib `StringIndexer` can turn these string values into a numeric representation of categories.

In [None]:
indexer = StringIndexer(inputCol="class", outputCol="labelIndex").fit(iris)

Next, we prep our model:

In [None]:
rf = RandomForestClassifier(labelCol="labelIndex", featuresCol="features", numTrees=10)

And we're going to need to be able to get those class labels back:

In [None]:
labeler = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                        labels=indexer.labels)

Finally, we assemble all of these steps into a `Pipeline`:

In [None]:
pipeline = Pipeline(stages=[assembler, indexer, rf, labeler])

### Split data into training and test sets

We always want to hold out some data from training so we can get an honest assessment on how our model will perform with unseen data.  Fortunately there's an easy API call for this.

In [None]:
help(iris.randomSplit)

In [None]:
train, test = iris.randomSplit([0.6, 0.4], 42)

In [None]:
train.count()

In [None]:
test.count()

Looks good.  We'll use `train` to build our model, and leave `test` alone for now.

### Train the model and get predictions

In [None]:
model = pipeline.fit(train)

In [None]:
predictions = model.transform(test)

In [None]:
predictions.select("predictedLabel", "class", "features").show()

In [None]:
for metric in ['accuracy', 'f1']:
    evaluator = MulticlassClassificationEvaluator(labelCol="labelIndex",
                                                  predictionCol="prediction",
                                                  metricName=metric)
    print("{}: {}".format(metric, evaluator.evaluate(predictions)))

In [None]:
model.stages

In [None]:
s2 = model.stages[2]

In [None]:
s2.trees

In [None]:
for t in s2.trees:
    print(t.toDebugString)

#### Some test stuff for later

In [None]:
!ls *.json

In [None]:
big_sample = sqlc.read.json("*.json")

In [None]:
big_sample.count()

## Zeppelin code for EMR cluster

Let's see what Spark can do on a real cluster.  We'll run some very similar queries on an AWS EMR cluster after break.

In [None]:
%pyspark
from pyspark import SQLContext
sqlc = sqlc = SQLContext(spark)
mlb = sqlc.read.json("s3://2018-dmfa/week-9/2018-world-series/*.json")
mlb.count()

In [None]:
%pyspark
mlb.createOrReplaceTempView("mlb")
sqlc.sql("""
    SELECT user.screen_name AS name, user.followers_count AS fc
    FROM mlb
    WHERE user.followers_count > 5000000
    GROUP BY user.screen_name, user.folowers_count
    ORDER BY user.followers_count DESC
""").show(10)