# Advanced task: Comparing RDDs vs dataframes

**Author:** Alonso Andrade Blázquez

This exercise is aimed at comparing the running time of two equivalent solutions in PySpark, one based on RDD and another on dataframe, to determine which is faster.

Given the dataset of the crimes of Chicago (https://data.cityofchicago.org/Public-Safety/Crimes-2001-to-present/ijzp-q8t2), write two programs crimeRDDAnalysis.py and crimeDataframeAnalysis.py that have to count the number of crimes per location and print the first 10 pairs (location,  count) ordered by count. 

The deliverables of this exercise will be the program files and the a document containing the characteristics of the computer you have used and the times obtained when running the codes with 1, 2, 4, 8 and cores.

## Laptop features

Laptop features:
- Windows 10 Home
- Processor Intel Core i7-4510U @ 2.00 GHz 2.60 GHz
- 16,0 GB RAM
- Spark 2.3.0
- Java JDK 1.8.0_141

## Data cleaning

Modifying file separator from comma to tab. We should do this in order to avoid confusion when loading to a RDD mapping with comma separator (it will consider more columns than the correct ones if a field contains a comma inside).

In [1]:
import pandas as pd

pandas_df = pd.read_csv('Crimes_-_2001_to_present.csv')
display(pandas_df.head(2))
pandas_df.to_csv('Crimes_-_2001_to_present.tsv', sep='\t')

Unnamed: 0,ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,...,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location
0,10000092,HY189866,03/18/2015 07:44:00 PM,047XX W OHIO ST,041A,BATTERY,AGGRAVATED: HANDGUN,STREET,False,False,...,28.0,25.0,04B,1144606.0,1903566.0,2015,02/10/2018 03:50:01 PM,41.891399,-87.744385,"(41.891398861, -87.744384567)"
1,10000094,HY190059,03/18/2015 11:00:00 PM,066XX S MARSHFIELD AVE,4625,OTHER OFFENSE,PAROLE VIOLATION,STREET,True,False,...,15.0,67.0,26,1166468.0,1860715.0,2015,02/10/2018 03:50:01 PM,41.773372,-87.665319,"(41.773371528, -87.665319468)"


## Solution with Spark RDD

In [2]:
import time
from pyspark import SparkConf, SparkContext

In [3]:
def count_crime_location_descriptions_RDD(file_name: str, number_of_threads: str) -> None:
    
    spark_conf = SparkConf().setMaster(number_of_threads)
    spark_context = SparkContext(conf=spark_conf)

    logger = spark_context._jvm.org.apache.log4j
    logger.LogManager.getLogger("org").setLevel(logger.Level.WARN)
    
    start_computing_time = time.time()
    
    # Load file
    crimes_file = spark_context.textFile(file_name)
    
    header = crimes_file.first()
    
    crimes_location_count = crimes_file \
        .filter(lambda line: header not in line) \
        .map(lambda line: line.split('\t')[8]) \
        .map(lambda location_description: (location_description, 1)) \
        .reduceByKey(lambda a, b: a + b)\
        .map(lambda pair: (pair[1], pair[0])) \
        .sortByKey(False) \
        .take(10) \
        
    total_computing_time = time.time() - start_computing_time
    
    print("Crimes count of the different types of locations")
    
    for (location_description, count) in crimes_location_count:
        print(location_description, count)
        
    print("\nComputing time with ", number_of_threads, ": ", str(total_computing_time))

    spark_context.stop()

In [4]:
count_crime_location_descriptions_RDD("Crimes_-_2001_to_present.tsv", "local[1]")

Crimes count of the different types of locations
1732048 STREET
1113617 RESIDENCE
675585 APARTMENT
650510 SIDEWALK
249276 OTHER
188765 PARKING LOT/GARAGE(NON.RESID.)
147384 ALLEY
140283 SCHOOL, PUBLIC, BUILDING
128880 RESIDENCE-GARAGE
114931 RESIDENCE PORCH/HALLWAY

Computing time with  local[1] :  569.3139615058899


In [5]:
count_crime_location_descriptions_RDD("Crimes_-_2001_to_present.tsv", "local[2]")

Crimes count of the different types of locations
1732048 STREET
1113617 RESIDENCE
675585 APARTMENT
650510 SIDEWALK
249276 OTHER
188765 PARKING LOT/GARAGE(NON.RESID.)
147384 ALLEY
140283 SCHOOL, PUBLIC, BUILDING
128880 RESIDENCE-GARAGE
114931 RESIDENCE PORCH/HALLWAY

Computing time with  local[2] :  413.1733124256134


In [6]:
count_crime_location_descriptions_RDD("Crimes_-_2001_to_present.tsv", "local[4]")

Crimes count of the different types of locations
1732048 STREET
1113617 RESIDENCE
675585 APARTMENT
650510 SIDEWALK
249276 OTHER
188765 PARKING LOT/GARAGE(NON.RESID.)
147384 ALLEY
140283 SCHOOL, PUBLIC, BUILDING
128880 RESIDENCE-GARAGE
114931 RESIDENCE PORCH/HALLWAY

Computing time with  local[4] :  339.36436462402344


In [7]:
count_crime_location_descriptions_RDD("Crimes_-_2001_to_present.tsv", "local[8]")

Crimes count of the different types of locations
1732048 STREET
1113617 RESIDENCE
675585 APARTMENT
650510 SIDEWALK
249276 OTHER
188765 PARKING LOT/GARAGE(NON.RESID.)
147384 ALLEY
140283 SCHOOL, PUBLIC, BUILDING
128880 RESIDENCE-GARAGE
114931 RESIDENCE PORCH/HALLWAY

Computing time with  local[8] :  337.79659962654114


## Solution with Spark Dataframes

In [26]:
from pyspark.sql import SparkSession

In [37]:
def count_crime_location_descriptions_DataFrame(file_name: str, number_of_threads: str) -> None:    
    spark_session = SparkSession \
        .builder \
        .master(number_of_threads) \
        .getOrCreate()
    
    start_computing_time = time.time()

    # Load tsv file
    crimes_df = spark_session\
        .read \
        .format("csv") \
        .option("header", "true") \
        .option("delimiter", "\t") \
        .load(file_name)
    
    # Retrieve location descriptions count by using a temporary view
    crimes_df.createOrReplaceTempView("chicago_crimes")

    sql_locations_count = spark_session.sql("SELECT COUNT(`Location Description`) AS `Number of crimes`, \
                                            `Location Description` \
                                            FROM chicago_crimes \
                                            GROUP BY `Location Description` \
                                            ORDER BY COUNT (`Location Description`) DESC \
                                            LIMIT 10")
    
    print("Crimes count of the different types of locations")
    sql_locations_count.show(truncate=False)
    
    total_computing_time = time.time() - start_computing_time
        
    print("\nComputing time with ", number_of_threads, ": ", str(total_computing_time))
    
    spark_session.stop()

In [38]:
count_crime_location_descriptions_DataFrame("Crimes_-_2001_to_present.tsv", "local[1]")

Crimes count of the different types of locations
+----------------+------------------------------+
|Number of crimes|Location Description          |
+----------------+------------------------------+
|1732048         |STREET                        |
|1113617         |RESIDENCE                     |
|675585          |APARTMENT                     |
|650510          |SIDEWALK                      |
|249276          |OTHER                         |
|188765          |PARKING LOT/GARAGE(NON.RESID.)|
|147384          |ALLEY                         |
|140283          |SCHOOL, PUBLIC, BUILDING      |
|128880          |RESIDENCE-GARAGE              |
|114931          |RESIDENCE PORCH/HALLWAY       |
+----------------+------------------------------+


Computing time with  local[1] :  178.44208574295044


In [39]:
count_crime_location_descriptions_DataFrame("Crimes_-_2001_to_present.tsv", "local[2]")

Crimes count of the different types of locations
+----------------+------------------------------+
|Number of crimes|Location Description          |
+----------------+------------------------------+
|1732048         |STREET                        |
|1113617         |RESIDENCE                     |
|675585          |APARTMENT                     |
|650510          |SIDEWALK                      |
|249276          |OTHER                         |
|188765          |PARKING LOT/GARAGE(NON.RESID.)|
|147384          |ALLEY                         |
|140283          |SCHOOL, PUBLIC, BUILDING      |
|128880          |RESIDENCE-GARAGE              |
|114931          |RESIDENCE PORCH/HALLWAY       |
+----------------+------------------------------+


Computing time with  local[2] :  43.52302169799805


In [40]:
count_crime_location_descriptions_DataFrame("Crimes_-_2001_to_present.tsv", "local[4]")

Crimes count of the different types of locations
+----------------+------------------------------+
|Number of crimes|Location Description          |
+----------------+------------------------------+
|1732048         |STREET                        |
|1113617         |RESIDENCE                     |
|675585          |APARTMENT                     |
|650510          |SIDEWALK                      |
|249276          |OTHER                         |
|188765          |PARKING LOT/GARAGE(NON.RESID.)|
|147384          |ALLEY                         |
|140283          |SCHOOL, PUBLIC, BUILDING      |
|128880          |RESIDENCE-GARAGE              |
|114931          |RESIDENCE PORCH/HALLWAY       |
+----------------+------------------------------+


Computing time with  local[4] :  21.1430242061615


In [41]:
count_crime_location_descriptions_DataFrame("Crimes_-_2001_to_present.tsv", "local[8]")

Crimes count of the different types of locations
+----------------+------------------------------+
|Number of crimes|Location Description          |
+----------------+------------------------------+
|1732048         |STREET                        |
|1113617         |RESIDENCE                     |
|675585          |APARTMENT                     |
|650510          |SIDEWALK                      |
|249276          |OTHER                         |
|188765          |PARKING LOT/GARAGE(NON.RESID.)|
|147384          |ALLEY                         |
|140283          |SCHOOL, PUBLIC, BUILDING      |
|128880          |RESIDENCE-GARAGE              |
|114931          |RESIDENCE PORCH/HALLWAY       |
+----------------+------------------------------+


Computing time with  local[8] :  23.687837839126587


### Conclusions

We can see above that using Spark SQL (Dataframes) computing time is much lower (with a minimum of around 20 seconds) than using Spark RDD, for any thread configuration.