# Spark Examples

This notebook demonstrates some examples of functionality in Spark.  It's meant to be run on a local machine, but could easily be adapted to run on an actual cluster.

## Starting Spark

This section shows how we start a `SparkContext`, which is the entry point into all Spark operations.  Setting the `master` to `local[*]` tells Spark to run in local mode and to use all available cores.

In [1]:
from pyspark import SparkContext

sc = SparkContext(master='local[*]', appName='example')

## Reading Data as an RDD

In my previous presentation on Pandas, I used a dataset about crime in NJ per county.  I'll use that same dataset here.

In [3]:
rdd = sc.textFile('./new-jersey-crime.csv')
rdd.count()

487

In [4]:
rdd.sample(False, 0.05).collect()

['Belleville,36258,62,0,3,32,27,600,93,419,88,1',
 'Buena,4520,19,0,4,7,8,77,21,54,2,1',
 'Cinnaminson Township,16871,25,0,3,7,15,305,38,258,9,0',
 'Closter,8674,2,1,1,0,0,34,6,27,1,0',
 'Deal,735,1,0,0,0,1,32,9,23,0,0',
 'Delanco Township,4592,5,0,0,4,1,97,22,68,7,0',
 'Eatontown,12264,39,0,3,17,19,401,34,363,4,0',
 'Edgewater,12079,4,0,1,1,2,141,12,128,1,0',
 'Englewood,28677,61,0,8,19,34,325,47,267,11,0',
 'Fairview,14497,15,0,1,10,4,118,26,83,9,0',
 'Haworth,3469,2,0,0,0,2,15,2,13,0,0',
 'Jamesburg,6028,6,0,2,3,1,51,16,32,3,0',
 'Jersey City,266179,1276,24,86,547,619,4483,953,2979,551,40',
 'Lavallette,1789,1,0,0,0,1,29,7,22,0,1',
 'Linden,42100,136,0,19,39,78,982,123,785,74,1',
 'Manalapan Township,40407,27,0,4,4,19,241,39,195,7,0',
 '"Mansfield Township, Warren County",7461,2,0,0,1,1,125,12,111,2,0',
 'Middletown Township,65919,17,0,3,4,10,436,62,364,10,1',
 'New Hanover Township,8218,2,0,0,1,1,4,3,1,0,0',
 'Pennington,2586,0,0,0,0,0,8,1,7,0,0',
 'Perth Amboy,52767,210,0,7,68,135

In [6]:
rdd.map(lambda x: x.split(',')).sample(False, 0.05).collect()

[['Bradley Beach', '4261', '6', '0', '0', '3', '3', '79', '7', '72', '0', '0'],
 ['Deal', '735', '1', '0', '0', '0', '1', '32', '9', '23', '0', '0'],
 ['Egg Harbor Township',
  '43971',
  '86',
  '1',
  '13',
  '21',
  '51',
  '746',
  '166',
  '555',
  '25',
  '5'],
 ['Franklin Lakes',
  '10902',
  '5',
  '0',
  '0',
  '3',
  '2',
  '75',
  '13',
  '56',
  '6',
  '0'],
 ['Hackettstown', '9549', '8', '0', '0', '2', '6', '94', '14', '77', '3', '0'],
 ['Harvey Cedars', '340', '0', '0', '0', '0', '0', '14', '0', '14', '0', '0'],
 ['Highlands', '4838', '8', '0', '0', '2', '6', '22', '4', '17', '1', '0'],
 ['Hopatcong', '14388', '7', '0', '0', '0', '7', '104', '25', '79', '0', '1'],
 ['Leonia', '9224', '2', '0', '0', '0', '2', '47', '14', '27', '6', '0'],
 ['Lumberton Township',
  '12400',
  '17',
  '2',
  '1',
  '6',
  '8',
  '217',
  '16',
  '193',
  '8',
  '1'],
 ['Manville',
  '10424',
  '11',
  '0',
  '2',
  '2',
  '7',
  '265',
  '34',
  '214',
  '17',
  '1'],
 ['Marlboro Township',
 

You can do all kinds of interesting things with RDDs.  If we wanted to, we could properly parse this data, do maps, reduces, joins, etc.  This is super useful when our data comes in a format that Spark does't understand.  For instance, I once had to read in a bunch of Excel files.  Since Spark doesn't understand Excel, I read them in as a RDD of binary blobs and used mapper functions to parse each one and to emit the contents.  The approach in the article is a bit dated, but you can [read it here](http://brianstempin.com/2017/10/05/dealing-with-excel-data-in-pyspark/).

## DataFrame and DataSet Operations

The `DataFrame` builds on top of the RDD and provides something that looks and feels like a table with records, columns, and aggregate functions.  If you are working in a strongly typed language like Scala, `DataSet`s build on this by offering type enforcement for `DataFrame`s.

To use `DataFrame` functionality, we need to start a `SparkSession`, which is the main entry point for the higher-level Spark functionality.

In [15]:
from pyspark.sql import SparkSession

spark = SparkSession(sc)
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

In [18]:
df = spark.read.csv('./new-jersey-crime.csv', header=True, inferSchema=True)
df.head(10)

[Row(City='Aberdeen Township', Population=18562, Violent Crime=18, Murder=0, Rape=2, Robbery=5, Aggravated Assault=11, Property Crime=147, Burglary=33, Larceny-theft=106, Motor Vehicle Theft=8, Arson=0),
 Row(City='Absecon', Population=8298, Violent Crime=8, Murder=0, Rape=0, Robbery=2, Aggravated Assault=6, Property Crime=242, Burglary=33, Larceny-theft=203, Motor Vehicle Theft=6, Arson=0),
 Row(City='Allendale', Population=6849, Violent Crime=1, Murder=0, Rape=0, Robbery=0, Aggravated Assault=1, Property Crime=37, Burglary=9, Larceny-theft=26, Motor Vehicle Theft=2, Arson=0),
 Row(City='Allenhurst', Population=490, Violent Crime=0, Murder=0, Rape=0, Robbery=0, Aggravated Assault=0, Property Crime=16, Burglary=1, Larceny-theft=15, Motor Vehicle Theft=0, Arson=0),
 Row(City='Allentown', Population=1824, Violent Crime=1, Murder=0, Rape=0, Robbery=0, Aggravated Assault=1, Property Crime=9, Burglary=3, Larceny-theft=6, Motor Vehicle Theft=0, Arson=0),
 Row(City='Alpha', Population=2291, V

In [17]:
df.summary()

summary,City,Population,Violent Crime,Murder,Rape,Robbery,Aggravated Assault,Property Crime,Burglary,Larceny-theft,Motor Vehicle Theft,Arson
count,486,486.0,486.0,486.0,486.0,486.0,486.0,486.0,486.0,486.0,486.0,486.0
mean,,17655.62962962963,39.2962962962963,0.6296296296296297,2.592592592592593,16.551440329218106,19.52263374485597,262.25925925925924,47.90534979423868,192.68312757201647,21.670781893004115,0.9444444444444444
stddev,,25480.58278814219,165.79395567641058,4.921095622987243,7.966338114549649,79.97244646625155,75.17629110556926,517.341270708231,109.36049953213738,329.5755383327342,114.84333860913544,3.220904656167848
min,Aberdeen Township,5.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,,4654.0,2.0,0.0,0.0,0.0,1.0,40.0,6.0,30.0,1.0,0.0
50%,,9483.0,6.0,0.0,0.0,2.0,3.0,101.0,17.0,80.0,4.0,0.0
75%,,21669.0,21.0,0.0,2.0,7.0,11.0,265.0,45.0,211.0,11.0,1.0
max,Wyckoff Township,281450.0,2637.0,100.0,99.0,1334.0,1104.0,6308.0,1138.0,3048.0,2122.0,41.0


In [19]:
df.groupBy().sum('Population')

sum(Population)
8580636


In [20]:
df.groupBy().sum('Robbery')

sum(Robbery)
8044


In [22]:
from pyspark.sql.functions import col

df.filter(col('Burglary') > 150)

City,Population,Violent Crime,Murder,Rape,Robbery,Aggravated Assault,Property Crime,Burglary,Larceny-theft,Motor Vehicle Theft,Arson
Brick Township,74818,71,0,11,18,42,1059,180,861,18,1
Bridgeton,24982,225,4,13,113,95,952,269,649,34,5
Cherry Hill Township,71079,92,1,2,49,40,1949,216,1678,55,1
Clifton,86324,202,0,7,83,112,1308,202,991,115,1
Deptford Township,30400,61,0,7,30,24,1170,193,943,34,5
East Orange,64787,437,6,22,179,230,898,205,462,231,10
Edison Township,102679,104,0,7,37,60,1088,165,844,79,1
Egg Harbor Township,43971,86,1,13,21,51,746,166,555,25,5
Elizabeth,129096,1054,5,33,528,488,4134,730,2536,868,5
Franklin Township...,66791,62,1,10,37,14,791,190,570,31,2


In [25]:
df.filter((col('Burglary') / col('Population')) > 0.01)

City,Population,Violent Crime,Murder,Rape,Robbery,Aggravated Assault,Property Crime,Burglary,Larceny-theft,Motor Vehicle Theft,Arson
Avalon,1273,3,0,0,0,3,191,30,159,2,0
Belvidere,2593,0,0,0,0,0,88,54,30,4,0
Bridgeton,24982,225,4,13,113,95,952,269,649,34,5
Brooklawn,1919,20,0,4,8,8,209,34,167,8,0
Chesilhurst,1626,8,0,0,1,7,42,19,19,4,0
Clementon,4913,14,0,1,9,4,169,59,104,6,1
Deal,735,1,0,0,0,1,32,9,23,0,0
Loch Arbour,186,0,0,0,0,0,9,3,6,0,0
Millville,28177,223,4,22,82,115,1714,304,1352,58,3
Runnemede,8322,10,0,1,4,5,358,145,199,14,0


In [28]:
df2 = df.withColumn('Burglary Rate', (col('Burglary') / col('Population')))
df2.filter(col('Burglary Rate') > 0.01).orderBy(col('Burglary Rate'), )

City,Population,Violent Crime,Murder,Rape,Robbery,Aggravated Assault,Property Crime,Burglary,Larceny-theft,Motor Vehicle Theft,Arson,Burglary Rate
Seaside Heights,2886,26,0,0,5,21,173,29,139,5,0,0.01004851004851
Wildwood Crest,3162,15,0,2,1,12,108,32,74,2,0,0.0101201771030993
Trenton,83644,1127,21,59,427,620,2186,883,930,373,17,0.0105566448280809
Bridgeton,24982,225,4,13,113,95,952,269,649,34,5,0.010767752782003
Millville,28177,223,4,22,82,115,1714,304,1352,58,3,0.0107889413351314
Chesilhurst,1626,8,0,0,1,7,42,19,19,4,0,0.0116851168511685
Clementon,4913,14,0,1,9,4,169,59,104,6,1,0.0120089558314675
Deal,735,1,0,0,0,1,32,9,23,0,0,0.0122448979591836
Wildwood,5115,67,0,6,26,35,403,63,334,6,2,0.0123167155425219
West Wildwood,566,2,0,0,0,2,18,8,9,1,0,0.0141342756183745
