# Visualization of crime data 

The spark app is created and the data is read from the S3 Bucket and the data is processed

In [1]:
### Creating a spark context
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("crime analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1670380244949_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
### Loading data
df = spark.read.format("csv").option("header", "true").load("s3://crimedatavisuals/estimated_crimes_1979_2020 (2).csv") 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
### Schema of the data
df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- year: string (nullable = true)
 |-- state_abbr: string (nullable = true)
 |-- state_name: string (nullable = true)
 |-- population: string (nullable = true)
 |-- violent_crime: string (nullable = true)
 |-- homicide: string (nullable = true)
 |-- rape_legacy: string (nullable = true)
 |-- rape_revised: string (nullable = true)
 |-- robbery: string (nullable = true)
 |-- aggravated_assault: string (nullable = true)
 |-- property_crime: string (nullable = true)
 |-- burglary: string (nullable = true)
 |-- larceny: string (nullable = true)
 |-- motor_vehicle_theft: string (nullable = true)
 |-- caveats: string (nullable = true)

In [4]:
### Replacing the null and na values if exists in state_name with United states of America
###  as it is a summary of all the states of a particular year
df = df.na.drop(subset=["state_abbr","state_name"])
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+-------+
|year|state_abbr|          state_name|population|violent_crime|homicide|rape_legacy|rape_revised|robbery|aggravated_assault|property_crime|burglary|larceny|motor_vehicle_theft|caveats|
+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+-------+
|1979|        AK|              Alaska|    406000|         1994|      54|        292|        null|    445|              1203|         23193|    5616|  15076|               2501|   null|
|1979|        AL|             Alabama|   3769000|        15578|     496|       1037|        null|   4127|              9918|        144372|   48517|  83791|              12064|   null|
|1979|        AR|            Arkansas|   2180000|         7984|     198|   

In [5]:
df = df.drop("caveats")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
df.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['year', 'state_abbr', 'state_name', 'population', 'violent_crime', 'homicide', 'rape_legacy', 'rape_revised', 'robbery', 'aggravated_assault', 'property_crime', 'burglary', 'larceny', 'motor_vehicle_theft']

In [7]:
### Replacing the null and na values with 0 if exists in remaining columns
cols = df.columns
df = df.na.fill(value = 0, subset=cols)
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+
|year|state_abbr|          state_name|population|violent_crime|homicide|rape_legacy|rape_revised|robbery|aggravated_assault|property_crime|burglary|larceny|motor_vehicle_theft|
+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+
|1979|        AK|              Alaska|    406000|         1994|      54|        292|        null|    445|              1203|         23193|    5616|  15076|               2501|
|1979|        AL|             Alabama|   3769000|        15578|     496|       1037|        null|   4127|              9918|        144372|   48517|  83791|              12064|
|1979|        AR|            Arkansas|   2180000|         7984|     198|        595|        null|   1626|          

In [8]:
### Trimming the state_abbr column

from pyspark.sql.functions import *
df_cleaned = df.withColumn("state_abbr", trim(df.state_abbr))
df_cleaned.show(truncate = False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+
|year|state_abbr|state_name          |population|violent_crime|homicide|rape_legacy|rape_revised|robbery|aggravated_assault|property_crime|burglary|larceny|motor_vehicle_theft|
+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+
|1979|AK        |Alaska              |406000    |1994         |54      |292        |null        |445    |1203              |23193         |5616    |15076  |2501               |
|1979|AL        |Alabama             |3769000   |15578        |496     |1037       |null        |4127   |9918              |144372        |48517   |83791  |12064              |
|1979|AR        |Arkansas            |2180000   |7984         |198     |595        |null        |1626   |5565      

In [9]:
### Trimming the state_name column

df_cleaned = df_cleaned.withColumn("state_name", trim(df.state_name))
df_cleaned.show(truncate = False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+
|year|state_abbr|state_name          |population|violent_crime|homicide|rape_legacy|rape_revised|robbery|aggravated_assault|property_crime|burglary|larceny|motor_vehicle_theft|
+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+
|1979|AK        |Alaska              |406000    |1994         |54      |292        |null        |445    |1203              |23193         |5616    |15076  |2501               |
|1979|AL        |Alabama             |3769000   |15578        |496     |1037       |null        |4127   |9918              |144372        |48517   |83791  |12064              |
|1979|AR        |Arkansas            |2180000   |7984         |198     |595        |null        |1626   |5565      

# Hypothesis 1

To Visualize the crime data across the states, the total crime should be defined and the following code provides the total crimes happening across the different states.

In [10]:
from pyspark.sql.functions import *

df1 = df.groupBy("state_name","year").agg(
    sum(col("homicide")+col("robbery")+col("violent_crime")+col("motor_vehicle_theft")+col("property_crime")+col("burglary")).alias("total_crime")
)

df1.na.drop("any")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[state_name: string, year: string, total_crime: double]

In [11]:
df1.select("year","state_name","total_crime").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-------------+-----------+
|year|   state_name|total_crime|
+----+-------------+-----------+
|1980|       Oregon|   233718.0|
|1993|  Connecticut|   210612.0|
|1984|     Virginia|   277213.0|
|1983|New Hampshire|    42065.0|
|2004|      Alabama|   266650.0|
|1979|     Michigan|   781313.0|
|2008|     Illinois|   582519.0|
|1994|    Wisconsin|   257563.0|
|2006|      Arizona|   453937.0|
|2002|    Louisiana|   301780.0|
|2003|       Alaska|    35083.0|
|1981|     Michigan|   884966.0|
|1996|    Louisiana|   394213.0|
|2000|     Kentucky|   157657.0|
|2008|   New Jersey|   301137.0|
|2010|Massachusetts|   241716.0|
|1986|  Mississippi|   121982.0|
|2010|      Wyoming|    17814.0|
|2011|  Connecticut|   113050.0|
|2017|     Kentucky|   140316.0|
+----+-------------+-----------+
only showing top 20 rows

In [12]:
df1.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- state_name: string (nullable = true)
 |-- year: string (nullable = true)
 |-- total_crime: double (nullable = true)

In [13]:
df1.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+----+-----------+
|   state_name|year|total_crime|
+-------------+----+-----------+
|       Oregon|1980|   233718.0|
|  Connecticut|1993|   210612.0|
|     Virginia|1984|   277213.0|
|New Hampshire|1983|    42065.0|
|      Alabama|2004|   266650.0|
|     Michigan|1979|   781313.0|
|     Illinois|2008|   582519.0|
|    Wisconsin|1994|   257563.0|
|      Arizona|2006|   453937.0|
|    Louisiana|2002|   301780.0|
|       Alaska|2003|    35083.0|
|     Michigan|1981|   884966.0|
|    Louisiana|1996|   394213.0|
|     Kentucky|2000|   157657.0|
|   New Jersey|2008|   301137.0|
|Massachusetts|2010|   241716.0|
|  Mississippi|1986|   121982.0|
|      Wyoming|2010|    17814.0|
|  Connecticut|2011|   113050.0|
|     Kentucky|2017|   140316.0|
+-------------+----+-----------+
only showing top 20 rows

# Hypothesis 2

To visualize crime rate across states, we need to consider state_name, total_crime attributes from the newly created dataframe and the following provides the data.

In [14]:
columns = ['state_name','total_crime']

df1.orderBy(df1["total_crime"].asc()).select(columns).show(100)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+-----------+
|        state_name|total_crime|
+------------------+-----------+
|           Vermont|     9970.0|
|           Vermont|    11210.0|
|           Vermont|    11933.0|
|           Vermont|    12156.0|
|           Wyoming|    12540.0|
|           Vermont|    12708.0|
|           Vermont|    12716.0|
|           Vermont|    12792.0|
|           Wyoming|    13016.0|
|           Wyoming|    13966.0|
|           Wyoming|    14315.0|
|           Wyoming|    14880.0|
|           Wyoming|    14980.0|
|           Wyoming|    15541.0|
|      North Dakota|    16079.0|
|      North Dakota|    16531.0|
|           Wyoming|    16597.0|
|           Wyoming|    16646.0|
|      North Dakota|    16736.0|
|      North Dakota|    16971.0|
|           Wyoming|    17167.0|
|      North Dakota|    17527.0|
|      North Dakota|    17801.0|
|           Wyoming|    17814.0|
|      North Dakota|    17833.0|
|      North Dakota|    17994.0|
|           Vermont|    18258.0|
|      Nor

# Hypothesis 3

To visualize crime type highest relative to the state and we have created the new dataframes according to the maximum crime types across states.

In [15]:
from pyspark.sql.functions import *

df2 = df.groupBy("state_name").agg(
    max(col("homicide"))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
df2.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------+
|          state_name|max(homicide)|
+--------------------+-------------+
|             Georgia|          943|
|            New York|          808|
|           Minnesota|           99|
|            Nebraska|           71|
|              Alaska|           69|
|       New Hampshire|           33|
|            Oklahoma|          400|
|District of Columbia|           88|
|            Maryland|          632|
|            Missouri|          599|
|            Maryland|          553|
|            Kentucky|          364|
|       Massachusetts|          254|
|            New York|          960|
|             Florida|         1290|
|            Missouri|          723|
|         Connecticut|           98|
|              Hawaii|           41|
|            Michigan|          754|
|           Wisconsin|          308|
+--------------------+-------------+
only showing top 20 rows

In [17]:
from pyspark.sql.functions import *

df3 = df.groupBy("state_name").agg(
    max(col("violent_crime"))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
df3.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+
|          state_name|max(violent_crime)|
+--------------------+------------------+
|             Georgia|             42850|
|            New York|             70339|
|           Minnesota|              9344|
|            Nebraska|              7507|
|              Alaska|              6346|
|       New Hampshire|              2152|
|            Oklahoma|             21770|
|District of Columbia|              9423|
|            Maryland|             49757|
|            Missouri|             39358|
|            Maryland|             27511|
|            Kentucky|              9887|
|       Massachusetts|             48393|
|            New York|             98022|
|             Florida|             83368|
|            Missouri|             33385|
|         Connecticut|              9889|
|              Hawaii|              3745|
|            Michigan|             47641|
|           Wisconsin|             18861|
+--------------------+------------

In [19]:
from pyspark.sql.functions import *

df4 = df.groupBy("state_name").agg(
    max(col("robbery"))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df4.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+
|          state_name|max(robbery)|
+--------------------+------------+
|             Georgia|        7488|
|            New York|       18165|
|           Minnesota|        5702|
|            Nebraska|         994|
|              Alaska|         826|
|       New Hampshire|         327|
|            Oklahoma|        4376|
|District of Columbia|        9137|
|            Maryland|        9716|
|            Missouri|        9554|
|            Maryland|        9215|
|            Kentucky|        4080|
|       Massachusetts|        9137|
|            New York|       97434|
|             Florida|       16234|
|            Missouri|        4995|
|         Connecticut|        7717|
|              Hawaii|         867|
|            Michigan|        5352|
|           Wisconsin|        3081|
+--------------------+------------+
only showing top 20 rows

In [21]:
from pyspark.sql.functions import *

df5 = df.groupBy("state_name").agg(
    max(col("motor_vehicle_theft"))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
df5.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------------+
|          state_name|max(motor_vehicle_theft)|
+--------------------+------------------------+
|             Georgia|                   25506|
|            New York|                   19656|
|           Minnesota|                    9818|
|            Nebraska|                    6490|
|              Alaska|                    2617|
|       New Hampshire|                     943|
|            Oklahoma|                   21456|
|District of Columbia|                    9975|
|            Maryland|                   38197|
|            Missouri|                   29951|
|            Maryland|                   11281|
|            Kentucky|                    9948|
|       Massachusetts|                    9237|
|            New York|                   89900|
|             Florida|                   39091|
|            Missouri|                   24189|
|         Connecticut|                    9951|
|              Hawaii|                  

In [23]:
from pyspark.sql.functions import *

df6 = df.groupBy("state_name").agg(
    max(col("property_crime"))
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
df6.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+
|          state_name|max(property_crime)|
+--------------------+-------------------+
|             Georgia|             294817|
|            New York|             272788|
|           Minnesota|             192109|
|            Nebraska|              68963|
|              Alaska|              21293|
|       New Hampshire|              16552|
|            Oklahoma|             184342|
|District of Columbia|              53442|
|            Maryland|             267625|
|            Missouri|             240898|
|            Maryland|              97487|
|            Kentucky|              99909|
|       Massachusetts|              97977|
|            New York|             967369|
|             Florida|             460966|
|            Missouri|             163223|
|         Connecticut|              99894|
|              Hawaii|              40617|
|            Michigan|             158317|
|           Wisconsin|              86654|
+----------

In [25]:
long_lat = spark \
.read \
.option("header", "true")\
.option("inferSchema","true")\
.csv("s3://crimedatabigdataproject/statelatlong.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
long_lat = long_lat.drop("City")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
long_lat.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----------+------------+
|State|  Latitude|   Longitude|
+-----+----------+------------+
|   AL|32.6010112| -86.6807365|
|   AK|61.3025006|-158.7750198|
|   AZ|34.1682185| -111.930907|
|   AR|34.7519275| -92.1313784|
|   CA|37.2718745|-119.2704153|
|   CO|38.9979339| -105.550567|
|   CT|41.5187835|  -72.757507|
|   DE| 39.145251| -75.4189206|
|   DC|38.8993487| -77.0145666|
|   FL|27.9757279| -83.8330166|
|   GA|32.6781248| -83.2229757|
|   HI|     20.46|    -157.505|
|   ID|45.4945756|-114.1424303|
|   IL| 39.739318|  -89.504139|
|   IN|39.7662195|  -86.441277|
|   IA|41.9383166|  -93.389798|
|   KS|38.4987789| -98.3200779|
|   KY|37.8222935| -85.7682399|
|   LA|30.9733766| -91.4299097|
|   ME|45.2185133| -69.0148656|
+-----+----------+------------+
only showing top 20 rows

In [29]:
### Augumenting the Longitude and Latitude to crime_df
crime_df_long_lat_augmented= df_cleaned.join\
                (long_lat,df_cleaned.state_abbr==long_lat.State,"inner")\
                .drop(long_lat.State)
crime_df_long_lat_augmented.show(10, False)
print((crime_df_long_lat_augmented.count(), len(crime_df_long_lat_augmented.columns)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+----------+------------+
|year|state_abbr|state_name          |population|violent_crime|homicide|rape_legacy|rape_revised|robbery|aggravated_assault|property_crime|burglary|larceny|motor_vehicle_theft|Latitude  |Longitude   |
+----+----------+--------------------+----------+-------------+--------+-----------+------------+-------+------------------+--------------+--------+-------+-------------------+----------+------------+
|1979|AK        |Alaska              |406000    |1994         |54      |292        |null        |445    |1203              |23193         |5616    |15076  |2501               |61.3025006|-158.7750198|
|1979|AL        |Alabama             |3769000   |15578        |496     |1037       |null        |4127   |9918              |144372        |48517   |83791  |12064              |32.6010112|-86.68073

In [31]:
df_cleaned.coalesce(1).write.csv("Crimes_DF_Cleaned", header = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
ACCESS_KEY ='ASIAZXYW6RWYCZFJ4WXO'
SECRET_KEY = 'xcrVAyRI0bmNyDrr+5gvedu9+aWMX6awDYjD7ENl'
SESSION_TOKEN = 'FwoGZXIvYXdzEPX//////////wEaDGzZW8nXC+Adb35E4yK+AW+OK6H60dXpbdSr89NPsrj8gW2jfYeozvnLvBQY54zyPxzW4ex34Yl8t0lQJLyeChx/Ter1z/2IJTi5C+PNdaxQqZy+9MOe/wNmoyfIaDohd6sgIA4eFkVXszK8pfkhHlojDxcNiM5yBrtw14NToLbwiEH69Rop3HHDDBWggHWyKNN85fX9g0VLBiTd+p/DJOFHkVryNbu20T1dN8koMePyWt5AFzlllZAQ9HAoyeUm1dHNo3cWqjXMgg1ypJ8o5u+6nAYyLYwBg/tmPNoUrkGju6lCpL0m2kgo7GPVT4uPrzSHuSUngK9j+aiLZj4ZR+CzmQ=='
import boto3
session = boto3.Session(aws_access_key_id= ACCESS_KEY,aws_secret_access_key= SECRET_KEY,aws_session_token= SESSION_TOKEN )

s3 = session.resource('s3')
BUCKET = "crimedatabigdataproject"

s3.Bucket(BUCKET).upload_file("crime_data_cleaned1.csv", "crime_data_cleaned1")


