In [60]:
import pandas as pd

In [61]:
# Use Pandas to visualize the data
pd.read_csv("./baby-names.csv").head(3)

Unnamed: 0,Year,First Name,County,Sex,Count
0,2013,DAVID,KINGS,M,272
1,2013,JAYDEN,KINGS,M,268
2,2013,JAYDEN,QUEENS,M,219


> *For Big Data, we may have problems bringing in the data to the system. Here's where `Apache Spark` comes in.*

# Create new RDD 

In [65]:
# Create new RDD
baby_names = sc.textFile("./baby-names.csv")
baby_names

MapPartitionsRDD[137] at textFile at NativeMethodAccessorImpl.java:-2

# Spark Transformation Concepts
Another new RDD created by calling transformation `map`

In [66]:
# Another new RDD created by calling transformation map
# Transform the baby_names RDD into rows RDD by applying function `map`.

rows = baby_names.map(lambda line: line.split(","))
rows.cache()

PythonRDD[138] at RDD at PythonRDD.scala:43

In [68]:
# Loop over the data and print the rows as => FirstName (1980)
for row in rows.take(rows.count()): 
    print("{}, ({})".format(row[1],row[0]))

First Name, (Year)
DAVID, (2013)
JAYDEN, (2013)
JAYDEN, (2013)
MOSHE, (2013)
ETHAN, (2013)
SOPHIA, (2013)
DANIEL, (2013)
JACOB, (2013)
ESTHER, (2013)
ETHAN, (2013)
ISABELLA, (2013)
DANIEL, (2013)
JACOB, (2013)
AIDEN, (2013)
LEAH, (2013)
NOAH, (2013)
JOSEPH, (2013)
MATTHEW, (2013)
JAYDEN, (2013)
RACHEL, (2013)
MICHAEL, (2013)
CHAYA, (2013)
SARAH, (2013)
SOPHIA, (2013)
ALEXANDER, (2013)
ETHAN, (2013)
EMILY, (2013)
MICHAEL, (2013)
AIDEN, (2013)
DYLAN, (2013)
EMMA, (2013)
MICHAEL, (2013)
LUCAS, (2013)
MATTHEW, (2013)
OLIVIA, (2013)
RYAN, (2013)
MIRIAM, (2013)
DAVID, (2013)
LIAM, (2013)
ABRAHAM, (2013)
MIA, (2013)
ISABELLA, (2013)
SAMUEL, (2013)
ISABELLA, (2013)
JACOB, (2013)
MATTHEW, (2013)
CHANA, (2013)
JOSHUA, (2013)
LIAM, (2013)
JOSEPH, (2013)
OLIVIA, (2013)
ANTHONY, (2013)
SOPHIA, (2013)
ALEXANDER, (2013)
CHAIM, (2013)
EMMA, (2013)
NICHOLAS, (2013)
RYAN, (2013)
ALEXANDER, (2013)
NOAH, (2013)
ANTHONY, (2013)
ISAAC, (2013)
EMILY, (2013)
JOSHUA, (2013)
MATTHEW, (2013)
SOFIA, (2013)
ANTHON

# Actions

* `Actions` aggregates all the `RDD` elements. Sunch functions include `reduce()`, `collect()`
* It returns final result to the driver program

In [21]:
sc.parallelize([2, 3, 4]).flatMap(lambda x: [x,x,x]).collect()

[2, 2, 2, 3, 3, 3, 4, 4, 4]

In [22]:
sc.parallelize([2, 3, 4]).map(lambda x: [x,x,x]).collect()

[[2, 2, 2], [3, 3, 3], [4, 4, 4]]

## `Filter()`

In [75]:
rows.filter(lambda line: "RICHARD" in line).collect()

[[u'2013', u'RICHARD', u'KINGS', u'M', u'35'],
 [u'2013', u'RICHARD', u'QUEENS', u'M', u'25'],
 [u'2013', u'RICHARD', u'SUFFOLK', u'M', u'21'],
 [u'2013', u'RICHARD', u'NEW YORK', u'M', u'14'],
 [u'2013', u'RICHARD', u'NASSAU', u'M', u'11'],
 [u'2013', u'RICHARD', u'ERIE', u'M', u'6'],
 [u'2013', u'RICHARD', u'WESTCHESTER', u'M', u'6'],
 [u'2013', u'RICHARD', u'ULSTER', u'M', u'5'],
 [u'2012', u'RICHARD', u'KINGS', u'M', u'44'],
 [u'2012', u'RICHARD', u'BRONX', u'M', u'26'],
 [u'2012', u'RICHARD', u'NEW YORK', u'M', u'20'],
 [u'2012', u'RICHARD', u'QUEENS', u'M', u'20'],
 [u'2012', u'RICHARD', u'SUFFOLK', u'M', u'16'],
 [u'2012', u'RICHARD', u'NASSAU', u'M', u'10'],
 [u'2012', u'RICHARD', u'WESTCHESTER', u'M', u'9'],
 [u'2012', u'RICHARD', u'ERIE', u'M', u'7'],
 [u'2012', u'RICHARD', u'ORANGE', u'M', u'6'],
 [u'2011', u'RICHARD', u'KINGS', u'M', u'33'],
 [u'2011', u'RICHARD', u'QUEENS', u'M', u'31'],
 [u'2011', u'RICHARD', u'BRONX', u'M', u'18'],
 [u'2011', u'RICHARD', u'NASSAU', u'M',

## Map Partitions
> Way to partition operations into the certain number of clusters for parallelize operations.

Example:
Create numbers 1 ~ 9. Distribute the numbers over three clusters (partitions) and sum each distribution.
* Partition 1: 1+2+3 = 6
* Partition 2: 4+5+6 = 15
* Partition 3: 7+8+9 = 24

In [26]:
# Create numbers 
one_thru_9 = range(1,10)

# Distribute the numbers over 3 partitions/clusters
parallel = sc.parallelize(one_thru_9, 3)

# Define a function that compute the addition over each cluster
def add_numbers(iterator):
    yield sum(iterator)
    
# Call the function over a parallel cluster
parallel.mapPartitions(add_numbers).collect()

[6, 15, 24]

In [32]:
# If we do not specify the number of clusters we have:
parallel = sc.parallelize(one_thru_9)

# We check how many partitions/clusters (4)
# 1+2 = 3,    3+4 = 7,   5+6 = 11,   7+8+9 = 24

parallel.mapPartitions(add_numbers).collect()


[3, 7, 11, 24]

In [34]:
# We have dual cores or 4 partitions
print(sc.defaultParallelism)

4


In [38]:
one = sc.parallelize(range(1,10))
print(one.collect())

two = sc.parallelize(range(10,21))
print(two.collect())

[1, 2, 3, 4, 5, 6, 7, 8, 9]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]


In [40]:
# Find Union of both
one.union(two).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

In [49]:
# Find intersection
print(one.collect())
three = sc.parallelize(range(5,15))
print(three.collect())

one.intersection(three).collect()

[1, 2, 3, 4, 5, 6, 7, 8, 9]
[5, 6, 7, 8, 9, 10, 11, 12, 13, 14]


[8, 9, 5, 6, 7]

In [50]:
# Find distinct values
one.union(three).distinct().collect()

[8, 1, 9, 2, 10, 11, 3, 12, 4, 5, 13, 14, 6, 7]

In [78]:
names_in_counties = rows.map(lambda n: (str(n[1]), str(n[2]) ))
names_in_counties.collect()

[('First Name', 'County'),
 ('DAVID', 'KINGS'),
 ('JAYDEN', 'KINGS'),
 ('JAYDEN', 'QUEENS'),
 ('MOSHE', 'KINGS'),
 ('ETHAN', 'QUEENS'),
 ('SOPHIA', 'QUEENS'),
 ('DANIEL', 'QUEENS'),
 ('JACOB', 'KINGS'),
 ('ESTHER', 'KINGS'),
 ('ETHAN', 'KINGS'),
 ('ISABELLA', 'QUEENS'),
 ('DANIEL', 'KINGS'),
 ('JACOB', 'QUEENS'),
 ('AIDEN', 'KINGS'),
 ('LEAH', 'KINGS'),
 ('NOAH', 'KINGS'),
 ('JOSEPH', 'KINGS'),
 ('MATTHEW', 'QUEENS'),
 ('JAYDEN', 'BRONX'),
 ('RACHEL', 'KINGS'),
 ('MICHAEL', 'QUEENS'),
 ('CHAYA', 'KINGS'),
 ('SARAH', 'KINGS'),
 ('SOPHIA', 'KINGS'),
 ('ALEXANDER', 'QUEENS'),
 ('ETHAN', 'BRONX'),
 ('EMILY', 'QUEENS'),
 ('MICHAEL', 'KINGS'),
 ('AIDEN', 'QUEENS'),
 ('DYLAN', 'QUEENS'),
 ('EMMA', 'QUEENS'),
 ('MICHAEL', 'SUFFOLK'),
 ('LUCAS', 'QUEENS'),
 ('MATTHEW', 'KINGS'),
 ('OLIVIA', 'QUEENS'),
 ('RYAN', 'QUEENS'),
 ('MIRIAM', 'KINGS'),
 ('DAVID', 'QUEENS'),
 ('LIAM', 'QUEENS'),
 ('ABRAHAM', 'KINGS'),
 ('MIA', 'QUEENS'),
 ('ISABELLA', 'KINGS'),
 ('SAMUEL', 'KINGS'),
 ('ISABELLA', 'BRONX'

In [86]:
# n[1] is 'Name' in col 1, n[2] is 'County' in col 2.
names_in_counties = rows.map(lambda n: (str(n[1]), str(n[2]) )).groupByKey()
names_in_counties.collect()

[('GRIFFIN', <pyspark.resultiterable.ResultIterable at 0x12101de50>),
 ('KALEB', <pyspark.resultiterable.ResultIterable at 0x121029450>),
 ('JOHNNY', <pyspark.resultiterable.ResultIterable at 0x121029150>),
 ('SAGE', <pyspark.resultiterable.ResultIterable at 0x121029110>),
 ('MIKE', <pyspark.resultiterable.ResultIterable at 0x121029310>),
 ('NAYELI', <pyspark.resultiterable.ResultIterable at 0x121029290>),
 ('ERIN', <pyspark.resultiterable.ResultIterable at 0x121029390>),
 ('DONOVAN', <pyspark.resultiterable.ResultIterable at 0x1210292d0>),
 ('LUCIANA', <pyspark.resultiterable.ResultIterable at 0x121029210>),
 ('LANDEN', <pyspark.resultiterable.ResultIterable at 0x121029350>),
 ('JAELYNN', <pyspark.resultiterable.ResultIterable at 0x121029250>),
 ('BARBARA', <pyspark.resultiterable.ResultIterable at 0x121029190>),
 ('GREGORY', <pyspark.resultiterable.ResultIterable at 0x1210293d0>),
 ('CHRISTIAN', <pyspark.resultiterable.ResultIterable at 0x1210291d0>),
 ('MENDY', <pyspark.resultiterab

In [87]:
# x[0] is 'Names' in col 1, and x[1] is collection of items in col 2.
# for each row, take first column and transform next column to list. Display
names_in_counties.map(lambda x: {x[0]: list(x[1])}).collect()

[{'GRIFFIN': ['ERIE',
   'ONONDAGA',
   'NEW YORK',
   'ERIE',
   'SUFFOLK',
   'MONROE',
   'NEW YORK',
   'ERIE',
   'MONROE',
   'ONONDAGA',
   'SUFFOLK',
   'NEW YORK',
   'NASSAU',
   'SUFFOLK',
   'ERIE',
   'MONROE',
   'MONROE',
   'NEW YORK',
   'SUFFOLK',
   'ERIE',
   'WESTCHESTER',
   'NEW YORK',
   'SUFFOLK',
   'MONROE',
   'SCHENECTADY',
   'ONONDAGA',
   'ERIE',
   'NEW YORK',
   'SUFFOLK',
   'NASSAU',
   'ONONDAGA']},
 {'KALEB': ['QUEENS',
   'SUFFOLK',
   'MONROE',
   'BRONX',
   'KINGS',
   'ERIE',
   'JEFFERSON',
   'SUFFOLK',
   'NASSAU',
   'ERIE',
   'SUFFOLK',
   'MONROE',
   'OSWEGO',
   'STEUBEN',
   'KINGS',
   'ERIE',
   'MONROE',
   'NASSAU',
   'JEFFERSON',
   'ONONDAGA',
   'SUFFOLK',
   'MONROE',
   'ERIE',
   'MONROE']},
 {'JOHNNY': ['KINGS',
   'QUEENS',
   'KINGS',
   'QUEENS',
   'SUFFOLK',
   'KINGS',
   'QUEENS',
   'KINGS',
   'QUEENS',
   'SUFFOLK',
   'KINGS',
   'QUEENS',
   'KINGS',
   'SUFFOLK',
   'QUEENS',
   'KINGS',
   'NASSAU']},
 {'SAG

In [101]:
# n[1] is 'Name' is col 1 & n[4] is 'Count' in col 4.

# Filter the first header row
filtered_rows = baby_names.filter(lambda line: "Count" not in line).map(lambda line: line.split(","))

# Sum up all the counts (col 4) of the names. Sort them and display.
filtered_rows.map(lambda n: (str(n[1]), int(n[4]) )).reduceByKey(lambda v1,v2: v1+v2).sortByKey().take(5)

[('AADEN', 39),
 ('AALIYAH', 1407),
 ('AARAV', 6),
 ('AARON', 3110),
 ('AAYAN', 22)]

## Displaying Results : `collect(), take(), takeSample()`
> `collect()` is highly expensive

In [102]:
# Take to 5.
filtered_rows.map(lambda n: (str(n[1]), int(n[4]) )).reduceByKey(lambda v1,v2: v1+v2).sortByKey().take(5)

[('AADEN', 39),
 ('AALIYAH', 1407),
 ('AARAV', 6),
 ('AARON', 3110),
 ('AAYAN', 22)]

In [105]:
# Take any random 5 from the results
filtered_rows.map(lambda n: (str(n[1]), int(n[4]) )).reduceByKey(lambda v1,v2: v1+v2).sortByKey().takeSample(True,5)

[('ANNIE', 137),
 ('CULLEN', 7),
 ('CAMDEN', 100),
 ('TYRESE', 13),
 ('JOHANNA', 44)]

In [106]:
# Display all results.
filtered_rows.map(lambda n: (str(n[1]), int(n[4]) )).reduceByKey(lambda v1,v2: v1+v2).sortByKey().collect()

[('AADEN', 39),
 ('AALIYAH', 1407),
 ('AARAV', 6),
 ('AARON', 3110),
 ('AAYAN', 22),
 ('ABBY', 61),
 ('ABDIEL', 10),
 ('ABDOULAYE', 22),
 ('ABDUL', 38),
 ('ABDULLAH', 147),
 ('ABEL', 47),
 ('ABIGAIL', 4797),
 ('ABRAHAM', 1491),
 ('ABRAM', 5),
 ('ADA', 80),
 ('ADALYN', 5),
 ('ADALYNN', 6),
 ('ADAM', 2490),
 ('ADDISON', 1468),
 ('ADELAIDE', 21),
 ('ADELE', 58),
 ('ADELINE', 44),
 ('ADELYN', 17),
 ('ADEN', 158),
 ('ADINA', 122),
 ('ADITYA', 10),
 ('ADONIS', 103),
 ('ADRIAN', 2195),
 ('ADRIANA', 1236),
 ('ADRIANNA', 811),
 ('ADRIEL', 76),
 ('ADYAN', 10),
 ('AHARON', 75),
 ('AHMAD', 60),
 ('AHMED', 290),
 ('AHRON', 29),
 ('AHUVA', 89),
 ('AICHA', 12),
 ('AIDAN', 2616),
 ('AIDEN', 5953),
 ('AILEEN', 70),
 ('AISHA', 196),
 ('AISSATOU', 11),
 ('AJANI', 11),
 ('AKIVA', 88),
 ('ALAINA', 82),
 ('ALAN', 807),
 ('ALANA', 603),
 ('ALANI', 11),
 ('ALANNA', 29),
 ('ALAYNA', 15),
 ('ALBERT', 319),
 ('ALBERTO', 14),
 ('ALEC', 32),
 ('ALEENA', 49),
 ('ALEJANDRA', 52),
 ('ALEJANDRO', 722),
 ('ALESSANDRA',