## Basic data research

In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql as pysql

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('spark-police-data-app') \
    .getOrCreate()

23/01/24 19:12:36 WARN Utils: Your hostname, MacBook-Pro-16.local resolves to a loopback address: 127.0.0.1; using 192.168.0.102 instead (on interface en0)
23/01/24 19:12:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/24 19:12:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

### Load Outcome data and check schema. We see that columns can be null, so we should manage this in code while transforming

In [4]:
outcome_df = (spark.read \
    .option("header", "true") \
    .parquet("datasets/intermid_parquets/outcome/*")
    .select('Crime ID', 'Outcome type')
)
print(f"Outcomes count: {outcome_df.count()}")

                                                                                

Outcomes count: 1679624


In [5]:
outcome_df.printSchema()

root
 |-- Crime ID: string (nullable = true)
 |-- Outcome type: string (nullable = true)



In [6]:
outcome_df.sample(fraction=0.3, seed=142).show(5, truncate=False)

+----------------------------------------------------------------+---------------------------------------------+
|Crime ID                                                        |Outcome type                                 |
+----------------------------------------------------------------+---------------------------------------------+
|383a4f6b2ad869210558ea0d9d9b2ae7403aae5599e827eb182712fbf3160eab|Investigation complete; no suspect identified|
|276c9c3992fbf2c031862176b1bba0a50992f49b5ef62f201f3cbfcb2d23e9f5|Investigation complete; no suspect identified|
|448a300ab2c02cac490e2df249e2ba927bccc43f9e3b211462d3040c3a5c460a|Investigation complete; no suspect identified|
|ee8c2aa361b9b310b1195ba9e7afa9220a4d2e42b9a57a0118801493796a693b|Investigation complete; no suspect identified|
|382f1adf7574030fddfbdb671d529717e1746cfe000e5d2d0504a7350a0ec80f|Investigation complete; no suspect identified|
+----------------------------------------------------------------+------------------------------

##### Null checks show we have no such rows

In [7]:
outcome_df.select([F.count(F.when((F.col(c) == 'null') | (F.col(c) == -0.0), c)).alias(c) for c in outcome_df.columns]).show()



+--------+------------+
|Crime ID|Outcome type|
+--------+------------+
|       0|           0|
+--------+------------+



                                                                                

##### Check Outcomes statistics for the whole dataset

In [8]:
outcome_df \
.groupBy('Outcome type') \
.agg(F.count('Outcome type').alias('counted')) \
.orderBy(F.desc('counted')) \
.show(truncate=False)

+---------------------------------------------------+-------+
|Outcome type                                       |counted|
+---------------------------------------------------+-------+
|Investigation complete; no suspect identified      |763014 |
|Unable to prosecute suspect                        |656796 |
|Suspect charged                                    |132653 |
|Local resolution                                   |45686  |
|Action to be taken by another organisation         |21713  |
|Offender given a caution                           |18778  |
|Further investigation is not in the public interest|15319  |
|Further action is not in the public interest       |10435  |
|Formal action is not in the public interest        |10179  |
|Offender given penalty notice                      |2590   |
|Suspect charged as part of another case            |1190   |
+---------------------------------------------------+-------+



### Load Places data and check schema. We see that columns can be null, so we should manage this in code while transforming

In [9]:
places_df = (spark.read \
    .option("header", "true")
    .parquet("datasets/intermid_parquets/places/*")
    .select('Crime ID', 'Month', 'Reported by', 'Longitude', 'Latitude', 'Crime type', 'Last outcome category')
)

print(f"Places count: {places_df.count()}")

Places count: 1746586


In [10]:
places_df.printSchema()

root
 |-- Crime ID: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Reported by: string (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Crime type: string (nullable = true)
 |-- Last outcome category: string (nullable = true)



In [11]:

places_df.sample(fraction=0.3, seed=140).show(5)

+--------------------+-------+--------------------+---------+---------+--------------------+---------------------+
|            Crime ID|  Month|         Reported by|Longitude| Latitude|          Crime type|Last outcome category|
+--------------------+-------+--------------------+---------+---------+--------------------+---------------------+
|fb0683b8f5616de7b...|2022-10|Metropolitan Poli...| 0.136416|51.584898|        Public order|  Under investigation|
|d652b245f882e8aa6...|2022-10|Metropolitan Poli...| 0.135924|51.587353|        Public order|  Under investigation|
|cfc55c544278d27a6...|2022-10|Metropolitan Poli...| 0.140194|51.582356|             Robbery|  Under investigation|
|12dca2a42408aacad...|2022-10|Metropolitan Poli...| 0.135579|51.584913|       Vehicle crime|  Under investigation|
|7817e95e0dd6a84c0...|2022-10|Metropolitan Poli...| 0.140576|51.583419|Violence and sexu...|  Under investigation|
+--------------------+-------+--------------------+---------+---------+---------

##### Null checks show we have nulls for Longitude, Latitude and Last outcome category columns

In [12]:
places_df.select([F.count(F.when((F.col(c) == 'null') | (F.col(c) == -0.0), c)).alias(c) for c in places_df.columns]).show()



+--------+-----+-----------+---------+--------+----------+---------------------+
|Crime ID|Month|Reported by|Longitude|Latitude|Crime type|Last outcome category|
+--------+-----+-----------+---------+--------+----------+---------------------+
|       0|    0|          0|    36060|   36060|         0|                36495|
+--------+-----+-----------+---------+--------+----------+---------------------+



                                                                                

##### Check Last outcome category for places statistics for the whole dataset

In [13]:
places_df \
.groupBy('Last outcome category') \
.agg(F.count('Last outcome category').alias('counted')) \
.orderBy(F.desc('counted')) \
.show(truncate=False)

+---------------------------------------------------+-------+
|Last outcome category                              |counted|
+---------------------------------------------------+-------+
|Under investigation                                |603988 |
|Investigation complete; no suspect identified      |543557 |
|Unable to prosecute suspect                        |427244 |
|Awaiting court outcome                             |59000  |
|null                                               |36495  |
|Local resolution                                   |29297  |
|Action to be taken by another organisation         |14624  |
|Further investigation is not in the public interest|9627   |
|Offender given a caution                           |9282   |
|Further action is not in the public interest       |5718   |
|Formal action is not in the public interest        |4660   |
|Offender given penalty notice                      |1909   |
|Suspect charged as part of another case            |367    |
|Court r

#### Merge Outcomes nad Places and check duplicates

In [14]:
df_all = places_df.join(outcome_df, on='Crime ID', how='left')

In [15]:
df_all.sample(fraction=0.3, seed=142).show(10)



+--------------------+-------+--------------------+---------+---------+--------------------+---------------------+--------------------+
|            Crime ID|  Month|         Reported by|Longitude| Latitude|          Crime type|Last outcome category|        Outcome type|
+--------------------+-------+--------------------+---------+---------+--------------------+---------------------+--------------------+
|0000660af187ccfe0...|2022-11|Suffolk Constabulary| 1.698238|52.458968|Violence and sexu...| Offender given a ...|Offender given a ...|
|0007596164e4dde4e...|2022-08|   Merseyside Police|-2.686276|53.447738|Violence and sexu...| Unable to prosecu...|Unable to prosecu...|
|000d2c4255b4c51cb...|2022-08|Devon & Cornwall ...|-4.135883|50.385401|            Burglary|  Under investigation|                null|
|000ee8509643fb926...|2022-10|West Midlands Police| -1.89286|52.489326|Violence and sexu...| Investigation com...|Investigation com...|
|000fc54e3bb91ca40...|2022-11|Metropolitan Poli.

                                                                                

In [16]:
df_all \
.groupBy('Crime ID') \
.agg(F.count('Crime ID').alias('counted')) \
.orderBy(F.desc('counted')) \
.show(truncate=False)

[Stage 31:>                                                       (0 + 16) / 17]

+----------------------------------------------------------------+-------+
|Crime ID                                                        |counted|
+----------------------------------------------------------------+-------+
|7fcc9b48992d482250ab58aee146da8c1a3c9c41ecc75c97fd0d0b473031a702|42     |
|fd05ba733e6b3b437746184e496bfa8d45bcec8e6e58dc21ae6352fdcc70088a|25     |
|3c6e1f8ede437daf0d68159db6179af0f4880837a00f7e2019dd55c0483b2a4e|25     |
|858e15159a664061856e1a282503efff06370413020382466a8f6bd764a6e752|24     |
|b70ecd551c578cc5a8dfb2d82268e55212297ef55ecd91743ae19ecfc3722a91|20     |
|5cf6d39519b1ab4609286fe89e326ce7a6c7c087a87a0d6010a470f9a6e87805|18     |
|5b97a52e34e897a389a46fa7f039b0f88f5a613b052312e558789577c1d877f9|18     |
|8e4a865bdeb256f49448d2b2c659cdabd384672bde2bd5308e94c2ff89896738|16     |
|34fed62ba224bcbe6188201dc2ae95dccd41e3b61bc5147b11aec8c1a72f16cd|16     |
|e526b855d7e417f819b32f6393cbf639052b91da75f0d4b3fbbd4ab557d19700|15     |
|d290fdad8c1163d3f148d8ae

                                                                                

In [17]:
df_all.filter(F.col('Crime ID') == '7fcc9b48992d482250ab58aee146da8c1a3c9c41ecc75c97fd0d0b473031a702').show()

+--------------------+-------+--------------------+---------+---------+----------+---------------------+---------------+
|            Crime ID|  Month|         Reported by|Longitude| Latitude|Crime type|Last outcome category|   Outcome type|
+--------------------+-------+--------------------+---------+---------+----------+---------------------+---------------+
|7fcc9b48992d48225...|2022-11|Leicestershire Po...|-1.118959|52.639872|   Robbery| Awaiting court ou...|Suspect charged|
|7fcc9b48992d48225...|2022-11|Leicestershire Po...|-1.118959|52.639872|   Robbery| Awaiting court ou...|Suspect charged|
|7fcc9b48992d48225...|2022-11|Leicestershire Po...|-1.118959|52.639872|   Robbery| Awaiting court ou...|Suspect charged|
|7fcc9b48992d48225...|2022-11|Leicestershire Po...|-1.118959|52.639872|   Robbery| Awaiting court ou...|Suspect charged|
|7fcc9b48992d48225...|2022-11|Leicestershire Po...|-1.118959|52.639872|   Robbery| Awaiting court ou...|Suspect charged|
|7fcc9b48992d48225...|2022-11|Le

In [18]:
df_all \
.groupby(['Crime ID', 'Month', 'Reported by', 'Longitude', 'Latitude', 'Crime type', 'Last outcome category', 'Outcome type']) \
.count() \
.where('count > 1') \
.sort('count', ascending=False) \
.show()



23/01/24 19:13:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:42 WARN RowBasedKeyValueBatch: Calling spill() on

[Stage 42:>                                                       (0 + 16) / 17]

23/01/24 19:13:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:13:43 WARN RowBasedKeyValueBatch: Calling spill() on



+--------------------+-------+--------------------+---------+---------+--------------------+---------------------+--------------------+-----+
|            Crime ID|  Month|         Reported by|Longitude| Latitude|          Crime type|Last outcome category|        Outcome type|count|
+--------------------+-------+--------------------+---------+---------+--------------------+---------------------+--------------------+-----+
|7fcc9b48992d48225...|2022-11|Leicestershire Po...|-1.118959|52.639872|             Robbery| Awaiting court ou...|     Suspect charged|   42|
|fd05ba733e6b3b437...|2022-11|Leicestershire Po...|-1.209047|52.772487|        Public order|     Local resolution|    Local resolution|   25|
|3c6e1f8ede437daf0...|2022-10|Metropolitan Poli...|-0.069378|51.592897|               Drugs|  Under investigation|    Local resolution|   20|
|5cf6d39519b1ab460...|2022-08|Metropolitan Poli...|-0.153797|51.320944|         Other crime|  Under investigation|                null|   17|
|5b97a

                                                                                

#### We see for some Crime ID we have many duplicates. Remove them!

In [18]:
print(f"Before dropping of duplicates, count: {df_all.count()}")
df_all = df_all.dropDuplicates()
print(f"Rows remained, count: {df_all.count()}")

                                                                                

Before dropping of duplicates, count: 1795490




Rows remained, count: 1740882


                                                                                

#### Check Outcomes in final DataFrame. Create lastOutcome column and init it as specified:
i. The last outcome should be taken from the <district>-outcomes.csv file
where the crime IDs match. If there is no matching data use the data
listed in the original <district>.csv file.

In [19]:
df_all.withColumn('lastOutcome', F.when(F.col('Outcome type') != 'null', F.col('Outcome type')).otherwise(F.col('Last outcome category'))) \
      .select('Crime ID', 'Crime type', 'Last outcome category', 'Outcome type', 'lastOutcome')\
      .show()



+--------------------+--------------------+---------------------+--------------------+--------------------+
|            Crime ID|          Crime type|Last outcome category|        Outcome type|         lastOutcome|
+--------------------+--------------------+---------------------+--------------------+--------------------+
|0000660af187ccfe0...|Violence and sexu...| Offender given a ...|Offender given a ...|Offender given a ...|
|000149eacb36e3c22...|Violence and sexu...| Awaiting court ou...|     Suspect charged|     Suspect charged|
|0001c071762e069d7...|Violence and sexu...|  Under investigation|                null| Under investigation|
|000246aee3bfa4abb...|       Vehicle crime| Investigation com...|Investigation com...|Investigation com...|
|0002d7d5a5a237ce5...|Violence and sexu...| Unable to prosecu...|Unable to prosecu...|Unable to prosecu...|
|000516cf06f6c1629...|Violence and sexu...| Awaiting court ou...|     Suspect charged|     Suspect charged|
|0005521e6136a6ed6...|Violen

                                                                                

In [20]:
df_all = (df_all.withColumn('lastOutcome', 
                F.when(F.col('Outcome type') != 'null', F.col('Outcome type'))
                .otherwise(F.col('Last outcome category')))            
            .withColumnRenamed('Crime ID', 'crimeId')
            .withColumnRenamed('Reported by', 'districtName')
            .withColumnRenamed('Longitude', 'longitude')
            .withColumnRenamed('Latitude', 'latitude')
            .withColumnRenamed('Crime type', 'crimeType')
            .select('crimeId', 'districtName', 'latitude', 'longitude', 'crimeType', 'lastOutcome')
)

df_all.show(5)



+--------------------+--------------------+---------+---------+--------------------+--------------------+
|             crimeId|        districtName| latitude|longitude|           crimeType|         lastOutcome|
+--------------------+--------------------+---------+---------+--------------------+--------------------+
|0000660af187ccfe0...|Suffolk Constabulary|52.458968| 1.698238|Violence and sexu...|Offender given a ...|
|000149eacb36e3c22...|South Yorkshire P...|53.413767|-1.459857|Violence and sexu...|     Suspect charged|
|0001c071762e069d7...|West Yorkshire Po...|      0.0|      0.0|Violence and sexu...| Under investigation|
|000246aee3bfa4abb...|  South Wales Police|51.496043|-3.146369|       Vehicle crime|Investigation com...|
|0002d7d5a5a237ce5...|Leicestershire Po...|52.638846|-1.055671|Violence and sexu...|Unable to prosecu...|
+--------------------+--------------------+---------+---------+--------------------+--------------------+
only showing top 5 rows



                                                                                

#### Aggregate some statistics on the data

In [21]:
def agg_crime_types_count(df: pysql.DataFrame):
    total = df.count()
    return (df.groupBy('crimeType').agg(F.count('crimeType').alias('cases'))
             .withColumn('percent', F.round(100.0 * F.col('cases')/total, 2))
             .orderBy(F.desc('percent'))
             )

check_df = agg_crime_types_count(df_all)
assert 100.0 == round(check_df.select(F.sum("percent").alias('prc')).collect()[0]['prc'], 0)

check_df.show(truncate=False)



23/01/24 19:02:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:49 WARN RowBasedKeyValueBatch: Calling spill() on

[Stage 79:>                                                       (0 + 16) / 17]

23/01/24 19:02:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:50 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:50 WARN RowBasedKeyValueBatch: Calling spill() on



23/01/24 19:02:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:55 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:55 WARN RowBasedKeyValueBatch: Calling spill() on

[Stage 103:>                                                      (0 + 16) / 17]

23/01/24 19:02:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:02:56 WARN RowBasedKeyValueBatch: Calling spill() on



+----------------------------+------+-------+
|crimeType                   |cases |percent|
+----------------------------+------+-------+
|Violence and sexual offences|715969|41.13  |
|Public order                |178948|10.28  |
|Criminal damage and arson   |166910|9.59   |
|Other theft                 |162290|9.32   |
|Vehicle crime               |133448|7.67   |
|Shoplifting                 |105326|6.05   |
|Burglary                    |86949 |4.99   |
|Drugs                       |56203 |3.23   |
|Other crime                 |36624 |2.1    |
|Theft from the person       |31983 |1.84   |
|Bicycle theft               |26524 |1.52   |
|Robbery                     |22655 |1.3    |
|Possession of weapons       |17053 |0.98   |
+----------------------------+------+-------+



                                                                                

In [22]:
def agg_crime_types_by_disctrict_count(df: pysql.DataFrame):
    _df = df.groupBy('crimeType', 'districtName').agg(F.count('crimeType').alias('cases'))
    total = df.count()

    return (_df.withColumn('percent', F.round(100.0 * F.col('cases')/total, 2))
               .orderBy(F.desc('percent'))
             )

check_df = agg_crime_types_by_disctrict_count(df_all)
assert 100.0 == round(check_df.select(F.sum("percent").alias('prc')).collect()[0]['prc'], 0)

check_df.show(truncate=False)



23/01/24 19:03:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:05 WARN RowBasedKeyValueBatch: Calling spill() on

[Stage 121:>                                                      (0 + 16) / 17]

23/01/24 19:03:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:06 WARN RowBasedKeyValueBatch: Calling spill() on



23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on

[Stage 145:>                                                      (0 + 16) / 17]

23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/01/24 19:03:12 WARN RowBasedKeyValueBatch: Calling spill() on



+----------------------------+----------------------------------+-----+-------+
|crimeType                   |districtName                      |cases|percent|
+----------------------------+----------------------------------+-----+-------+
|Violence and sexual offences|Metropolitan Police Service       |78623|4.52   |
|Violence and sexual offences|West Midlands Police              |53400|3.07   |
|Violence and sexual offences|West Yorkshire Police             |47425|2.72   |
|Other theft                 |Metropolitan Police Service       |43404|2.49   |
|Vehicle crime               |Metropolitan Police Service       |38472|2.21   |
|Violence and sexual offences|Thames Valley Police              |27491|1.58   |
|Violence and sexual offences|Kent Police                       |27305|1.57   |
|Violence and sexual offences|Hampshire Constabulary            |26502|1.52   |
|Violence and sexual offences|Essex Police                      |25142|1.44   |
|Violence and sexual offences|Merseyside

                                                                                

In [19]:
def agg_outcome_by_disctrict_count(df: pysql.DataFrame):
    _df = df.groupBy('lastOutcome', 'districtName').agg(F.count('lastOutcome').alias('cases'))
    total = df.count()

    return (_df.withColumn('percent', F.round(100.0 * F.col('cases')/total, 2))
               .orderBy(F.desc('percent'))
             )

check_df = agg_outcome_by_disctrict_count(df_all)
assert 100.0 == round(check_df.select(F.sum("percent").alias('prc')).collect()[0]['prc'], 0)

check_df.show(truncate=False)

AnalysisException: Column 'lastOutcome' does not exist. Did you mean one of the following? [Latitude, Crime type, Longitude, Outcome type, Month, Crime ID, Reported by, Last outcome category];
'Aggregate ['lastOutcome, 'districtName], ['lastOutcome, 'districtName, count('lastOutcome) AS cases#396]
+- Project [Crime ID#65, Month#66, Reported by#67, Longitude#68, Latitude#69, Crime type#70, Last outcome category#71, Outcome type#1]
   +- Join LeftOuter, (Crime ID#65 = Crime ID#0)
      :- Project [Crime ID#65, Month#66, Reported by#67, Longitude#68, Latitude#69, Crime type#70, Last outcome category#71]
      :  +- Relation [Crime ID#65,Month#66,Reported by#67,Longitude#68,Latitude#69,Crime type#70,Last outcome category#71,__index_level_0__#72L] parquet
      +- Project [Crime ID#0, Outcome type#1]
         +- Relation [Crime ID#0,Outcome type#1,__index_level_0__#2L] parquet


In [25]:
def agg_occurences_by_district(df: pysql.DataFrame):
    _df = df.groupBy('districtName').agg(F.count('crimeType').alias('cases'))
    total = df.count()

    return (_df.withColumn('percent', F.round(100.0 * F.col('cases')/total, 2))
               .orderBy(F.desc('percent'))
             )

check_df = agg_occurences_by_district(df_all)
assert 100.0 == round(check_df.select(F.sum("percent").alias('prc')).collect()[0]['prc'], 0)

check_df.show(truncate=False)


ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
def agg_outcomes_count(df: pysql.DataFrame):
    total = df.count()
    return (df.groupBy('lastOutcome').agg(F.count('lastOutcome').alias('cases'))
             .withColumn('percent', F.round(100.0 * F.col('cases')/total, 2))
             .orderBy(F.desc('percent'))
             )

check_df = agg_outcomes_count(df_all)
assert 100.0 == round(check_df.select(F.sum("percent").alias('prc')).collect()[0]['prc'], 0)

check_df.show(truncate=False)



+---------------------------------------------------+------+-------+
|lastOutcome                                        |cases |percent|
+---------------------------------------------------+------+-------+
|Under investigation                                |211258|51.11  |
|Investigation complete; no suspect identified      |108506|26.25  |
|Unable to prosecute suspect                        |63075 |15.26  |
|Suspect charged                                    |9673  |2.34   |
|null                                               |8852  |2.14   |
|Local resolution                                   |4549  |1.1    |
|Action to be taken by another organisation         |2726  |0.66   |
|Further investigation is not in the public interest|1515  |0.37   |
|Offender given a caution                           |1356  |0.33   |
|Formal action is not in the public interest        |624   |0.15   |
|Further action is not in the public interest       |635   |0.15   |
|Offender given penalty notice    

                                                                                

In [None]:
df_all.show(10)



+--------------------+--------------------+---------+---------+--------------------+--------------------+
|             crimeId|        districtName| latitude|longitude|           crimeType|         lastOutcome|
+--------------------+--------------------+---------+---------+--------------------+--------------------+
|0000660af187ccfe0...|Suffolk Constabulary|52.458968| 1.698238|Violence and sexu...|Offender given a ...|
|000149eacb36e3c22...|South Yorkshire P...|53.413767|-1.459857|Violence and sexu...|     Suspect charged|
|0002d7d5a5a237ce5...|Leicestershire Po...|52.638846|-1.055671|Violence and sexu...|Unable to prosecu...|
|000564cee44fde54a...|Lancashire Consta...|53.747572|-2.379645|       Vehicle crime|Investigation com...|
|000756f1015445e64...|Gloucestershire C...|51.909686|-2.109721|        Public order| Under investigation|
|0009b00e7245ee4ff...|Metropolitan Poli...|51.531715|-0.121637|         Other theft| Under investigation|
|000b3522a56720e5c...|Metropolitan Poli...|51.

                                                                                