In [None]:
import pyspark
from pyspark.sql import functions as F
import pandas
import pandas as pd
from pyspark.sql.types import *
import matplotlib.pyplot as plot
import matplotlib.pyplot as pt
import seaborn as sns
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark import SparkContext
from pyspark import SparkFiles
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

In [None]:
def create_spark_session_get_imports():
    builder = pyspark.sql.SparkSession.builder.appName("Crime Analysis")
    spark = builder.getOrCreate()
    return spark

In [None]:
spark = create_spark_session_get_imports()

In [None]:
spark.sparkContext.addFile("https://jesildaw21052449.blob.core.windows.net/jesilda21052449/deprivation_with_hdr.gz")
#reading deprivation data
deprivation_data = spark.read.option("header", "true").csv("file:///" + SparkFiles.get("deprivation_with_hdr.gz"))

In [None]:
spark.sparkContext.addFile("https://jesildaw21052449.blob.core.windows.net/jesilda21052449/all_crimes21_hdr.txt.gz")
#reading crime dataset data
crime_dataset_ = spark.read.option("header", "true").csv("file:///" + SparkFiles.get("all_crimes21_hdr.txt.gz"))

In [None]:
print(crime_dataset_.printSchema())
crime_dataset_.show()
crime_dataset_.select(F.col("LSOA code"), F.col("LSOA name")).show(truncate=False)

In [None]:
burglary_crime_data = crime_dataset_.filter(F.col("Crime type") == "Burglary")


# Finding the most affluent areas using the deprivation datsets available
deprivation_df = deprivation_data.withColumn("IMD SCORE", F.col("IMD SCORE").cast("double"))

# Here we are calculating average IMD score to find  the associated lsoa code for that area
avg_imd_scores = deprivation_df.groupby("LSOA CODE").avg("IMD SCORE")

# Sorting of average IMD scores in descending order to identify the more affluent areas
sorted_imd_scores = avg_imd_scores.orderBy("avg(IMD SCORE)", ascending=False)


In [None]:
crime_dataset_.show(truncate=False)

In [None]:
crime_dataset_.printSchema()

In [None]:
deprivation_df.show()

In [None]:
deprivation_df.printSchema()

In [None]:
burglary_crime_data.show()

In [None]:
burglary_crime_data.printSchema()

In [None]:
sorted_imd_scores.show()

In [None]:
burglary_crime_data.createOrReplaceTempView("burglary_data")
sorted_imd_scores.createOrReplaceTempView("imd_scores")

burglary_data_with_affluent_areas = spark.sql("select * from burglary_data bcd join imd_scores sis on sis.`LSOA CODE` == bcd.`LSOA code`")

In [None]:
burglary_data_with_affluent_areas.show()

In [None]:
burglary_data_with_affluent_areas.printSchema()

In [None]:
burglary_data_with_affluent_areas.createOrReplaceTempView("burglary_data_with_affluent_areas")
crime_order = spark.sql("select Month, count(*) as count from burglary_data_with_affluent_areas group by Month order by Month asc").toPandas()


In [None]:
crime_order.head()

In [None]:
crime_order.plot(x='Month', 
             y='count', 
             title='Burglaries over the time',xlabel="Year_Month", ylabel="Burglaries",  kind="bar", figsize=(15, 10));


In [None]:
crime_order['Month'] = pd.to_datetime(crime_order['Month'], errors='coerce')
crime_order['year'] = crime_order['Month'].dt.year

crime_order.head()

In [None]:
fig, ax = plot.subplots()
crime_order[['count', 'year']].plot.area(x='year', y='count', ax=ax, linewidth=0)
ax.legend()
ax.set_ylabel('Count of Burglaries Crime');
ax.set_xlabel('Year');

In [None]:
burglary_counts = burglary_crime_data.groupby("LSOA code", "LSOA name").count()

joined_scores = burglary_counts.join(sorted_imd_scores, burglary_counts["LSOA code"] == sorted_imd_scores["LSOA CODE"], "inner")

In [None]:
sorted_joined_scores = joined_scores.orderBy("avg(IMD SCORE)", ascending=False)
sorted_joined_scores = sorted_joined_scores.withColumnRenamed("count", "burglaries_count").withColumnRenamed("avg(IMD SCORE)", "imd_score")

In [None]:
pd_dff = sorted_joined_scores.toPandas()

In [None]:
pd_dff.head()

In [None]:
pd_dff.head(n=10).plot(kind='bar' , x='LSOA name', y = ["burglaries_count", "imd_score"],stacked=True, figsize= (10,8));


In [None]:
pd_dff.head(n=50).plot(x='LSOA name', 
             y=['burglaries_count', 'imd_score'], 
             title='Crime and Affluent Areas', xlabel="LSOA Name", ylabel="Burglaries and IMD Score",  kind="bar", figsize=(15, 10));


In [None]:
asm = VectorAssembler(inputCols=["burglaries_count", "imd_score"], outputCol="co-relation")
feature_df = asm.transform(sorted_joined_scores).select("co-relation")

co_relation_matrixx = Correlation.corr(feature_df, "co-relation", "spearman")
co_relation_matrixx.show(truncate=False)

In [None]:
co_relations = sorted_joined_scores.stat.corr('burglaries_count', 'imd_score')

In [None]:
co_relations

In [None]:
matrixdata = co_relation_matrixx.collect()[0][0]
matr_co_info = matrixdata.toArray().tolist()
print(matr_co_info)

In [None]:

fig = plot.figure(figsize=(8, 6))
ax = fig.add_subplot(2, 2, 2)
ax.set_title("Relation between burglaries_count and affluent areas")
cax = ax.matshow(matr_co_info, vmax=1, vmin=-1)
fig.colorbar(cax)
plot.show()