# Washington Crime Lab (RDDs and DataFrames)

In this lab, we'll explore some of the RDD concepts we've discussed. We'll be using a data set consisting of reported crimes in Washington D.C. from October 3, 2015 through October 2, 2016. This data comes from the [District of Columbia's Open Data Catalog](http://data.octo.dc.gov/). We'll use this data to explore some RDD transitions and actions.

## Exercises and Solutions

This notebook contains a number of exercises. Use the API docs for methods
<a href="http://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.rdd.RDD" target="_blank">common to all RDDs</a>,
plus the extra methods for
<a href="http://spark.apache.org/docs/1.6.1/api/scala/index.html#org.apache.spark.rdd.PairRDDFunctions" target="_blank">pair RDDs</a>, to look up transformations and actions. If, at any point, you're struggling with the solution to an exercise, feel free to look in the **Solutions** notebook (in the same folder as this lab).

## Let's get started.

## Load the data

The first step is to load the data. Run the following cell to create an RDD containing the data.

In [2]:
base_rdd = sc.textFile("/data/training/washington_crime_incidents_2013.csv")

NameError: name 'sc' is not defined

**Question**: Does the RDD _actually_ contain the data right now?

## Explore the data

Let's take a look at some of the data.

In [3]:
for line in base_rdd.take(10):
  print(line)

NameError: name 'base_rdd' is not defined

Okay, there's a header. We'll need to remove that. But, since the file will be split into partitions, we can't just drop the first item. Let's figure out another way to do it.

In [4]:
no_header_rdd = base_rdd.filter(lambda line: "REPORTDATETIME" not in line)

NameError: name 'base_rdd' is not defined

In [5]:
for line in no_header_rdd.take(10):
  print(line)

NameError: name 'no_header_rdd' is not defined

### Exercise 1

Let's make things a little easier to handle, by converting the `noHeaderRDD` to an RDD containing Scala objects.

**TO DO**

* Split each line into its individual cells.
* Map the RDD into another RDD of appropriate `CrimeData` objects.

In [6]:
# TODO
# Replace the <FILL-IN> sections with appropriate code.

# TAKE NOTE: We are deliberately only keeping the first five fields of
# each line, since that's all we're using in this lab. There's no sense
# in dragging around more data than we need.
from collections import namedtuple
from pprint import pprint

CrimeData = namedtuple('CrimeData', ['ccn', 'report_time', 'shift', 'offense', 'method'])

def map_line(line):
  columns = line.split(",")
  return CrimeData(columns[0],columns[1],columns[2],columns[3],columns[4])

data_rdd = no_header_rdd.map(map_line)
pprint(data_rdd.take(10))

NameError: name 'no_header_rdd' is not defined

### Exercise 2

Next, group the data by type of crime (the "OFFENSE" column).

In [7]:
# TODO
# Replace <FILL-IN> with the appropriate code.

grouped_by_offense_rdd = data_rdd.groupBy(lambda data: data.offense)
pprint(grouped_by_offense_rdd.take(10))

NameError: name 'data_rdd' is not defined

### Exercise 3
Next, create an RDD that counts the number of each offense. How many murders were there in the period covered by the data? How many assaults with a dangerous weapon?

In [8]:
# TODO
# Replace <FILL-IN> with the appropriate code.

offense_counts = grouped_by_offense_rdd.map(lambda tup: (tup[0], len(tup[1])))
for offense, count in offense_counts.collect():
  print("{0:30s} {1:d}".format(offense, count))

NameError: name 'grouped_by_offense_rdd' is not defined

### Question

Run the following cell. Can you explain what happened? Is `collectAsMap()` a _transformation_ or an _action_?

In [9]:
grouped_by_offense_rdd.collectAsMap()

NameError: name 'grouped_by_offense_rdd' is not defined

### Exercise 4

How many partitions does the base RDD have? What about the `groupedByOffenseRDD` RDD? How can you find out?

**Hint**: Check the [API documentation](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.rdd.RDD).

In [10]:
# TODO
# Replace <FILL-IN> with the appropriate code.
print (base_rdd.getNumPartitions())
print (grouped_by_offense_rdd.getNumPartitions())

NameError: name 'base_rdd' is not defined

### Exercise 5

Since we're continually playing around with this data, we might as well cache it, to speed things up.

**Question**: Which RDD should you cache?

1. `baseRDD`
2. `noHeaderRDD`
3. `dataRDD`
4. None of them, because they're all still too big.
5. It doesn't really matter.

In [None]:
# TODO
# Replace <FILL-IN> with the appropriate code.
data_rdd.cache().count() # Why am I calling count() here?

### Exercise 6

Display the number of homicides by weapon (method).

In [None]:
# TODO
# Replace <FILL-IN> with the appropriate code.
result_rdd1 = data_rdd.filter(lambda data: data.offense =="HOMICIDE").groupBy(lambda data: data.method).mapValues(lambda value: len(value))
#encore mieux
result_rdd1 = (data_rdd
                .filter(lambda data: data.offense =="HOMICIDE")
                .map(lambda data: (data.method, 1))
                .reduceByKey(add)
              )
print (result_rdd1.collect())

# BONUS: Make the output look better, using a for loop or a list comprehension.

### Exercise 7

During which police shift did the most crimes occur in the period covered by the data?

In [None]:
# TODO
# Replace <FILL-IN> with the appropriate code.

# Hint: Start with the data_rdd
print (data_rdd.groupBy(lambda data: data.shift).mapValues(lambda value: len(value)).sortBy(lambda value: value[1],False).collect())

print (data_rdd
       .groupBy(lambda data: data.shift, 1)
       .reduceByKey(lambda x,y: x + y)
       .sortBy(lambda x: -x

In [None]:
# TODO
# Replace <FILL-IN> with the appropriate code.

# Hint: Start with the data_rdd
print (data_rdd.groupBy(lambda data: data.shift).mapValues(lambda value: len(value)).sortBy(lambda value: value[1],False).collect())

print (data_rdd
       .groupBy(lambda data: data.shift, 1)
       .reduceByKey(lambda x,y: x + y)
       .sortBy(lambda x: -x(crochet 1))
       .collectAsMap())

## Let's Switch to DataFrames

### Demonstration

Let's plot murders by month. DataFrames are useful for this one.

To do this properly, we'll need to parse the dates. That will require knowing their format. A quick sampling of the data will help.

In [None]:
data_rdd.map(lambda item: item.report_time).take(30)

Okay. We can now parse the strings into actual `Date` objects.

**NOTE:** The DataFrame API does _not_ support schemas with `Date` objects in them. We'll need to convert the resulting `Date` to a `java.sql.Timestamp`.

Now, we can create the DataFrame. We'll start with the `dataRDD`, since it's already cached. Note that we're using the built-in `unix_timestamp` function, which parses
a string according to a Java [SimpleDateFormat](https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html) pattern, returning the number of seconds since
the epoch (January 1, 1970 at midnight UTC), which we can then cast to a SQL timestamp type.

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
df0 = data_rdd.toDF()
df = df0.select(unix_timestamp(df0['report_time'], 'M/dd/yyyy KK:mm:SS a').cast('timestamp').alias('report_time'),
                df0['shift'],
                df0['offense'],
                df0['method'])

In [None]:
df.printSchema()

In [None]:
df.show()

Let's use the built-in
<a href="http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@month(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column" target="_blank"><tt>month</tt></a>
function to extract the month. 

In [None]:
df_with_month = df.select(df['*'], month(df['report_time']).alias('month'))

In [None]:
(
  df_with_month.filter(df_with_month['offense'] == 'HOMICIDE')
               .select(df_with_month['month'], df_with_month['offense'])
               .groupBy('month')
               .count().show()
)

What about all crimes per month?

In [None]:
df_with_month.select(df_with_month['month']).groupBy('month').count().show()

### Exercise 8

Plot the frequency of crimes by hour of day.

In [None]:
# TODO
# Replace <FILL-IN> with your code.

  <FILL-IN>.show()