
###*Pontificia Universidad Javeriana*

**Fecha**: 3 de abril 2024

**Profesor**: John Corredor, PhD

**Materia**: Procesamiento de Alto Volumen de Datos

**Objetivo**: Primera entrega proyectyo

###**Nombres de los Estudiantes**: 

- Juan Sebastian Cordoba Valderrama

- Samuel Peña Garcia

- Tomas de Aza Marquez

In [21]:
##Importar librerias a utilizar

from pyspark import SparkContext
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import os
import spark

In [22]:
##Se instancia Pyspark
sc = SparkContext.getOrCreate()
sql_sc = SQLContext(sc)
sc



In [23]:
from pyspark.sql import SparkSession

# Iniciar una SparkSession
spark = SparkSession.builder \
    .appName("Mi Aplicación") \
    .getOrCreate()


In [24]:
path = 'https://raw.githubusercontent.com/TommyDS2005/Proyecto-Procesamiento-de-Datos/main/NYPD_Arrest_Data__Year_to_Date__20240322.csv'

df = pd.read_csv(path, sep=',')

df = spark. createDataFrame(df)

# Muestra las primeras filas del DataFrame de Spark para verificar
df.show()

+----------+-----------+-----+--------------------+-----+--------------------+----------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+----------------+-----------------+------------------------+
|ARREST_KEY|ARREST_DATE|PD_CD|             PD_DESC|KY_CD|           OFNS_DESC|  LAW_CODE|LAW_CAT_CD|ARREST_BORO|ARREST_PRECINCT|JURISDICTION_CODE|AGE_GROUP|PERP_SEX|PERP_RACE|X_COORD_CD|Y_COORD_CD|        Latitude|        Longitude|New Georeferenced Column|
+----------+-----------+-----+--------------------+-----+--------------------+----------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+----------------+-----------------+------------------------+
| 261265483| 01/03/2023|397.0|ROBBERY,OPEN AREA...|105.0|             ROBBERY|PL 1600500|         F|          B|             49|                0|    18-24|       M|    BLACK|   1027430|    251104|       40.855793|       -73.8

In [25]:
##Tipos de datos.
df.printSchema()

root
 |-- ARREST_KEY: long (nullable = true)
 |-- ARREST_DATE: string (nullable = true)
 |-- PD_CD: double (nullable = true)
 |-- PD_DESC: string (nullable = true)
 |-- KY_CD: double (nullable = true)
 |-- OFNS_DESC: string (nullable = true)
 |-- LAW_CODE: string (nullable = true)
 |-- LAW_CAT_CD: string (nullable = true)
 |-- ARREST_BORO: string (nullable = true)
 |-- ARREST_PRECINCT: long (nullable = true)
 |-- JURISDICTION_CODE: long (nullable = true)
 |-- AGE_GROUP: string (nullable = true)
 |-- PERP_SEX: string (nullable = true)
 |-- PERP_RACE: string (nullable = true)
 |-- X_COORD_CD: long (nullable = true)
 |-- Y_COORD_CD: long (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- New Georeferenced Column: string (nullable = true)




###Significado de cada atributo

ARREST_KEY: Identificador único y persistente generado aleatoriamente para cada arresto. Tipo de dato texto plano.

ARREST_DATE: Fecha exacta del arresto del evento reportado. Tipo de dato fecha y hora.

PD_CD: Código de clasificación interna de tres dígitos (más detallado que el Código Clave). Tipo de dato número.

PD_DESC: Descripción de la clasificación interna que corresponde con el código PD (más detallada que la Descripción de la Ofensa). Tipo de dato texto plano.

KY_CD: Código de clasificación interna de tres dígitos (categoría más general que el código PD). Tipo de dato número.

OFNS_DESC: Descripción de la clasificación interna que corresponde con el código KY (categoría más general que la descripción PD). Tipo de dato texto plano.

LAW_CODE: Códigos de la ley correspondientes al Código Penal de Nueva York, VTL y otras leyes locales diversas. Tipo de dato texto plano.

LAW_CAT_CD: Nivel del delito: delito mayor (felony), delito menor (misdemeanor), infracción (violation). Tipo de dato texto plano.

ARREST_BORO: Barrio del arresto. B (Bronx), S (Staten Island), K (Brooklyn), M (Manhattan), Q (Queens). Tipo de dato texto plano.

ARREST_PRECINCT: Precinto donde ocurrió el arresto. Tipo de dato número.

JURISDICTION_CODE: Código de jurisdicción responsable del arresto. Los códigos de jurisdicción 0 (Patrulla), 1 (Tránsito) y 2 (Vivienda) representan al NYPD, mientras que los códigos 3 en adelante representan jurisdicciones ajenas al NYPD. Tipo de dato número.

AGE_GROUP: Edad del perpetrador dentro de una categoría establecida. Tipo de dato texto plano.

PERP_SEX: Descripción del sexo del perpetrador. Tipo de dato texto plano.

PERP_RACE: Descripción de la raza del perpetrador. Tipo de dato texto plano.

X_COORD_CD: Coordenada X del punto medio de la cuadra para el Sistema de Coordenadas Planas del Estado de Nueva York, Zona de Long Island, NAD 83, unidades en pies (FIPS 3104). Tipo de dato número.

Y_COORD_CD: Coordenada Y del punto medio de la cuadra para el Sistema de Coordenadas Planas del Estado de Nueva York, Zona de Long Island, NAD 83, unidades en pies (FIPS 3104). Tipo de dato número.

Latitude: Coordenada de latitud para el Sistema de Coordenadas Global, WGS 1984, grados decimales (EPSG 4326). Tipo de dato número.

Longitude: Coordenada de longitud para el Sistema de Coordenadas Global, WGS 1984, grados decimales (EPSG 4326). Tipo de dato número.


###Descripcion del dataset

Este conjunto de datos contiene registros detallados de arrestos efectuados por el Departamento de Policía de Nueva York (NYPD) durante el año en curso, incluyendo información demográfica del detenido, el tipo de delito, la ubicación exacta y la fecha del arresto, así como el código y descripción de la ley asociada con el arresto.


In [26]:
####Exploracionde datos

##Identificar a forma del dataset
print("El conjunto de datos cuenta con " + str(df.count()) + " filas y " + str(len(df.columns)) + " columnas")

El conjunto de datos cuenta con 226872 filas y 19 columnas


In [27]:
###Verificar si existen valores nulos en el dataset
df.select([count(when(isnan(c) | col(c).isNull() , c)).alias(c) for c in df.columns]).show()

+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+--------+---------+------------------------+
|ARREST_KEY|ARREST_DATE|PD_CD|PD_DESC|KY_CD|OFNS_DESC|LAW_CODE|LAW_CAT_CD|ARREST_BORO|ARREST_PRECINCT|JURISDICTION_CODE|AGE_GROUP|PERP_SEX|PERP_RACE|X_COORD_CD|Y_COORD_CD|Latitude|Longitude|New Georeferenced Column|
+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+--------+---------+------------------------+
|         0|          0|    2|      0|   17|        0|       0|      1599|          0|              0|                0|        0|       0|        0|         0|         0|       0|        0|                       0|
+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+------

In [28]:
#Se busca un patron dentro de las variables directamente relacionadas en el dataset para rellenar valores nulos de la variable "LAW_CAT_CD"

df.filter(isnan("LAW_CAT_CD") | col("LAW_CAT_CD").isNull()).select("PD_CD","PD_DESC","LAW_CODE","LAW_CAT_CD").show()


+-----+--------------------+----------+----------+
|PD_CD|             PD_DESC|  LAW_CODE|LAW_CAT_CD|
+-----+--------------------+----------+----------+
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 15.0|FUGITIVE/OTHER JU...|FOA9000015|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 29.0|NYS PAROLE VIOLATION|FOA9000029|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 16.0|FUGITIVE/OTHER ST...|FOA9000016|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA9000049|       NaN|
| 49.0|U.S. CODE UNCLASS...|FOA

In [29]:
#Nos remitimos a la dcoumentacion y observamos que para este tipo de ley, son leyes para autoridades fuera de la policia de nueva york, por lo tanto se le asignara un valor (NC: not classified) consiguiente,ya que no se conoce con certeza el nivel de gravedad del delito

df = df.withColumn("LAW_CAT_CD", when((isnan("LAW_CAT_CD") | col("LAW_CAT_CD").isNull()),"NC").otherwise(df["LAW_CAT_CD"]))


In [30]:
##Eliminamos valores faltantes nulos irrelevantes
df = df.na.drop(subset = ["PD_CD" , "KY_CD"])


###Verificar si existen valores nulos en el dataset
df.select([count(when(isnan(c) | col(c).isNull() , c)).alias(c) for c in df.columns]).show()


+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+--------+---------+------------------------+
|ARREST_KEY|ARREST_DATE|PD_CD|PD_DESC|KY_CD|OFNS_DESC|LAW_CODE|LAW_CAT_CD|ARREST_BORO|ARREST_PRECINCT|JURISDICTION_CODE|AGE_GROUP|PERP_SEX|PERP_RACE|X_COORD_CD|Y_COORD_CD|Latitude|Longitude|New Georeferenced Column|
+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+--------+---------+----------+----------+--------+---------+------------------------+
|         0|          0|    0|      0|    0|        0|       0|         0|          0|              0|                0|        0|       0|        0|         0|         0|       0|        0|                       0|
+----------+-----------+-----+-------+-----+---------+--------+----------+-----------+---------------+-----------------+---------+------

In [31]:
#Se hace un resumen estadisticos de la informacion
display(df.describe())

DataFrame[summary: string, ARREST_KEY: string, ARREST_DATE: string, PD_CD: string, PD_DESC: string, KY_CD: string, OFNS_DESC: string, LAW_CODE: string, LAW_CAT_CD: string, ARREST_BORO: string, ARREST_PRECINCT: string, JURISDICTION_CODE: string, AGE_GROUP: string, PERP_SEX: string, PERP_RACE: string, X_COORD_CD: string, Y_COORD_CD: string, Latitude: string, Longitude: string, New Georeferenced Column: string]

In [32]:
#Parece exisitir un error de digitacion en la variable "LAW_CAT_CD" se debe buscar
filtered_df = df.filter(df["LAW_CAT_CD"]=="9").select("PD_CD","PD_DESC","LAW_CODE","LAW_CAT_CD")
filtered_df.show()

+-----+--------------------+----------+----------+
|PD_CD|             PD_DESC|  LAW_CODE|LAW_CAT_CD|
+-----+--------------------+----------+----------+
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL5700600|         9|
|849.0|NY STATE LAWS,UNC...|CPL

In [33]:
#Se mira la cantidad de registros con el error

filtered_df.count()

611

In [34]:
#Se decide asignar el valor asociado a la descripcion "UV:uncalssified violnece" Ya que no esta dentro de la misma serie de leyes que NC: not classified, y no se puede realizar una asociacion directa de una con otra 

df = df.withColumn("LAW_CAT_CD", when(df["LAW_CAT_CD"] == "9", "UV").otherwise(df["LAW_CAT_CD"]))

In [35]:
#Ya que los datos son del año 2023 se extraera el mes para manejar los arrestos en diferentes periodos del año

df = df.withColumn("Mes", regexp_extract(col('ARREST_DATE'), "^(0[1-9]|1[0-2])\/", 1))

df.select("Mes").distinct().show()

+---+
|Mes|
+---+
| 07|
| 11|
| 01|
| 09|
| 05|
| 08|
| 03|
| 02|
| 06|
| 10|
| 12|
| 04|
+---+



In [36]:
##Se evidencian la necesidad de realizar imputacion a variables categoricas tales como: "LAW_CAT_CD, ARREST_BORO, ARREST_PRECINT, AGE_GRUOP, PERP_SEX y PERP_RACE"
#Tambien paran simplicidad se separara la fecha para tomr solo el mes en una nueva variable llamda "MONTH"

from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


columnas = ["LAW_CAT_CD", "ARREST_BORO", "ARREST_PRECINCT", "AGE_GROUP", "PERP_SEX", "PERP_RACE"]
for columna in columnas:
    indexer = StringIndexer(inputCol=columna, outputCol=columna + "Ind")
    model = indexer.fit(df)
    df = model.transform(df)
    print(f"Mapping de {columna} a índices: {model.labels}")


Mapping de LAW_CAT_CD a índices: ['M', 'F', 'NC', 'V', 'UV', 'I']
Mapping de ARREST_BORO a índices: ['K', 'B', 'M', 'Q', 'S']
Mapping de ARREST_PRECINCT a índices: ['14', '75', '44', '40', '103', '46', '52', '43', '73', '120', '113', '110', '47', '25', '109', '42', '48', '105', '67', '18', '114', '115', '41', '60', '79', '84', '102', '70', '5', '72', '45', '77', '106', '13', '34', '83', '49', '121', '90', '71', '32', '23', '1', '63', '107', '28', '62', '6', '68', '108', '33', '7', '9', '104', '61', '81', '19', '10', '24', '69', '101', '78', '30', '122', '112', '50', '88', '66', '26', '94', '76', '20', '100', '123', '17', '111', '22']
Mapping de AGE_GROUP a índices: ['25-44', '45-64', '18-24', '<18', '65+']
Mapping de PERP_SEX a índices: ['M', 'F', 'U']
Mapping de PERP_RACE a índices: ['BLACK', 'WHITE HISPANIC', 'BLACK HISPANIC', 'WHITE', 'ASIAN / PACIFIC ISLANDER', 'UNKNOWN', 'AMERICAN INDIAN/ALASKAN NATIVE']
