# Big Data
# Proyecto Programado

- Professor: Luis Chavarría.

- Student:  
    - Felipe Alberto Mejías Loría, Instituto Tecnológico de Costa Rica.
    - María Mora, Instituto Tecnológico de Costa Rica.

- January 21th, 2020

## **1-) Instalación de PySpark**

In [1]:
# Install necessary libraries
import findspark

# Set SPARK_HOME. Needed to initialize Apache Spark.
SPARK_PATH = 'C:\Users\mejiasf\Desktop\Spark\spark-2.4.4-bin-hadoop2.7'
findspark.init(SPARK_PATH)

# **2-) Importar bibliotecas necesarias para la ejecución del proyecto**

In [196]:
# Necessary Imports for the execution of the TP3
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import findspark

from pandas.plotting import scatter_matrix
from datetime import datetime
from pyspark.sql import SparkSession, Row, dataframe
from pyspark.sql.functions import col, date_format, udf, array, explode, trim, lower, ltrim, rtrim
from pyspark.sql.types import DateType
from pyspark.sql.types import (StringType, IntegerType, FloatType, 
                               DecimalType, StructField, StructType)
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors, VectorUDT

# **3-) Funciones utilitarias para la construcción de DataFrames**

In [197]:
POSTGRESQL_URL = "jdbc:postgresql://localhost/"
POSTGRESQL_USER = "postgres"
POSTGRESQL_PASSWORD = "big_data"

def create_spark_session():
    """
    This function builds a Spark Session
    return the main entry of a Spark DataFrame
    """
    spark = SparkSession \
      .builder \
      .appName("Basic JDBC pipeline") \
      .config("spark.driver.extraClassPath", "postgresql-42.1.4.jar") \
      .config("spark.executor.extraClassPath", "postgresql-42.1.4.jar") \
      .getOrCreate()
    return spark

def join_spark_data_frames(data_frame_1, data_frame_2,
                           using_column_data_frame_1,
                           using_column_data_frame_2):
    """
    This function joint two Spark Data Frames
    data_frame_1: Spark DataFrame 1
    data_frame_2: Spark DataFrame 2
    using_column_data_frame_1: Column from DataFrame 1 to compare
    using_column_data_frame_2: Column from DataFrame 2 to compare
    return the Spark DataFrame from the JOIN
    """
    using_columns_statement = using_column_data_frame_1 == using_column_data_frame_2
    joint_data_frame = data_frame_1.join(data_frame_2, using_columns_statement)

    # To remove duplicated columns
    joint_data_frame = joint_data_frame.drop(using_column_data_frame_1)

    return joint_data_frame

# **4-) Datos de entrada**

In [198]:
def show_entry_data_description():
    """
    This function shows a description of all the entry data columns
    """
    
    # OIJ Dataset Explanation
    print("\na-) The first dataset contains information taken from the OIJ Police Statistics of Costa Rica.")
    
    print("\nb-) Columns description for OIJ dataset: \n")
    print("Delito: Descripción del Delito")
    print("SubDelito: Descripción del SubDelito")
    print("Fecha: Fecha del Hecho")
    print("Hora: Rango de 3 horas del Hecho")
    print("Victima: Descripción de la Víctima ")
    print("SubVictima: Descripción de la SubVíctima")
    print("Edad: Grupo de Edad que pertenece la Víctima")
    print("Genero: Género de la Víctima")
    print("Nacionalidad: Nacionalidad de la Víctima")
    print("Provincia: Provincia del Lugar del Hecho")
    print("Canton: Cantón del Lugar del Hecho")
    print("Distrito: Distrito del Lugar del Hecho")
    
    # INEC Dataset Explanation
    print("\nc-) The second dataset contains information about Economic Indicators according to province, canton")
    print("    and district taken from INEC.")
    
    print("\nb-) Columns description for INEC dataset: \n")
    print("Columna 1: Provincia, Cantón y Distrito")
    print("Columna 2: Población de 15 años y más")
    print("Columna 3: Tasa neta de participación")
    print("Columna 4: Tasa de ocupación")
    print("Columna 5: Tasa de desempleo abierto")
    print("Columna 6: Porcentaje de poblacion economicamente inactiva")
    print("Columna 7: Relación de depedencia económica")
    
    # Show which columns is going to be predicted
    print("\ne-) The column that is going to be predicted is the type of Delito in San Jose province according to the canton.\n")

# **5-) Cargado y preprocesamiento de datos antes de cruzarlos**

In [199]:
OIJ_DATAFRAME_UNNECESSARY_COLUMNS = ["Fecha", "Hora", "SubVictima", "Provincia"]
INEC_DATAFRAME_UNNECESSARY_COLUMNS = ["Porcentaje de poblacion economicamente inactiva",
                                      "Relacion de dependencia economica",
                                      "Porcentaje de poblacion ocupada - Sector Primario",
                                      "Porcentaje de poblacion ocupada - Sector Secundario",
                                      "Porcentaje de poblacion ocupada - Sector Terciario"]

def convert_categorical_values_to_numerical_from_df(spark_df, categorical_columns_list):
    """
    This function creates a Spark DataFrame with all values as numerical
    spark_df: Spark DataFrame
    return the Spark DataFrame with all values as numerical
    """
    # Convert categorical columns to numerical values
    for categorical_column in categorical_columns_list:
        new_column_name = "{column_name}_index".format(column_name = categorical_column)
        indexer = StringIndexer(inputCol=categorical_column, outputCol=new_column_name)
        spark_df = indexer.fit(spark_df).transform(spark_df)
    
    # Remove categorical columns
    columns_to_drop = categorical_columns_list
    spark_df = spark_df.drop(*columns_to_drop)
    
    # Rename new numerical columns
    for categorical_column in categorical_columns_list:
        new_column_name = "{column_name}_index".format(column_name = categorical_column)
        spark_df = spark_df.withColumnRenamed(new_column_name, categorical_column)
    
    # Show converted data
    print("- Show that the data has been converted successfully from categorical to numerical: \n")
    spark_df.select(categorical_columns_list).show()

    return spark_df

def rename_oij_spark_dataframe_columns(spark_df):
    """
    This function is necessary as OIJ Dataset is outdated since the CSV
    they provide does not bring the time field
    spark_df: Spark DataFrame
    return the Spark DataFrame with right columns names
    """
    spark_df = spark_df.withColumnRenamed('Victima', 'Hora')
    spark_df = spark_df.withColumnRenamed('SubVictima', 'Victima')
    spark_df = spark_df.withColumnRenamed('Edad', 'SubVictima')
    spark_df = spark_df.withColumnRenamed('Genero', 'Edad')
    spark_df = spark_df.withColumnRenamed('Nacionalidad', 'Genero')
    spark_df = spark_df.withColumnRenamed('Provincia', 'Nacionalidad')
    spark_df = spark_df.withColumnRenamed('Canton', 'Provincia')
    spark_df = spark_df.withColumnRenamed('Distrito', 'Canton')
    return spark_df

def remove_spark_dataframe_trailing_whitespaces(spark_df):
    """
    This function removes all trailing whitespaces in Spark DataFrame Columns
    spark_df: Spark DataFrame
    return the Spark DataFrame
    """
    for column in spark_df.columns:
        spark_df = spark_df.withColumn(column, trim(col(column)).cast(spark_df.schema[column].dataType))
    return spark_df

def convert_spark_dataframe_to_lower_case(spark_df):
    """
    This function converts to lower case Spark DataFrame Columns
    spark_df: Spark DataFrame
    columns_list: Columns List
    return the Spark DataFrame
    """
    for column in spark_df.columns:
        spark_df =  spark_df.withColumn(column, lower(col(column)).cast(spark_df.schema[column].dataType))
    return spark_df
    
def create_and_clean_spark_dataframe_from_csv(spark_session,
                                              csv_file_name,
                                              schema_types_list):
    """
    This function creates a Spark DataFrame from Dataset CSV file
    spark_session: Spark Session
    return the Spark DataFrame
    """
    print("\n- Loading and cleaning CSV input file: {name}".format(name = csv_file_name))
    
    spark_df = spark_session \
      .read \
      .format("csv") \
      .option("path", csv_file_name) \
      .option("header", True) \
      .schema(StructType(schema_types_list)) \
      .load()
    
    # Rename OIJ Spark DataFrame Columns
    spark_df = rename_oij_spark_dataframe_columns(spark_df) \
               if csv_file_name == "datos_delitos_oij_2011.csv" \
               else spark_df
    
    # Clean whitespaces
    spark_df = remove_spark_dataframe_trailing_whitespaces(spark_df)
    
    # Convert to lower case
    spark_df = convert_spark_dataframe_to_lower_case(spark_df)
    
    return spark_df

def remove_unnecessary_columns(spark_df, columns_to_remove_list):
    """
    This function removes unnecessary columns for DataFrame
    """
    columns_to_drop = columns_to_remove_list
    spark_df = spark_df.drop(*columns_to_drop)
    return spark_df

def preprocessing_oij_data(spark_session):
    """
    This function completes data preprocessing before training model
    for OIJ Dataset
    """
    schema_types_list = [StructField("Delito", StringType()),
                         StructField("SubDelito", StringType()),
                         StructField("Fecha", StringType()),
                         StructField("Victima", StringType()),
                         StructField("SubVictima", StringType()),
                         StructField("Edad", StringType()),
                         StructField("Genero", StringType()),
                         StructField("Nacionalidad", StringType()),
                         StructField("Provincia", StringType()),
                         StructField("Canton", StringType()),
                         StructField("Distrito", StringType())]
    
    oij_spark_df = create_and_clean_spark_dataframe_from_csv(spark_session,
                                                             "datos_delitos_oij_2011.csv",
                                                             schema_types_list)
    
    oij_spark_df = remove_unnecessary_columns(oij_spark_df, OIJ_DATAFRAME_UNNECESSARY_COLUMNS)
    
    return oij_spark_df

def preprocessing_inec_data(spark_session):
    """
    This function completes data preprocessing before training model
    for INEC Dataset
    """
    schema_types_list = [StructField("Canton", StringType()),
                         StructField("Poblacion de 15 anos y mas", IntegerType()),
                         StructField("Tasa neta de participacion", FloatType()),
                         StructField("Tasa de ocupacion", FloatType()),
                         StructField("Tasa de desempleo abierto", FloatType()),
                         StructField("Porcentaje de poblacion economicamente inactiva", FloatType()),
                         StructField("Relacion de dependencia economica", FloatType()),
                         StructField("Porcentaje de poblacion ocupada - Sector Primario", FloatType()),
                         StructField("Porcentaje de poblacion ocupada - Sector Secundario", FloatType()),
                         StructField("Porcentaje de poblacion ocupada - Sector Terciario", FloatType())]
    
    inec_spark_df = create_and_clean_spark_dataframe_from_csv(spark_session,
                                                             "datos_socioeconomicos_inec_2011.csv",
                                                             schema_types_list)
    
    inec_spark_df = remove_unnecessary_columns(inec_spark_df, INEC_DATAFRAME_UNNECESSARY_COLUMNS)
    
    return inec_spark_df

def show_preprocessing_spark_dataframe(spark_df, columns_list):
    """
    This function completes data preprocessing before training model
    """
    print("\n1-) Definition of schema: \n")
    spark_df.printSchema()
    
    print("2-) Show that the data has been loaded successfully by selecting a couple of rows: \n")
    spark_df.select(columns_list).show()
    

def data_preprocessing(spark_session):
    """
    This function completes data preprocessing for INEC and OIJ before training model
    """
    # Loading and cleaning OIJ CSV input file data.
    oij_spark_df = preprocessing_oij_data(spark_session)
    
    # Show Preprocessing OIJ DataFrame
    show_preprocessing_spark_dataframe(oij_spark_df,
                                       ["Delito", "SubDelito", "Genero", "Canton"])
    
    # Convert all OIJ Categorical Values to Numerical Values
    oij_spark_df = convert_categorical_values_to_numerical_from_df(oij_spark_df,
                                                                   ["Delito", "SubDelito",
                                                                    "Victima", "Edad", "Genero",
                                                                    "Nacionalidad"])
    
    # Loading and cleaning INEC CSV input file data.
    inec_spark_df = preprocessing_inec_data(spark_session)
    
    # Show Preprocessing INEC DataFrame
    show_preprocessing_spark_dataframe(inec_spark_df,
                                       ["Canton", "Poblacion de 15 anos y mas", "Tasa de desempleo abierto"])
    
    return oij_spark_df, inec_spark_df

# **6-) Materialización en Postgresql**

In [217]:
FEATURES_LIST = ['SubDelito', 'Victima', 'Edad',
                 'Genero', 'Nacionalidad',
                 'Poblacion de 15 anos y mas', 'Tasa neta de participacion',
                 'Tasa de ocupacion', 'Tasa de desempleo abierto', 'Canton', 'Delito']

def create_joint_spark_data_frames(oij_data_frame, inec_data_frame):
    """
    This function creates the data frame of the joint of the two datasets
    oij_data_frame: DataFrame with the students info
    inec_data_frame: DataFrame with the courses info
    return the joint Spark DataFrame
    """

    print("\nThe data union between OIJ DataFrame and INEC DataFrame is the following: \n")
    joint_oij_and_inec_df = join_spark_data_frames(oij_data_frame,
                                                   inec_data_frame,
                                                   oij_data_frame.Canton,
                                                   inec_data_frame.Canton)
    
    joint_oij_and_inec_df.select(["Delito", "Genero", "Canton",
                                  "Poblacion de 15 anos y mas",
                                  "Tasa de desempleo abierto",
                                  "Tasa de ocupacion"]).show(20)

    return joint_oij_and_inec_df

def transform_sparse_vector_df_to_dense_vector_df(spark_df):
    """
    This function transforms a sparse vector to a dense vector df
    """
    toDense = lambda v: Vectors.dense(v.toArray())
    toDenseUdf = udf(toDense, VectorUDT())
    spark_df = spark_df.withColumn('features', toDenseUdf('features'))
    return spark_df

def transform_spark_df_to_features_vector_df(spark_df, remove_delito=False):
    """
    This function transforms a spark df to a features vector df
    """
    assembler = VectorAssembler(
        inputCols=FEATURES_LIST[:-1] if remove_delito else \
                  FEATURES_LIST,
        outputCol='features')
    
    vector_df = assembler.transform(spark_df)
    vector_df = vector_df.select(['features', 'Delito'])
    
    # Converte Sparse Vectors to Dense Vectors
    vector_df = transform_sparse_vector_df_to_dense_vector_df(vector_df)

    return vector_df

def normalize_data(spark_df):
    """
    This function normalize data before training the model
    """
    vector_df = transform_spark_df_to_features_vector_df(spark_df, remove_delito=True)
    standard_scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
    scale_model = standard_scaler.fit(vector_df)
    scaled_df = scale_model.transform(vector_df)
    
    return scaled_df

def convert_normalized_df_to_valid_table_for_db(normalized_df):
    """
    This function converts normalize features vector into a valid DF for DB
    """
    normalized_valid_db_format_df = normalized_df.select('scaled_features', 'Delito')
    normalized_valid_db_format_df = normalized_valid_db_format_df.rdd.map(lambda x:[float(y) for y in x['scaled_features']]+[x['Delito']]).toDF(FEATURES_LIST)
    
    return normalized_valid_db_format_df

def materialization_to_postgresql(oij_spark_df, inec_spark_df):
    """
    This function materialize data to write to DB
    """
    # Joint Spark Data Frames
    joint_data_frame = create_joint_spark_data_frames(oij_spark_df,
                                                      inec_spark_df)
    
    # Convert Canton Categorical Value to Numerical Values
    joint_data_frame = convert_categorical_values_to_numerical_from_df(joint_data_frame,
                                                                       ["Canton"])
    
    # Normalizing the data
    print("\n- After normalizing the data, the DataFrame looks like this: \n")
    normalized_df = normalize_data(joint_data_frame)
    normalized_df.show()
    
    # Convert to valid format for DB
    print("\ng-) Writing to DB: ")
    print("\ng.1-) Before writing the normalized Dataframe, is necessary to convert it to a valid table for DB: ")
    normalized_valid_db_df = convert_normalized_df_to_valid_table_for_db(normalized_df)
    print("- After convert it to a valid table for DB, the DataFrame looks like this: \n")
    normalized_valid_db_df.show()
    print("- In order to check that this table was written correctly into DB, you can go to PostgreSQL and check the table named tarea3.")
    print("  It should look exactly the same.\n")


# **7-) Modelos de predicción**

In [218]:
def read_dataset_from_db(spark_session):
    """
    This function reads the clean dataset from the database
    """
    spark_df = spark_session \
               .read \
               .format("jdbc") \
               .option("url", POSTGRESQL_URL) \
               .option("user", POSTGRESQL_USER) \
               .option("password", POSTGRESQL_PASSWORD) \
               .option("dbtable", "tarea3") \
               .load()
    spark_df.show()
    return spark_df

def split_train_test_data(spark_df):
    """
    This function splits train and test data
    """
    train, test = spark_df.randomSplit([0.8, 0.2], seed=12345)
    return train, test

# **8-) Función main() para ejecutar el programa principal**

In [219]:
def main():
    """
    This function extract data and train a model using PySpark
    """
    print("Step 1: Entry Data Description")
    show_entry_data_description()
    
    print("Step 2: Data Preprocessing")
    spark_session = create_spark_session()
    oij_spark_df, inec_spark_df = data_preprocessing(spark_session)
    
    print("Step 3: Materialization to PostgreSQL")
    materialization_to_postgresql(oij_spark_df, inec_spark_df)

# Execute main program
main()

Step 1: Entry Data Description

a-) The first dataset contains information taken from the OIJ Police Statistics of Costa Rica.

b-) Columns description for OIJ dataset: 

Delito: Descripción del Delito
SubDelito: Descripción del SubDelito
Fecha: Fecha del Hecho
Hora: Rango de 3 horas del Hecho
Victima: Descripción de la Víctima 
SubVictima: Descripción de la SubVíctima
Edad: Grupo de Edad que pertenece la Víctima
Genero: Género de la Víctima
Nacionalidad: Nacionalidad de la Víctima
Provincia: Provincia del Lugar del Hecho
Canton: Cantón del Lugar del Hecho
Distrito: Distrito del Lugar del Hecho

c-) The second dataset contains information about Economic Indicators according to province, canton
    and district taken from INEC.

b-) Columns description for INEC dataset: 

Columna 1: Provincia, Cantón y Distrito
Columna 2: Población de 15 años y más
Columna 3: Tasa neta de participación
Columna 4: Tasa de ocupación
Columna 5: Tasa de desempleo abierto
Columna 6: Porcentaje de poblacion e

- Show that the data has been converted successfully from categorical to numerical: 

+------+
|Canton|
+------+
|   3.0|
|   0.0|
|   0.0|
|   0.0|
|   5.0|
|   0.0|
|   2.0|
|   3.0|
|   0.0|
|   3.0|
|   9.0|
|   2.0|
|   1.0|
|   1.0|
|   2.0|
|   1.0|
|   0.0|
|   4.0|
|   5.0|
|   1.0|
+------+
only showing top 20 rows


- After normalizing the data, the DataFrame looks like this: 

+--------------------+------+--------------------+
|            features|Delito|     scaled_features|
+--------------------+------+--------------------+
|[1.0,0.0,1.0,0.0,...|   0.0|[0.31499290187266...|
|[1.0,0.0,1.0,1.0,...|   0.0|[0.31499290187266...|
|[1.0,0.0,0.0,0.0,...|   0.0|[0.31499290187266...|
|[1.0,0.0,0.0,0.0,...|   0.0|[0.31499290187266...|
|[1.0,0.0,0.0,0.0,...|   0.0|[0.31499290187266...|
|[1.0,0.0,0.0,0.0,...|   0.0|[0.31499290187266...|
|[1.0,0.0,0.0,1.0,...|   0.0|[0.31499290187266...|
|[1.0,0.0,0.0,0.0,...|   0.0|[0.31499290187266...|
|[1.0,0.0,0.0,0.0,...|   0.0|[0.31499290187266.