La siguiente celda se encargara de importar todos los paquetes necesarios para el desarrollo del proyecto. Además, se importa el archivo de configuración de la base de datos.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import ltrim,rtrim,trim,col
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas
import matplotlib.pyplot as plt
from sqlalchemy import create_engine
import psycopg2

spark = SparkSession.builder.config("spark.jars", "../lib/postgresql-42.5.1.jar") \
	.master("local").appName("PySpark_Postgres_test").getOrCreate()

conn = psycopg2.connect("host=localhost dbname=etl user=postgres password=Legolas00")
engine = create_engine('postgresql://postgres:Legolas00@localhost:5432/etl')

En la siguiente celda se encarga de utilizar spark para cargar en dos diferentes dataframes los datos del INEC y el OIJ para despues limpiarlos y unirlos en un solo dataframe.

In [None]:
#CREATE DATAFRAME FROM CSV FILE
oij_df = spark.read.csv( path="../data/OIJ.csv", sep=";", header=True,quote='"',inferSchema=True,)
inec_df = spark.read.csv( path="../data/INEC.csv", sep=";", header=True,quote='"',inferSchema=True,)

En la siguiente celda se encarga de realizar la limpieza de los datos del INEC y el OIJ. Primero se creo una función para remover todos los espacios en blanco al inicio y al final de cada sub celda del data frame utilizando la funcion trim para ello. Ademas se creo una funcio la cual se encarga de pasar todos los caracteres a letras minusculas para poder realizar la busqueda de los datos en el data frame. Los parametros de las funciones son un dataframe y el retorno es el mismo dataframe pero con los datos limpios.

In [None]:
def remove_spaces(df):
    for col in df.columns:
        if  col == 'Provincia' or col == 'Canton' or col == 'Distrito' or col == 'Provincia, cantón y distrito':
            df = df.withColumn(col, trim(col))
    return df

#Function to parser the string to lowercase
def to_lower_case(df):
    for col in df.columns:
        if  col == 'Provincia' or col == 'Canton' or col == 'Distrito' or col == 'Provincia, cantón y distrito':
            df = df.withColumn(col, lower(col))
    return df


oij_df = remove_spaces(oij_df)
inec_df = remove_spaces(inec_df)

oij_df = to_lower_case(oij_df)
inec_df = to_lower_case(inec_df)

oij_df.show(5)
inec_df.show(5)

En la siguiente celda se encarga de reemplazar todos los caracteres especiales del español por caracteres ascii para poder realizar la busqueda de los datos en el data frame. Y que su taza de concidencia sea mayor.Para este trabajo se utilizo la funcionalidad regex_replace de spark. Y asi cada vez que se encuentre un caracter especial se reemplazara por un caracter ascii. El parametro de la funcion es un dataframe y el retorno es el mismo dataframe pero con los datos limpios es decir sin caracteres especiales.

In [None]:
#Function to replace the accents in column Provincia, cantón y distrito in inec_df
def replace_accents(df):
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'á', 'a'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'é', 'e'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'í', 'i'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'ó', 'o'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'ú', 'u'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'ñ', 'n'))
    return df

inec_df = replace_accents(inec_df)

En la siguiente celda se encarga de dividir la columna del dataframe del inec en tres columnas diferentes, una para la provincia, otra para el canton y otra para el distrito. Para esto se creo una funcion que basado en la cantidad espacios que se encuentren en la columna se dividira en tres columnas diferentes. Esto para que la comparacion de los datos sea mas exacta. El parametro de la funcion es un dataframe y el retorno es el mismo dataframe pero con las columnas divididas en tres columnas diferentes para la provincia, canton y distrito.

In [None]:
def generate_new_columns(df):
    columns = ["Provincia", "Canton", "Distrito","poblacionMayor15", "tasaParticipacion", "tasaOcupacion", "tasaDesempleo", "poblacionInactiva", "relacionDependencia"]
    new_df = spark.createDataFrame(data =[("","","","","","","","","")], schema = columns)
    Provincia =''
    Canton = ''
    Distrito = ''
    counter = 0
    counter2 = 0
    for row in df.collect():
        if row['Provincia, cantón y distrito'] == None:
            counter += 1
            counter2 += 1
            if counter == 4: 
                counter = 2  

            if counter2 == 2:
                counter = 1
            continue
        if counter == 1:
            Provincia = row[0]
        if counter == 2:
            Canton = row[0]
        if counter == 3:
            Distrito = row[0]
            poblacionMayor15 = row[1]
            tasaParticipacion = row[2]
            tasaOcupacion = row[3]
            tasaDesempleo = row[4]
            poblacionInactiva = row[5]
            relacionDependencia = row[6]
            NewRow = (Provincia, Canton, Distrito, poblacionMayor15, tasaParticipacion, tasaOcupacion, tasaDesempleo, poblacionInactiva, relacionDependencia)
            new_df = new_df.union(spark.createDataFrame(data =[NewRow], schema = columns))
        counter2 = 0
    
    return new_df

new_df = generate_new_columns(inec_df)
new_df.show(5)

En la siguiente celda la funcion se encarga de encontrar los datos que no hacen match entre el dataframe del inec y el dataframe del oij. Para esto se utilizo la funcion de subtract. El parametro de la funcion son dos dataframes y el retorno es una lista con los datos que no hacen match entre los dos dataframes.

In [None]:
def find_non_matches(df1,df2):
    nonMatches = []
    Province = ''
    Canton = ''
    District = ''
    df1Temp = df1.select("Provincia", "Canton", "Distrito")
    df2Temp = df2.select("Provincia", "Canton", "Distrito")
    df1Temp = df1Temp.subtract(df2Temp)
    for row in df1Temp.collect():
        Province = row[0]
        Canton = row[1]
        District = row[2]
        nonMatches.append((Province,Canton,District))
    return nonMatches

non_matches = find_non_matches(oij_df,new_df)

print(non_matches)

En la siguiente celda se encarga de encontrar la cantidad de datos que no hacen match en el distrito entre el dataframe del inec y el dataframe del oij. Para esto se utilizo la funcion count de spark. El parametro de la funcion son dos dataframes y el retorno es un entero con la cantidad de datos que no hacen match entre los dos dataframes.

In [None]:
def numberOfNonMatches(df1,df2):
    df1Temp = df1.select("Distrito")
    df2Temp = df2.select("Distrito")
    df1Temp = df1Temp.subtract(df2Temp)
    return df1Temp.count()

print(numberOfNonMatches(oij_df,new_df))

En la siguiente celda se encarga de la conexion a la base de datos y la creacion de las tablas necesarias para el almacenamiento de los datos ya limpios y procesados. Se crean dos tablas una para el inec y otra para el oij.

In [None]:
#Create sql table from dataframe
url = "jdbc:postgresql://localhost:5432/etl"
mode = "overwrite"
properties = {"user": "postgres", " password": "Legolas00", "driver": "org.postgresql.Driver"}

new_df.write.jdbc(url=url, table="INEC", mode=mode, properties=properties)
oij_df.write.jdbc(url=url, table="OIJ", mode=mode, properties=properties)

In [None]:
#Visualizcion para Compara la cantidad de delitos y la tasa de ocupación para los 10 distritos con más delitos en el país. 
df = pandas.read_sql_query('SELECT "oij"."Distrito", COUNT("oij"."Distrito") AS "Cantidad de delitos", "tasaOcupacion" FROM "oij" INNER JOIN "inec" ON "oij"."Distrito" = "inec"."Distrito" and "oij"."Canton" = "inec"."Canton" and "oij"."Provincia" = "inec"."Provincia" GROUP BY "oij"."Distrito", "tasaOcupacion" ORDER BY COUNT("oij"."Distrito") DESC LIMIT 10', con=engine)
df.plot(x='Distrito', y=['Cantidad de delitos', 'tasaOcupacion'], kind='bar', figsize=(20,10), title='Cantidad de delitos y tasa de ocupación para los 10 distritos con más delitos en el país')
plt.show()

In [None]:
#graph for the days of the week with more crimes
df = pandas.read_sql_query('SELECT "oij"."Fecha" from oij', con=engine)
df['Fecha'] = pandas.to_datetime(df['Fecha'])
df['Fecha'] = df['Fecha'].dt.day_name()
df = df.groupby('Fecha').size().reset_index(name='Cantidad de delitos')
df.plot.bar(x='Fecha', y='Cantidad de delitos', rot=0, figsize=(20,10))
plt.show()


In [None]:
#graph for count of type of delitos by distrito
def generate_graph(distrito):
    df = pandas.read_sql_query('SELECT "oij"."Delito" from oij where "oij"."Distrito" = \'' + distrito + '\' ', con=engine)
    df = df.groupby('Delito').size().reset_index(name='Cantidad de delitos')
    df.plot.bar(x='Delito', y='Cantidad de delitos', rot=0, figsize=(20,10))
    plt.show()
    return

generate_graph('hospital')

In [None]:
#graph for count of delitos by genero
df = pandas.read_sql_query('SELECT "oij"."Genero" from oij', con=engine)
df = df.groupby('Genero').size().reset_index(name='Cantidad de delitos')
df.plot.bar(x='Genero', y='Cantidad de delitos', rot=0, figsize=(20,10))
plt.show()

In [None]:
#graph for count of delitos by provincia
df = pandas.read_sql_query('SELECT "oij"."Provincia" from oij', con=engine)
df = df.groupby('Provincia').size().reset_index(name='Cantidad de delitos')
df.plot.bar(x='Provincia', y='Cantidad de delitos', rot=0, figsize=(30,20))
plt.show()