In [1]:
import findspark
findspark.add_jars('/app/postgresql-42.1.4.jar')
findspark.init()

In [2]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("argentinaETL")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [3]:
from pyspark.sql.functions import *;
from datetime import datetime, date

In [4]:
from pyspark.sql.types import *

schema = (
StructType().
    add("ID_Provincia", IntegerType()).
    add("Provincia", StringType()).
    add("ID_Departamento", IntegerType()).
    add("Departamento", StringType()).
    add("Id_Cultivo", IntegerType()).
    add("Cultivo", StringType()).
    add("ID_Campania", IntegerType()).
    add("Campania", StringType()).
    add("Ha_Sembrada", DecimalType()).
    add("Ha_Cosechada", DecimalType()).
    add("Produccion_Tn", DecimalType()).
    add("Rendimiento_KgxHa", DecimalType())
)

In [5]:
df = (
    spark.read.
    option("delimiter", ';').
    csv("/dataset/argentina-datos/Estimaciones/Estimaciones.csv",
        schema=schema,
        header=True,
        ignoreLeadingWhiteSpace=True,
        ignoreTrailingWhiteSpace=True,
        nullValue='NA')
     )

In [6]:
df = df.withColumnRenamed("ID_Provincia", "id_provincia") \
.withColumnRenamed("Provincia", "provincia") \
.withColumnRenamed("ID_Departamento", "id_departamento") \
.withColumnRenamed("Departamento", "departamento") \
.withColumnRenamed("ID_Cultivo", "id_cultivo") \
.withColumnRenamed("Cultivo", "cultivo") \
.withColumnRenamed("ID_Campania", "id_campania") \
.withColumnRenamed("Campania", "campania") \
.withColumnRenamed("Ha_Sembrada", "ha_sembrada") \
.withColumnRenamed("Ha_Cosechada", "ha_cosechada") \
.withColumnRenamed("Produccion_Tn", "produccion_tn") \
.withColumnRenamed("Rendimiento_KgxHa", "rendimiento_kgha")



In [7]:
from pyspark.sql.functions import when, lit

df = df.withColumn('cultivo', when(df['cultivo']=="Ca�a de az�car",lit("Cania de azucar")).otherwise(df['Cultivo']));
df = df.withColumn('cultivo', when(df['cultivo']=="T�", lit("Te")).otherwise(df['Cultivo']));
df = df.withColumn('cultivo', when(df['cultivo']=="Algod�n", lit("Algodon")).otherwise(df['Cultivo']));
df = df.withColumn('cultivo', when(df['cultivo']=="C�rtamo", lit("Cartamo")).otherwise(df['Cultivo']));
df = df.withColumn('cultivo', when(df['cultivo']=="Man�", lit("Mani")).otherwise(df['Cultivo']));
df = df.withColumn('cultivo', when(df['cultivo']=="Ma�z", lit("Maiz")).otherwise(df['Cultivo']));


In [8]:
for col in df.columns:
    print(col, "\t", "Nulls: ", df.filter(df[col]=="NA").count())

id_provincia 	 Nulls:  0
provincia 	 Nulls:  0
id_departamento 	 Nulls:  0
departamento 	 Nulls:  0
id_cultivo 	 Nulls:  0
cultivo 	 Nulls:  0
id_campania 	 Nulls:  0
campania 	 Nulls:  0
ha_sembrada 	 Nulls:  0
ha_cosechada 	 Nulls:  0
produccion_tn 	 Nulls:  0
rendimiento_kgha 	 Nulls:  0


In [9]:
df = df.withColumn('anio', split(df['campania'], '/').getItem(0));

In [10]:
df = df.withColumn('pre', lit("-01-01"))

In [11]:
df = df.withColumn('fecha',to_date(concat(df['anio'], df['pre']), 'yyyy-mm-dd'))

In [12]:
df = df.drop('anio', 'pre')

In [13]:
### Write to postgres

In [14]:
df \
    .write \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/agricultura") \
    .option("dbtable", "agricultura.argentina_datos") \
    .option("user", "agricultura") \
    .option("password", "p4ssW0rdP4r4Agr1cultur4") \
    .option("driver", "org.postgresql.Driver") \
    .mode('overwrite') \
    .save()

In [15]:
spark.stop()