In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, date_format
from pyspark.sql.functions import when
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql.functions import coalesce
# Create a SparkSession
spark = SparkSession.builder.appName("Spark SQL").getOrCreate()



In [3]:
newData = spark.read.csv("crimeData2012-2022.csv", header=True, inferSchema=True)
# newData.show()
from pyspark.sql.functions import col, sum as _sum

# Step 1: Identify year-month columns
year_month_columns = [col_name for col_name in newData.columns if col_name.isdigit()]

# Step 2: Extract years from the year-month columns
years = list(set(col_name[:4] for col_name in year_month_columns))

# print(years)
# Step 3: Sum values for each year
for year in years:
    # Select columns for the current year
    year_columns = [col(col_name) for col_name in year_month_columns if col_name.startswith(year)]
    
    # Sum the columns for the current year and add as a new column
    newData = newData.withColumn(year, sum(year_columns))

# Step 4: Drop the original year-month columns if no longer needed
newData = newData.drop(*year_month_columns)

newData = newData.drop('SNTName')
# Show the resulting DataFrame
# newData.show(n= 1000 ,truncate=False)

# Group by SNTBorough and MajorText, and sum the yearly columns
groupedData = newData.groupBy("SNTBorough", "MajorText") \
    .agg(
        *[_sum(year).alias(year) for year in years]  # Sum each year column dynamically
    )

# Show the grouped DataFrame
groupedData = groupedData.drop('2021','2022')
groupedData = groupedData.filter(groupedData["SNTBorough"] != "Aviation Security(SO18)")

groupedData = groupedData.withColumnRenamed("SNTBorough", "Borough")
groupedData = groupedData.withColumnRenamed("MajorText", "CrimeType")

groupedData = groupedData.withColumn(
    "CrimeType",
    when(col("CrimeType") == "Historical Fraud and Forgery", "Fraud and Forgery").otherwise(col("CrimeType"))
)
groupedData.orderBy("Borough").show(n=1000, truncate=False)
# groupedData.select("Borough").distinct().show(n = 1000, truncate=False)


+----------------------+------------------------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|Borough               |CrimeType                           |2014 |2016 |2019 |2018 |2015 |2017 |2020 |2012 |2013 |
+----------------------+------------------------------------+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|Barking and Dagenham  |Drug Offences                       |814  |936  |1065 |880  |946  |739  |1579 |957  |1102 |
|Barking and Dagenham  |Burglary                            |1902 |1288 |1536 |1667 |1630 |1574 |1211 |2448 |2234 |
|Barking and Dagenham  |Arson and Criminal Damage           |1608 |1876 |1477 |1426 |1742 |1709 |1309 |1682 |1598 |
|Barking and Dagenham  |Miscellaneous Crimes Against Society|251  |249  |282  |262  |259  |249  |328  |134  |201  |
|Barking and Dagenham  |Vehicle Offences                    |2002 |2237 |2714 |2572 |1867 |2754 |2301 |2508 |2756 |
|Barking and Dagenham  |Theft                               |3268 |3398 

In [4]:
# Load the CSV data into a DataFrame
df = spark.read.csv("crimeData.csv", header=True, inferSchema=True)
df = df.drop('Financial Year','FY_FYIndex','Refresh Date', 'Area Type')
# df.show()

#in Borough_SNT, keep values that contain Barking and Dagenham
# dfB_D = df.filter(df['Borough_SNT'].contains('Barking and Dagenham'))

# show the full result
# dfB_D.show(n= 100,truncate=False)

df = df.withColumn("formatted_date", to_date(df["Month_Year"], "dd/MM/yyyy"))

# # Extract the month from the formatted date
df = df.withColumn("month", date_format("formatted_date", "MM"))
df = df.withColumn("year", date_format("formatted_date", "yyyy"))

# # Drop the original 'Month_Year' column if you no longer need it
df = df.drop("Month_Year")

# # Show the updated DataFrame
# df.show()

# #group crime groups by offence type in febuary 2021
# df.filter(df['year'] =='2021').groupBy('Offence Group').count().show()


In [7]:
df = df.withColumn(
    "Borough_SNT",
    when(df["Borough_SNT"].contains("Barking and Dagenham"), "Barking and Dagenham")
    .when(df["Borough_SNT"].contains("Westminster"), "Westminster")
    .when(df["Borough_SNT"].contains("Camden"), "Camden")
    .when(df["Borough_SNT"].contains("Hackney"), "Hackney")
    .when(df["Borough_SNT"].contains("Islington"), "Islington")
    .when(df["Borough_SNT"].contains("Tower Hamlets"), "Tower Hamlets")
    .when(df["Borough_SNT"].contains("Southwark"), "Southwark")
    .when(df["Borough_SNT"].contains("Lambeth"), "Lambeth")
    .when(df["Borough_SNT"].contains("Lewisham"), "Lewisham")
    .when(df["Borough_SNT"].contains("Greenwich"), "Greenwich")
    .when(df["Borough_SNT"].contains("Bexley"), "Bexley")
    .when(df["Borough_SNT"].contains("Bromley"), "Bromley")
    .when(df["Borough_SNT"].contains("Croydon"), "Croydon")
    .when(df["Borough_SNT"].contains("Sutton"), "Sutton")
    .when(df["Borough_SNT"].contains("Merton"), "Merton")
    .when(df["Borough_SNT"].contains("Kingston upon Thames"), "Kingston upon Thames")
    .when(df["Borough_SNT"].contains("Richmond upon Thames"), "Richmond upon Thames")
    .when(df["Borough_SNT"].contains("Hounslow"), "Hounslow")
    .when(df["Borough_SNT"].contains("Hillingdon"), "Hillingdon")
    .when(df["Borough_SNT"].contains("Ealing"), "Ealing")
    .when(df["Borough_SNT"].contains("Brent"), "Brent")
    .when(df["Borough_SNT"].contains("Harrow"), "Harrow")
    .when(df["Borough_SNT"].contains("Barnet"), "Barnet")
    .when(df["Borough_SNT"].contains("Enfield"), "Enfield")
    .when(df["Borough_SNT"].contains("Haringey"), "Haringey")
    .when(df["Borough_SNT"].contains("Waltham Forest"), "Waltham Forest")
    .when(df["Borough_SNT"].contains("Redbridge"), "Redbridge")
    .when(df["Borough_SNT"].contains("Havering"), "Havering")
    .when(df["Borough_SNT"].contains("Newham"), "Newham")
    .when(df["Borough_SNT"].contains("Kensington and Chelsea"), "Kensington and Chelsea")
    .when(df["Borough_SNT"].contains("Hammersmith and Fulham"), "Hammersmith and Fulham")
    .when(df["Borough_SNT"].contains("Wandsworth"), "Wandsworth")
    .when(df["Borough_SNT"].contains("City of London"), "City of London")
    .when(df["Borough_SNT"].contains("Aviation Security"), "Unknown")
    .when(df["Borough_SNT"].contains("Other / NK"), "Unknown")
    .when(df["Borough_SNT"].contains("N/K"), "Unknown")
    .otherwise(df["Borough_SNT"])  # Keep the original value if no match
)
# df.show(n = 100000, truncate=False)


df_aggregated = df.groupBy("Borough_SNT", "Offence Group", "Year").agg(
    _sum("Count").alias("Total_Count")
)



# df_pivot.select("Borough").distinct().orderBy("Borough").show(n = 40, truncate=False)
df_pivot = df_aggregated.groupBy("Borough_SNT", "Offence Group").pivot("Year").agg(
    _sum("Total_Count")
)

df_pivot = df_pivot.orderBy('Borough_SNT')
df_pivot = df_pivot.withColumnRenamed("Borough_SNT", "Borough")
df_pivot = df_pivot.withColumnRenamed("Offence Group", "CrimeType")
df_pivot = df_pivot.na.fill(0)
df_pivot.show(n=1000, truncate=False)

+----------------------+------------------------------------+-----+-----+------+------+----+
|Borough               |CrimeType                           |2021 |2022 |2023  |2024  |2025|
+----------------------+------------------------------------+-----+-----+------+------+----+
|Barking and Dagenham  |ARSON AND CRIMINAL DAMAGE           |2870 |3104 |2930  |2895  |224 |
|Barking and Dagenham  |BURGLARY                            |2224 |2356 |2242  |1914  |172 |
|Barking and Dagenham  |MISCELLANEOUS CRIMES AGAINST SOCIETY|710  |768  |646   |683   |55  |
|Barking and Dagenham  |VIOLENCE AGAINST THE PERSON         |13132|14572|15228 |14791 |1220|
|Barking and Dagenham  |DRUG OFFENCES                       |5034 |5342 |4708  |3534  |397 |
|Barking and Dagenham  |POSSESSION OF WEAPONS               |502  |540  |632   |336   |34  |
|Barking and Dagenham  |THEFT                               |5838 |7244 |8972  |8243  |628 |
|Barking and Dagenham  |SEXUAL OFFENCES                     |1048 |134

In [None]:
# Save the DataFrame to a CSV file
df_pivot.toPandas().to_csv("crimeDataAggregated2021-2025.csv", index=False)

#save the grouped data to a csv file
groupedData.toPandas().to_csv("crimeDataAggregated2012-2020.csv", index=False)

In [12]:
#save the grouped data to a csv file
groupedData.toPandas().to_csv("crimeDataAggregated2012-2020.csv", index=False)