In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import ltrim,rtrim,trim,col
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("MyFirstCSV").getOrCreate()


In [16]:
#CREATE DATAFRAME FROM CSV FILE
oij_df = spark.read.csv( path="OIJ.csv", sep=",", header=True,quote='"',inferSchema=True,)
inec_df = spark.read.csv( path="INEC.csv", sep=";", header=True,quote='"',inferSchema=True,)

In [17]:
#Function to remove spaces from the beginning and end of the string
def remove_spaces(df):
    for col in df.columns:
        if  col == 'Provincia' or col == 'Canton' or col == 'Distrito' or col == 'Provincia, cantón y distrito':
            df = df.withColumn(col, trim(col))
    return df

#Function to parser the string to lowercase
def to_lower_case(df):
    for col in df.columns:
        if  col == 'Provincia' or col == 'Canton' or col == 'Distrito' or col == 'Provincia, cantón y distrito':
            df = df.withColumn(col, lower(col))
    return df


oij_df = remove_spaces(oij_df)
inec_df = remove_spaces(inec_df)

oij_df = to_lower_case(oij_df)
inec_df = to_lower_case(inec_df)

oij_df.show(5)
inec_df.show(5)

+------+-----------+-------------------+-------------------+--------+--------------------+-------------+------+------------+----------+----------+--------+----+
|Delito|  SubDelito|              Fecha|               Hora| Victima|          SubVictima|         Edad|Genero|Nacionalidad| Provincia|    Canton|Distrito|_c12|
+------+-----------+-------------------+-------------------+--------+--------------------+-------------+------+------------+----------+----------+--------+----+
|ASALTO|ARMA BLANCA|2021-06-03 00:00:00|00:00:00 - 02:59:59|VEHICULO|SERVICIO PUBLICO/...|Mayor de edad|HOMBRE|  COSTA RICA|  san jose|alajuelita|    null|null|
|ASALTO|ARMA BLANCA|2021-06-10 00:00:00|15:00:00 - 17:59:59| PERSONA|    PEATON [PERSONA]|Mayor de edad| MUJER|  COSTA RICA|  san jose|    escazu|    null|null|
|ASALTO|ARMA BLANCA|2021-06-14 00:00:00|09:00:00 - 11:59:59| PERSONA|    PEATON [PERSONA]|Mayor de edad|HOMBRE|  COSTA RICA|guanacaste|    nicoya|    null|null|
|ASALTO|ARMA BLANCA|2021-06-14 00:

In [18]:
#Function find list the not match values in the dataframes
def find_non_matches(df1,df2):
    non_matches = []
    for col in df1.columns:
        if  col == 'Provincia' or col == 'Canton' or col == 'Distrito':
            for row in df1.select(col).distinct().collect():
                if not df2.filter(df2['Provincia, cantón y distrito'] == row[col]).collect():
                    non_matches.append(row[col])
    return non_matches

non_matches_oij = find_non_matches(oij_df,inec_df)


#Function to parser the string to lowercase
def to_lower_case(df):
    for col in df.columns:
        if  col == 'Provincia' or col == 'Canton' or col == 'Distrito' or col == 'Provincia, cantón y distrito':
            df = df.withColumn(col, lower(col))
    return df


oij_df = remove_spaces(oij_df)
inec_df = remove_spaces(inec_df)

oij_df = to_lower_case(oij_df)
inec_df = to_lower_case(inec_df)

nonMatches = find_non_matches(oij_df,inec_df)
print(nonMatches)
#oij_df.show(5)
#inec_df.show(5)


['san jose', 'limon', 'islas', 'desconocido', 'pococi', 'rio cuarto', 'guacimo', 'belen', 'la union', 'desconocido', 'puerto jim&#201;nez', 'leon cortes', 'poas', 'tarrazu', 'san jose', 'san ramon', 'aserri', 'vasquez de coronado', 'sarch&#205;', 'paraiso', 'tilaran', 'canas', 'limon', 'perez zeledon', 'jimenez', 'tibas', 'sarapiqui', 'escazu', 'santa barbara', 'monteverde', None]


In [19]:
#Function to replace the accents in column Provincia, cantón y distrito in inec_df
def replace_accents(df):
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'á', 'a'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'é', 'e'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'í', 'i'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'ó', 'o'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'ú', 'u'))
    df = df.withColumn('Provincia, cantón y distrito', regexp_replace('Provincia, cantón y distrito', 'ñ', 'n'))
    return df

inec_df = replace_accents(inec_df)
nonMatches = find_non_matches(oij_df,inec_df)
print(nonMatches)


['islas', 'desconocido', 'desconocido', 'puerto jim&#201;nez', 'leon cortes', 'vasquez de coronado', 'sarch&#205;', 'monteverde', None]


In [20]:
#Fucntion to separate column Provincia, cantón y distrito in inec_df in three columns
def separate_columns(df):
    df = df.withColumn('ProvinciaUnida', split(col('Provincia, cantón y distrito'), ',').getItem(0))
    df = df.withColumn('CantonUnido', split(col('Provincia, cantón y distrito'), ',').getItem(1))
    df = df.withColumn('DistritoUnido', split(col('Provincia, cantón y distrito'), ',').getItem(2))
    return df

inec_df = separate_columns(inec_df)
inec_df.show(5)

+----------------------------+--------------------------+--------------------------+-----------------+-------------------------+-----------------------------------------------+---------------------------------+------------------------------------------------+--------------------------------------------------+-------------------------------------------------+--------------+-----------+-------------+
|Provincia, cantón y distrito|Población de 15 años y más|Tasa neta de participación|Tasa de ocupación|Tasa de desempleo abierto|Porcentaje de población económicamente inactiva|Relación de dependencia económica|Porcentaje de población ocupada  Sector Primario|Porcentaje de población ocupada  Sector Secundario|Porcentaje de población ocupada  Sector Terciario|ProvinciaUnida|CantonUnido|DistritoUnido|
+----------------------------+--------------------------+--------------------------+-----------------+-------------------------+-----------------------------------------------+--------------------

In [21]:
#Function to remove column Provincia, cantón y distrito in inec_df
def remove_column(df):
    df = df.drop('Provincia, cantón y distrito')
    return df

inec_df = remove_column(inec_df)

In [22]:
#Function to create a new dataframe with inec_df and oij_df when provincia, canton and distrito are equals
def join_dataframes(df1,df2):
    df = df1.join(df2, (df1.Provincia == df2.ProvinciaUnida) | (df1.Canton == df2.CantonUnido) | (df1.Distrito == df2.DistritoUnido), 'inner')
    return df

df = join_dataframes(oij_df,inec_df)
df.show(25)


+------+-----------+-------------------+-------------------+--------+--------------------+-------------+-----------+------------+----------+----------+--------+----+--------------------------+--------------------------+-----------------+-------------------------+-----------------------------------------------+---------------------------------+------------------------------------------------+--------------------------------------------------+-------------------------------------------------+--------------+-----------+-------------+
|Delito|  SubDelito|              Fecha|               Hora| Victima|          SubVictima|         Edad|     Genero|Nacionalidad| Provincia|    Canton|Distrito|_c12|Población de 15 años y más|Tasa neta de participación|Tasa de ocupación|Tasa de desempleo abierto|Porcentaje de población económicamente inactiva|Relación de dependencia económica|Porcentaje de población ocupada  Sector Primario|Porcentaje de población ocupada  Sector Secundario|Porcentaje de pobl

In [30]:
#Create sql table from dataframe
df.createOrReplaceTempView("df")

#Save the dataframe in a postgresql database
df.write.jdbc(url="jdbc:postgresql://localhost:5432/etl", table="etl", mode="overwrite", properties={"user": "postgres", "password": "Legolas00"})


Py4JJavaError: An error occurred while calling o2747.jdbc.
: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:298)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:107)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:107)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:229)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite.<init>(JDBCOptions.scala:233)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:47)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:757)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [25]:
#Save the dataframe in csv file
df.write.csv(path="output.csv", sep=",", header=True,quote='"',mode="overwrite")
df.show(5)


+------+-----------+-------------------+-------------------+--------+--------------------+-------------+------+------------+---------+----------+--------+----+--------------------------+--------------------------+-----------------+-------------------------+-----------------------------------------------+---------------------------------+------------------------------------------------+--------------------------------------------------+-------------------------------------------------+--------------+-----------+-------------+
|Delito|  SubDelito|              Fecha|               Hora| Victima|          SubVictima|         Edad|Genero|Nacionalidad|Provincia|    Canton|Distrito|_c12|Población de 15 años y más|Tasa neta de participación|Tasa de ocupación|Tasa de desempleo abierto|Porcentaje de población económicamente inactiva|Relación de dependencia económica|Porcentaje de población ocupada  Sector Primario|Porcentaje de población ocupada  Sector Secundario|Porcentaje de población ocupad