# Standford open policing project

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=ca7e04e4cb3755158189cc0b4f6be75b076449f3ab66b3f89141b48b9f2cf108
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
# si estamos trabajando en colab, 
# debemos correr las siguientes líneas:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
# si estamos trabajando en colab, 
# podemos montar el drive
# from google.colab import drive
# drive.mount('/content/drive')

## Antecedentes

Trabajaremos datos del proyecto abierto _stanford open policing project_, que cuenta con data de 31 estados de los EUA, pero específicamente trabajaremos con datos del estado de Rhod Island.

Este proyecto está relacionado con una serie de investigaciones y publicaciones muy famosas todas hechas en R, pero en este proyecto trabajaremos en spark, si quieren conocer más, pueden visitar la [página oficial](https://openpolicing.stanford.edu/) del proyecto

## Preparación de los datos

Examina y limpia los datos:

In [None]:
from pyspark.sql import functions as F

In [None]:
!wget -O police_activity.csv https://raw.githubusercontent.com/edroga/Datasets_for_projects/main/police_activity.csv

In [None]:
# Carga los datos
ri = spark.read.csv('police_activity.csv',
                    sep=',', 
                    header=True, 
                    inferSchema=True)

In [None]:
# Examina la estructura del DataFrame
ri.show(2)

In [None]:
# Examina el tipo de datos del DataFrame
ri.dtypes

In [None]:
# Examine las dimensiones del DataFrame
ri.count(), len(ri.columns)

In [None]:
# Usa el método describe sobre el DataFrame
ri.describe().show()

In [None]:
# Cuenta el número de missings por cada columna en el DataFrame
(ri
 .select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in ri.columns])
 .show())

In [None]:
# Elimina las columnas 'county_name', 'state'
ri = ri.drop(*['county_name', 'state'])

In [None]:
# Nuevamente examine las dimensiones del DataFrame
ri.count(), len(ri.columns)

In [None]:
# Elimina los registros que tengan missings en la columna 'driver_gender'
ri = ri.dropna(subset=['driver_gender'])

In [None]:
# Nuevament cuenta el número de missings por cada columna en el DataFrame
(ri
 .select([F.sum(F.col(c).isNull().cast('int')).alias(c) for c in ri.columns])
 .show())

In [None]:
# Nuevamente examine las dimensiones del DataFrame
ri.count(), len(ri.columns)

## Tipos de datos correctos

In [None]:
# Examina las primeras 4 filas del DataFrame
ri.show(4)

In [None]:
# Examina los data types del DataFrame
ri.dtypes

In [None]:
# transforma 'stop_date' and 'stop_time' en una nueva columna con formato datetime llamada stop_datetime
ri = (ri.withColumn('stop_datetime', F.concat_ws(' ', 'stop_date', 'stop_time'))
        .withColumn('stop_datetime', F.from_unixtime(F.unix_timestamp(F.col('stop_datetime'), 'yyyy-MM-dd HH:mm'))))

## ¿Hay evidencia de diferentes delitos cometidos por género?

Crea una tabla con el siguiente formato:


|violation| F| M| Total| Fp100_of_Total| Mp100_of_Total|
| --| --| --| --| --| --|
| Equipment| 2501| 8420| 10921| 0.23| 0.77|
| Other| 707| 3702| 4409| 0.16| 0.84|
| ...| ...| ...| ...| ...| ...|

En donde:

+ __Violation__ son las violaciones cometidas
+ __F__ cantidad de mujeres que cometen esa violación
+ __M__ cantidad de hombres que cometen esa violación
+ __Total__ suma de __F__ + __M__
+ __Fp100_of_Total__ división de __F__ / __Total__
+ __Mp100_of_Total__ división de __F__ y __Total__


In [None]:
t1 = (ri.groupBy(['violation', 'driver_gender'])
 .count()
 .groupBy("violation")
 .pivot("driver_gender")
 .sum("count")
 .withColumn('Total', F.col('F') + F.col('M'))
 .withColumn('Fp100_of_Total', F.round(F.col('F')/F.col('Total'), 2))
 .withColumn('Mp100_of_Total', F.round(F.col('M')/F.col('Total'), 2)))

t1.show(2)

Agrega dos columnas más a la tabla anterior y llámalas:

* __Fp100__ es la división de __F__ / __SUMA(columna F)__
* __Mp100__ es la división de __M__ / __SUMA(columna M)__

In [None]:
TF = t1.select((F.sum('F'))).first()[0]
TM = t1.select((F.sum('M'))).first()[0]

In [None]:
t1.withColumn('Fp100', F.round(F.col('F')/TF, 2)).withColumn('Mp100', F.round(F.col('M')/TM, 2)).show(2)

## ¿Impacta el género en quiénes reciben más multas?

## Analiza el género cuando las violaciones son por _speeding_

In [None]:
# Filtra el dataset por violation == 'speedong'
gender_and_speeding = ri.filter(F.col('violation')=='Speeding')
gender_and_speeding.show(2)

In [None]:
# A partir del DataFrame anterior, calcula en qué terminó el encuentro con la policia 'stop_outcome' por género
Outcomes_by_gender = gender_and_speeding.groupBy(['driver_gender', 'stop_outcome']).count().groupBy('stop_outcome').pivot('driver_gender').sum('count')
Outcomes_by_gender.show(2)

A partir del DataFrame anterior, calcula las siguientes variables:

* __Fp100__ = __F__ / __SUMA(columna F)__
* __Mp100__ = __F__ / __SUMA(columna M)__

In [None]:
OTF = Outcomes_by_gender.select((F.sum('F'))).first()[0]
OTM = Outcomes_by_gender.select((F.sum('M'))).first()[0]

In [None]:
Outcomes_by_gender.withColumn('Fp100', F.round(F.col('F')/OTF, 2)).withColumn('Mp100', F.round(F.col('M')/OTM, 2)).show(2)

## Calcula el search-rate

Obtén la siguiente tabla:

|driver_gender|false|true|Rate|
|--|--|--|--|
|F|23318| 456|0.019180617481282074|
|M|59911|2851| 0.04542557598546892|

donde:

* __false__ es la cantidad de veces que no se realizó una 'search_conducted'
* __true__ es la cantidad de veces que se llevó a cabo una 'search_conducted'
* __rate__ es __true__ / (__true__ + __false__)

In [None]:
(ri.groupBy(['search_conducted', 'driver_gender'])
 .count()
 .groupBy("driver_gender")
 .pivot("search_conducted")
 .sum("count")
 .withColumn('Rate', F.col('true')/(F.col('true')+ F.col('false')))
 ).show()

## Análisis de violation

Incorpora al DataFrame anterior la variable violation y comprueba si el search-rate varía por clase y género, y responde si los hombres y las mujeres tienden a cometer distintos delitos.

In [None]:
from pyspark.sql.types import IntegerType

In [None]:
(ri.withColumn('search_conducted', F.col('search_conducted').cast(IntegerType()))
 .groupBy(['violation', 'driver_gender'])
 .agg(F.avg('search_conducted').alias('mean'))
 .withColumn('mean', F.round(F.col('mean'),3 ) )
 .groupBy("violation")
 .pivot("driver_gender")
 .sum("mean")
#  .withColumn('Rate', F.col('true')/(F.col('true')+ F.col('false')))
 ).show()

## Análisis de los frisks

Analiza la frecuencia de las clases en search-rate:

In [None]:
(ri
 .groupBy(['search_type'])
 .count()
 ).show()

### Compara frisk rates por género

1. Elimina los nulos de la variable search_type
2. Detecta dónde hay registros relacionados con 'frisk'
3. Agrupa por género
4. Obtén el promedio por género de los 'frisk'

In [None]:
(ri
 .dropna(subset = ['search_type'])
 .withColumn('frisk', F.regexp_extract(F.lower(F.col('search_type') ), '(frisk)', 1))
 .withColumn('frisk', F.when(F.col('frisk')=='frisk', 1)
                       .otherwise(0))

 .groupBy(['driver_gender'])
 .agg(F.avg('frisk'))
 ).show(100, False)

### Calcula el promedio de las horas de arresto por día de la semana

In [None]:
hr_arrest = (ri
             .withColumn('is_arrested', F.when(F.col('is_arrested')==True, 1)
             .otherwise(0))
             .withColumn('weekday', F.date_format(F.col('stop_datetime'), 'E'))
             .groupBy(['weekday', F.hour('stop_datetime').alias('hr')])
             .agg(F.avg('is_arrested').alias('avg'))
             .groupBy('hr')
             .pivot('weekday')
             .sum('avg')
             .toPandas()
             .sort_values(by='hr', ascending = False)
             )

In [None]:
hr_arrest['hrstr'] = hr_arrest.hr.apply(lambda x: 'hr_' + str(x))
hr_arrest['hr_dummy'] = hr_arrest.hr.apply(lambda x: x - 7 if x - 7 >= 0 else x + 17)
hr_arrest.sort_values(by='hr_dummy', ascending=False, inplace=True)

In [None]:
# Grafica la tabla anterior como un radar chart

In [None]:
import plotly.graph_objects as go

categories = hr_arrest.hrstr

Fri = list(hr_arrest.Fri)
Sat = list(hr_arrest.Sat)
Sun = list(hr_arrest.Sun)
Mon = list(hr_arrest.Mon)
Tue = list(hr_arrest.Tue)
Wed = list(hr_arrest.Wed)
Thu = list(hr_arrest.Thu)

f = go.Figure(
    data=[
        go.Scatterpolar(r=Fri, theta=categories, name='Fri'),
        go.Scatterpolar(r=Sat, theta=categories, name='Sat'),
        go.Scatterpolar(r=Sun, theta=categories, name='Sun'),
        go.Scatterpolar(r=Mon, theta=categories, name='Mon'),
        go.Scatterpolar(r=Tue, theta=categories, name='Tue'),
        go.Scatterpolar(r=Wed, theta=categories, name='Wed'),
        go.Scatterpolar(r=Thu, theta=categories, name='Thu')
    ],
    layout=go.Layout(
        title=go.layout.Title(text='Weekday comparison'),
        polar={'radialaxis': {'visible': True}},
        showlegend=True
    )
)

f.show()