# SPARK
### Group H

https://www.kaggle.com/sumaiaparveenshupti/los-angeles-crime-data-20102020?select=Crime_Data_from_2020_to_Present.csv

In [2]:
import findspark
findspark.init()

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Data source and Spark data abstraction (DataFrame) setup

In [3]:
#We had to manually set the schema to avoid problems with Time column 
  #Time column is in military hour, if spark infers the schema it erases the 00 at the beginning, loosing the format
from pyspark.sql.types import StructType, StructField, IntegerType, DateType, DoubleType, StringType

dfschema = StructType([
    StructField("DR_NO", IntegerType()),
    StructField("Date Rptd", StringType()),
    StructField("DATE OCC", StringType()),
    StructField("TIME OCC", StringType()),
    StructField("AREA", IntegerType()),      
    StructField("AREA NAME", StringType()),
    StructField("Rpt Dist No", IntegerType()),
    StructField("Part 1-2", IntegerType()),
    StructField("Crm Cd", IntegerType()),
    StructField("Crm Cd Desc", StringType()),
    StructField("Mocodes", StringType()),
    StructField("Vict Age", IntegerType()),
    StructField("Vict Sex", StringType()),
    StructField("Vict Descent", StringType()),
    StructField("Premis Cd", IntegerType()),
    StructField("Premis Desc", StringType()),
    StructField("Weapon Used Cd", IntegerType()),
    StructField("Weapon Desc", StringType()),
    StructField("Status", StringType()),
    StructField("Status Desc", StringType()),
    StructField("Crm Cd 1", IntegerType()),
    StructField("Crm Cd 2", IntegerType()),
    StructField("Crm Cd 3", IntegerType()),
    StructField("Crm Cd 4", IntegerType()),
    StructField("LOCATION", StringType()),
    StructField("Cross Street", StringType()),
    StructField("LAT", DoubleType()),
    StructField("LON", DoubleType()),   
])

In [4]:
LA_crimesdf = spark.read \
                 .schema(dfschema) \
                 .option("header", "true") \
                 .csv("crime_LA.csv")
                # .option("inferSchema", "true") \

### Display schema and size of the DataFrame

In [5]:
from IPython.display import display, Markdown

LA_crimesdf.printSchema()
display(Markdown("This DataFrame has **%d rows**." % LA_crimesdf.count()))

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: string (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- LO

                                                                                

This DataFrame has **2117589 rows**.

In [6]:
from pyspark.sql.functions import * 

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

df_schema = LA_crimesdf.withColumn("DATE OCC", to_date(col("DATE OCC"),"MM/dd/yyyy hh:mm:ss a")) \
            .withColumn("Date Rptd", to_date(col("Date Rptd"),"MM/dd/yyyy hh:mm:ss a")) \
            .withColumn("TIME OCC", to_timestamp(col("TIME OCC"),"HHmm"))
            #.printSchema()

In [7]:
df_schema.select("Date Rptd").show(2)

+----------+
| Date Rptd|
+----------+
|2010-02-20|
|2010-09-13|
+----------+
only showing top 2 rows



In [8]:
df_schema.printSchema()

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: date (nullable = true)
 |-- DATE OCC: date (nullable = true)
 |-- TIME OCC: timestamp (nullable = true)
 |-- AREA: integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- LOC

### Random Samples

In [9]:
df_schema.cache() # optimization to make the processing faster
df_sample = df_schema.sample(False, 0.10)
display(Markdown("This DataFrame has **%d rows**." % df_sample.count()))

                                                                                

This DataFrame has **211381 rows**.

In [10]:
#We do have negative ages that we will not take into consideration
df_sample=df_sample.where(col("Vict Age")>0)

display(Markdown("This DataFrame has **%d rows**." % df_sample.count()))

This DataFrame has **174454 rows**.

In [11]:
df_sample.select(year("Date Rptd")).distinct().collect()

                                                                                

[Row(year(Date Rptd)=2018),
 Row(year(Date Rptd)=2015),
 Row(year(Date Rptd)=2013),
 Row(year(Date Rptd)=2014),
 Row(year(Date Rptd)=2019),
 Row(year(Date Rptd)=2020),
 Row(year(Date Rptd)=2012),
 Row(year(Date Rptd)=2016),
 Row(year(Date Rptd)=2010),
 Row(year(Date Rptd)=2011),
 Row(year(Date Rptd)=2017),
 Row(year(Date Rptd)=2021)]

### Basic Profiling

**Variable segmentation:

        DR_NO: Division of Records Number: Official file number made up of a 2 digit year, area ID, and 5 digits.

 - `Where and When Crime Ocurred`
  
        - DATE OCC: MM/DD/YYYY.
        - TIME OCC: In 24 hour military time.
        - AREA: The LAPD has 21 Community Police Stations referred to as Geographic Areas within the department. These Geographic Areas are sequentially numbered from 1-21.
        - AREA NAME: The 21 Geographic Areas or Patrol Divisions are also given a name designation that references a landmark or the surrounding community that it is responsible for. For example 77th Street Division is located at the intersection of South Broadway and 77th Street, serving neighborhoods in South Los Angeles.
        - Rpt Dist No: A four-digit code that represents a sub-area within a Geographic Area. All crime records reference the "RD" that it occurred in for statistical comparisons.
        - LOCATION: Street address of crime incident rounded to the nearest hundred block to maintain anonymity.
        - Cross Street: Cross Street of rounded Address.
        - LAT: Latitude.
        - LON: Longitude.
        
        
 - `Crime Description`
 
        - Crm Cd: Indicates the crime committed. (Same as Crime Code 1)
        - Crm Cd Desc: Defines the Crime Code provided.
        - Mocodes: Modus Operandi: Activities associated with the suspect in commission of the crime.
        - Premis Cd: The type of structure, vehicle, or location where the crime took place.
        - Premis Desc: Defines the Premise Code provided.
        - Weapon Used Cd: The type of weapon used in the crime.
        - Weapon Desc: Defines the Weapon Used Code provided.
        - Status: Status of the case. (IC is the default).
        - Status Desc: Defines the Status Code provided.
        - Crm Cd 1: Indicates the crime committed. Crime Code 1 is the primary and most serious one. Crime Code 2, 3, and 4 are respectively less serious offenses. Lower crime class numbers are more serious.
        - Crm Cd 2: May contain a code for an additional crime, less serious than Crime Code 1.
        - Crm Cd 3: May contain a code for an additional crime, less serious than Crime Code 1.
        - Crm Cd 4: May contain a code for an additional crime, less serious than Crime Code 1.
        
 - `Victim Description` 
        
        - Vict Age: Two character numeric.
        - Vict Sex: F - Female M - Male X - Unknown.
        - Vict Descent: Descent Code: A - Other Asian B - Black C - Chinese D - Cambodian F - Filipino G - Guamanian H - Hispanic/Latin/Mexican I - American Indian/Alaskan Native J - Japanese K - Korean L - Laotian O - Other P - Pacific Islander S - Samoan U - Hawaiian V - Vietnamese W - White X - Unknown Z - Asian Indian.
        
        

In [12]:
#Additional Features
from pyspark.sql.functions import year, month, quarter

df_sample=df_sample.withColumn("Year", year("Date Rptd"))\
                    .withColumn("Month",month("Date Rptd"))\
                    .withColumn("Quarter",quarter("Date Rptd"))
                    


In [13]:
# Where and When Columns
from IPython.display import display, Markdown
from pyspark.sql.functions import when, count, col, countDistinct, desc, first, lit, asc_nulls_last, desc_nulls_last, hour


print('\033[1m' + "Where and When Columns" + '\033[0m')

print ("Summary of columns:")
df_sample.select(hour("TIME OCC").alias("Hour"),"DR_NO","Year","Month","Quarter").summary().show()

print("Checking for nulls:")
df_sample.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Date Rptd","DATE OCC","TIME OCC","AREA","AREA NAME","Rpt Dist No","LOCATION","Cross Street","LAT","LON","Year","Month","Quarter"]]).show()

print("Checking amount of distinct values:")
df_sample.select([countDistinct(c).alias(c) for c in ["AREA NAME","Rpt Dist No","Location","Cross Street"]]).show()

print ("Most and least frequent occurrences:")
freq_AreaName = df_sample.groupBy("AREA NAME").agg(count(lit(1)).alias("Total"))
freq_CrossStreet = df_sample.groupBy("Cross Street").agg(count(lit(1)).alias("Total"))
#freq_TimeOcc = df_sample.groupBy("TIME OCC").agg(count(lit(1)).alias("Total"))
freq_TimeOcc = df_sample.groupBy(hour("TIME OCC").alias("TIME OCC")).agg(count(lit(1)).alias("Total"))

leastFreqAreaName = freq_AreaName.orderBy(col("Total").asc()).first()
mostFreqAreaName = freq_AreaName.orderBy(col("Total").desc()).first()
leastFreqTimeOcc = freq_TimeOcc.orderBy(col("Total").asc()).first()
mostFreqTimeOcc = freq_TimeOcc.orderBy(col("Total").desc()).first()
leastFreqCrossStreet = freq_CrossStreet.orderBy(col("Total").asc()).first()
mostFreqCrossStreet = freq_CrossStreet.orderBy(col("Total").desc()).first()
                                               
display(Markdown(
    
    """
| %s | %s | %s | %s | %s | %s |
|----|----|----|----|----|----|
| %s | %s | %s | %s | %s | %s |
""" % ("leastFreqAreaName", "mostFreqAreaName", "leastFreqTimeOcc", "mostFreqTimeOcc", "leastFreqCrossStreet", "mostFreqCrossStreet", \
       "%s (%d occurrences)" % (leastFreqAreaName["AREA NAME"], leastFreqAreaName["Total"]), \
       "%s (%d occurrences)" % (mostFreqAreaName["AREA NAME"], mostFreqAreaName["Total"]), \
       "%s (%d occurrences)" % (leastFreqTimeOcc["TIME OCC"], leastFreqTimeOcc["Total"]), \
       "%s (%d occurrences)" % (mostFreqTimeOcc["TIME OCC"], mostFreqTimeOcc["Total"]), \
       "%s (%d occurrences)" % (leastFreqCrossStreet["Cross Street"], leastFreqCrossStreet["Total"]), \
       "%s (%d occurrences)" % (mostFreqCrossStreet["Cross Street"], mostFreqCrossStreet["Total"]))))

[1mWhere and When Columns[0m
Summary of columns:


                                                                                

+-------+------------------+--------------------+------------------+------------------+------------------+
|summary|              Hour|               DR_NO|              Year|             Month|           Quarter|
+-------+------------------+--------------------+------------------+------------------+------------------+
|  count|            174454|              174454|            174454|            174454|            174454|
|   mean|13.288293762252513|1.4767144272541186E8|2014.6527222075733| 6.523347128756004|2.5111089456246347|
| stddev| 6.467924275476101|  2.88973274303441E7|2.8889601032877867|3.4115860543236103|1.1082749436753492|
|    min|                 0|            20305364|              2010|                 1|                 1|
|    25%|                 9|           121412019|              2012|                 4|                 2|
|    50%|                14|           150624245|              2015|                 7|                 3|
|    75%|                19|         

                                                                                

+---------+--------+--------+----+---------+-----------+--------+------------+---+---+----+-----+-------+
|Date Rptd|DATE OCC|TIME OCC|AREA|AREA NAME|Rpt Dist No|LOCATION|Cross Street|LAT|LON|Year|Month|Quarter|
+---------+--------+--------+----+---------+-----------+--------+------------+---+---+----+-----+-------+
|        0|       0|       0|   0|        0|          0|       0|      145502|  0|  0|   0|    0|      0|
+---------+--------+--------+----+---------+-----------+--------+------------+---+---+----+-----+-------+

Checking amount of distinct values:


                                                                                

+---------+-----------+--------+------------+
|AREA NAME|Rpt Dist No|Location|Cross Street|
+---------+-----------+--------+------------+
|       21|       1164|   41447|        4413|
+---------+-----------+--------+------------+

Most and least frequent occurrences:


                                                                                


| leastFreqAreaName | mostFreqAreaName | leastFreqTimeOcc | mostFreqTimeOcc | leastFreqCrossStreet | mostFreqCrossStreet |
|----|----|----|----|----|----|
| Hollenbeck (5761 occurrences) | 77th Street (12365 occurrences) | 5 (2122 occurrences) | 12 (14714 occurrences) | BUCKINGHAM                   ST (1 occurrences) | None (145502 occurrences) |


In [12]:
#Crime Description

print('\033[1m' + "Crime Description Columns" + '\033[0m')

print ("Summary of columns:")
df_sample.select("Crm Cd","Crm Cd 1","Crm Cd 2","Crm Cd 3","Crm Cd 4").summary().show()

print("Checking for nulls:")
df_sample.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Crm Cd","Crm Cd Desc","Mocodes","Premis Cd","Premis Desc"]]).show()
df_sample.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Weapon Used Cd","Weapon Desc","Status","Status Desc","Crm Cd 1","Crm Cd 2","Crm Cd 3","Crm Cd 4"]]).show()

print("Checking amount of distinct values:")
df_sample.select([countDistinct(c).alias(c) for c in ["Weapon Used Cd","Weapon Desc","Status","Status Desc","Mocodes"]]).show()

print ("Most and least frequent occurrences:")
freq_CrmCdDesc = df_sample.groupBy("Crm Cd Desc").agg(count(lit(1)).alias("Total"))
freq_StatusDesc = df_sample.groupBy("Status Desc").agg(count(lit(1)).alias("Total"))
freq_WeaponDesc = df_sample.groupBy("Weapon Desc").agg(count(lit(1)).alias("Total"))


leastFreqCrmCdDesc = freq_CrmCdDesc.orderBy(col("Total").asc()).first()
mostFreqCrmCdDesc = freq_CrmCdDesc.orderBy(col("Total").desc()).first()
leastFreqStatusDesc = freq_StatusDesc.orderBy(col("Total").asc()).first()
mostFreqStatusDesc = freq_StatusDesc.orderBy(col("Total").desc()).first()
leastFreqWeaponDesc = freq_WeaponDesc.orderBy(col("Total").asc()).first()
mostFreqWeaponDesc = freq_WeaponDesc.orderBy(col("Total").desc()).first()

display(Markdown(
    
    """
| %s | %s | %s | %s || %s || %s |
|----|----|----|----||----||----|
| %s | %s | %s | %s || %s || %s |
""" % ("leastFreqCrmCdDesc", "mostFreqCrmCdDesc", "leastFreqStatusDesc", "mostFreqStatusDesc","leastFreqWeaponDesc", "mostFreqWeaponDesc", \
       "%s (%d occurrences)" % (leastFreqCrmCdDesc["Crm Cd Desc"], leastFreqCrmCdDesc["Total"]), \
       "%s (%d occurrences)" % (mostFreqCrmCdDesc["Crm Cd Desc"], mostFreqCrmCdDesc["Total"]), \
       "%s (%d occurrences)" % (leastFreqStatusDesc["Status Desc"], leastFreqStatusDesc["Total"]), \
       "%s (%d occurrences)" % (mostFreqStatusDesc["Status Desc"], mostFreqStatusDesc["Total"]), \
       "%s (%d occurrences)" % (leastFreqWeaponDesc["Weapon Desc"], leastFreqWeaponDesc["Total"]), \
       "%s (%d occurrences)" % (mostFreqWeaponDesc["Weapon Desc"], mostFreqWeaponDesc["Total"]))))


[1mCrime Description Columns[0m
Summary of columns:


                                                                                

+-------+-----------------+------------------+------------------+-----------------+--------+
|summary|           Crm Cd|          Crm Cd 1|          Crm Cd 2|         Crm Cd 3|Crm Cd 4|
+-------+-----------------+------------------+------------------+-----------------+--------+
|  count|           175430|            175429|             10612|              188|       8|
|   mean| 505.988114917631| 505.8817299306272| 953.2878816434226|966.1702127659574|   998.0|
| stddev|221.2003135486193|221.13809355867178|124.19112694486192|94.25067704806506|     0.0|
|    min|              110|             110.0|             210.0|            420.0|   998.0|
|    25%|              330|             330.0|             998.0|            998.0|   998.0|
|    50%|              440|             440.0|             998.0|            998.0|   998.0|
|    75%|              627|             627.0|             998.0|            998.0|   998.0|
|    max|              956|             956.0|             999.0|     

                                                                                

+--------------+-----------+------+-----------+-------+
|Weapon Used Cd|Weapon Desc|Status|Status Desc|Mocodes|
+--------------+-----------+------+-----------+-------+
|            75|         75|     6|          6|  53905|
+--------------+-----------+------+-----------+-------+

Most and least frequent occurrences:


                                                                                


| leastFreqCrmCdDesc | mostFreqCrmCdDesc | leastFreqStatusDesc | mostFreqStatusDesc || leastFreqWeaponDesc || mostFreqWeaponDesc |
|----|----|----|----||----||----|
| INCEST (SEXUAL ACTS BETWEEN BLOOD RELATIVES) (1 occurrences) | BATTERY - SIMPLE ASSAULT (19991 occurrences) | UNK (5 occurrences) | Invest Cont (132976 occurrences) || MAC-10 SEMIAUTOMATIC ASSAULT WEAPON (1 occurrences) || None (108930 occurrences) |


In [13]:
#Victim Description

print('\033[1m' + "Victim Description Columns" + '\033[0m')

print ("Summary of columns:")
df_sample.select("Vict Age").summary().show()

print("Checking for nulls:")
df_sample.select([count(when(col(c).isNull(), c)).alias(c) for c in ["Vict Age","Vict Sex","Vict Descent"]]).show()

print("Checking amount of distinct values:")
df_sample.select([countDistinct(c).alias(c) for c in ["Vict Age","Vict Sex","Vict Descent"]]).show()


print ("Most and least frequent occurrences:")
freq_VictAge = df_sample.groupBy("Vict Age").agg(count(lit(1)).alias("Total"))
freq_VictSex = df_sample.groupBy("Vict Sex").agg(count(lit(1)).alias("Total"))
freq_VictDescent = df_sample.groupBy("Vict Descent").agg(count(lit(1)).alias("Total"))


leastFreqVictAge = freq_VictAge.orderBy(col("Total").asc()).first()
mostFreqVictAge = freq_VictAge.orderBy(col("Total").desc()).first()
leastFreqVictSex = freq_VictSex.orderBy(col("Total").asc()).first()
mostFreqVictSex = freq_VictSex.orderBy(col("Total").desc()).first()
leastFreqVictDescent = freq_VictDescent.orderBy(col("Total").asc()).first()
mostFreqVictDescent = freq_VictDescent.orderBy(col("Total").desc()).first()
                                               
display(Markdown(
    
    """
| %s | %s | %s | %s | %s | %s |
|----|----|----|----|----|----|
| %s | %s | %s | %s | %s | %s |
""" % ("leastFreqVictAge", "mostFreqVictAge", "leastFreqVictSex", "mostFreqVictSex","leastFreqVictDescen", "mostFreqVictDescen", \
       "%s (%d occurrences)" % (leastFreqVictAge["Vict Age"], leastFreqVictAge["Total"]), \
       "%s (%d occurrences)" % (mostFreqVictAge["Vict Age"], mostFreqVictAge["Total"]), \
       "%s (%d occurrences)" % (leastFreqVictSex["Vict Sex"], leastFreqVictSex["Total"]), \
       "%s (%d occurrences)" % (mostFreqVictSex["Vict Sex"], mostFreqVictSex["Total"]),\
       "%s (%d occurrences)" % (leastFreqVictDescent["Vict Descent"], leastFreqVictDescent["Total"]), \
       "%s (%d occurrences)" % (mostFreqVictDescent["Vict Descent"], mostFreqVictDescent["Total"]))))

[1mVictim Description Columns[0m
Summary of columns:
+-------+-----------------+
|summary|         Vict Age|
+-------+-----------------+
|  count|           175430|
|   mean|37.83759904235308|
| stddev|16.04160329076748|
|    min|                2|
|    25%|               25|
|    50%|               36|
|    75%|               49|
|    max|               99|
+-------+-----------------+

Checking for nulls:
+--------+--------+------------+
|Vict Age|Vict Sex|Vict Descent|
+--------+--------+------------+
|       0|       6|           7|
+--------+--------+------------+

Checking amount of distinct values:


                                                                                

+--------+--------+------------+
|Vict Age|Vict Sex|Vict Descent|
+--------+--------+------------+
|      98|       5|          19|
+--------+--------+------------+

Most and least frequent occurrences:


                                                                                


| leastFreqVictAge | mostFreqVictAge | leastFreqVictSex | mostFreqVictSex | leastFreqVictDescen | mostFreqVictDescen |
|----|----|----|----|----|----|
| 98 (8 occurrences) | 25 (4877 occurrences) | - (1 occurrences) | F (89506 occurrences) | L (1 occurrences) | H (71091 occurrences) |


# Business Questions

## A. Ratio of crimes commited by time segment

In [14]:
from pyspark.sql.functions import count, round

# Time Segmentation:
#
#   "Early Morning"     [5:00 AM,9:00 AM) 
#   "Late Morning"     [9:00 AM,12:00 AM)
#   "Afternoon"         [12:00 AM,15:00 PM)
#   "Evening"    (15:00 PM,20:00 PM]
#   "Night"     (20:00 PM,00:00 AM)
#   "Dawn"     [00:00 AM,5:00 AM)



# 1. Let's enrich the DF with our categorization
totalCrimes = df_sample.count()
TimeCategorizationDF = df_sample.withColumn("TimeSegment", when(((hour(col("TIME OCC"))>=5) & (hour(col("TIME OCC"))<9)).alias("TIME OCC"),"1.Early Morning")\
                               .when(((hour(col("TIME OCC"))>=9) & (hour(col("TIME OCC"))<12)).alias("TIME OCC"),"2.Late Morning")\
                               .when(((hour(col("TIME OCC"))>=12) & (hour(col("TIME OCC"))<15)).alias("TIME OCC"),"3.Afternoon")\
                               .when(((hour(col("TIME OCC"))>=15) & (hour(col("TIME OCC"))<20)).alias("TIME OCC"),"4.Evening")\
                               .when(((hour(col("TIME OCC"))>=20) & (hour(col("TIME OCC"))<=23)).alias("TIME OCC"),"5.Night")\
                               .otherwise("6.Dawn"))
TimeCategorizationDF.cache() # optimization to make the processing faster

# 2. Ready to answer to this business question
TimeCategorizationDF.select("TimeSegment", "TIME OCC")\
                     .groupBy("TimeSegment")\
                     .agg(count("TimeSegment").alias("NumCrimes"), \
                          (count("TimeSegment")/totalCrimes*100).alias("Ratio"))\
                     .orderBy("TimeSegment")\
                     .select("TimeSegment","NumCrimes",round("Ratio",2).alias("RoundedRatio")).show()



+---------------+---------+------------+
|    TimeSegment|NumCrimes|RoundedRatio|
+---------------+---------+------------+
|1.Early Morning|    16992|        9.74|
| 2.Late Morning|    21985|        12.6|
|    3.Afternoon|    30735|       17.62|
|      4.Evening|    46917|       26.89|
|        5.Night|    35158|       20.15|
|         6.Dawn|    22667|       12.99|
+---------------+---------+------------+



                                                                                

In [39]:
from pyspark.sql.functions import count, round

# Crime Severity Segmentation:
#
#   "Severe"     [0, 200] 
#   "Less Severe"     (200,400]
#   "Moderate"        (400,600]
#   "Mild"            (600,800]
#   "Not Severe"     (800,1000)




# 1. Let's enrich the DF with our categorization
totalCrimes = df_sample.count()
SeverityCategorizationDF = TimeCategorizationDF.withColumn("CrimeSeverity", when(col("Crm Cd")<=200,"1.Severe")\
                               .when((col("Crm Cd")>200) & (col("Crm Cd")<=400),"2.Less Severe")\
                               .when((col("Crm Cd")>400) & (col("Crm Cd")<600),"3.Moderate")\
                               .when((col("Crm Cd")>600) & (col("Crm Cd")<800),"4.Mild")\
                               .otherwise("5.Not Severe"))
SeverityCategorizationDF.cache() # optimization to make the processing faster

# 2. Ready to answer to this business question
SeverityCategorizationDF.select("CrimeSeverity", "Crm Cd")\
                     .groupBy("CrimeSeverity")\
                     .agg(count("CrimeSeverity").alias("NumCrimes"), \
                          (count("CrimeSeverity")/totalCrimes*100).alias("Ratio"))\
                     .orderBy("CrimeSeverity")\
                     .select("CrimeSeverity","NumCrimes",round("Ratio",2).alias("RoundedRatio")).show()



+-------------+---------+------------+
|CrimeSeverity|NumCrimes|RoundedRatio|
+-------------+---------+------------+
|     1.Severe|     1424|        0.82|
|2.Less Severe|    73692|       42.24|
|   3.Moderate|    23427|       13.43|
|       4.Mild|    55388|       31.75|
| 5.Not Severe|    20523|       11.76|
+-------------+---------+------------+



                                                                                

In [24]:
display(Markdown("**Severity of Crime by Time of the day** (Order by 1.Severe):"))

SeverityCategorizationDF.groupBy("TimeSegment")\
                        .pivot("CrimeSeverity")\
                        .agg(count("CrimeSeverity")/totalCrimes*100)\
                        .orderBy(col("`1.Severe`").desc())\
                        .select("TimeSegment",round("`1.Severe`",2).alias("1.Severe"),round("`2.Less Severe`",2).alias("2.Less Severe"),round("`3.Moderate`",2).alias("3.Moderate"),round("`4.Mild`",2).alias("4.Mild"),round("`5.Not Severe`",2).alias("5. Not Severe")).show()

**Severity of Crime by Time of the day** (Order by 1.Severe):



+---------------+--------+-------------+----------+------+-------------+
|    TimeSegment|1.Severe|2.Less Severe|3.Moderate|4.Mild|5. Not Severe|
+---------------+--------+-------------+----------+------+-------------+
|         6.Dawn|    0.26|         5.77|       1.2|  4.45|         1.32|
|        5.Night|    0.21|         8.26|      2.42|  7.27|         1.99|
|      4.Evening|    0.14|        10.65|      4.23|   8.6|         3.28|
|    3.Afternoon|    0.08|         7.86|      2.63|  4.84|         2.21|
|1.Early Morning|    0.08|         4.47|      1.17|  2.73|          1.3|
| 2.Late Morning|    0.05|         5.24|      1.78|  3.86|         1.67|
+---------------+--------+-------------+----------+------+-------------+



                                                                                

In [25]:
display(Markdown("**Severity of Crime by Time of the day** (Order by 3.Moderate):"))

#,col("`2.Less Severe`").desc(),col("`3.Moderate`").desc(),col("`4.Mild`").desc(),col("`5.Not Severe`").desc()

SeverityCategorizationDF.groupBy("TimeSegment")\
                        .pivot("CrimeSeverity")\
                        .agg(count("CrimeSeverity")/totalCrimes*100)\
                        .orderBy(col("`3.Moderate`").desc())\
                        .select("TimeSegment",round("`1.Severe`",2).alias("1.Severe"),round("`2.Less Severe`",2).alias("2.Less Severe"),round("`3.Moderate`",2).alias("3.Moderate"),round("`4.Mild`",2).alias("4.Mild"),round("`5.Not Severe`",2).alias("5. Not Severe")).show()

**Severity of Crime by Time of the day** (Order by 3.Moderate):



+---------------+--------+-------------+----------+------+-------------+
|    TimeSegment|1.Severe|2.Less Severe|3.Moderate|4.Mild|5. Not Severe|
+---------------+--------+-------------+----------+------+-------------+
|      4.Evening|    0.14|        10.65|      4.23|   8.6|         3.28|
|    3.Afternoon|    0.08|         7.86|      2.63|  4.84|         2.21|
|        5.Night|    0.21|         8.26|      2.42|  7.27|         1.99|
| 2.Late Morning|    0.05|         5.24|      1.78|  3.86|         1.67|
|         6.Dawn|    0.26|         5.77|       1.2|  4.45|         1.32|
|1.Early Morning|    0.08|         4.47|      1.17|  2.73|          1.3|
+---------------+--------+-------------+----------+------+-------------+



                                                                                

In [26]:
display(Markdown("**Severity of Crime by Time of the day** (Order by 5.Not Severe):"))

SeverityCategorizationDF.groupBy("TimeSegment")\
                        .pivot("CrimeSeverity")\
                        .agg(count("CrimeSeverity")/totalCrimes*100)\
                        .orderBy(col("`5.Not Severe`").desc())\
                        .select("TimeSegment",round("`1.Severe`",2).alias("1.Severe"),round("`2.Less Severe`",2).alias("2.Less Severe"),round("`3.Moderate`",2).alias("3.Moderate"),round("`4.Mild`",2).alias("4.Mild"),round("`5.Not Severe`",2).alias("5. Not Severe")).show()

**Severity of Crime by Time of the day** (Order by 5.Not Severe):



+---------------+--------+-------------+----------+------+-------------+
|    TimeSegment|1.Severe|2.Less Severe|3.Moderate|4.Mild|5. Not Severe|
+---------------+--------+-------------+----------+------+-------------+
|      4.Evening|    0.14|        10.65|      4.23|   8.6|         3.28|
|    3.Afternoon|    0.08|         7.86|      2.63|  4.84|         2.21|
|        5.Night|    0.21|         8.26|      2.42|  7.27|         1.99|
| 2.Late Morning|    0.05|         5.24|      1.78|  3.86|         1.67|
|         6.Dawn|    0.26|         5.77|       1.2|  4.45|         1.32|
|1.Early Morning|    0.08|         4.47|      1.17|  2.73|          1.3|
+---------------+--------+-------------+----------+------+-------------+



                                                                                

In [27]:
display(Markdown("**Severity of Crime by Year**"))

SeverityCategorizationDF.groupBy("Year")\
                        .pivot("CrimeSeverity")\
                        .agg(count("CrimeSeverity"))\
                        .orderBy(col("Year").desc())\
                        .show()

**Severity of Crime by Year**



+----+--------+-------------+----------+------+------------+
|Year|1.Severe|2.Less Severe|3.Moderate|4.Mild|5.Not Severe|
+----+--------+-------------+----------+------+------------+
|2021|       6|           49|         6|     8|          45|
|2020|      29|          307|        60|    86|         119|
|2019|     152|         7191|      2565|  5853|        2142|
|2018|     171|         7829|      2638|  5905|        2103|
|2017|     163|         7883|      2578|  5975|        2176|
|2016|     155|         7753|      2642|  5743|        2018|
|2015|     157|         7380|      2784|  5529|        2016|
|2014|     131|         6635|      2275|  5320|        1918|
|2013|     111|         6821|      2194|  5165|        1917|
|2012|     126|         7107|      2237|  5154|        1999|
|2011|      99|         7161|      1963|  5243|        2054|
|2010|     124|         7576|      1485|  5407|        2016|
+----+--------+-------------+----------+------+------------+



                                                                                

### B. Is there Age, Gender or Race Discrimination by Crime Severity?

In [28]:
TotalCrimeMale = df_sample.where(col("Vict Sex")=="M").count()

TotalCrimeFemale = df_sample.where(col("Vict Sex")=="F").count()

print("Total Crimes towards Male",TotalCrimeMale)
print("Total Crimes towards Female", TotalCrimeFemale)
print("Ratio Male/Women:", TotalCrimeMale/TotalCrimeFemale)

Total Crimes towards Male 86231
Total Crimes towards Female 87605
Ratio Male/Women: 0.9843159637007021


In [29]:
from pyspark.sql.functions import max, min, avg, stddev

TotalCrimeMale = df_sample.where(col("Vict Sex")=="M").count()

display(Markdown("**Severity of Crime by Gender** (MALE):"))
SeverityCategorizationDF.where(col("Vict Sex")=="M")\
              .groupBy("CrimeSeverity")\
              .agg(count("CrimeSeverity").alias("TotalCrimesGender"), \
                  (count("CrimeSeverity")/TotalCrimeMale*100).alias("Ratio"))\
               .select("CrimeSeverity","TotalCrimesGender",round("Ratio",2).alias("RoundedRatio")).show()

display(Markdown("With total crimes of **%d**." % TotalCrimeMale))

**Severity of Crime by Gender** (MALE):

                                                                                

+-------------+-----------------+------------+
|CrimeSeverity|TotalCrimesGender|RoundedRatio|
+-------------+-----------------+------------+
|     1.Severe|              253|        0.29|
|2.Less Severe|            41633|       48.28|
|   3.Moderate|            12720|       14.75|
| 5.Not Severe|             7187|        8.33|
|       4.Mild|            24438|       28.34|
+-------------+-----------------+------------+



With total crimes of **86231**.

In [30]:
from pyspark.sql.functions import max, min, avg, stddev

display(Markdown("**Severity of Crime by Gender** (FEMALE):"))
SeverityCategorizationDF.where(col("Vict Sex")=="F")\
              .groupBy("CrimeSeverity")\
              .agg(count("CrimeSeverity").alias("TotalCrimesGender"), \
                  (count("CrimeSeverity")/TotalCrimeMale*100).alias("Ratio"))\
               .select("CrimeSeverity","TotalCrimesGender",round("Ratio",2).alias("RoundedRatio")).show()

display(Markdown("With total crimes of **%d**." % TotalCrimeFemale))

**Severity of Crime by Gender** (FEMALE):

+-------------+-----------------+------------+
|CrimeSeverity|TotalCrimesGender|RoundedRatio|
+-------------+-----------------+------------+
|     1.Severe|             1171|        1.36|
|2.Less Severe|            31859|       36.95|
|   3.Moderate|            10503|       12.18|
| 5.Not Severe|            13281|        15.4|
|       4.Mild|            30791|       35.71|
+-------------+-----------------+------------+



With total crimes of **87605**.

In [31]:
import pyspark.sql.functions as F

display(Markdown("**Most Common Crimes**"))

SeverityCategorizationDF.where((col("Vict Sex")=="F") | (col("Vict Sex")=="M"))\
                        .groupBy("Crm Cd Desc")\
                        .pivot("Vict Sex")\
                        .agg(count("Vict Sex"))\
                        .withColumn("F/M ratio",F.col("F")/F.col("M"))\
                        .orderBy(col("F/M ratio").desc())\
                        .select("Crm Cd Desc","F","M",round("F/M ratio",2).alias("Ratio")).show(truncate = False)

**Most Common Crimes**



+--------------------------------------------------------+----+----+-----+
|Crm Cd Desc                                             |F   |M   |Ratio|
+--------------------------------------------------------+----+----+-----+
|RAPE, FORCIBLE                                          |1029|10  |102.9|
|RAPE, ATTEMPTED                                         |102 |2   |51.0 |
|PIMPING                                                 |45  |1   |45.0 |
|PURSE SNATCHING                                         |123 |5   |24.6 |
|BATTERY WITH SEXUAL CONTACT                             |1100|93  |11.83|
|SEXUAL PENETRATION W/FOREIGN OBJECT                     |234 |22  |10.64|
|KIDNAPPING - GRAND ATTEMPT                              |48  |5   |9.6  |
|PANDERING                                               |25  |3   |8.33 |
|HUMAN TRAFFICKING - INVOLUNTARY SERVITUDE               |7   |1   |7.0  |
|SEX,UNLAWFUL(INC MUTUAL CONSENT, PENETRATION W/ FRGN OBJ|390 |59  |6.61 |
|STALKING                

                                                                                

In [32]:
from pyspark.sql.functions import max, min, avg, stddev

display(Markdown("**Severity of Crime by Age**"))
SeverityCategorizationDF.groupBy("CrimeSeverity")\
              .agg(avg("Vict Age").alias("AverageAge"),\
                   min("Vict Age").alias("LowestAge"),\
                   max("Vict Age").alias("HighestAge"),\
                   stddev("Vict Age").alias("StdDevAge"))\
              .orderBy("CrimeSeverity").show()

**Severity of Crime by Age**



+-------------+------------------+---------+----------+------------------+
|CrimeSeverity|        AverageAge|LowestAge|HighestAge|         StdDevAge|
+-------------+------------------+---------+----------+------------------+
|     1.Severe|28.223314606741575|        2|        86|13.703490585248609|
|2.Less Severe| 39.94025131628942|        2|        99| 16.07501815603156|
|   3.Moderate|  39.0078114995518|        3|        99|15.630281838246544|
|       4.Mild| 37.68778435762259|        2|        99|15.511123055128408|
| 5.Not Severe|  35.2440676314379|        2|        99|16.741300616803212|
+-------------+------------------+---------+----------+------------------+



                                                                                

In [43]:
from pyspark.sql.functions import count, round

# Crime Severity Segmentation:
#
#   [0-18)
#   [18-25)
#   [25-35)
#   [35-45)
#   [45-55)
#   [55-65)
#   [65 and older




# 1. Let's enrich the DF with our categorization
totalCrimes = df_sample.count()
SeverityCategorizationDF = SeverityCategorizationDF.withColumn("Age_Seg", when(col("Vict Age")<18,"0-17")\
                               .when((col("Vict Age")>=18) & (col("Vict Age")<25),"18-24")\
                               .when((col("Vict Age")>=25) & (col("Vict Age")<35),"25-34")\
                               .when((col("Vict Age")>=35) & (col("Vict Age")<45),"35-44")\
                               .when((col("Vict Age")>=45) & (col("Vict Age")<55),"45-54")\
                               .when((col("Vict Age")>=55) & (col("Vict Age")<65),"55-64")\
                               .otherwise("65+"))
SeverityCategorizationDF.cache() # optimization to make the processing faster

# 2. Ready to answer to this business question
display(Markdown("**Severity of Crime by Age Segmentation**"))

SeverityCategorizationDF.groupBy("Age_Seg")\
                        .pivot("CrimeSeverity")\
                        .agg(count("CrimeSeverity"))\
                        .orderBy(col("Age_Seg"))\
                        .show()



+-------+--------+-------------+----------+------+------------+
|Age_Seg|1.Severe|2.Less Severe|3.Moderate|4.Mild|5.Not Severe|
+-------+--------+-------------+----------+------+------------+
|   0-17|     283|         2728|       813|  3180|        3347|
|  18-24|     434|        10251|      3776|  8919|        2562|
|  25-34|     329|        18776|      6186| 14620|        4790|
|  35-44|     171|        15112|      4623| 10946|        3897|
|  45-54|     123|        12715|      3906|  9203|        3134|
|  55-64|      65|         8336|      2508|  5536|        1789|
|    65+|      19|         5774|      1615|  2984|        1004|
+-------+--------+-------------+----------+------+------------+



                                                                                

In [44]:
display(Markdown("**Severity of Crime by Time of the day** (Order by 5.Not Severe):"))
SeverityCategorizationDF.groupBy("Age_Seg")\
                        .pivot("CrimeSeverity")\
                        .agg(count("CrimeSeverity")/totalCrimes*100)\
                        .orderBy(col("`5.Not Severe`").desc())\
                        .select("Age_Seg",round("`1.Severe`",2).alias("1.Severe"),round("`2.Less Severe`",2).alias("2.Less Severe"),round("`3.Moderate`",2).alias("3.Moderate"),round("`4.Mild`",2).alias("4.Mild"),round("`5.Not Severe`",2).alias("5. Not Severe")).show()

**Severity of Crime by Time of the day** (Order by 5.Not Severe):



+-------+--------+-------------+----------+------+-------------+
|Age_Seg|1.Severe|2.Less Severe|3.Moderate|4.Mild|5. Not Severe|
+-------+--------+-------------+----------+------+-------------+
|  25-34|    0.19|        10.76|      3.55|  8.38|         2.75|
|  35-44|     0.1|         8.66|      2.65|  6.27|         2.23|
|   0-17|    0.16|         1.56|      0.47|  1.82|         1.92|
|  45-54|    0.07|         7.29|      2.24|  5.28|          1.8|
|  18-24|    0.25|         5.88|      2.16|  5.11|         1.47|
|  55-64|    0.04|         4.78|      1.44|  3.17|         1.03|
|    65+|    0.01|         3.31|      0.93|  1.71|         0.58|
+-------+--------+-------------+----------+------+-------------+



                                                                                

In [47]:
display(Markdown("**Severity of Crime by Time of the day** (Order by 3.Moderate):"))
SeverityCategorizationDF.groupBy("Age_Seg")\
                        .pivot("CrimeSeverity")\
                        .agg(count("CrimeSeverity")/totalCrimes*100)\
                        .orderBy(col("`3.Moderate`").desc())\
                        .select("Age_Seg",round("`1.Severe`",2).alias("1.Severe"),round("`2.Less Severe`",2).alias("2.Less Severe"),round("`3.Moderate`",2).alias("3.Moderate"),round("`4.Mild`",2).alias("4.Mild"),round("`5.Not Severe`",2).alias("5. Not Severe")).show()

**Severity of Crime by Time of the day** (Order by 3.Moderate):



+-------+--------+-------------+----------+------+-------------+
|Age_Seg|1.Severe|2.Less Severe|3.Moderate|4.Mild|5. Not Severe|
+-------+--------+-------------+----------+------+-------------+
|  25-34|    0.19|        10.76|      3.55|  8.38|         2.75|
|  35-44|     0.1|         8.66|      2.65|  6.27|         2.23|
|  45-54|    0.07|         7.29|      2.24|  5.28|          1.8|
|  18-24|    0.25|         5.88|      2.16|  5.11|         1.47|
|  55-64|    0.04|         4.78|      1.44|  3.17|         1.03|
|    65+|    0.01|         3.31|      0.93|  1.71|         0.58|
|   0-17|    0.16|         1.56|      0.47|  1.82|         1.92|
+-------+--------+-------------+----------+------+-------------+



                                                                                

In [48]:
display(Markdown("**Severity of Crime by Time of the day** (Order by 5.Not Severe):"))
SeverityCategorizationDF.groupBy("Age_Seg")\
                        .pivot("CrimeSeverity")\
                        .agg(count("CrimeSeverity")/totalCrimes*100)\
                        .orderBy(col("`5.Not Severe`").desc())\
                        .select("Age_Seg",round("`1.Severe`",2).alias("1.Severe"),round("`2.Less Severe`",2).alias("2.Less Severe"),round("`3.Moderate`",2).alias("3.Moderate"),round("`4.Mild`",2).alias("4.Mild"),round("`5.Not Severe`",2).alias("5. Not Severe")).show()

**Severity of Crime by Time of the day** (Order by 5.Not Severe):



+-------+--------+-------------+----------+------+-------------+
|Age_Seg|1.Severe|2.Less Severe|3.Moderate|4.Mild|5. Not Severe|
+-------+--------+-------------+----------+------+-------------+
|  25-34|    0.19|        10.76|      3.55|  8.38|         2.75|
|  35-44|     0.1|         8.66|      2.65|  6.27|         2.23|
|   0-17|    0.16|         1.56|      0.47|  1.82|         1.92|
|  45-54|    0.07|         7.29|      2.24|  5.28|          1.8|
|  18-24|    0.25|         5.88|      2.16|  5.11|         1.47|
|  55-64|    0.04|         4.78|      1.44|  3.17|         1.03|
|    65+|    0.01|         3.31|      0.93|  1.71|         0.58|
+-------+--------+-------------+----------+------+-------------+



                                                                                

In [25]:
display(Markdown("**Severity of Crime by Race**"))

#Descent Code: 
#A - Other Asian 
#B - Black 
#C - Chinese 
#D - Cambodian 
#F - Filipino 
#G - Guamanian 
#H - Hispanic/Latin/Mexican 
#I - American Indian/Alaskan Native 
#J - Japanese 
#K - Korean 
#L - Laotian 
#O - Other 
#P - Pacific Islander 
#S - Samoan 
#U - Hawaiian 
#V - Vietnamese 
#W - White 
#X - Unknown 
#Z - Asian Indian.


# 1. Let's enrich the DF with our categorization
SeverityCategorizationDF = SeverityCategorizationDF.withColumn("Race Desc", when(col("Vict Descent")=="A","Other Asian")\
                               .when(col("Vict Descent")=="B","Black")\
                               .when(col("Vict Descent")=="C","Chinese")\
                               .when(col("Vict Descent")=="D","Cambodian")\
                               .when(col("Vict Descent")=="F","Filipino")\
                               .when(col("Vict Descent")=="G","Guamanian")\
                               .when(col("Vict Descent")=="H","Hispanic/Latin/Mexican")\
                               .when(col("Vict Descent")=="I","American Indian/Alaskan Native")\
                               .when(col("Vict Descent")=="J","Japanese")\
                               .when(col("Vict Descent")=="K","Korean")\
                               .when(col("Vict Descent")=="L","Laotian")\
                               .when(col("Vict Descent")=="O","Other")\
                               .when(col("Vict Descent")=="P","Pacific Islander")\
                               .when(col("Vict Descent")=="S","Samoan")\
                               .when(col("Vict Descent")=="U","Hawaiian")\
                               .when(col("Vict Descent")=="V","Vietnamese")\
                               .when(col("Vict Descent")=="W","White")\
                               .when(col("Vict Descent")=="Z","Asian Indian")\
                               .otherwise("Unknown"))


SeverityCategorizationDF.groupBy("Race Desc")\
                        .pivot("CrimeSeverity")\
                        .agg(count("CrimeSeverity"))\
                        .orderBy(col("`1.Severe`").desc(),col("`2.Less Severe`").desc(),col("`3.Moderate`").desc(),col("`4.Mild`").desc(),col("`5.Not Severe`").desc())\
                        .show(truncate = False)

**Severity of Crime by Race**



+------------------------------+--------+-------------+----------+------+------------+
|Race Desc                     |1.Severe|2.Less Severe|3.Moderate|4.Mild|5.Not Severe|
+------------------------------+--------+-------------+----------+------+------------+
|Hispanic/Latin/Mexican        |604     |28809        |8226      |23714 |9738        |
|Black                         |350     |12841        |3439      |12583 |4467        |
|White                         |281     |23258        |7100      |12180 |5095        |
|Other                         |59      |7687         |2339      |4273  |1467        |
|Other Asian                   |40      |2469         |703       |1138  |338         |
|Korean                        |4       |461          |70        |265   |87          |
|American Indian/Alaskan Native|2       |29           |21        |16    |8           |
|Unknown                       |1       |197          |362       |161   |88          |
|Filipino                      |1       |11

                                                                                

In [26]:
display(Markdown("**Most Common Crimes against Hispanic/Latin/Mexican**"))

TotalCrimeHisp = SeverityCategorizationDF.where(col("Race Desc")=="Hispanic/Latin/Mexican").count()


SeverityCategorizationDF.where(col("Race Desc")=="Hispanic/Latin/Mexican")\
                        .groupBy("Crm Cd Desc")\
                        .agg(count("Crm Cd Desc").alias("Total Crimes"))\
                        .orderBy(col("Total Crimes").desc())\
                        .select("Crm Cd Desc","Total Crimes",round(col("Total Crimes")/TotalCrimeHisp,2).alias("Ratio")).show(truncate = False)

**Most Common Crimes against Hispanic/Latin/Mexican**

+--------------------------------------------------------+------------+-----+
|Crm Cd Desc                                             |Total Crimes|Ratio|
+--------------------------------------------------------+------------+-----+
|BATTERY - SIMPLE ASSAULT                                |8712        |0.12 |
|INTIMATE PARTNER - SIMPLE ASSAULT                       |5722        |0.08 |
|BURGLARY FROM VEHICLE                                   |5633        |0.08 |
|THEFT OF IDENTITY                                       |4926        |0.07 |
|ROBBERY                                                 |4542        |0.06 |
|THEFT PLAIN - PETTY ($950 & UNDER)                      |4426        |0.06 |
|BURGLARY                                                |4237        |0.06 |
|ASSAULT WITH DEADLY WEAPON, AGGRAVATED ASSAULT          |4068        |0.06 |
|VANDALISM - MISDEAMEANOR ($399 OR UNDER)                |3478        |0.05 |
|VANDALISM - FELONY ($400 & OVER, ALL CHURCH VANDALISMS) |3250  

                                                                                

# Motive

In [29]:
Mocodes_df = spark.read \
                 .option("inferSchema", "true") \
                 .option("header", "true") \
                 .csv("MO_Codes.csv")

In [30]:
Mocodes_df.printSchema()
display(Markdown("This DataFrame has **%d rows**." % Mocodes_df.count()))

root
 |-- MO_CODE: integer (nullable = true)
 |-- MO_DESCRIPTION: string (nullable = true)



This DataFrame has **824 rows**.

In [104]:
SeverityCategorizationDF.select("DR_NO","Mocodes").distinct().show(truncate = False)

+---------+--------------------------------------------+
|DR_NO    |Mocodes                                     |
+---------+--------------------------------------------+
|100105506|344                                         |
|100108119|null                                        |
|100119439|344                                         |
|100120856|344                                         |
|100121327|344                                         |
|100200602|0935 1402                                   |
|100200707|0432 0913 1212                              |
|100204466|1251 1402                                   |
|100209567|0311 0329 0907 1413                         |
|100212216|0369 0408 0416 1243 1813                    |
|100406601|344                                         |
|100217614|344                                         |
|100300781|0342 0416 0417 0422 0421 0432 0906 0913 1414|
|100300790|0421 1402 2000                              |
|100307715|344                 

In [37]:
Mocodes_split_df= SeverityCategorizationDF.select(
    "DR_NO",
    F.split("Mocodes"," ").alias("Mocodes"),
    F.posexplode(F.split("Mocodes", " ")).alias("pos","MO_CODE")
 )\

Mocodes_split_df.show()

+---------+--------------------+---+-------+
|    DR_NO|             Mocodes|pos|MO_CODE|
+---------+--------------------+---+-------+
|  1307355|  [0913, 1814, 2000]|  0|   0913|
|  1307355|  [0913, 1814, 2000]|  1|   1814|
|  1307355|  [0913, 1814, 2000]|  2|   2000|
|100100572|               [416]|  0|    416|
|100100574|               [416]|  0|    416|
|100100622|               [416]|  0|    416|
|100100623|               [329]|  0|    329|
|100100632|        [0400, 2000]|  0|   0400|
|100100632|        [0400, 2000]|  1|   2000|
|100100651|        [0416, 0429]|  0|   0416|
|100100651|        [0416, 0429]|  1|   0429|
|100100672|[0413, 0500, 0536...|  0|   0413|
|100100672|[0413, 0500, 0536...|  1|   0500|
|100100672|[0413, 0500, 0536...|  2|   0536|
|100100672|[0413, 0500, 0536...|  3|   0526|
|100100676|               [329]|  0|    329|
|100100694|[0416, 0411, 0417...|  0|   0416|
|100100694|[0416, 0411, 0417...|  1|   0411|
|100100694|[0416, 0411, 0417...|  2|   0417|
|100100694

In [54]:
display(Markdown("**Most frequent Motives for a Crime**."))

Mocodes_tot_df = Mocodes_split_df\
    .join(Mocodes_df,"MO_CODE")\
    .groupBy("MO_DESCRIPTION")\
    .agg(count("MO_DESCRIPTION").alias("Total Motive in Crimes"))\
    .orderBy(col("Total Motive in Crimes").desc())\
    .show(truncate = False)

**Most frequent Motives for a Crime**.



+--------------------------------------------------------------------------------+----------------------+
|MO_DESCRIPTION                                                                  |Total Motive in Crimes|
+--------------------------------------------------------------------------------+----------------------+
|Removes vict property                                                           |64477                 |
|Hit-Hit w/ weapon                                                               |27225                 |
|Vandalized                                                                      |20338                 |
|Domestic violence                                                               |17951                 |
|Stranger                                                                        |17580                 |
|Force used                                                                      |14154                 |
|Victim knew Suspect                          

                                                                                

## Locations

In [50]:
#display(Markdown("**Most Common Crimes against Hispanic/Latin/Mexican**"))

TotalCrimeLoc = SeverityCategorizationDF.select("AREA NAME").count()


SeverityCategorizationDF.select("AREA NAME")\
                        .groupBy("AREA NAME")\
                        .agg(count("AREA NAME").alias("Total Crimes by Location"))\
                        .orderBy(col("Total Crimes by Location").desc())\
                        .select("AREA NAME","Total Crimes by Location",round(col("Total Crimes by Location")/TotalCrimeLoc,2).alias("Ratio")).show(truncate = False)



+-----------+------------------------+-----+
|AREA NAME  |Total Crimes by Location|Ratio|
+-----------+------------------------+-----+
|77th Street|12938                   |0.07 |
|Southwest  |12039                   |0.07 |
|N Hollywood|9559                    |0.05 |
|Southeast  |9551                    |0.05 |
|Pacific    |9016                    |0.05 |
|Mission    |8629                    |0.05 |
|Topanga    |8623                    |0.05 |
|Van Nuys   |8541                    |0.05 |
|Devonshire |8406                    |0.05 |
|Northeast  |8235                    |0.05 |
|West LA    |7926                    |0.05 |
|Hollywood  |7693                    |0.04 |
|Rampart    |7661                    |0.04 |
|Newton     |7488                    |0.04 |
|Harbor     |7443                    |0.04 |
|Central    |7409                    |0.04 |
|Olympic    |7408                    |0.04 |
|West Valley|7382                    |0.04 |
|Wilshire   |7212                    |0.04 |
|Foothill 

                                                                                

# Machine Learning Model to Predict if crime is done to a hispanic

In [None]:
# Lets see if time segment, age, sex, Crm Cd and first motive can predict correctly a crime against hispanic

### Data Cleansing for model

In [66]:
Firstmotive_df = Mocodes_split_df.where(col("pos")==0)\
                                    .select("DR_NO","MO_CODE")#.show()

Crimes_df=SeverityCategorizationDF\
            .join(Firstmotive_df, "DR_NO")\
            #.select("MO_CODE").show()

In [68]:
Crimes_df.printSchema()

root
 |-- DR_NO: integer (nullable = true)
 |-- _c0: integer (nullable = true)
 |-- Date Rptd: date (nullable = true)
 |-- DATE OCC: date (nullable = true)
 |-- TIME OCC: timestamp (nullable = true)
 |-- AREA : integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: double (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: double (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: double (nullable = true)
 |-- Crm Cd 2: double (nullable = true)
 |-- Crm Cd 3: double (nullable = true)
 |-- Crm Cd 4: do

In [78]:
MLdf = Crimes_df.select(col("TimeSegment"),\
                    col("Vict Age").cast("float"),\
                    col("Vict Sex"),\
                    col("Crm Cd").cast("float"),\
                    col("Vict Descent"),\
                    col("MO_CODE").cast("float"))

MLdf.show()

+--------------+--------+--------+------+------------+-------+
|   TimeSegment|Vict Age|Vict Sex|Crm Cd|Vict Descent|MO_CODE|
+--------------+--------+--------+------+------------+-------+
|   3.Afternoon|    48.0|       M| 900.0|           H|  913.0|
|     4.Evening|    61.0|       F| 230.0|           B|  416.0|
|     4.Evening|    40.0|       M| 624.0|           W|  416.0|
|        6.Dawn|    24.0|       M| 624.0|           O|  416.0|
|        6.Dawn|    22.0|       F| 745.0|           W|  329.0|
|     4.Evening|    22.0|       F| 626.0|           H|  400.0|
|       5.Night|    23.0|       F| 236.0|           H|  416.0|
|        6.Dawn|    28.0|       F| 121.0|           H|  413.0|
|     4.Evening|    44.0|       M| 745.0|           H|  329.0|
|2.Late Morning|    62.0|       M| 230.0|           H|  416.0|
|        6.Dawn|    21.0|       F| 626.0|           H| 2000.0|
|        6.Dawn|    18.0|       F| 745.0|           H|  329.0|
|        6.Dawn|    34.0|       M| 230.0|           W| 

In [81]:
#Check for nulls
MLdf.select([count(when(isnull(c), c)).alias(c) for c in MLdf.columns])\
            .show()

+-----------+--------+--------+------+------------+-------+
|TimeSegment|Vict Age|Vict Sex|Crm Cd|Vict Descent|MO_CODE|
+-----------+--------+--------+------+------------+-------+
|          0|       0|       3|     0|           4|      0|
+-----------+--------+--------+------+------------+-------+



In [82]:
MLdf_clean = MLdf.dropna(how="any")

### Feature Engineering

In [98]:
MLdf_hispanic = MLdf_clean.withColumn("Vict Descent", when(col("Vict Descent")=="H",1)\
                               .otherwise(0))
MLdf_hips_time = MLdf_hispanic.withColumn("TimeSegmentCode",when(col("TimeSegment")=="1.Early Morning",1)\
                                        .when(col("TimeSegment")=="2.Late Morning",2)\
                                        .when(col("TimeSegment")=="3.Afternoon",3)\
                                        .when(col("TimeSegment")=="4.Evening",4)\
                                        .when(col("TimeSegment")=="5.Night",5)\
                                        .otherwise(6))
MLdf_hips_time.show()

+--------------+--------+--------+------+------------+-------+---------------+
|   TimeSegment|Vict Age|Vict Sex|Crm Cd|Vict Descent|MO_CODE|TimeSegmentCode|
+--------------+--------+--------+------+------------+-------+---------------+
|   3.Afternoon|    48.0|       M| 900.0|           1|  913.0|              3|
|     4.Evening|    61.0|       F| 230.0|           0|  416.0|              4|
|     4.Evening|    40.0|       M| 624.0|           0|  416.0|              4|
|        6.Dawn|    24.0|       M| 624.0|           0|  416.0|              6|
|        6.Dawn|    22.0|       F| 745.0|           0|  329.0|              6|
|     4.Evening|    22.0|       F| 626.0|           1|  400.0|              4|
|       5.Night|    23.0|       F| 236.0|           1|  416.0|              5|
|        6.Dawn|    28.0|       F| 121.0|           1|  413.0|              6|
|     4.Evening|    44.0|       M| 745.0|           1|  329.0|              4|
|2.Late Morning|    62.0|       M| 230.0|           

In [99]:
from pyspark.ml.feature import StringIndexer

MLdf_model = StringIndexer(inputCol="Vict Sex",\
                            outputCol='Gender',\
                            handleInvalid='keep').fit(MLdf_hips_time).transform(MLdf_hips_time)


MLdf_model.show()

+--------------+--------+--------+------+------------+-------+---------------+------+
|   TimeSegment|Vict Age|Vict Sex|Crm Cd|Vict Descent|MO_CODE|TimeSegmentCode|Gender|
+--------------+--------+--------+------+------------+-------+---------------+------+
|   3.Afternoon|    48.0|       M| 900.0|           1|  913.0|              3|   1.0|
|     4.Evening|    61.0|       F| 230.0|           0|  416.0|              4|   0.0|
|     4.Evening|    40.0|       M| 624.0|           0|  416.0|              4|   1.0|
|        6.Dawn|    24.0|       M| 624.0|           0|  416.0|              6|   1.0|
|        6.Dawn|    22.0|       F| 745.0|           0|  329.0|              6|   0.0|
|     4.Evening|    22.0|       F| 626.0|           1|  400.0|              4|   0.0|
|       5.Night|    23.0|       F| 236.0|           1|  416.0|              5|   0.0|
|        6.Dawn|    28.0|       F| 121.0|           1|  413.0|              6|   0.0|
|     4.Evening|    44.0|       M| 745.0|           1|

In [100]:
#check for datatypes
MLdf_model=MLdf_model.drop("Vict Sex","TimeSegment")
MLdf_model.dtypes

[('Vict Age', 'float'),
 ('Crm Cd', 'float'),
 ('Vict Descent', 'int'),
 ('MO_CODE', 'float'),
 ('TimeSegmentCode', 'int'),
 ('Gender', 'double')]

In [101]:
#assemnling features into one single list
from pyspark.ml.feature import VectorAssembler

required_features = ['TimeSegmentCode','Vict Age','Gender','Crm Cd','MO_CODE']
assembler = VectorAssembler(inputCols=required_features,\
                            outputCol='features')
ML_featuresDF = assembler.transform(MLdf_model)

ML_featuresDF.show()

+--------+------+------------+-------+---------------+------+--------------------+
|Vict Age|Crm Cd|Vict Descent|MO_CODE|TimeSegmentCode|Gender|            features|
+--------+------+------------+-------+---------------+------+--------------------+
|    48.0| 900.0|           1|  913.0|              3|   1.0|[3.0,48.0,1.0,900...|
|    61.0| 230.0|           0|  416.0|              4|   0.0|[4.0,61.0,0.0,230...|
|    40.0| 624.0|           0|  416.0|              4|   1.0|[4.0,40.0,1.0,624...|
|    24.0| 624.0|           0|  416.0|              6|   1.0|[6.0,24.0,1.0,624...|
|    22.0| 745.0|           0|  329.0|              6|   0.0|[6.0,22.0,0.0,745...|
|    22.0| 626.0|           1|  400.0|              4|   0.0|[4.0,22.0,0.0,626...|
|    23.0| 236.0|           1|  416.0|              5|   0.0|[5.0,23.0,0.0,236...|
|    28.0| 121.0|           1|  413.0|              6|   0.0|[6.0,28.0,0.0,121...|
|    44.0| 745.0|           1|  329.0|              4|   1.0|[4.0,44.0,1.0,745...|
|   

### Model Training - Random Forest Classifier


In [102]:
#Train and test set
from pyspark.ml.classification import RandomForestClassifier

# 1. Split the current data set to get the training (80%) and test (20%) data sets.
(trainingDF, testDF) = ML_featuresDF.randomSplit([0.8,0.2])

# 2. Initialize the algorithm to understand our featured data set.
rfcAlgorithm = RandomForestClassifier(labelCol='Vict Descent',\
                                      featuresCol='features',\
                                      maxDepth=5)

#3. Train the algorithm to build the model and apply it on the test data set.
model = rfcAlgorithm.fit(trainingDF)
predictions = model.transform(testDF)
predictions.dtypes

                                                                                

[('Vict Age', 'float'),
 ('Crm Cd', 'float'),
 ('Vict Descent', 'int'),
 ('MO_CODE', 'float'),
 ('TimeSegmentCode', 'int'),
 ('Gender', 'double'),
 ('features', 'vector'),
 ('rawPrediction', 'vector'),
 ('probability', 'vector'),
 ('prediction', 'double')]

### Model Evaluation

In [113]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='Vict Descent',\
                                              predictionCol='prediction',\
                                              metricName='accuracy')
precision = evaluator.evaluate(predictions)
print('The accuracy of our model is ', precision)

The accuracy of our model is  0.6155682347461574


In [None]:
# Low accuracy

In [None]:
#ANEXOS

from pyspark.sql.functions import * 

df_schema = LA_crimesdf.withColumn("DATE OCC", to_date(col("DATE OCC"))) \
            .withColumn("Date Rptd", to_date(col("Date Rptd"))) \
            .withColumn("TIME OCC", to_timestamp(col("TIME OCC"),"HHmm"))
            #.printSchema()