In [1]:
# BROADCAST VARIABLES ARE USED IN PROCESSES TO ACCESS SHARED DATA 
# ACCUMULATORS ALLOW MULTIPLE PROCESSES TO UPDATED SHARED VARIABLES
# SPARK CONTEXT (SC)
# SPARK SESSION COMPRISES OF SPARK CONTEXT (SC) AND HIVE CONTEXT (HC)

In [2]:
from pyspark.sql import SparkSession

In [3]:
# WE CAN GET (IF IT ALREADY EXISTS) OR CREATE A SparkSession AS SUCH
spark = SparkSession.builder\
                    .appName("Analyzing London crime data")\
                    .getOrCreate()

In [7]:
# WE DESIGNATED THAT THE DATA IS IN A csv FORMAT, AND DESIGNATE header SO 
data = spark.read\
            .format("csv")\
            .option("header", "true")\
            .load("../datasets/london_crime_by_lsoa.csv")

In [8]:
# ALLOWS US TO SEE THE SCHEMA OF THE DATA THAT WE JUST READ IN
data.printSchema()

root
 |-- lsoa_code: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- major_category: string (nullable = true)
 |-- minor_category: string (nullable = true)
 |-- value: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)



In [9]:
data.count()

13490604

In [10]:
data.limit(5).show()

+---------+----------+--------------------+--------------------+-----+----+-----+
|lsoa_code|   borough|      major_category|      minor_category|value|year|month|
+---------+----------+--------------------+--------------------+-----+----+-----+
|E01001116|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|   Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
+---------+----------+--------------------+--------------------+-----+----+-----+



In [11]:
# JUST LIKE PANDAS DROPS ROWS W/NA
data.dropna()

DataFrame[lsoa_code: string, borough: string, major_category: string, minor_category: string, value: string, year: string, month: string]

In [12]:
# TRIM DATA BY DROPPING COLUMNS WHICH ARE NOT USEFUL FOR ANALYSIS
data = data.drop('lsoa_code')

data.show(5)

+----------+--------------------+--------------------+-----+----+-----+
|   borough|      major_category|      minor_category|value|year|month|
+----------+--------------------+--------------------+-----+----+-----+
|   Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
| Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|   Bromley|Violence Against ...|      Other violence|    0|2015|    5|
| Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
+----------+--------------------+--------------------+-----+----+-----+
only showing top 5 rows



In [13]:
# YOU CAN SEE UNIQUE VALUES W/IN A COLUMN BY UTILIZING THE .distinct OPERATION
total_boroughs = data.select('borough')\
                     .distinct()
total_boroughs.show()

+--------------------+
|             borough|
+--------------------+
|             Croydon|
|          Wandsworth|
|              Bexley|
|             Lambeth|
|Barking and Dagenham|
|              Camden|
|           Greenwich|
|              Newham|
|       Tower Hamlets|
|            Hounslow|
|              Barnet|
|              Harrow|
|Kensington and Ch...|
|           Islington|
|               Brent|
|            Haringey|
|             Bromley|
|              Merton|
|         Westminster|
|             Hackney|
+--------------------+
only showing top 20 rows



In [14]:
total_boroughs.count()

33

In [17]:
# YOU CAN FILETER OUT RECORDS OF A SPECIFIC VALUE 
hackney_data = data.filter(data['borough'] == 'Hackney')

hackney_data.show(5)

+-------+--------------------+--------------------+-----+----+-----+
|borough|      major_category|      minor_category|value|year|month|
+-------+--------------------+--------------------+-----+----+-----+
|Hackney|     Criminal Damage|Criminal Damage T...|    0|2011|    6|
|Hackney|Violence Against ...|          Harassment|    1|2013|    2|
|Hackney|     Criminal Damage|Other Criminal Da...|    0|2011|    7|
|Hackney|Violence Against ...|        Wounding/GBH|    0|2013|   12|
|Hackney|  Theft and Handling|  Other Theft Person|    0|2016|    8|
+-------+--------------------+--------------------+-----+----+-----+
only showing top 5 rows



In [18]:
# HERE WE FILTER THE RECORDS USING .isin FUNCTION TO SPECIFY YEARS
# .sample FUNCTION ALLOWS YOU TO SEE A CERTAIN FRACTION OF YOUR DATA SET
data_2015_2016 = data.filter(data['year'].isin(['2015','2016']))
data_2015_2016.sample(fraction=0.1).show()

+--------------------+--------------------+--------------------+-----+----+-----+
|             borough|      major_category|      minor_category|value|year|month|
+--------------------+--------------------+--------------------+-----+----+-----+
|          Hillingdon|  Theft and Handling|Theft/Taking Of M...|    0|2016|    2|
|           Southwark|  Theft and Handling|    Theft From Shops|    4|2016|    8|
|             Croydon|             Robbery|   Personal Property|    0|2016|    1|
|      Waltham Forest|Violence Against ...|      Other violence|    0|2016|    9|
|Kensington and Ch...|             Robbery|   Personal Property|    1|2015|    7|
|      Waltham Forest|            Burglary|Burglary in Other...|    0|2015|    6|
|              Bexley|Violence Against ...| Assault with Injury|    0|2015|    6|
|         Westminster|  Theft and Handling|Theft From Motor ...|    6|2016|    8|
|               Brent|  Theft and Handling|Theft/Taking of P...|    0|2015|    1|
|              B

In [20]:
# WE CAN USE OTHER VARIABLE MANIPULATIONS TO EXTRACT CERTAIN DATA
data_2014_onwards = data.filter(data['year']>= 2014 )
data_2014_onwards.sample(fraction=0.1).show()

+--------------------+--------------------+--------------------+-----+----+-----+
|             borough|      major_category|      minor_category|value|year|month|
+--------------------+--------------------+--------------------+-----+----+-----+
|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|             Lambeth|Violence Against ...|      Other violence|    0|2015|    4|
|            Havering|    Fraud or Forgery|  Counted per Victim|    0|2015|   11|
|           Southwark|  Theft and Handling|    Theft From Shops|    4|2016|    8|
|              Newham|     Criminal Damage|Criminal Damage T...|    0|2015|    1|
|             Hackney|             Robbery|   Business Property|    0|2016|    7|
|            Havering|            Burglary|Burglary in a Dwe...|    1|2016|    8|
|             Croydon|Violence Against ...|      Common Assault|    1|2014|   11|
|             Bromley|     Criminal Damage|Criminal Damage T...|    0|2014|    6|
|           Isli

In [21]:
#PERFORM AGGREGATIONS ON DFs
# .groupBY IS SIMILAR TO SQL
borough_crime_count = data.groupBy('borough')\
                          .count()

borough_crime_count.show(5)

+--------------------+------+
|             borough| count|
+--------------------+------+
|             Croydon|602100|
|          Wandsworth|498636|
|              Bexley|385668|
|             Lambeth|519048|
|Barking and Dagenham|311040|
+--------------------+------+
only showing top 5 rows



In [22]:
# SUM IS A BUILT IN AGGREGATION IN SPARK DFs, WE UTILIZE BY INVOKING THE .agg METHOD
borough_conviction_sum = data.groupBy('borough')\
                             .agg({'value' : 'sum'})

borough_conviction_sum.show(5)

+--------------------+----------+
|             borough|sum(value)|
+--------------------+----------+
|             Croydon|  260294.0|
|          Wandsworth|  204741.0|
|              Bexley|  114136.0|
|             Lambeth|  292178.0|
|Barking and Dagenham|  149447.0|
+--------------------+----------+
only showing top 5 rows



In [24]:
borough_conviction_sum = data.groupBy('borough')\
                             .agg({'value' : 'sum'})\
                             .withColumnRenamed("sum(value)", "convictions")

borough_conviction_sum.show(5)

+--------------------+-----------+
|             borough|convictions|
+--------------------+-----------+
|             Croydon|   260294.0|
|          Wandsworth|   204741.0|
|              Bexley|   114136.0|
|             Lambeth|   292178.0|
|Barking and Dagenham|   149447.0|
+--------------------+-----------+
only showing top 5 rows



In [27]:
# WE UTILIZE THE SUM FUNCTION W/OUT GROUPBY 
total_borough_convictions = borough_conviction_sum.agg({"convictions" : "sum"})

total_borough_convictions.show()

+----------------+
|sum(convictions)|
+----------------+
|       6447758.0|
+----------------+



In [28]:
total_convictions = total_borough_convictions.collect()[0][0]

In [29]:
import pyspark.sql.functions as func

In [31]:
# ADDED A NEW COLUMN % contribution
# ROUNDS RESULTS TO 2 DECIMAL PLACES, UTILIZING .round METHOD, THEN PUTTING A 2 @ THE END OF THE FUNCTION
borough_percentage_contribution = borough_conviction_sum.withColumn(
    "% contribution",
    func.round(borough_conviction_sum.convictions / total_convictions * 100, 2))

borough_percentage_contribution.printSchema()

root
 |-- borough: string (nullable = true)
 |-- convictions: double (nullable = true)
 |-- % contribution: double (nullable = true)



In [32]:
# ORDER BY THE 3RD COLUMN @ INDEX [2] IN DESCENDING MANNER
borough_percentage_contribution.orderBy(borough_percentage_contribution[2].desc())\
                               .show(10)

+-------------+-----------+--------------+
|      borough|convictions|% contribution|
+-------------+-----------+--------------+
|  Westminster|   455028.0|          7.06|
|      Lambeth|   292178.0|          4.53|
|    Southwark|   278809.0|          4.32|
|       Camden|   275147.0|          4.27|
|       Newham|   262024.0|          4.06|
|      Croydon|   260294.0|          4.04|
|       Ealing|   251562.0|           3.9|
|    Islington|   230286.0|          3.57|
|Tower Hamlets|   228613.0|          3.55|
|        Brent|   227551.0|          3.53|
+-------------+-----------+--------------+
only showing top 10 rows

