# UK Crime Analysis

UK Crime Analysis for city of london,leicestershire and west Middlands from 01-2022 to 03-2023

**1. pyspark Installation**

In [None]:
# install PySpark 
#http://spark.apache.org/docs/latest/api/python/index.html

!pip install pyspark==3.2

**2.Spark session & SQLContext**

In [None]:
import warnings
warnings.filterwarnings("ignore") 

# start spark session and configuration
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
spark = SparkSession.builder.master("local[2]").appName("dat").getOrCreate()
sc = spark.sparkContext

#create an instance of SQLContext
sqlContext = SQLContext(spark)

**3. Import Main Libraries**

In [None]:
# Import libraries and other functions
from io import StringIO
from collections import namedtuple
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *

import csv
import matplotlib.pyplot as plt

import pandas as pd
import numpy as np

**4. Create Schema**

In [None]:
myschema = StructType(
    [StructField("Crime ID", StringType(), True),
     StructField("Month", DateType(), True),
     StructField("Reported by", StringType(), True),
     StructField("Falls within", StringType(), True),
     StructField("Longitude", FloatType(), True),
     StructField("Latitude", FloatType(), True),
     StructField("Location", StringType(), True),
     StructField("LSOA code", StringType(), True),
     StructField("LSOA name", StringType(), True),
     StructField("Crime type", StringType(), True),
     StructField("Last outcome category", StringType(), True),
     StructField("Context", StringType(), True)])  

**5.Read UK Crime dataset**

In [None]:
#create a dataFrame directly from UK Street Crime csv file 
#https://data.police.uk/data/
# Read the dataset with custom schema
UKCrime = spark.read\
    .option("header", "true")\
    .option("delimiter", ",")\
    .option("inferSchema", "false")\
    .schema(myschema)\
    .csv("/kaggle/input/uk-crime-dataset/*street.csv")

**6.Check for Null Values**

In [None]:
from pyspark.sql.functions import col, sum

# Check for null values in each column
null_counts = UKCrime.select([sum(col(c).isNull().cast("int")).alias(c) for c in UKCrime.columns])

# Display the null counts
null_counts.toPandas()

**7. Analyse null values for Crime ID and Last outcome category**

In [None]:
from pyspark.sql.functions import col

# Filter the DataFrame for rows where Crime ID and Last outcome category are null
null_data = UKCrime.filter(col("Crime ID").isNull() & col("Last outcome category").isNull())

# Get distinct crime types from the filtered data
distinct_crime_types = null_data.select("Crime type").distinct().collect()

# Extract the crime types as a list
crime_types = [row["Crime type"] for row in distinct_crime_types]

# Print the crime types with null values in Crime ID and Last outcome category
print(f"The crime type with null values in 'Crime ID' and 'Last outcome category' is: {', '.join(crime_types)}")


**8. Analyze Null Values for LSOA_code & LSOA_name**

In [None]:
# Assuming your DataFrame is named "WestMidlandsCrime"
missing_lsoa_records = UKCrime.filter(UKCrime['LSOA name'].isNull())

# Display the records with missing LSOA names and their corresponding locations
missing_lsoa_records.select('Location','LSOA code', 'LSOA name').distinct().show(truncate=False)


**9. Print Schema**

In [None]:
UKCrime.printSchema()

**10. Rename all Necessary columns**

In [None]:
# tidy up the column names

UKCrime = UKCrime.withColumnRenamed('Crime ID', 'Crime_ID')
UKCrime = UKCrime.withColumnRenamed('Reported by', 'Reported_by')
UKCrime = UKCrime.withColumnRenamed('Falls within', 'Falls_within')
UKCrime = UKCrime.withColumnRenamed('LSOA code', 'LSOA_code')
UKCrime = UKCrime.withColumnRenamed('LSOA name', 'LSOA_name')
UKCrime = UKCrime.withColumnRenamed('Crime type', 'Crime_type')
UKCrime = UKCrime.withColumnRenamed('Last outcome category', 'Last_outcome_category')

**11.Remove context and crime id**

Context has all rows with null values and crime id has null values for anti social behaviour and not a necessary coloumn as its hexademical and does not required for any analysis.

In [None]:
UKCrime = UKCrime.drop("Context", "Crime_ID")

In [None]:
UKCrime.printSchema()

**12. Check the data looks good for Analyses**

In [None]:
UKCrime.take(3)

**13. Display Total Count**

In [None]:
print(UKCrime.count())

**14. Data Analysis**

**Create Temperory view**

In [None]:
UKCrime.createTempView("UKdata")

# Analysis of various crime types

**Question 1 : What crime categories does the Police data use in all three cities?**

In [None]:
UKCrimeType = spark.sql("select distinct Crime_type from UKdata")
UKCrimeType.show()

**Is the crime categories same in all the cities?**

In [None]:
# Filter the data for each city
city_of_london_data = UKCrime.filter(UKCrime['Reported_by'] == 'City of London')
leicestershire_data = UKCrime.filter(UKCrime['Reported_by'] == 'Leicestershire')
west_midlands_data = UKCrime.filter(UKCrime['Reported_by'] == 'West Midlands')

# Get the unique crime categories for each city
city_of_london_categories = city_of_london_data.select('Crime_type').distinct().rdd.flatMap(lambda x: x).collect()
leicestershire_categories = leicestershire_data.select('Crime_type').distinct().rdd.flatMap(lambda x: x).collect()
west_midlands_categories = west_midlands_data.select('Crime_type').distinct().rdd.flatMap(lambda x: x).collect()

# Check if the crime categories are the same in all three cities
if set(city_of_london_categories) == set(leicestershire_categories) == set(west_midlands_categories):
    print("The crime categories are the same in all three cities.")
else:
    print("The crime categories differ among the cities.")

In [None]:
UKCrimeType.toPandas()

**Question what is the prelevance in top 5 crime type in certain locations in all three cities?**

In [None]:
from pyspark.sql.functions import col, desc

# Define a function to get the top crime types and their prevalence in certain locations for a given city
def get_top_crime_types(city):
    # Filter the data for the given city
    city_data = UKCrime.filter(col("Reported_by") == city)
    
    # Group the data by location and crime type, and calculate the count of each crime type
    crime_type_counts = city_data.groupBy("Location", "Crime_type").count()
    
    # Sort the counts in descending order
    sorted_crime_type_counts = crime_type_counts.orderBy(desc("count"))
    
    # Get the top 5 crime types and their counts
    top_crime_types = sorted_crime_type_counts.limit(5)
    
    return top_crime_types

# Get the top crime types and their prevalence in certain locations for West Midlands
west_midlands_top_crime_types = get_top_crime_types("West Midlands Police")
print("Top 5 Crime Types in West Midlands:")
west_midlands_top_crime_types.show(truncate=False)

# Get the top crime types and their prevalence in certain locations for City of London
city_of_london_top_crime_types = get_top_crime_types("City of London Police")
print("Top 5 Crime Types in City of London:")
city_of_london_top_crime_types.show(truncate=False)

# Get the top crime types and their prevalence in certain locations for Leicestershire
leicestershire_top_crime_types = get_top_crime_types("Leicestershire Police")
print("Top 5 Crime Types in Leicestershire:")
leicestershire_top_crime_types.show(truncate=False)



# Analysis within cities

**Question 4 : What are the types of crimes recorded in West Midlands,City Of London and Leicestershire?**

In [None]:
#Assuming your DataFrame is named "UKCrime"
crime_df = UKCrime

# Filter the data for each city
west_midlands_df = crime_df.filter(col('Reported_by') == 'West Midlands Police')
city_of_london_df = crime_df.filter(col('Reported_by') == 'City of London Police')
leicestershire_df = crime_df.filter(col('Reported_by') == 'Leicestershire Police')

# Group the data by crime type and count the number of crimes for each city
west_midlands_crime_counts = west_midlands_df.groupBy('Crime_type').count()
city_of_london_crime_counts = city_of_london_df.groupBy('Crime_type').count()
leicestershire_crime_counts = leicestershire_df.groupBy('Crime_type').count()

# Print the types of crimes and number of crimes recorded in each city
print("West Midlands:")
west_midlands_crime_counts.show(truncate=False)

print("City of London:")
city_of_london_crime_counts.show(truncate=False)

print("Leicestershire:")
leicestershire_crime_counts.show(truncate=False)

In [None]:
from pyspark.sql.functions import desc

# Group the data by city and crime type, and calculate the count
crime_counts = UKCrime.groupBy("Reported_by", "Crime_type").count()

# Rank the crime types within each city based on the count
window_spec = Window.partitionBy("Reported_by").orderBy(desc("count"))
ranked_crime_types = crime_counts.withColumn("rank", rank().over(window_spec))

# Filter the crime types with rank 1 (highest count) in each city
highest_crime_types = ranked_crime_types.filter(col("rank") == 1)

# Display the result
highest_crime_types.show()



**Question 5 :Are levels of violent crime constant, increasing, or decreasing?**

In [None]:
filtereddata = spark.sql("select Month, count(Crime_type) as CrimeNumbers FROM UKdata where Crime_type='Violence and sexual offences' and Month >= '2022-01-01' and Month <= '2023-03-31' group by Month order by Month")
filpanda = filtereddata.toPandas()
filpanda
filpanda.Month = filpanda.Month.astype('datetime64[ns]')
filpanda2 = filpanda.set_index(pd.to_datetime(filpanda.Month))
filpanda2.index

plt.figure(figsize=(10, 6))

filpanda2.plot(y='CrimeNumbers', x='Month',marker='.', linestyle='-', linewidth=1,\
         subplots=False,color = 'red',
              label='Monthly Crime Numbers',
              title='Violent Crime Numbers Vs Month')

plt.ylabel('Crime Numbers')
plt.xlabel('Year')
plt.show()

**Question 2 : How is Last Outcome Categories distributed in each of the three cities?**

In [None]:
# Group the data by City and Last Outcome Category and count the occurrences
outcome_counts = UKCrime.groupBy('Reported_by', 'Last_Outcome_Category').count()

# Pivot the data to have City as columns and Last Outcome Categories as rows
pivot_table = outcome_counts.groupBy('Last_Outcome_Category').pivot('Reported_by').sum('count')

# Fill any null values with 0
pivot_table = pivot_table.fillna(0)

# Display the pivot table
pivot_table.show()

**Geographical Distribution of Last Outcome Categories**

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Assuming your DataFrame is named "UKCrime"
crime_df = UKCrime.toPandas()

# Select the columns of interest for geographical analysis
geo_df = crime_df[['Longitude', 'Latitude', 'Last_outcome_category']]

# Create a scatter plot to visualize the geographical distribution
plt.figure(figsize=(10, 8))
sns.scatterplot(x='Longitude', y='Latitude', hue='Last_outcome_category', data=geo_df)
plt.xlabel('Longitude')
plt.ylabel('Latitude')
plt.title('Geographical Distribution of Outcome Categories')
plt.legend()
plt.show()

**Question 6 : what is the Count of Police Service Who Reported the Crime?**

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count

# Assuming you have a SparkSession named spark and a DataFrame named UKCrime containing the crime data

# Group by the 'Reported_by' column and count the occurrences
police_service_counts = UKCrime.groupBy('Reported_by').agg(count('*').alias('Count'))

# Show the result
police_service_counts.show()


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

# Assuming you have a SparkSession named spark and a DataFrame named UKCrime containing the crime data

# Group by the 'Reported_by' column and count the occurrences
police_service_counts = UKCrime.groupBy('Reported_by').count()

# Convert the Spark DataFrame to a Pandas DataFrame
police_service_counts_pd = police_service_counts.toPandas()

# Plot the count of police services
plt.figure(figsize=(10, 6))
plt.barh(police_service_counts_pd['Reported_by'], police_service_counts_pd['count'])
plt.xlabel('Count')
plt.ylabel('Police Service')
plt.title('Count of Police Service Who Report The Crime')
plt.show()


**Question 7 : what are the top 5 crimes per location for a ALL category in each cities**

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import col

# Filter the DataFrame for the desired cities using col function
crime_counts = UKCrime.filter((col("Reported_by") == "West Midlands Police") |
                              (col("Reported_by") == "City of London Police") |
                              (col("Reported_by") == "Leicestershire Police")) \
    .groupBy("Location", "Reported_by") \
    .count()

# Convert the result to Pandas DataFrame
crime_counts = crime_counts.toPandas()

# Find the top 5 number of crimes per location for all categories in each city
top_crimes = pd.DataFrame()
for city in ['West Midlands Police', 'City of London Police', 'Leicestershire Police']:
    city_crimes = crime_counts[crime_counts["Reported_by"] == city].nlargest(5, "count")
    top_crimes = top_crimes.append(city_crimes)

# Create subplots for each city
fig, axes = plt.subplots(3, 1, figsize=(12, 18))

# Iterate over each city and plot the top 5 number of crimes per location
for i, city in enumerate(['West Midlands Police', 'City of London Police', 'Leicestershire Police']):
    city_crimes = top_crimes[top_crimes["Reported_by"] == city]
    sns.barplot(data=city_crimes, x='Location', y='count', ax=axes[i])
    axes[i].set_xlabel('Location')
    axes[i].set_ylabel('Number of Crimes')
    axes[i].set_title(city)

# Adjust layout and display the plots
plt.tight_layout()
plt.show()


**Question 8 : what are top 10 Hot Spot with the highest crime types**

In [None]:
from pyspark.sql.functions import desc, row_number
from pyspark.sql.window import Window

# Filter the DataFrame for the desired cities
filtered_df = UKCrime.filter((col("Reported_by") == "West Midlands Police") |
                             (col("Reported_by") == "City of London Police") |
                             (col("Reported_by") == "Leicestershire Police"))

# Group by Location and Crime type, and count the occurrences
crime_counts = filtered_df.groupBy("Reported_by", "Location", "Crime_type").count()

# Rank the hotspots within each city based on crime count
window_spec = Window.partitionBy("Reported_by").orderBy(desc("count"))
ranked_df = crime_counts.withColumn("rank", row_number().over(window_spec))

# Filter the top 10 hotspots with the highest crime types in each city
top_hotspots = ranked_df.filter(col("rank") <= 10)

# Convert the result to Pandas DataFrame
top_hotspots_pandas = top_hotspots.toPandas()
# Print the top hotspots for each city
for city in ['West Midlands Police', 'City of London Police', 'Leicestershire Police']:
    city_hotspots = top_hotspots_pandas[top_hotspots_pandas["Reported_by"] == city]
    print(f"Top 10 Hotspots in {city}:")
    print(city_hotspots)
    print()

In [None]:
# Plot the top hotspots for each city
for city in ['West Midlands Police', 'City of London Police', 'Leicestershire Police']:
    city_hotspots = top_hotspots_pandas[top_hotspots_pandas["Reported_by"] == city]
    
    plt.figure(figsize=(10, 6))
    plt.barh(city_hotspots["Location"], city_hotspots["count"])
    plt.xlabel("Crime Count")
    plt.ylabel("Location")
    plt.title(f"Top 10 Hotspots in {city}")
    plt.show()

# Across cities

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import matplotlib.pyplot as plt


# Compare crime rates
crime_rates = UKCrime.groupBy("Reported_by").count().withColumn("crime_rate", F.col("count") / F.sum("count").over(Window.partitionBy())).select("Reported_by", "crime_rate")
crime_rates.show()

# Analyze crime types
crime_types = UKCrime.groupBy("Reported_by", "Crime_type").count().orderBy("Reported_by", F.desc("count")).groupBy("Reported_by").agg(F.collect_list("Crime_type").alias("crime_types"))
crime_types.show(truncate=False)

# Evaluate outcomes
outcomes = UKCrime.groupBy("Reported_by", "Last_outcome_category").count().orderBy("Reported_by", F.desc("count")).groupBy("Reported_by").agg(F.collect_list("Last_outcome_category").alias("outcomes"))
outcomes.show(truncate=False)



In [None]:
# Get the crime type counts for each city
west_midlands_crime_counts = west_midlands_data.groupBy("Crime_type").count()
city_of_london_crime_counts = city_of_london_data.groupBy("Crime_type").count()
leicestershire_crime_counts = leicestershire_data.groupBy("Crime_type").count()

# Find the crime type with the highest count for each city
west_midlands_highest_crime = west_midlands_crime_counts.orderBy(desc("count")).first()
city_of_london_highest_crime = city_of_london_crime_counts.orderBy(desc("count")).first()
leicestershire_highest_crime = leicestershire_crime_counts.orderBy(desc("count")).first()

# Initialize variables to store the highest crime type information
highest_city = None
highest_crime_type = None
highest_crime_count = None

# Check if West Midlands has crime data
if west_midlands_highest_crime is not None:
    west_midlands_crime_type = west_midlands_highest_crime["Crime_type"]
    west_midlands_crime_count = west_midlands_highest_crime["count"]
    highest_city = "West Midlands"
    highest_crime_type = west_midlands_crime_type
    highest_crime_count = west_midlands_crime_count

# Check if City of London has crime data
if city_of_london_highest_crime is not None:
    city_of_london_crime_type = city_of_london_highest_crime["Crime_type"]
    city_of_london_crime_count = city_of_london_highest_crime["count"]
    # Compare the crime count with the existing highest crime count
    if highest_crime_count is None or city_of_london_crime_count > highest_crime_count:
        highest_city = "City of London"
        highest_crime_type = city_of_london_crime_type
        highest_crime_count = city_of_london_crime_count

# Check if Leicestershire has crime data
if leicestershire_highest_crime is not None:
    leicestershire_crime_type = leicestershire_highest_crime["Crime_type"]
    leicestershire_crime_count = leicestershire_highest_crime["count"]
    # Compare the crime count with the existing highest crime count
    if highest_crime_count is None or leicestershire_crime_count > highest_crime_count:
        highest_city = "Leicestershire"
        highest_crime_type = leicestershire_crime_type
        highest_crime_count = leicestershire_crime_count

# Check if any city has crime data
if highest_city is not None:
    print(f"The city with the highest crime type count is {highest_city}.")
    print(f"The highest crime type is {highest_crime_type} with a count of {highest_crime_count}.")
else:
    print("None of the cities have crime data.")


In [None]:
# Convert the pivot table to Pandas DataFrame
pivot_table_pd = pivot_table.toPandas()

# Get the Last Outcome Categories as the index
pivot_table_pd.set_index('Last_Outcome_Category', inplace=True)

# Plot the distribution for each city
pivot_table_pd.plot(kind='bar', stacked=True, figsize=(10, 6))

# Set the plot title and labels
plt.title('Distribution of Last Outcome Categories for Each City')
plt.xlabel('Last Outcome Category')
plt.ylabel('Count')

# Show the plot
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Prepare the data
city_counts = completed_outcomes.groupBy("Reported_by").count().toPandas()

# Plot the pie chart
plt.pie(city_counts["count"], labels=city_counts["Reported_by"], autopct='%1.1f%%')
plt.title("Distribution of Investigations Completed as Last Outcome")

# Display the chart
plt.show()


In [None]:
import pyspark.sql.functions as F

# Filter the data for the year 2023
crime_2023 = UKCrime.filter(F.year("Month") == 2023)

# Calculate the total reported crimes for each city
city_crime_counts = crime_2023.groupBy("Reported_by").count().orderBy(F.desc("count"))

# Get the city with the highest reported crime
highest_crime_city = city_crime_counts.select("Reported_by").first()[0]

print("City with the highest reported crime in 2023:", highest_crime_city)


# Trend across year

In [None]:
import pyspark.sql.functions as F

# Group the data by city and month and calculate the count of reported crimes
city_month_crime_counts = UKCrime.groupBy("Reported_by", F.month("Month").alias("Month")).count()

# Find the month with the highest crime rate for each city
highest_crime_months = city_month_crime_counts.groupBy("Reported_by").agg(F.max("count").alias("max_count"), F.first("Month").alias("highest_month"))

highest_crime_months.show()



**Question 9 : what is the trend in the number of crimes over time in all cities together?**

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Group the data by month and crime type and count the number of crimes
crime_counts = spark.sql("SELECT Month, Crime_Type, COUNT(*) AS count FROM UKdata GROUP BY Month, Crime_Type")

# Pivot the data to create a table of crime counts by month and crime type
pivoted_counts = crime_counts.groupby("Month").pivot("Crime_Type").sum("count").fillna(0)

# Convert the PySpark DataFrame to a Pandas DataFrame and set the index to the month names
pandas_df = pivoted_counts.toPandas().set_index("Month")

# Create a stacked bar chart
months = pandas_df.index
crime_types = pandas_df.columns
totals = pandas_df.sum(axis=1)

plt.figure(figsize=(10, 6))
pandas_df.plot(kind="bar", stacked=True, figsize=(7, 6))
plt.xticks(rotation=90)
plt.xlabel("Month")
plt.ylabel("Number of Crimes")
plt.legend(title="Crime Type", loc="upper left", bbox_to_anchor=(1.0, 1.0))
plt.show()


**Question 10 : what is the forecasted number of crimes for the next 12 months?**

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA

# Load crime data into a Pandas dataframe
crime_df = spark.sql("SELECT * FROM UKdata").toPandas()

# Convert the "Month" column to datetime type
crime_df['Month'] = pd.to_datetime(crime_df['Month'])

# Group the data by month and count the number of crimes
monthly_crime_counts = crime_df.groupby(pd.Grouper(key='Month', freq='M')).size()

# Create the ARIMA model
model = ARIMA(monthly_crime_counts, order=(2, 1, 1))
model_fit = model.fit()

# Forecast the number of crimes for the next 12 months
forecast = model_fit.forecast(steps=12)

# Plot the historical data and the forecasted values
plt.figure(figsize=(12, 6))
plt.plot(monthly_crime_counts.index, monthly_crime_counts, label='Historical Data')
plt.plot(forecast.index, forecast, label='Forecast')

plt.xlabel('Month')
plt.ylabel('Crime Numbers')
plt.title('Monthly Crime Numbers Forecast')
plt.legend()
plt.show()
