#Entrega Proyecto Final Procesamiento de Datos a Gran Escala
#Pontificia Universidad Javeriana
#John Corredor
#Juan Felipe Briñez y Alejandro Barragán
#11/Noviembre/2023





**Objetivo:** Aplicar técnicas de ML a conjuntos de datos abiertos para comprender problemas y responder preguntas de negocio

In [None]:
from pyspark.sql.functions import count

In [None]:
df_education = spark.read.csv("/FileStore/tables/2016___2017_Health_Education_Report_20231106.csv")
df_poverty = spark.read.csv("/FileStore/tables/NYCgov_Poverty_Measure_Data__2018__20231106.csv")
df_arrests = spark.read.csv("/FileStore/tables/NYPD_Arrest_Data__Year_to_Date__20231106.csv")
df_accidents = spark.read.csv("/FileStore/tables/Motor_Vehicle_Collisions___Vehicles_20231106.csv")

# DATASET HEALTH EDUCATION

In [None]:
df_education.printSchema()

In [None]:
df_education.display()

##TRANSFORMACIONES Y FILTROS HEALTH EDUCATION

###1. Renombrar las columnas con el nombre que corresponden:
- Se hace con el fin de renombrar las columnas para que tengan nombres con sentido y no _c0, _c1, ...

##### Debido a que algunos nombres de los campos son muy largos o usan caracteres, se escogera un nombre representativo para cada uno de ellos:
- '# of students in grades 9-12' correspondera a: Students_grades_9_12
- '# of students in grades 9-12 scheduled for at least one semester of health instruction' correspondera a: Students_grades_9_12_health
- '%' correspondera a: Percentage_at_least_one_semester
- '# of 16-17 June and August graduates' correspondera a: Graduates_16_17_june_august
- '# of 16-17 June and August graduates meeting high school health requirements' correspondera a: Graduates_16_17_june_august_meeting_requirements
- '% 1' correspondera a: Percentage_students_meeting_requirements

In [None]:
df_education = df_education.withColumnRenamed('_c0', 'School_DBN')
df_education = df_education.withColumnRenamed('_c1', 'Community_School_District')
df_education = df_education.withColumnRenamed('_c2', 'City_Council_District')
df_education = df_education.withColumnRenamed('_c3', 'School_Name')
df_education = df_education.withColumnRenamed('_c4', 'Students_grades_9_12')
df_education = df_education.withColumnRenamed('_c5', 'Students_grades_9_12_health')
df_education = df_education.withColumnRenamed('_c6', 'Percentage_at_least_one_semester')
df_education = df_education.withColumnRenamed('_c7', 'Graduates_16_17_june_august')
df_education = df_education.withColumnRenamed('_c8', 'Graduates_16_17_june_august_meeting_requirements')
df_education = df_education.withColumnRenamed('_c9', 'Percentage_students_meeting_requirements')


###1.1. Eliminar el registro que contiene los nombres de los campos
- Este registro realmente no tiene información valiosa, pues los nombres de los campos solo aportan eso, el nombre de la columna.

In [None]:
df_education = df_education.filter(df_education.School_DBN != "School DBN")
display(df_education)

### 2. Castear las columnas con el tipo de dato que corresponde
- Todos los campos de este dataframe son strings y se necesita tener concordancia con el tipo de dato
  - Como algunos datos tienen valores que no corresponden es necesario hacer antes una limpieza de los mismos


###2.1. Filtrar los datos que tienen una s en donde corresponde un valor numerico
- El 22% de los datos tienen un valor de 's', por lo que no aportan información valiosa y al no considerarse representativo, se decide eliminar los mismos

In [None]:
df_education= df_education.filter(df_education.Students_grades_9_12 != 's')
df_education= df_education.filter(df_education.Students_grades_9_12_health != 's')
df_education= df_education.filter(df_education.Percentage_at_least_one_semester != 's')
df_education= df_education.filter(df_education.Graduates_16_17_june_august != 's')
df_education= df_education.filter(df_education.Graduates_16_17_june_august_meeting_requirements != 's')
df_education= df_education.filter(df_education.Percentage_students_meeting_requirements != 's')
display(df_education)

In [None]:
from pyspark.sql.types import IntegerType,FloatType

df_education = df_education.withColumn("Community_School_District", df_education.Community_School_District.cast(IntegerType()))
df_education = df_education.withColumn("City_Council_District", df_education.City_Council_District.cast(IntegerType()))
df_education = df_education.withColumn("Students_grades_9_12", df_education.Students_grades_9_12.cast(IntegerType()))
df_education = df_education.withColumn("Students_grades_9_12_health", df_education.Students_grades_9_12.cast(IntegerType()))
df_education = df_education.withColumn("Percentage_at_least_one_semester", df_education.Students_grades_9_12.cast(FloatType()))
df_education = df_education.withColumn("Graduates_16_17_june_august", df_education.Students_grades_9_12.cast(IntegerType()))
df_education = df_education.withColumn("Graduates_16_17_june_august_meeting_requirements", df_education.Students_grades_9_12.cast(IntegerType()))
df_education = df_education.withColumn("Percentage_students_meeting_requirements", df_education.Students_grades_9_12.cast(FloatType()))
display(df_education)

In [None]:
df_education.printSchema()


### 3. Eliminar valores nulos

In [None]:
df_education = df_education.na.drop()
df_education.display()


#DATASET POVERTY

In [None]:
df_poverty.printSchema()

In [None]:
df_poverty.display()

##TRANSFORMACIONES Y FILTROS POVERTY



###1. Renombrar las columnas con el nombre que corresponden:
- Se hace con el fin de renombrar las columnas para que tengan nombres con sentido y no _c0, _c1, ...

In [None]:
df_poverty = df_poverty.withColumnRenamed('_c0', 'SERIALNO')
df_poverty = df_poverty.withColumnRenamed('_c1', 'SPORDER')
df_poverty = df_poverty.withColumnRenamed('_c2', 'PWGTP')
df_poverty = df_poverty.withColumnRenamed('_c3', 'WGTP')
df_poverty = df_poverty.withColumnRenamed('_c4', 'AGEP')
df_poverty = df_poverty.withColumnRenamed('_c5', 'CIT')
df_poverty = df_poverty.withColumnRenamed('_c6', 'REL')
df_poverty = df_poverty.withColumnRenamed('_c7', 'SCH')
df_poverty = df_poverty.withColumnRenamed('_c8', 'SCHG')
df_poverty = df_poverty.withColumnRenamed('_c9', 'SCHL')
df_poverty = df_poverty.withColumnRenamed('_c10', 'SEX')
df_poverty = df_poverty.withColumnRenamed('_c11', 'ESR')
df_poverty = df_poverty.withColumnRenamed('_c12', 'LANX')
df_poverty = df_poverty.withColumnRenamed('_c13', 'ENG')
df_poverty = df_poverty.withColumnRenamed('_c14', 'MSP')
df_poverty = df_poverty.withColumnRenamed('_c15', 'MAR')
df_poverty = df_poverty.withColumnRenamed('_c16', 'WKW')
df_poverty = df_poverty.withColumnRenamed('_c17', 'WKHP')
df_poverty = df_poverty.withColumnRenamed('_c18', 'DIS')
df_poverty = df_poverty.withColumnRenamed('_c19', 'JWTR')
df_poverty = df_poverty.withColumnRenamed('_c20', 'NP')
df_poverty = df_poverty.withColumnRenamed('_c21', 'TEN')
df_poverty = df_poverty.withColumnRenamed('_c22', 'HHT')
df_poverty = df_poverty.withColumnRenamed('_c23', 'AgeCateg')
df_poverty = df_poverty.withColumnRenamed('_c24', 'Boro')
df_poverty = df_poverty.withColumnRenamed('_c25', 'CitizenStatus')
df_poverty = df_poverty.withColumnRenamed('_c26', 'EducAttain')
df_poverty = df_poverty.withColumnRenamed('_c27', 'EST_Childcare')
df_poverty = df_poverty.withColumnRenamed('_c28', 'EST_Commuting')
df_poverty = df_poverty.withColumnRenamed('_c29', 'EST_EITC')
df_poverty = df_poverty.withColumnRenamed('_c30', 'EST_FICAtax')
df_poverty = df_poverty.withColumnRenamed('_c31', 'EST_HEAP')
df_poverty = df_poverty.withColumnRenamed('_c32', 'EST_Housing')
df_poverty = df_poverty.withColumnRenamed('_c33', 'EST_IncomeTax')
df_poverty = df_poverty.withColumnRenamed('_c34', 'EST_MOOP')
df_poverty = df_poverty.withColumnRenamed('_c35', 'EST_Nutrition')
df_poverty = df_poverty.withColumnRenamed('_c36', 'EST_PovGap')
df_poverty = df_poverty.withColumnRenamed('_c37', 'EST_PovGapIndex')
df_poverty = df_poverty.withColumnRenamed('_c38', 'Ethnicity')
df_poverty = df_poverty.withColumnRenamed('_c39', 'FamType_PU')
df_poverty = df_poverty.withColumnRenamed('_c40', 'FTPTWork')
df_poverty = df_poverty.withColumnRenamed('_c41', 'INTP_adj')
df_poverty = df_poverty.withColumnRenamed('_c42', 'MRGP_adj')
df_poverty = df_poverty.withColumnRenamed('_c43', 'NYCgov_Income')
df_poverty = df_poverty.withColumnRenamed('_c44', 'NYCgov_Pov_Stat')
df_poverty = df_poverty.withColumnRenamed('_c45', 'NYCgov_REL')
df_poverty = df_poverty.withColumnRenamed('_c46', 'NYCgov_Threshold')
df_poverty = df_poverty.withColumnRenamed('_c47', 'Off_Pov_Stat')
df_poverty = df_poverty.withColumnRenamed('_c48', 'Off_Threshold')
df_poverty = df_poverty.withColumnRenamed('_c49', 'OI_adj')
df_poverty = df_poverty.withColumnRenamed('_c50', 'PA_adj')
df_poverty = df_poverty.withColumnRenamed('_c51', 'Povunit_ID')
df_poverty = df_poverty.withColumnRenamed('_c52', 'Povunit_Rel')
df_poverty = df_poverty.withColumnRenamed('_c53', 'PreTaxIncome_PU')
df_poverty = df_poverty.withColumnRenamed('_c54', 'RETP_adj')
df_poverty = df_poverty.withColumnRenamed('_c55', 'RNTP_adj')
df_poverty = df_poverty.withColumnRenamed('_c56', 'SEMP_adj')
df_poverty = df_poverty.withColumnRenamed('_c57', 'SSIP_adj')
df_poverty = df_poverty.withColumnRenamed('_c58', 'SSP_adj')
df_poverty = df_poverty.withColumnRenamed('_c59', 'TotalWorkHrs_PU')
df_poverty = df_poverty.withColumnRenamed('_c60', 'WAGP_adj')


###1.1. Eliminar el registro que contiene los nombres de los campos
- Este registro realmente no tiene información valiosa, pues los nombres de los campos solo aportan eso, el nombre de la columna.

In [None]:
df_poverty = df_poverty.filter(df_poverty.SERIALNO != "SERIALNO")
display(df_poverty)


### 2. Castear las columnas con el tipo de dato que corresponde
- Todos los campos de este dataframe son strings y se necesita tener concordancia con el tipo de dato
  - Como algunos datos tienen valores que no corresponden es necesario hacer antes una limpieza de los mismos


###2.1. Eliminar los datos que tienen un valor nulo
- El 0.97% de los datos tienen un valor nulo, por lo que no aportan información valiosa y al no considerarse representativo, se decide eliminar los mismos

In [None]:
df_poverty = df_poverty.na.drop()
df_poverty.display()

In [None]:
df_poverty = df_poverty.withColumn("SERIALNO", df_poverty.SERIALNO.cast(IntegerType()))
df_poverty = df_poverty.withColumn("SPORDER", df_poverty.SPORDER.cast(IntegerType()))
df_poverty = df_poverty.withColumn("PWGTP", df_poverty.PWGTP.cast(IntegerType()))
df_poverty = df_poverty.withColumn("WGTP", df_poverty.WGTP.cast(IntegerType()))
df_poverty = df_poverty.withColumn("AGEP", df_poverty.AGEP.cast(IntegerType()))
df_poverty = df_poverty.withColumn("CIT", df_poverty.CIT.cast(IntegerType()))
df_poverty = df_poverty.withColumn("REL", df_poverty.REL.cast(IntegerType()))
df_poverty = df_poverty.withColumn("SCH", df_poverty.SCH.cast(IntegerType()))
df_poverty = df_poverty.withColumn("SCHG", df_poverty.SCHG.cast(IntegerType()))
df_poverty = df_poverty.withColumn("SCHL", df_poverty.SCHL.cast(IntegerType()))
df_poverty = df_poverty.withColumn("SEX", df_poverty.SEX.cast(IntegerType()))
df_poverty = df_poverty.withColumn("ESR", df_poverty.ESR.cast(IntegerType()))
df_poverty = df_poverty.withColumn("LANX", df_poverty.LANX.cast(IntegerType()))
df_poverty = df_poverty.withColumn("ENG", df_poverty.ENG.cast(IntegerType()))
df_poverty = df_poverty.withColumn("MSP", df_poverty.MSP.cast(IntegerType()))
df_poverty = df_poverty.withColumn("MAR", df_poverty.MAR.cast(IntegerType()))
df_poverty = df_poverty.withColumn("WKW", df_poverty.WKW.cast(IntegerType()))
df_poverty = df_poverty.withColumn("WKHP", df_poverty.WKHP.cast(IntegerType()))
df_poverty = df_poverty.withColumn("DIS", df_poverty.DIS.cast(IntegerType()))
df_poverty = df_poverty.withColumn("JWTR", df_poverty.JWTR.cast(IntegerType()))
df_poverty = df_poverty.withColumn("NP", df_poverty.NP.cast(IntegerType()))
df_poverty = df_poverty.withColumn("TEN", df_poverty.TEN.cast(IntegerType()))
df_poverty = df_poverty.withColumn("HHT", df_poverty.HHT.cast(IntegerType()))
df_poverty = df_poverty.withColumn("AgeCateg", df_poverty.AgeCateg.cast(IntegerType()))
df_poverty = df_poverty.withColumn("Boro", df_poverty.Boro.cast(IntegerType()))
df_poverty = df_poverty.withColumn("CitizenStatus", df_poverty.CitizenStatus.cast(IntegerType()))
df_poverty = df_poverty.withColumn("EducAttain", df_poverty.EducAttain.cast(IntegerType()))
df_poverty = df_poverty.withColumn("EST_Childcare", df_poverty.EST_Childcare.cast(IntegerType()))
df_poverty = df_poverty.withColumn("EST_Commuting", df_poverty.EST_Commuting.cast(FloatType()))
df_poverty = df_poverty.withColumn("EST_EITC", df_poverty.EST_EITC.cast(FloatType()))
df_poverty = df_poverty.withColumn("EST_FICAtax", df_poverty.EST_FICAtax.cast(FloatType()))
df_poverty = df_poverty.withColumn("EST_HEAP", df_poverty.EST_HEAP.cast(IntegerType()))
df_poverty = df_poverty.withColumn("EST_Housing", df_poverty.EST_Housing.cast(FloatType()))
df_poverty = df_poverty.withColumn("EST_IncomeTax", df_poverty.EST_IncomeTax.cast(FloatType()))
df_poverty = df_poverty.withColumn("EST_MOOP", df_poverty.EST_MOOP.cast(FloatType()))
df_poverty = df_poverty.withColumn("EST_Nutrition", df_poverty.EST_Nutrition.cast(FloatType()))
df_poverty = df_poverty.withColumn("EST_PovGap", df_poverty.EST_PovGap.cast(FloatType()))
df_poverty = df_poverty.withColumn("EST_PovGapIndex", df_poverty.EST_PovGapIndex.cast(FloatType()))
df_poverty = df_poverty.withColumn("Ethnicity", df_poverty.Ethnicity.cast(IntegerType()))
df_poverty = df_poverty.withColumn("FamType_PU", df_poverty.FamType_PU.cast(IntegerType()))
df_poverty = df_poverty.withColumn("FTPTWork", df_poverty.FTPTWork.cast(IntegerType()))
df_poverty = df_poverty.withColumn("INTP_adj", df_poverty.INTP_adj.cast(FloatType()))
df_poverty = df_poverty.withColumn("MRGP_adj", df_poverty.MRGP_adj.cast(FloatType()))
df_poverty = df_poverty.withColumn("NYCgov_Income", df_poverty.NYCgov_Income.cast(FloatType()))
df_poverty = df_poverty.withColumn("NYCgov_Pov_Stat", df_poverty.NYCgov_Pov_Stat.cast(IntegerType()))
df_poverty = df_poverty.withColumn("NYCgov_REL", df_poverty.NYCgov_REL.cast(IntegerType()))
df_poverty = df_poverty.withColumn("NYCgov_Threshold", df_poverty.NYCgov_Threshold.cast(FloatType()))
df_poverty = df_poverty.withColumn("Off_Pov_Stat", df_poverty.Off_Pov_Stat.cast(IntegerType()))
df_poverty = df_poverty.withColumn("Off_Threshold", df_poverty.Off_Threshold.cast(IntegerType()))
df_poverty = df_poverty.withColumn("OI_adj", df_poverty.OI_adj.cast(FloatType()))
df_poverty = df_poverty.withColumn("PA_adj", df_poverty.PA_adj.cast(FloatType()))
df_poverty = df_poverty.withColumn("Povunit_ID", df_poverty.Povunit_ID.cast(IntegerType()))
df_poverty = df_poverty.withColumn("Povunit_Rel", df_poverty.Povunit_Rel.cast(IntegerType()))
df_poverty = df_poverty.withColumn("PreTaxIncome_PU", df_poverty.PreTaxIncome_PU.cast(FloatType()))
df_poverty = df_poverty.withColumn("RETP_adj", df_poverty.RETP_adj.cast(FloatType()))
df_poverty = df_poverty.withColumn("RNTP_adj", df_poverty.RNTP_adj.cast(FloatType()))
df_poverty = df_poverty.withColumn("SEMP_adj", df_poverty.SEMP_adj.cast(FloatType()))
df_poverty = df_poverty.withColumn("SSIP_adj", df_poverty.SSIP_adj.cast(FloatType()))
df_poverty = df_poverty.withColumn("SSP_adj", df_poverty.SSP_adj.cast(FloatType()))
df_poverty = df_poverty.withColumn("TotalWorkHrs_PU", df_poverty.TotalWorkHrs_PU.cast(IntegerType()))
df_poverty = df_poverty.withColumn("WAGP_adj", df_poverty.WAGP_adj.cast(FloatType()))
df_poverty.display()

In [None]:
df_poverty.printSchema()


# DATASET ARRESTS


In [None]:
df_arrests.printSchema()

In [None]:
df_arrests.display()


###1. Renombrar las columnas con el nombre que corresponden:
- Se hace con el fin de renombrar las columnas para que tengan nombres con sentido y no _c0, _c1, ...

In [None]:
df_arrests = df_arrests.withColumnRenamed('_c0', 'ARREST_KEY',)
df_arrests = df_arrests.withColumnRenamed('_c1', 'ARREST_DATE')
df_arrests = df_arrests.withColumnRenamed('_c2', 'PD_CD')
df_arrests = df_arrests.withColumnRenamed('_c3', 'PD_DESC')
df_arrests = df_arrests.withColumnRenamed('_c4', 'KY_CD')
df_arrests = df_arrests.withColumnRenamed('_c5', 'OFNS_DESC')
df_arrests = df_arrests.withColumnRenamed('_c6', 'LAW_CODE')
df_arrests = df_arrests.withColumnRenamed('_c7', 'LAW_CAT_CD')
df_arrests = df_arrests.withColumnRenamed('_c8', 'ARREST_BORO')
df_arrests = df_arrests.withColumnRenamed('_c9', 'ARREST_PRECINCT')
df_arrests = df_arrests.withColumnRenamed('_c10', 'JURISDICTION_CODE')
df_arrests = df_arrests.withColumnRenamed('_c11', 'AGE_GROUP')
df_arrests = df_arrests.withColumnRenamed('_c12', 'PERP_SEX')
df_arrests = df_arrests.withColumnRenamed('_c13', 'PERP_RACE')
df_arrests = df_arrests.withColumnRenamed('_c14', 'X_COORD_CD')
df_arrests = df_arrests.withColumnRenamed('_c15', 'Y_COORD_CD')
df_arrests = df_arrests.withColumnRenamed('_c16', 'Latitude')
df_arrests = df_arrests.withColumnRenamed('_c17', 'Longitude')
df_arrests = df_arrests.withColumnRenamed('_c18', 'New_Georeferenced_Column')


###1.1. Eliminar el registro que contiene los nombres de los campos
- Este registro realmente no tiene información valiosa, pues los nombres de los campos solo aportan eso, el nombre de la columna.

In [None]:
df_arrests = df_arrests.filter(df_arrests.ARREST_KEY != 'ARREST_KEY')
df_arrests.display()


### 2. Castear las columnas con el tipo de dato que corresponde
- Todos los campos de este dataframe son strings y se necesita tener concordancia con el tipo de dato
  - Como algunos datos tienen valores que no corresponden es necesario hacer antes una limpieza de los mismos


###2.1. Eliminar los datos que tienen un valor nulo
- El 0.075% de los datos tienen un valor nulo, por lo que no aportan información valiosa y al no considerarse representativo, se decide eliminar los mismos

In [None]:
df_arrests = df_arrests.na.drop()
df_arrests.display()


###2.2. Transformar el formato de fecha del string para poder hacer el casteo a tipo de dato date posteriormente
- El formato de fecha esta en MM/dd/yyyy y necesita estar en yyyy-MM--dd


In [None]:
from pyspark.sql.functions import to_date

df_arrests = df_arrests.withColumn("ARREST_DATE", to_date(df_arrests["ARREST_DATE"], 'MM/dd/yyyy'))
df_arrests.display()

In [None]:
from pyspark.sql.types import DateType, CharType

df_arrests = df_arrests.withColumn("ARREST_KEY", df_arrests.ARREST_KEY.cast(IntegerType()))
df_arrests = df_arrests.withColumn("PD_CD", df_arrests.PD_CD.cast(IntegerType()))
df_arrests = df_arrests.withColumn("KY_CD", df_arrests.KY_CD.cast(IntegerType()))
df_arrests = df_arrests.withColumn("ARREST_PRECINCT", df_arrests.ARREST_PRECINCT.cast(IntegerType()))
df_arrests = df_arrests.withColumn("JURISDICTION_CODE", df_arrests.JURISDICTION_CODE.cast(IntegerType()))
df_arrests = df_arrests.withColumn("X_COORD_CD", df_arrests.X_COORD_CD.cast(IntegerType()))
df_arrests = df_arrests.withColumn("Y_COORD_CD", df_arrests.Y_COORD_CD.cast(IntegerType()))
df_arrests = df_arrests.withColumn("Latitude", df_arrests.Latitude.cast(FloatType()))
df_arrests = df_arrests.withColumn("Longitude", df_arrests.Longitude.cast(FloatType()))
df_arrests.display()

In [None]:
df_arrests.printSchema()


#DATASET ACCIDENTS

In [None]:
df_accidents.printSchema()

In [None]:
df_accidents.display()


###1. Renombrar las columnas con el nombre que corresponden:
- Se hace con el fin de renombrar las columnas para que tengan nombres con sentido y no _c0, _c1, ...

In [None]:
df_accidents = df_accidents.withColumnRenamed("_c0", "UNIQUE_ID")
df_accidents = df_accidents.withColumnRenamed("_c1", "COLLISION_ID")
df_accidents = df_accidents.withColumnRenamed("_c2", "CRASH_DATE")
df_accidents = df_accidents.withColumnRenamed("_c3", "CRASH_TIME")
df_accidents = df_accidents.withColumnRenamed("_c4", "VEHICLE_ID")
df_accidents = df_accidents.withColumnRenamed("_c5", "STATE_REGISTRATION")
df_accidents = df_accidents.withColumnRenamed("_c6", "VEHICLE_TYPE")
df_accidents = df_accidents.withColumnRenamed("_c7", "VEHICLE_MAKE")
df_accidents = df_accidents.withColumnRenamed("_c8", "VEHICLE_MODEL")
df_accidents = df_accidents.withColumnRenamed("_c9", "VEHICLE_YEAR")
df_accidents = df_accidents.withColumnRenamed("_c10", "TRAVEL_DIRECTION")
df_accidents = df_accidents.withColumnRenamed("_c11", "VEHICLE_OCCUPANTS")
df_accidents = df_accidents.withColumnRenamed("_c12", "DRIVER_SEX")
df_accidents = df_accidents.withColumnRenamed("_c13", "DRIVER_LICENSE_STATUS")
df_accidents = df_accidents.withColumnRenamed("_c14", "DRIVER_LICENSE_JURISDICTION")
df_accidents = df_accidents.withColumnRenamed("_c15", "PRE_CRASH")
df_accidents = df_accidents.withColumnRenamed("_c16", "POINT_OF_IMPACT")
df_accidents = df_accidents.withColumnRenamed("_c17", "VEHICLE_DAMAGE")
df_accidents = df_accidents.withColumnRenamed("_c18", "VEHICLE_DAMAGE_1")
df_accidents = df_accidents.withColumnRenamed("_c19", "VEHICLE_DAMAGE_2")
df_accidents = df_accidents.withColumnRenamed("_c20", "VEHICLE_DAMAGE_3")
df_accidents = df_accidents.withColumnRenamed("_c21", "PUBLIC_PROPERTY_DAMAGE")
df_accidents = df_accidents.withColumnRenamed("_c22", "PUBLIC_PROPERTY_DAMAGE_TYPE")
df_accidents = df_accidents.withColumnRenamed("_c23", "CONTRIBUTING_FACTOR_1")
df_accidents = df_accidents.withColumnRenamed("_c24", "CONTRIBUTING_FACTOR_2")


###1.1. Eliminar el registro que contiene los nombres de los campos
- Este registro realmente no tiene información valiosa, pues los nombres de los campos solo aportan eso, el nombre de la columna.

In [None]:
df_accidents = df_accidents.filter(df_accidents.UNIQUE_ID != "UNIQUE_ID")
df_accidents.display()


### 2. Castear las columnas con el tipo de dato que corresponde
- Todos los campos de este dataframe son strings y se necesita tener concordancia con el tipo de dato
  - Como algunos datos tienen valores que no corresponden es necesario hacer antes una limpieza de los mismos


###2.1. Eliminar los datos que tienen un valor nulo
- El 0.075% de los datos tienen un valor nulo, por lo que no aportan información valiosa y al no considerarse representativo, se decide eliminar los mismos. Cabe aclarar, que solo se eliminarán los registros que tengan valores nulos en las columnas: "VEHICLE_TYPE"

In [None]:
df_accidents = df_accidents.na.drop(subset=["VEHICLE_TYPE"])
df_accidents.display()


###2.2. Transformar el formato de fecha del string para poder hacer el casteo a tipo de dato date posteriormente
- El formato de fecha esta en MM/dd/yyyy y necesita estar en yyyy-MM--dd


In [None]:
from pyspark.sql.functions import to_timestamp, date_format, date_trunc, hour, split, col
from pyspark.sql.functions import to_date
from pyspark.sql.types import IntegerType, FloatType, DateType, CharType
df_accidents = df_accidents.withColumn("CRASH_DATE", to_date(df_accidents["CRASH_DATE"], 'MM/dd/yyyy'))
df_accidents = df_accidents.withColumn("Hora", split(col("CRASH_TIME"), ":")[0].cast(IntegerType()))



In [None]:
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import when

df_accidents = df_accidents.withColumn("UNIQUE_ID", df_accidents.UNIQUE_ID.cast(IntegerType()))
df_accidents = df_accidents.withColumn("COLLISION_ID", df_accidents.COLLISION_ID.cast(IntegerType()))
df_accidents = df_accidents.withColumn("VEHICLE_ID", df_accidents.VEHICLE_ID.cast(IntegerType()))
df_accidents = df_accidents.withColumn("VEHICLE_YEAR", df_accidents.VEHICLE_YEAR.cast(IntegerType()))
df_accidents = df_accidents.withColumn( "VEHICLE_OCCUPANTS", df_accidents.VEHICLE_OCCUPANTS.cast(IntegerType()))

df_accidents = df_accidents.withColumn("VEHICLE_TYPE", when(df_accidents["VEHICLE_TYPE"] == "SPORT UTILITY / STATION WAGON", "Station Wagon/Sport Utility Vehicle").otherwise(df_accidents["VEHICLE_TYPE"]))
df_accidents = df_accidents.where(df_accidents.VEHICLE_TYPE!="UNKNOWN")
df_accidents = df_accidents.na.drop(subset=["VEHICLE_YEAR"])
df_accidents = df_accidents.na.drop(subset=["DRIVER_LICENSE_STATUS"])


df_accidents.display()

In [None]:
df_accidents.printSchema()

In [None]:
df_accidents.display()



#Respuesta a preguntas de negocio planteadas

En esta sección, se mostrarán las consultas necesarias para dar respuesta a las preguntas establecidas en la entrega anterior

%md

###Pregunta 1: ¿Cuáles son las 5 causas más comunes de arrestos en Nueva York?

In [None]:
df_arrests.display()



###Pregunta 2: ¿Cuáles son los 5 tipos de vehículos que más accidentes viales tienen?

In [None]:
df_arrests.display()



###Pregunta 3: ¿Cómo se relaciona la criminalidad con características demográficas (edad, sexo, raza) de los perpetuadores?

In [None]:
df_arrests.display()



###Pregunta 4: ¿Cómo se relacionan las horas de los accidentes con la frecuencia en la que ocurren? Es decir, ¿hay horas en las que es más común que ocurra un accidente vial?

In [None]:
df_arrests.display()



###Pregunta 5: ¿Cómo se relaciona el año de manufactura del vehículo con la tasa de accidentalidad?

In [None]:
df_arrests.display()



###Pregunta 6: ¿Cómo se relacionan las condiciones antes del choque (dirección de viaje y la última acción antes de colisión) con la seguridad vial?

In [None]:
df_arrests.display()



###Pregunta 7: ¿Cómo se relacionan las características (sexo y licencia de conducción) de los conductores con la frecuencia de accidentes?

In [None]:
df_arrests.display()



#Preparación de los datos


In [None]:
from pyspark.sql.functions import col, lit, row_number
from pyspark.sql.window import Window

#Eliminar columnas que no tendrán valor para predecir
columns_to_drop = ["UNIQUE_ID", "COLLISION_ID", "CRASH_DATE", "CRASH_TIME", "VEHICLE_ID", "STATE_REGISTRATION", "VEHICLE_MAKE", "VEHICLE_MODEL", "DRIVER_LICENSE_JURISDICTION", "POINT_OF_IMPACT", "VEHICLE_DAMAGE_1", "VEHICLE_DAMAGE_2", "VEHICLE_DAMAGE_3", "PUBLIC_PROPERTY_DAMAGE", "PUBLIC_PROPERTY_DAMAGE_TYPE", "CONTRIBUTING_FACTOR_1", "CONTRIBUTING_FACTOR_2", "TRAVEL_DIRECTION", "PRE_CRASH"]

df_accidents = df_accidents.select([column for column in df_accidents.columns if column not in columns_to_drop])


#Eliminar registros que no tengan sentido

df_accidents = df_accidents.filter(df_accidents["VEHICLE_OCCUPANTS"] != 0)
df_accidents = df_accidents.dropna(subset=["VEHICLE_DAMAGE"])

#Categorizar VEHICLE_DAMAGE arbitrariamente

danio_grave = ["Right Side Doors", "Roof", "Overturned", "Trailer", "Left Side Doors", "Demolished", "Undercarriage"]
danio_no_grave = ["Right Front Bumper", "Left Rear Bumper", "Left Rear Quarter Panel", "Center Back End", "Left Front Quarter Panel", "Right Front Quarter Panel", "Other", "Right Rear Quarter Panel", "Left Front Bumper", "Center Front End", "Right Rear Bumper", "No Damage"]

df_accidents = df_accidents.withColumn("VEHICLE_DAMAGE", when(df_accidents["VEHICLE_DAMAGE"].isin(danio_grave), 1)
                                        .when(df_accidents["VEHICLE_DAMAGE"]
                                        .isin(danio_no_grave), 0).otherwise(None))

#Dar valor numérico a VEHICLE_TYPE según el porcentaje de accidentes. Mientras más veces aparezca registrado en un accidente, más peso tendrá para el modelo

counts = df_accidents.groupBy("VEHICLE_TYPE").count()
df_accidents = df_accidents.join(counts, on="VEHICLE_TYPE", how="left")
df_accidents = df_accidents.withColumn("VEHICLE_TYPE", df_accidents["count"].cast(IntegerType()))
df_accidents = df_accidents.drop("count")

#Dar valor numérico a DRIVER_SEX. Si es mujer es 1, si es hombre es -1. Se pretende que esto categorice y no afecte la predicción del modelo

df_accidents = df_accidents.withColumn("DRIVER_SEX", when(df_accidents["DRIVER_SEX"] == "F", int("1")).otherwise(int("-1")))

#Dar valor numérico a DRIVER_LICENSE_STATUS

df_accidents = df_accidents.withColumn("DRIVER_LICENSE_STATUS", when(df_accidents["DRIVER_LICENSE_STATUS"] == "Licensed", int("1")).otherwise(int(0)))

df_accidents.show()




#Aplicación de modelos ML

In [None]:
#Pasar datos a un dataframe de pandas
import pandas as pd
df = df_accidents.toPandas()
df.info()


##Observación
- Debido a que el dataset contiene 1473066 datos, se van a tomar 3 muestras aleatorias representativas y a partir de alli se hara un análisis general.

In [None]:
#Tomar 3 muestras aleatorias garantizando la reproducibilidad de los datos

porcentaje_muestras = 0.33
df_muestra1 = df.sample(frac = porcentaje_muestras)
df_muestra2 = df.sample(frac = porcentaje_muestras)
df_muestra3 = df.sample(frac = porcentaje_muestras)

In [None]:
df_muestra1.display()

In [None]:
df_muestra2.display()

In [None]:
df_muestra3.display()


##Separar los datos

In [None]:
from sklearn.model_selection import train_test_split

#Separar los datos para la muestra 1
X1 = df_muestra1.drop('VEHICLE_DAMAGE', axis = 1)
y1 = df_muestra1['VEHICLE_DAMAGE']

X1train, X1test, y1train, y1test = train_test_split(X1, y1, test_size = 0.2)

#Separar los datos para la muestra 2
X2 = df_muestra2.drop('VEHICLE_DAMAGE', axis = 1)
y2 = df_muestra2['VEHICLE_DAMAGE']

X2train, X2test, y2train, y2test = train_test_split(X2, y2, test_size = 0.2)

#Separar los datos para la muestra 3
X3 = df_muestra3.drop('VEHICLE_DAMAGE', axis = 1)
y3 = df_muestra3['VEHICLE_DAMAGE']

X3train, X3test, y3train, y3test = train_test_split(X3, y3, test_size = 0.2)



##Aprendizaje Supervisado Escogido: Random Forest


In [None]:
from sklearn.ensemble import RandomForestClassifier

model01RF = RandomForestClassifier(
            n_estimators = 5,
            max_depth    = None,
            max_features = 1,
            oob_score    = False,
            n_jobs       = -1,
            random_state = 123) #Parámetros por defecto
model01RF.fit(X1train, y1train)
prediction1RF = model01RF.predict(X1test)
print(classification_report(y1test, prediction1RF))
print(confusion_matrix(y1test, prediction1RF))

In [None]:
model02RF = RandomForestClassifier(
            n_estimators = 146,
            max_depth    = None,
            max_features = 1,
            oob_score    = False,
            n_jobs       = -1,
            random_state = 42) #Cambio del número de árboles utilizando la cantidad de arboles optimos arrojados por Out-of-Bag error
model02RF.fit(X2train, y2train)
prediction2RF = model02RF.predict(X2test)
print(classification_report(y2test, prediction2RF))
print(confusion_matrix(y2test, prediction2RF))

In [None]:
model03RF = RandomForestClassifier(n_estimators = 250) #Solo tener un n_estimators = 250
model03RF.fit(X3train, y3train)
prediction3RF = model03RF.predict(X3test)
print(classification_report(y3test, prediction3RF))
print(confusion_matrix(y3test, prediction3RF))


##Reporte de clasificación promediando los resultados de los modelos aplicados en las 3 muestras


In [None]:
import numpy as np

class_report1 = classification_report(y1test, prediction1RF, output_dict=True)
class_report2 = classification_report(y2test, prediction2RF, output_dict=True)
class_report3 = classification_report(y3test, prediction3RF, output_dict=True)


metrics_keys = list(class_report1["0"].keys())  # Obtén las claves de métricas de uno de los informes
precision_list = []
recall_list = []
f1_score_list = []

for key in metrics_keys:
    precision_values = [class_report1["0"][key], class_report2["0"][key], class_report3["0"][key]]
    recall_values = [class_report1["1"][key], class_report2["1"][key], class_report3["1"][key]]
    f1_score_values = [class_report1["weighted avg"][key], class_report2["weighted avg"][key], class_report3["weighted avg"][key]]

    precision_list.append(np.mean(precision_values))
    recall_list.append(np.mean(recall_values))
    f1_score_list.append(np.mean(f1_score_values))

precision_list = np.round(precision_list, decimals=3)
recall_list = np.round(recall_list, decimals=3)
f1_score_list = np.round(f1_score_list, decimals=3)

average_class_report = {
    "0": dict(zip(metrics_keys, precision_list)),
    "1": dict(zip(metrics_keys, recall_list)),
    "weighted avg": dict(zip(metrics_keys, f1_score_list))
}

print("Informe de Clasificación Promediado:")
print(average_class_report)



##Matriz de confusión promediando los resultados de los modelos aplicados en las 3 muestras


In [None]:
matriz_conf1 = confusion_matrix(y1test, prediction1RF).tolist()
matriz_conf2 = confusion_matrix(y2test, prediction2RF).tolist()
matriz_conf3 = confusion_matrix(y3test, prediction3RF).tolist()

matriz1c = np.array(matriz_conf1)
matriz2c = np.array(matriz_conf2)
matriz3c = np.array(matriz_conf3)

matriz_conf_promediada = (matriz1c + matriz2c + matriz3c) / 3

matriz_conf_promediada_redondeada = np.round(matriz_conf_promediada, decimals=3)

np.set_printoptions(precision=3, suppress=True)

print(matriz_conf_promediada_redondeada)



##Aprendizaje no supervisado escogido: K-means

In [None]:
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
from sklearn.metrics import silhouette_score

# Crear y ajustar el modelo de k-means
model01kmeans = KMeans(n_clusters=2, random_state=69)
model01kmeans.fit(X1train)

# Obtener las etiquetas de cluster y los centroides
labels_train1 = model01kmeans.labels_
centroids_train1 = model01kmeans.cluster_centers_

# Visualizar los resultados en el conjunto de entrenamiento
plt.scatter(X1train.iloc[:, 0], X1train.iloc[:, 1], c=labels_train1, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train1[:, 0], centroids_train1[:, 1], c='red', marker='X', s=200)
plt.title('K-Means Clustering (Conjunto de Entrenamiento)')
plt.show()

conf_matrix1 = confusion_matrix(y1train, labels_train1)
print(conf_matrix1)

# Obtener las etiquetas de cluster para el conjunto de prueba
labels_test1 = model01kmeans.predict(X1test)

# Visualizar los resultados en el conjunto de prueba
plt.scatter(X1test.iloc[:, 0], X1test.iloc[:, 1], c=labels_test1, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train1[:, 0], centroids_train1[:, 1], c='red', marker='X', s=200)
plt.title('K-Means Clustering (Conjunto de Prueba)')
plt.show()

# Calcular la matriz de confusión en el conjunto de prueba
conf_matrix_test1 = confusion_matrix(y1test, labels_test1)
print(conf_matrix_test1)

#Calcular metricas especificas para el rendimiento de k-means
inertia1 = model01kmeans.inertia_



In [None]:
# Crear y ajustar el modelo de k-means
model02kmeans = KMeans(n_clusters=2, random_state=42)
model02kmeans.fit(X2train)

# Obtener las etiquetas de cluster y los centroides
labels_train2 = model02kmeans.labels_
centroids_train2 = model02kmeans.cluster_centers_

# Visualizar los resultados en el conjunto de entrenamiento
plt.scatter(X2train.iloc[:, 0], X2train.iloc[:, 1], c=labels_train2, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train2[:, 0], centroids_train2[:, 1], c='red', marker='X', s=200)
plt.title('K-Means Clustering (Conjunto de Entrenamiento)')
plt.show()

conf_matrix2 = confusion_matrix(y2train, labels_train2)
print(conf_matrix2)

# Obtener las etiquetas de cluster para el conjunto de prueba
labels_test2 = model02kmeans.predict(X2test)

# Visualizar los resultados en el conjunto de prueba
plt.scatter(X2test.iloc[:, 0], X2test.iloc[:, 1], c=labels_test2, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train2[:, 0], centroids_train2[:, 1], c='red', marker='X', s=200)
plt.title('K-Means Clustering (Conjunto de Prueba)')
plt.show()

# Calcular la matriz de confusión en el conjunto de prueba
conf_matrix_test2 = confusion_matrix(y2test, labels_test2)
print(conf_matrix_test2)

#Calcular metricas especificas para el rendimiento de k-means
inertia2 = model02kmeans.inertia_



In [None]:
# Crear y ajustar el modelo de k-means
model03kmeans = KMeans(n_clusters=2, random_state=13)
model03kmeans.fit(X3train)

# Obtener las etiquetas de cluster y los centroides
labels_train3 = model03kmeans.labels_
centroids_train3 = model03kmeans.cluster_centers_

# Visualizar los resultados en el conjunto de entrenamiento
plt.scatter(X3train.iloc[:, 0], X3train.iloc[:, 1], c=labels_train3, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train3[:, 0], centroids_train3[:, 1], c='red', marker='X', s=200)
plt.title('K-Means Clustering (Conjunto de Entrenamiento)')
plt.show()

conf_matrix3 = confusion_matrix(y3train, labels_train3)
print(conf_matrix3)

# Obtener las etiquetas de cluster para el conjunto de prueba
labels_test3 = model03kmeans.predict(X3test)

# Visualizar los resultados en el conjunto de prueba
plt.scatter(X3test.iloc[:, 0], X3test.iloc[:, 1], c=labels_test3, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train3[:, 0], centroids_train3[:, 1], c='red', marker='X', s=200)
plt.title('K-Means Clustering (Conjunto de Prueba)')
plt.show()

# Calcular la matriz de confusión en el conjunto de prueba
conf_matrix_test3 = confusion_matrix(y3test, labels_test3)
print(conf_matrix_test3)

#Calcular metricas especificas para el rendimiento de k-means
inertia3 = model03kmeans.inertia_


##Matriz de confusión promediando los resultados de los modelos aplicados en las 3 muestras


In [None]:

matriz_conf_promediada = (conf_matrix1 + conf_matrix2 + conf_matrix3) / 3

matriz_conf_promediada_redondeada = np.round(matriz_conf_promediada, decimals=3)

np.set_printoptions(precision=3, suppress=True)

print(matriz_conf_promediada_redondeada)


matriz_conf_promediada2 = (conf_matrix_test1 + conf_matrix_test2 + conf_matrix_test3) / 3

matriz_conf_promediada_redondeada2 = np.round(matriz_conf_promediada2, decimals=3)

np.set_printoptions(precision=3, suppress=True)

print(matriz_conf_promediada_redondeada2)



##Visualizar las 3 gráficas arrojadas en una sola

In [None]:
# Visualizar los resultados en el conjunto de entrenamiento
plt.figure(figsize=(15, 5))

# Gráfica para el modelo 01
plt.subplot(1, 3, 1)
plt.scatter(X1train.iloc[:, 0], X1train.iloc[:, 1], c=labels_train1, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train1[:, 0], centroids_train1[:, 1], c='red', marker='X', s=200)
plt.title('Modelo 01 (Entrenamiento)')

# Gráfica para el modelo 02
plt.subplot(1, 3, 2)
plt.scatter(X2train.iloc[:, 0], X2train.iloc[:, 1], c=labels_train2, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train2[:, 0], centroids_train2[:, 1], c='red', marker='X', s=200)
plt.title('Modelo 02 (Entrenamiento)')

# Gráfica para el modelo 03
plt.subplot(1, 3, 3)
plt.scatter(X3train.iloc[:, 0], X3train.iloc[:, 1], c=labels_train3, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train3[:, 0], centroids_train3[:, 1], c='red', marker='X', s=200)
plt.title('Modelo 03 (Entrenamiento)')

plt.tight_layout()
plt.show()


In [None]:
# Visualizar los resultados en el conjunto de prueba
plt.figure(figsize=(15, 5))

# Gráfica para el modelo 01
plt.subplot(1, 3, 1)
plt.scatter(X1test.iloc[:, 0], X1test.iloc[:, 1], c=labels_test1, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train1[:, 0], centroids_train1[:, 1], c='red', marker='X', s=200)
plt.title('Modelo 01 (Prueba)')

# Gráfica para el modelo 02
plt.subplot(1, 3, 2)
plt.scatter(X2test.iloc[:, 0], X2test.iloc[:, 1], c=labels_test2, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train2[:, 0], centroids_train2[:, 1], c='red', marker='X', s=200)
plt.title('Modelo 02 (Prueba)')

# Gráfica para el modelo 03
plt.subplot(1, 3, 3)
plt.scatter(X3test.iloc[:, 0], X3test.iloc[:, 1], c=labels_test3, cmap='viridis', s=50, alpha=0.7)
plt.scatter(centroids_train3[:, 0], centroids_train3[:, 1], c='red', marker='X', s=200)
plt.title('Modelo 03 (Prueba)')

plt.tight_layout()
plt.show()


##Promedio de Inercia


In [None]:
promedio_i = (inertia1 + inertia2 + inertia3) / 3
print(promedio_i)