In [1]:
from pyspark.sql.types import FloatType,StringType,IntegerType,StructType
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col

In [2]:
#Helper functions

def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL crime prediction functions") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

def getNullValues(data):
    spark = init_spark()
    rowsData = ()
    rowDataPorc=()
    rowDataNames = ()
    for i in data.columns:
        rowDataNames = rowDataNames + (i + "_null",)
        aux=data.where(col(i).isNull()).count()
        porAux=(aux/data.count())*100
        rowsData = rowsData + (float(aux),)
        rowDataPorc=rowDataPorc+(float(porAux),)
    nullCount = spark.createDataFrame([rowsData,rowDataPorc], rowDataNames)
    return nullCount

In [3]:
#Function to pre-process the Wellbeing datasets

def datasetWellbeing(demographicsFilename,economicsFilename,educationFilename, variable):

    spark = init_spark()

    #Selecting features from the dataframe

    eliminateSpace = udf(lambda x: x.replace(" ","-").lower(),StringType())
    porcentaj=udf(lambda x: x/100,FloatType())
    dataWellbeingDemographics = spark.read.csv(demographicsFilename, header=True, mode="DROPMALFORMED",inferSchema=True)
    dataWellbeingDemographics=dataWellbeingDemographics.select(eliminateSpace('Neighbourhood').alias('neighbourhood_d'),dataWellbeingDemographics['Neighbourhood Id'].alias("neighbourhood_id_d"),dataWellbeingDemographics['Total Area'].alias('total_area'),dataWellbeingDemographics['Total Population'].alias('total_population'))

    dataWellbeingEconomics = spark.read.csv(economicsFilename, header=True, mode="DROPMALFORMED",inferSchema=True)
    dataWellbeingEconomics = dataWellbeingEconomics.select(eliminateSpace('Neighbourhood').alias('neighbourhood_e'),
                                                                 dataWellbeingEconomics['Neighbourhood Id'].alias(
                                                                     "neighbourhood_id_e"),
                                                                 dataWellbeingEconomics['Home Prices'].alias(
                                                                     'home_prices'),
                                                                 dataWellbeingEconomics['Local Employment'].alias(
                                                                     'local_employment'),dataWellbeingEconomics['Social Assistance Recipients'].alias('social_assistance_recipients'))

    dataWellbeingEducation = spark.read.csv(educationFilename, header=True, mode="DROPMALFORMED",inferSchema=True)
    dataWellbeingEducation = dataWellbeingEducation.select(eliminateSpace('Neighbourhood').alias('neighbourhood_ed'),
                                                           dataWellbeingEducation['Neighbourhood Id'].alias(
                                                               "neighbourhood_id_ed"),
                                                           dataWellbeingEducation['Catholic School Graduation'].alias(
                                                               'catholic_school_graduation'),
                                                           porcentaj(dataWellbeingEducation['Catholic School Literacy']).alias(
                                                               'catholic_school_literacy'),
                                                           porcentaj(dataWellbeingEducation['Catholic University Applicants']).alias(
                                                               'catholic_university_applicants'))

    data = dataWellbeingEducation.join(dataWellbeingEconomics,dataWellbeingEducation['neighbourhood_id_ed']==dataWellbeingEconomics['neighbourhood_id_e'])
    data = data.join(dataWellbeingDemographics,
                                       data['neighbourhood_id_ed'] == dataWellbeingDemographics[
                                           'neighbourhood_id_d'])

    data=data.select(data['neighbourhood_e'].alias('neighbourhood_w'),data['neighbourhood_id_e'].alias('neighbourhood_id'),data['total_area'],data['total_population'],data['home_prices'],data['local_employment'],data['social_assistance_recipients'],data['catholic_school_graduation'],data['catholic_school_literacy'],data['catholic_university_applicants'])

    if(variable==1):

        # Show Schema
        data.printSchema()

        #Counting null values
        nullCount=getNullValues(data)
        nullCount.show()

        #Feature metrics
        data.describe().show()

    return data

result0=datasetWellbeing("./data/WB-Demographics.csv","./data/WB-Economics.csv","./data/WB-Education.csv",1)

root
 |-- neighbourhood_w: string (nullable = true)
 |-- neighbourhood_id: integer (nullable = true)
 |-- total_area: double (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- home_prices: integer (nullable = true)
 |-- local_employment: integer (nullable = true)
 |-- social_assistance_recipients: integer (nullable = true)
 |-- catholic_school_graduation: double (nullable = true)
 |-- catholic_school_literacy: float (nullable = true)
 |-- catholic_university_applicants: float (nullable = true)



+--------------------+---------------------+---------------+---------------------+----------------+---------------------+---------------------------------+-------------------------------+-----------------------------+-----------------------------------+
|neighbourhood_w_null|neighbourhood_id_null|total_area_null|total_population_null|home_prices_null|local_employment_null|social_assistance_recipients_null|catholic_school_graduation_null|catholic_school_literacy_null|catholic_university_applicants_null|
+--------------------+---------------------+---------------+---------------------+----------------+---------------------+---------------------------------+-------------------------------+-----------------------------+-----------------------------------+
|                 0.0|                  0.0|            0.0|                  0.0|             0.0|                  0.0|                              0.0|                            0.0|                          0.0|                     

+-------+------------------+-----------------+-----------------+------------------+------------------+------------------+----------------------------+--------------------------+------------------------+------------------------------+
|summary|   neighbourhood_w| neighbourhood_id|       total_area|  total_population|       home_prices|  local_employment|social_assistance_recipients|catholic_school_graduation|catholic_school_literacy|catholic_university_applicants|
+-------+------------------+-----------------+-----------------+------------------+------------------+------------------+----------------------------+--------------------------+------------------------+------------------------------+
|  count|               140|              140|              140|               140|               140|               140|                         140|                       140|                     140|                           140|
|   mean|              null|             70.5|4.523500000000002|

In [4]:
#Function to pre-process the MCI dataset

def dataSetCreactionCrime(crimeFilename,variable):
    spark = init_spark()
    dataCrimes = spark.read.csv(crimeFilename, header=True, mode="DROPMALFORMED",inferSchema=True)
    
     # Counting null values
    nullCount = getNullValues(dataCrimes)
    
    #Eliminate null values
    numberNotNull=dataCrimes.where(dataCrimes["occurrenceyear"].isNotNull() & dataCrimes["occurrencemonth"].isNotNull() & dataCrimes["occurrenceday"].isNotNull() & dataCrimes["occurrencedayofyear"].isNotNull() & dataCrimes["occurrencedayofweek"].isNotNull())

    if(variable==1):
        
        #Show Schema
        dataCrimes.printSchema()
        
        #Showing null values
        nullCount.show()
        
        # Feature metrics
        dataCrimes.describe().show()


    return numberNotNull

result1=dataSetCreactionCrime("./data/MCI_2014_to_2017.csv",1)

root
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- Index_: integer (nullable = true)
 |-- event_unique_id: string (nullable = true)
 |-- occurrencedate: timestamp (nullable = true)
 |-- reporteddate: timestamp (nullable = true)
 |-- premisetype: string (nullable = true)
 |-- ucr_code: integer (nullable = true)
 |-- ucr_ext: integer (nullable = true)
 |-- offence: string (nullable = true)
 |-- reportedyear: integer (nullable = true)
 |-- reportedmonth: string (nullable = true)
 |-- reportedday: integer (nullable = true)
 |-- reporteddayofyear: integer (nullable = true)
 |-- reporteddayofweek: string (nullable = true)
 |-- reportedhour: integer (nullable = true)
 |-- occurrenceyear: integer (nullable = true)
 |-- occurrencemonth: string (nullable = true)
 |-- occurrenceday: integer (nullable = true)
 |-- occurrencedayofyear: integer (nullable = true)
 |-- occurrencedayofweek: string (nullable = true)
 |-- occurrencehour: integer (nullable = true)
 |-- MCI: string

+------+------+-----------+--------------------+-------------------+-----------------+----------------+-------------+------------+------------+-----------------+------------------+----------------+----------------------+----------------------+-----------------+--------------------+--------------------+--------------------+------------------------+------------------------+-------------------+--------+-------------+------------+------------------+--------+---------+--------+
|X_null|Y_null|Index__null|event_unique_id_null|occurrencedate_null|reporteddate_null|premisetype_null|ucr_code_null|ucr_ext_null|offence_null|reportedyear_null|reportedmonth_null|reportedday_null|reporteddayofyear_null|reporteddayofweek_null|reportedhour_null| occurrenceyear_null|occurrencemonth_null|  occurrenceday_null|occurrencedayofyear_null|occurrencedayofweek_null|occurrencehour_null|MCI_null|Division_null|Hood_ID_null|Neighbourhood_null|Lat_null|Long_null|FID_null|
+------+------+-----------+-----------------

+-------+-------------------+-------------------+---------------+---------------+-----------+------------------+------------------+--------------------+------------------+-------------+------------------+------------------+-----------------+------------------+------------------+---------------+------------------+-------------------+-------------------+-----------------+----------+--------+------------------+--------------------+-------------------+-------------------+------------------+
|summary|                  X|                  Y|         Index_|event_unique_id|premisetype|          ucr_code|           ucr_ext|             offence|      reportedyear|reportedmonth|       reportedday| reporteddayofyear|reporteddayofweek|      reportedhour|    occurrenceyear|occurrencemonth|     occurrenceday|occurrencedayofyear|occurrencedayofweek|   occurrencehour|       MCI|Division|           Hood_ID|       Neighbourhood|                Lat|               Long|               FID|
+-------+-------

In [5]:
#All Datasets

def joinDataset(variable):

    rddCrime=dataSetCreactionCrime("./data/MCI_2014_to_2017.csv",0)
    rddWellbeing=datasetWellbeing("./data/WB-Demographics.csv","./data/WB-Economics.csv","./data/WB-Education.csv",0)

    data = rddCrime.join(rddWellbeing,rddCrime['Hood_ID'] == rddWellbeing['neighbourhood_id'])
    data = data.select(data['neighbourhood_w'].alias("neighbourhood_name"),
                       data['neighbourhood_id'], data['total_area'],
                       data['total_population'], data['home_prices'], data['local_employment'],
                       data['social_assistance_recipients'], data['catholic_school_graduation'],
                       data['catholic_school_literacy'], data['catholic_university_applicants'],
                       data['neighbourhood'],data['X'].alias("x"),data['Y'].alias("y"),data['Index_'].alias("index"),
                       data["event_unique_id"], data["premisetype"],data["ucr_code"],data["ucr_ext"],data["offence"],
                       data["reportedyear"],data["reportedmonth"], data["reportedday"],data["reporteddayofyear"],data["reporteddayofweek"],
                       data["reportedhour"], data["occurrenceyear"],data["occurrencemonth"],data["occurrenceday"],data["occurrencedayofyear"],
                       data["occurrencedayofweek"], data["occurrencehour"], data["MCI"].alias("mci"),data["Lat"].alias("lat"),data["Long"].alias("long"), data["FID"].alias("fid"))

    if(variable==1):
        data.printSchema()
    return data

def groupingAl():

    result=joinDataset(0)
    result.printSchema()
    result.groupBy("mci").count().orderBy(col("count").desc()).show()
    result.groupBy("neighbourhood_name").count().orderBy(col("count").desc()).show()
    analisis=result.withColumn("year_difference",col("reportedyear")-col("occurrenceyear"))
    analisis.printSchema()
    analisis.select(analisis["year_difference"],analisis["index"]).where(col("year_difference")>5).orderBy(col("year_difference").desc()).show()
    return result

result2=groupingAl()

root
 |-- neighbourhood_name: string (nullable = true)
 |-- neighbourhood_id: integer (nullable = true)
 |-- total_area: double (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- home_prices: integer (nullable = true)
 |-- local_employment: integer (nullable = true)
 |-- social_assistance_recipients: integer (nullable = true)
 |-- catholic_school_graduation: double (nullable = true)
 |-- catholic_school_literacy: float (nullable = true)
 |-- catholic_university_applicants: float (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- index: integer (nullable = true)
 |-- event_unique_id: string (nullable = true)
 |-- premisetype: string (nullable = true)
 |-- ucr_code: integer (nullable = true)
 |-- ucr_ext: integer (nullable = true)
 |-- offence: string (nullable = true)
 |-- reportedyear: integer (nullable = true)
 |-- reportedmonth: string (nullable = true)
 |-- reportedday: integer (n

+---------------+-----+
|            mci|count|
+---------------+-----+
|        Assault|71406|
|Break and Enter|27353|
|        Robbery|14590|
|     Auto Theft|13477|
|     Theft Over| 4207|
+---------------+-----+



+--------------------+-----+
|  neighbourhood_name|count|
+--------------------+-----+
|church-yonge-corr...| 4699|
|waterfront-commun...| 4362|
|west-humber-clair...| 3282|
| bay-street-corridor| 2744|
|           moss-park| 2653|
|york-university-h...| 2538|
|              woburn| 2530|
|kensington-chinatown| 2440|
|downsview-roding-cfb| 2298|
|               annex| 2283|
|           west-hill| 2263|
|islington-city-ce...| 1999|
|dovercourt-wallac...| 1857|
|    wexford/maryvale| 1837|
|mount-olive-silve...| 1818|
|             bendale| 1812|
| clairlea-birchmount| 1762|
|         black-creek| 1628|
|             malvern| 1604|
|glenfield-jane-he...| 1579|
+--------------------+-----+
only showing top 20 rows

root
 |-- neighbourhood_name: string (nullable = true)
 |-- neighbourhood_id: integer (nullable = true)
 |-- total_area: double (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- home_prices: integer (nullable = true)
 |-- local_employment: integer (nullabl

+---------------+-----+
|year_difference|index|
+---------------+-----+
|             17|    9|
|             17|    5|
|             16|    6|
|             16|    8|
|             15|    7|
|             15|   33|
|             15|   14|
|             15|   34|
|             15|   15|
|             15|   32|
|             15|11748|
|             14|    4|
|             14|   17|
|             14|   31|
|             14|   13|
|             14|11747|
|             14|   19|
|             14|   21|
|             14|   10|
|             14|12410|
+---------------+-----+
only showing top 20 rows

