In [2]:
import pandas as pd
sc.master

u'local[*]'

In [8]:
# Use Pandas to visualize the data
pd.read_csv("./baby-names.csv").head(3)

Unnamed: 0,Year,First Name,County,Sex,Count
0,2013,DAVID,KINGS,M,272
1,2013,JAYDEN,KINGS,M,268
2,2013,JAYDEN,QUEENS,M,219


> *For Big Data, we may have problems loading petabyte of data to the system. Here's where `Apache Spark` distributed system comes in.*

# Creating new RDD - `Creating recipe for computing a result`
1. By parallelizing existing python collection (e.g. List)
2. By Transforming existing RDD
3. From files (e.g. HDFS, txt, Amazon S3, HBase, RDBMS, JSON)

In [18]:
# Create RDD through parallelize and slice into 4 partitions. No computation occurs yet. 
#Spark only records how to create the rDD with 4 partitions.
data = [1,2,3,4,5]
dataRDD = sc.parallelize(data, 4)
dataRDD

ParallelCollectionRDD[9] at parallelize at PythonRDD.scala:423

In [77]:
# Create new RDD. Spark records we want to create RDD with 5 partitions.
baby_namesRDD = sc.textFile("./baby-names.csv", 5)
baby_namesRDD

MapPartitionsRDD[130] at textFile at NativeMethodAccessorImpl.java:-2

# Spark Transformation Concepts
Another new RDD created by calling transformation `map`

In [33]:
# Lambda function - single expression function
print("Double these data: {}".format(data))

# Transform and apply an action collect
dataRDD.map(lambda x: 2 * x).collect()

Double these data: [1, 2, 3, 4, 5]


[2, 4, 6, 8, 10]

### `filter()` and `collect()`

In [25]:
# Select or filter data
print("Select even numbers from: {}".format(data))
dataRDD.filter(lambda x: x % 2 == 0).collect()

Select even numbers from: [1, 2, 3, 4, 5]


[2, 4]

### `parallelize()`, `distinct()` and `takeOrdered()`

In [51]:
# Select or filter data distinct values
new_data = [1,3,4,3,3,4,5]
print("Select even numbers from: {}".format(new_data))
new_dataRDD = sc.parallelize(new_data)

# Transformation distinct() and action takeOrdered()
new_dataRDD.distinct().takeOrdered(4)

Select even numbers from: [1, 3, 4, 3, 3, 4, 5]


[1, 3, 4, 5]

### `map()` and `flatMap()`

In [48]:
# Tranformation map() and action collect()
dataRDD.map(lambda x: [x, x+5]).collect()

[[1, 6], [2, 7], [3, 8], [4, 9], [5, 10]]

In [49]:
# Transformation flatap() and action collect()
dataRDD.flatMap(lambda x: [x, x+5]).collect()

[1, 6, 2, 7, 3, 8, 4, 9, 5, 10]

### `reduceByKey()`, `sortByKey()` and `groupByKey()`

* reduceByKey()

In [93]:
# For each key, apply the lambda function
collectionRDD = sc.parallelize([(1,2), (1,4), (3,6), (2,7), (2,5)])
collectionRDD.reduceByKey(lambda x, y: x + y).collect()

[(1, 6), (2, 12), (3, 6)]

In [94]:
# For each key, apply the lambda function
collectionRDD = sc.parallelize([("Dog", 5), ("Dog",4), ("Cat",6), ("Lion",7), ("Lion",5)])
collectionRDD.reduceByKey(lambda x, y: x + y).collect()

[('Dog', 9), ('Lion', 12), ('Cat', 6)]

* sortByKey()

In [100]:
# For each key, apply the lambda function
collectionRDD = sc.parallelize([("Dog", 5), ("Dog",4), ("Cat",6), ("Lion",7), ("Lion",5)])
collectionRDD.sortByKey().collect()

[('Cat', 6), ('Dog', 5), ('Dog', 4), ('Lion', 7), ('Lion', 5)]

* groupByKey()
The function can cause a large amount of data movement across the network and create large iterables at workers. Ensure that resulting `iterable` can fit into a single worker. `It's advisable to use reduceByKey()` instead.

In [104]:
# For each key, apply the lambda function
collectionRDD = sc.parallelize([("Dog", 5), ("Dog",4), ("Cat",6), ("Lion",7), ("Lion",5)])
collectionRDD.groupByKey().collect()

[('Dog', <pyspark.resultiterable.ResultIterable at 0x1116b2b50>),
 ('Lion', <pyspark.resultiterable.ResultIterable at 0x111676a50>),
 ('Cat', <pyspark.resultiterable.ResultIterable at 0x112e39550>)]

In [78]:
# Another new RDD created by calling transformation map
# Transform the baby_names RDD into rows RDD by applying function `map`.

rows = baby_namesRDD.map(lambda line: line.split(","))
rows.cache()

PythonRDD[131] at RDD at PythonRDD.scala:43

In [59]:
# Loop over the data and print the rows as => FirstName (1980)
for row in rows.take(rows.count()): 
    print("{}, ({})".format(row[1],row[0]))

First Name, (Year)
DAVID, (2013)
JAYDEN, (2013)
JAYDEN, (2013)
MOSHE, (2013)
ETHAN, (2013)
SOPHIA, (2013)
DANIEL, (2013)
JACOB, (2013)
ESTHER, (2013)
ETHAN, (2013)
ISABELLA, (2013)
DANIEL, (2013)
JACOB, (2013)
AIDEN, (2013)
LEAH, (2013)
NOAH, (2013)
JOSEPH, (2013)
MATTHEW, (2013)
JAYDEN, (2013)
RACHEL, (2013)
MICHAEL, (2013)
CHAYA, (2013)
SARAH, (2013)
SOPHIA, (2013)
ALEXANDER, (2013)
ETHAN, (2013)
EMILY, (2013)
MICHAEL, (2013)
AIDEN, (2013)
DYLAN, (2013)
EMMA, (2013)
MICHAEL, (2013)
LUCAS, (2013)
MATTHEW, (2013)
OLIVIA, (2013)
RYAN, (2013)
MIRIAM, (2013)
DAVID, (2013)
LIAM, (2013)
ABRAHAM, (2013)
MIA, (2013)
ISABELLA, (2013)
SAMUEL, (2013)
ISABELLA, (2013)
JACOB, (2013)
MATTHEW, (2013)
CHANA, (2013)
JOSHUA, (2013)
LIAM, (2013)
JOSEPH, (2013)
OLIVIA, (2013)
ANTHONY, (2013)
SOPHIA, (2013)
ALEXANDER, (2013)
CHAIM, (2013)
EMMA, (2013)
NICHOLAS, (2013)
RYAN, (2013)
ALEXANDER, (2013)
NOAH, (2013)
ANTHONY, (2013)
ISAAC, (2013)
EMILY, (2013)
JOSHUA, (2013)
MATTHEW, (2013)
SOFIA, (2013)
ANTHON

# Actions

* `Actions` aggregates all the `RDD` elements. Sunch functions include `reduce()`, `collect()`
* It returns final result to the driver program

In [79]:
baby_namesRDD

MapPartitionsRDD[130] at textFile at NativeMethodAccessorImpl.java:-2

### count() causes Spark to do these steps `everytime`:
* read data
* Sum within partitions
* Combines sums in driver

In [80]:
# Using the baby_names RDD, count number of lines in the file
baby_namesRDD.count()

43891

In [81]:
# For another operation, the process above is repeated. So we need to cache.
baby_namesRDD.cache()

MapPartitionsRDD[130] at textFile at NativeMethodAccessorImpl.java:-2

In [57]:
# Action reduce(func) aggregates elements using the func and returns ONE
print("Add all numbers: {}".format(data))

# Apply action reduce(func)
dataRDD.reduce(lambda a,b: a+b)

Add all numbers: [1, 2, 3, 4, 5]


15

In [63]:
# Return in descreasing order by apply a function
another_dataRDD = sc.parallelize([5,3,1,2])

# Apply function lambda s and take highest 3
another_dataRDD.takeOrdered(3, lambda s: -1 *s)

[5, 3, 2]

In [21]:
sc.parallelize([2, 3, 4]).flatMap(lambda x: [x,x,x]).collect()

[2, 2, 2, 3, 3, 3, 4, 4, 4]

In [22]:
sc.parallelize([2, 3, 4]).map(lambda x: [x,x,x]).collect()

[[2, 2, 2], [3, 3, 3], [4, 4, 4]]

## `Filter()`

In [66]:
rows.filter(lambda line: "RICHARD" in line).takeSample(True, 5)

[[u'2007', u'RICHARD', u'ERIE', u'M', u'20'],
 [u'2007', u'RICHARD', u'BRONX', u'M', u'32'],
 [u'2007', u'RICHARD', u'MONROE', u'M', u'9'],
 [u'2008', u'RICHARD', u'QUEENS', u'M', u'41'],
 [u'2008', u'RICHARD', u'ONONDAGA', u'M', u'6']]

## Map Partitions
> Way to partition operations into the certain number of clusters for parallelize operations.

Example:
Create numbers 1 ~ 9. Distribute the numbers over three clusters (partitions) and sum each distribution.
* Partition 1: 1+2+3 = 6
* Partition 2: 4+5+6 = 15
* Partition 3: 7+8+9 = 24

In [26]:
# Create numbers 
one_thru_9 = range(1,10)

# Distribute the numbers over 3 partitions/clusters
parallel = sc.parallelize(one_thru_9, 3)

# Define a function that compute the addition over each cluster
def add_numbers(iterator):
    yield sum(iterator)
    
# Call the function over a parallel cluster
parallel.mapPartitions(add_numbers).collect()

[6, 15, 24]

In [32]:
# If we do not specify the number of clusters we have:
parallel = sc.parallelize(one_thru_9)

# We check how many partitions/clusters (4)
# 1+2 = 3,    3+4 = 7,   5+6 = 11,   7+8+9 = 24

parallel.mapPartitions(add_numbers).collect()


[3, 7, 11, 24]

In [34]:
# We have dual cores or 4 partitions
print(sc.defaultParallelism)

4


In [38]:
one = sc.parallelize(range(1,10))
print(one.collect())

two = sc.parallelize(range(10,21))
print(two.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]


In [40]:
# Find Union of both
one.union(two).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [49]:
# Find intersection
print(one.collect())
three = sc.parallelize(range(5,15))
print(three.collect())

one.intersection(three).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]
[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]


[8, 9, 5, 6, 7]

In [50]:
# Find distinct values
one.union(three).distinct().collect()

[8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]

In [67]:
names_in_counties = rows.map(lambda n: (str(n[1]), str(n[2]) ))
names_in_counties.takeSample(True, 10)

[('MILA', 'ERIE'),
 ('ALEXIS', 'ONTARIO'),
 ('MIKAELA', 'SUFFOLK'),
 ('CODY', 'NASSAU'),
 ('JARED', 'WESTCHESTER'),
 ('JAMES', 'ALBANY'),
 ('CAMILA', 'NEW YORK'),
 ('EMMA', 'SUFFOLK'),
 ('ALEX', 'ROCKLAND'),
 ('EMMANUEL', 'MONROE')]

In [68]:
# n[1] is 'Name' in col 1, n[2] is 'County' in col 2.
names_in_counties = rows.map(lambda n: (str(n[1]), str(n[2]) )).groupByKey()
names_in_counties.take(10)

[('SELINA', <pyspark.resultiterable.ResultIterable at 0x112026f50>),
 ('YAKOV', <pyspark.resultiterable.ResultIterable at 0x112e39b10>),
 ('ROHAN', <pyspark.resultiterable.ResultIterable at 0x112e394d0>),
 ('DIYA', <pyspark.resultiterable.ResultIterable at 0x112e39c50>),
 ('JOCELYN', <pyspark.resultiterable.ResultIterable at 0x112e39850>),
 ('DELANEY', <pyspark.resultiterable.ResultIterable at 0x112e396d0>),
 ('JAMARI', <pyspark.resultiterable.ResultIterable at 0x112e39dd0>),
 ('NOELLE', <pyspark.resultiterable.ResultIterable at 0x112e39890>),
 ('TZIREL', <pyspark.resultiterable.ResultIterable at 0x112e39d90>),
 ('RICARDO', <pyspark.resultiterable.ResultIterable at 0x112e39a10>)]

In [70]:
# x[0] is 'Names' in col 1, and x[1] is collection of items in col 2.
# for each row, take first column and transform next column to list. Display
names_in_counties.map(lambda x: {x[0]: list(x[1])}).take(3)

[{'SELINA': ['KINGS',
   'KINGS',
   'QUEENS',
   'KINGS',
   'QUEENS',
   'KINGS',
   'KINGS',
   'QUEENS',
   'KINGS',
   'KINGS',
   'QUEENS',
   'NEW YORK']},
 {'YAKOV': ['KINGS',
   'ROCKLAND',
   'KINGS',
   'ROCKLAND',
   'KINGS',
   'ROCKLAND',
   'KINGS',
   'ROCKLAND',
   'KINGS',
   'ROCKLAND',
   'ORANGE',
   'ROCKLAND',
   'KINGS',
   'ORANGE',
   'KINGS',
   'ROCKLAND']},
 {'ROHAN': ['NEW YORK', 'QUEENS', 'QUEENS', 'KINGS', 'WESTCHESTER']}]

In [72]:
# n[1] is 'Name' is col 1 & n[4] is 'Count' in col 4.

# Filter the first header row
filtered_rows = baby_names.filter(lambda line: "Count" not in line).map(lambda line: line.split(","))

# Sum up all the counts (col 4) of the names. Sort them and display.
filtered_rows.map(lambda n: (str(n[1]), int(n[4]) )).reduceByKey(lambda v1,v2: v1+v2).sortByKey().take(5)

[('AADEN', 39),
 ('AALIYAH', 1407),
 ('AARAV', 6),
 ('AARON', 3110),
 ('AAYAN', 22)]

## Displaying Results : `collect(), take(), takeSample()`
> `collect()` is highly expensive

In [102]:
# Take to 5.
filtered_rows.map(lambda n: (str(n[1]), int(n[4]) )).reduceByKey(lambda v1,v2: v1+v2).sortByKey().take(5)

[('AADEN', 39),
 ('AALIYAH', 1407),
 ('AARAV', 6),
 ('AARON', 3110),
 ('AAYAN', 22)]

In [105]:
# Take any random 5 from the results
filtered_rows.map(lambda n: (str(n[1]), int(n[4]) )).reduceByKey(lambda v1,v2: v1+v2).sortByKey().takeSample(True,5)

[('ANNIE', 137),
 ('CULLEN', 7),
 ('CAMDEN', 100),
 ('TYRESE', 13),
 ('JOHANNA', 44)]

In [73]:
# Display all results.
filtered_rows.map(lambda n: (str(n[1]), int(n[4]) )).reduceByKey(lambda v1,v2: v1+v2).sortByKey().takeOrdered(10)

[('AADEN', 39),
 ('AALIYAH', 1407),
 ('AARAV', 6),
 ('AARON', 3110),
 ('AAYAN', 22),
 ('ABBY', 61),
 ('ABDIEL', 10),
 ('ABDOULAYE', 22),
 ('ABDUL', 38),
 ('ABDULLAH', 147)]