# ETL Infrastructure Processes

### Importing libraries

In [None]:
import psycopg2
import glob
import os
import openpyxl
import pandas as pd
from itertools import groupby
import re
from google.cloud import storage
from io import BytesIO

import os
import paramiko
from io import StringIO

### Functions

In [None]:
# Get a list of files with a given extension
def get_files(filepath, file_extension):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, file_extension))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

In [None]:
## Function to open the selected sheet
def get_data_frame(sheet, workbook):
    data = workbook[sheet]
    data_values = data.values
    ## To get the first line in file as a header line
    columns = next(data_values)[0:]
    ## To create a DataFrame based on the second and subsequent lines of data
    data_frame = pd.DataFrame(data_values, columns = columns)
    return data_frame

In [None]:
## Create tables
def create_tables(cur, conn):
    for query in create_table_queries:
        try:
            cur.execute(query)
        except psycopg2.Error as e: 
            print("Error: Issue creating table")
            print (e)
        conn.commit()

In [None]:
# Drop tables
def drop_tables(cur, conn):
    for name_table in drop_table_names:
        drop_query = "DROP table " + name_table
        try: 
            cur.execute(drop_query)
        except psycopg2.Error as e: 
            print("Error: Dropping table")
            print (e)

### Postgres conection 

In [None]:
#conn = psycopg2.connect("host=/cloudsql/bibmbo-maiz:us-central1:bimbo-maiz-db, user=luis password=qaz.wsx1 dbname=postgres")
#cur = conn.cursor()
try: 
    #conn = psycopg2.connect("dbname=bimbomaiz user=luis password=postgres")
    conn = psycopg2.connect(dbname='etldb', user='postgres', password='qaz.wsx1', host='/cloudsql/etl-process-274514:us-central1:etlpg')
except psycopg2.Error as e: 
    print("Error: Could not make connection to the Postgres database")
    print(e)
try: 
    cur = conn.cursor()
except psycopg2.Error as e: 
    print("Error: Could not get cursor to the Database")
    print(e)

conn.set_session(autocommit=True)

### Conection to AWS

In [None]:
usuario = 'ubuntu'
pwd = '/home/luis/Documents/ETL/jupyter/AWS_conection/Jonh-test1.pem'
enlace = '3.138.177.157'

try:
    hostname = enlace
    myuser = usuario
    mySSHK = pwd
    sshcon = paramiko.SSHClient()  # will create the object
    sshcon.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # no known_hosts error
    sshcon.connect(hostname, username=myuser, key_filename=mySSHK) # no passwd needed
    sftp = sshcon.open_sftp()
except:
    print('\n #### No se puede abrir la pagina, comprueba tu conexion a internet #### \n')
    
### Conection to the CRM server
#usuario = 'km'
#pwd = 'H3pEq2n5YRqp5uLn#.'
#enlace = '172.99.119.44'

#try:
#    client = paramiko.SSHClient()
#    client.load_system_host_keys()
#    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#    trans = paramiko.Transport((enlace, 22))     
#    trans.connect(username = usuario, password = pwd)
    
#    sftp = paramiko.SFTPClient.from_transport(trans)
#except:
#    print('\n #### No se puede abrir la pagina, comprueba tu conexion a internet #### \n')

## Extract general data

#### Setting the directory where the data are located and to open the Excel workbook

In [None]:
### Option from local directory
file_xlsx = get_files('/home/luis/Documents/ETL/jupyter', '*.xlsx')
#file_xlsx 
## Opening the Excel Workbook 
workbook = openpyxl.load_workbook(filename = file_xlsx[0], read_only=True)
#print(workbook.sheetnames, '\n')

In [None]:
### Option from google cloud storage
# create storage client
storage_client = storage.Client.from_service_account_json('/home/luis/Documents/ETL/jupyter/key/etl-process-5ef778cd27fb.json')
bucket = storage_client.get_bucket('etl_bem_bitacoras_data')
blob = bucket.blob('EXPORTAR.xlsx')
downloaded_blob = blob.download_as_string()
workbook = openpyxl.load_workbook(filename = BytesIO(downloaded_blob), read_only=True)
#print(workbook.sheetnames, '\n')

# 1. Bitacoras

## Extract bitacoras

### BEM

In [None]:
bitacoras_csv = get_data_frame('01_caracteristicas Bitácora', workbook)
#print(bitacoras_csv.head())
#print(data_bitacoras_raw.columns)
bitacoras_csv.shape

#### Subset of dataframe rows

In [None]:
## Get the id to make the subset from the project bitacoras
#bitacoras_subset = pd.read_csv("bitacoras_subset.csv")
#bitacoras_subset

## Transform the dataframe into a list
#bitacoras_subset = bitacoras_subset['Id_bitacora'].tolist()

## Get the rows to use
#bitacoras_row = bitacoras_csv[bitacoras_csv['ID de la bitácora (clave primaria)'].isin(bitacoras_subset)]

#bitacoras_row.tail()

#### Subset of dataframe columns

In [None]:
## List of the columns to use
bitacoras_columns = ['ID de la bitácora (clave primaria)',
       'Tipo de bitácora (para módulo, áreas de extensión o áreas de impacto)',
       'Año', 'Ciclo agronómico', 'Tipo de producción','ID de la parcela (clave foránea)',
       'ID del Productor (clave foránea)', 'Nombre de la institución']

bitacoras_raw = bitacoras_csv[bitacoras_columns]
bitacoras_raw.shape

### AWS

In [None]:
remotefile = '/var/www/html/reportes/Resumen_bitacorasETL.txt'

# Get the file using the conection 
bitacorasCRM_f_in = sftp.file(remotefile, "r")
bitacorasCRM_c_in = bitacorasCRM_f_in.read()
bitacorasCRM_f_in.close()

#From bites to dataframe
bitacorasCRM_str = str(bitacorasCRM_c_in,'utf-8')
bitacorasCRM_data = StringIO(bitacorasCRM_str) 
bitacorasCRM_df_na = pd.read_csv(bitacorasCRM_data , sep = "\t")
#productores_df_complete.to_csv("productores_df_complete.csv", encoding = 'windows-1252', index = False)


bitacorasCRM_df_na.shape

In [None]:
# only for local connection 
bitacorasCRM_df_na = pd.read_csv("/home/luis/Documents/ETL/jupyter/AWS_files/Resumen_bitacorasETL.txt", sep = "\t")

In [None]:
bitacorasCRM_df_na.columns

In [None]:
bitacorasCRM_columns = ['ID de bitacora', 'Tipo de bitacora',
       'Anio Bitacora', 'Ciclo', 'Regimen hidrico', 'ID parcela',
       'ID del productor', 'Proyecto']

bitacorasCRM_df_raw = bitacorasCRM_df_na[bitacorasCRM_columns]
# Delete NA
bitacorasCRM_raw = bitacorasCRM_df_raw.dropna(subset=['ID de bitacora'])

## Transform bitacoras

### BEM

In [None]:
## Deleting NAs
bitacoras_duplicate = bitacoras_raw.dropna(subset=['ID de la bitácora (clave primaria)'])

## Dropping duplicates
bitacoras_tipo = bitacoras_duplicate.drop_duplicates(subset = 'ID de la bitácora (clave primaria)') 

#bitacoras.shape
#bitacoras.groupby('Tipo de producción').count()[['ID de la bitácora (clave primaria)']]

## Standarize type of production by water availability
bitacoras_tipo.loc[bitacoras_tipo['Tipo de producción'] == 'Punta de riego', 'Tipo de producción'] = 'Riego'
#bitacoras.groupby('Tipo de producción').count()[['ID de la bitácora (clave primaria)']]

## Changing NaN values to None
bitacoras = bitacoras_tipo.where(pd.notnull(bitacoras_tipo), None)
bitacoras.shape

In [None]:
bitacoras

### AWS

In [None]:
bitacorasCRM_raw['Ciclo'].value_counts()

## Load bitacoras

#### Queries

In [None]:
bitacoras_table_create = ("""
    CREATE TABLE IF NOT EXISTS bitacoras(
        bitacora_id int PRIMARY KEY,
        tipo_bitacora varchar NOT NULL, 
        ao int NOT NULL,
        ciclo_agronomico varchar NOT NULL,
        tipo_produccion varchar NOT NULL,
        parcela_id int NOT NULL,
        productor_id int,
        institucion_nombre varchar NOT NULL);
""")

# Insert queries
bitacoras_table_insert = ("""
    INSERT INTO bitacoras (bitacora_id, tipo_bitacora, ao, ciclo_agronomico, tipo_produccion,  parcela_id, 
    productor_id, institucion_nombre)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    ON CONFLICT (bitacora_id) DO NOTHING;
""")

#### Execute create tables queries

In [None]:
# Create tables
create_table_queries = [bitacoras_table_create]
create_tables(cur, conn)

#### Insert values

In [None]:
for i, row in bitacoras.iterrows():
    #print(list(row))
    cur.execute(bitacoras_table_insert, list(row))
    conn.commit()

# 2. Parcelas

## Extract parcelas

In [None]:
parcelas_csv = get_data_frame('04_parcelas', workbook)
#print(parcelas_csv.tail())
#print(parcelas_csv.columns)
parcelas_csv.shape

#### Dropping rows by id

In [None]:
## Get the id to make the subset
file_parcelas_drop = pd.read_csv("infrastructure_parcelas_drop.csv")
#parcelas_drop

## Transform the dataframe into a list
parcelas_drop = file_parcelas_drop['parcela_id'].tolist()

## Drop rows from the list
parcelas_row = parcelas_csv[parcelas_csv['ID de la parcela (clave primaria)'].isin(parcelas_drop)== False]
parcelas_row.reset_index(inplace = True)
#parcelas_row.columns
parcelas_row.shape

#### Subset of rows that are in the bitacoras dataframe

In [None]:
## Drop parcelas are not in bitacoras
bitacoras_par = bitacoras['ID de la parcela (clave foránea)'].tolist()
#bitacoras_row = bitacoras_csv[bitacoras_csv['ID de la bitácora (clave primaria)'].isin(bitacoras_subset)]

parcelas_bit = parcelas_row[parcelas_row['ID de la parcela (clave primaria)'].isin(bitacoras_par)]
parcelas_bit.shape

#### Subset of dataframe columns

In [None]:
## List of the columns to use
parcelas_columns = ['ID de la parcela (clave primaria)','Superficie (ha)', 
                    'Tipo de parcela (módulo, área de extensión o área de impacto)', 'Estado', 
                    'Municipio', 'Localidad',  'Nombre del Hub', 'Latitud N', 'Longitud W']

parcelas_columns = parcelas_bit[parcelas_columns]
parcelas_columns.shape

## Transform parcelas

#### Drop NA and duplicated rows

In [None]:
## Deleting NAs
parcelas_na = parcelas_columns.dropna(subset=['ID de la parcela (clave primaria)'])

## Dropping duplicates
parcelas = parcelas_na.drop_duplicates(subset = 'ID de la parcela (clave primaria)') 

#### Replace incorrect coordinates 

In [None]:
## Get ccorrect coordinates
coordenadas_corection = pd.read_csv('coordenadas_correction.csv')
coordenadas_corection

## Transform dataframe latitude column into dictionary
latitud = dict()
for n in range(0, coordenadas_corection.shape[0]): latitud[coordenadas_corection.iloc[n,0]] = coordenadas_corection.iloc[n,1]
#df['first_name'] = df['ID'].apply(lambda x: first_name.get(x, df.loc[df['ID'] == x, 'first_name'].values[0]))
parcelas['Latitud N'] = parcelas['ID de la parcela (clave primaria)'].apply(lambda x: latitud.get(x, parcelas.loc[parcelas['ID de la parcela (clave primaria)'] == x, 'Latitud N'].values[0]))


## Transform dataframe longitude column into dictionary
longitud = dict()
for n in range(0, coordenadas_corection.shape[0]): longitud[coordenadas_corection.iloc[n,0]] = coordenadas_corection.iloc[n,2]
parcelas['Longitud W'] = parcelas['ID de la parcela (clave primaria)'].apply(lambda x: longitud.get(x, parcelas.loc[parcelas['ID de la parcela (clave primaria)'] == x, 'Longitud W'].values[0]))
    

## Load parcelas

#### Queries

In [None]:
parcelas_table_create = ("""
    CREATE TABLE IF NOT EXISTS parcelas(
        parcela_id int PRIMARY KEY,
        superficie_ha real NOT NULL,
        tipo_parcela varchar NOT NULL, 
        estado varchar NOT NULL,
        municipio varchar NOT NULL,
        localidad varchar NOT NULL,
        hub varchar NOT NULL,
        latitud_n real NOT NULL,
        longitud_w real NOT NULL);
""")

In [None]:
# Queries insert 
parcelas_table_insert = ("""
    INSERT INTO parcelas (parcela_id, superficie_ha, tipo_parcela , estado, municipio, localidad, 
    hub, latitud_n, longitud_w)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
    ON CONFLICT (parcela_id) DO NOTHING;
""")

In [None]:
# Create tables
create_table_queries = [parcelas_table_create]
create_tables(cur, conn)

In [None]:
for i, row in parcelas.iterrows():
    #print(list(row))
    cur.execute(parcelas_table_insert, list(row))
    conn.commit()

In [None]:
# Drop tables
drop_table_names = ['bitacoras']
drop_tables(cur, conn)

# 3. Superficie

## Extract siembra-resiembra

In [None]:
siembra_superficie_csv = get_data_frame('12_siembra Resiembra_general', workbook)
#print(siembra_resiembra_csv.tail())
#print(siembra_resiembra_csv.columns)

#### Dropping rows by filter

In [None]:
# Filter siembra
siembra_superficie_drop = siembra_superficie_csv[(siembra_superficie_csv['Nombre de la sección'] == 'C. SIEMBRA') & (siembra_superficie_csv['Tipo de parcela (testigo o innovación)'] == 'Parcela innovación')]
siembra_superficie_drop.shape

In [None]:
## Drop siembra rows that are not in bitacoras
bitacoras_siem = bitacoras['ID de la bitácora (clave primaria)'].tolist()
#bitacoras_row = bitacoras_csv[bitacoras_csv['ID de la bitácora (clave primaria)'].isin(bitacoras_subset)]

siembra_superficie_bit = siembra_superficie_drop[siembra_superficie_drop['ID de la bitácora (clave foránea)'].isin(bitacoras_siem)]
siembra_superficie_bit.shape

In [None]:
## List of the columns to use
#siembra_superficie_columns = ['ID de la bitácora (clave foránea)',
#       'ID de tipo de bitácora (clave foránea)',
#       'Tipo de parcela (testigo o innovación)',
#       'Nombre de la sección', 'Superficie sembrada ($/ha)']

siembra_superficie_columns = ['ID de la bitácora (clave foránea)', 'Superficie sembrada ($/ha)']

siembra_superficie = siembra_superficie_bit[siembra_superficie_columns]
siembra_superficie.shape

## Transform siembra_resiembra

In [None]:
## Deleting NAs
#siembra_superficie_na = siembra_superficie_columns.dropna(subset=['ID de tipo de bitácora (clave foránea)'])
#siembra_na.shape
#siembra_superficie_na.tail()

## Dropping duplicates
#siembra_superficie = siembra_superficie_na.drop_duplicates(subset = 'ID de tipo de bitácora (clave foránea)') 
#siembra_superficie.columns

## Getting superficie by bitacora

#### Getting the superficie data from parcelas dataframe

In [None]:
bitacoras_superficie = bitacoras[['ID de la bitácora (clave primaria)', 'ID de la parcela (clave foránea)']]
parcelas_superficie = parcelas[['ID de la parcela (clave primaria)', 'Superficie (ha)']]

superficie_bit_par = pd.merge(left=bitacoras_superficie, right= parcelas_superficie, how='inner', left_on =  'ID de la parcela (clave foránea)', right_on = 'ID de la parcela (clave primaria)')
superficie_bit_par.shape

#### Getting the superficie data from siembra when the plot is a modulo

In [None]:
## Transform dataframe superficie column into dictionary
dict_siembra_sup = dict()
for n in range(0, siembra_superficie.shape[0]): dict_siembra_sup[siembra_superficie.iloc[n,0]] = siembra_superficie.iloc[n,1]
    
#df['first_name'] = df['ID'].apply(lambda x: first_name.get(x, df.loc[df['ID'] == x, 'first_name'].values[0]))
superficie_bit_par['Superficie (ha)'] = superficie_bit_par['ID de la bitácora (clave primaria)'].apply(lambda x: dict_siembra_sup.get(x, superficie_bit_par.loc[superficie_bit_par['ID de la bitácora (clave primaria)'] == x, 'Superficie (ha)'].values[0]))
superficie_bit_par[superficie_bit_par['ID de la bitácora (clave primaria)'] == dict_siembra_sup]

superficie_bitacora_parcela = superficie_bit_par[['ID de la bitácora (clave primaria)', 'Superficie (ha)']]
superficie_bitacora_parcela.shape

In [None]:
pd.set_option('display.max_rows', None)
superficie_bitacora_parcela

## Load siembra

#### Queries

In [None]:
superficie_bit_table_create = ("""
    CREATE TABLE IF NOT EXISTS superficie_bit(
        bitacora_id int PRIMARY KEY,
        superficie_sembrada_ha real NOT NULL);
""")

In [None]:
# Queries insert
superficie_bit_table_insert = ("""
    INSERT INTO superficie_bit(bitacora_id, superficie_sembrada_ha)
    VALUES (%s, %s)
    ON CONFLICT (bitacora_id) DO NOTHING;
""")

In [None]:
# Create tables
create_table_queries = [superficie_bit_table_create]
create_tables(cur, conn)

In [None]:
for i, row in superficie_bitacora_parcela.iterrows():
    print(list(row))
    cur.execute(superficie_bit_table_insert, list(row))
    conn.commit()

# 4. Queries

In [None]:
cur.execute("SELECT to_json(infraestructure_q) FROM (SELECT bitacoras.bitacora_id, bitacoras.ao, bitacoras.ciclo_agronomico, \
    bitacoras.tipo_produccion, bitacoras.parcela_id, bitacoras.institucion_nombre, parcelas.parcela_id,\
    parcelas.tipo_parcela,  parcelas.estado,  parcelas.municipio,  parcelas.localidad, parcelas.latitud_n,\
    parcelas.longitud_w, superficie_bit.bitacora_id, superficie_bit.superficie_sembrada_ha \
    FROM bitacoras INNER JOIN parcelas ON bitacoras.parcela_id = parcelas.parcela_id \
    INNER JOIN  superficie_bit ON bitacoras.bitacora_id = superficie_bit.bitacora_id) infraestructure_q LIMIT 5")
# bitacora_id, tipo_bitacora, ao, ciclo_agronomico, tipo_produccion,  parcela_id, productor_id, institucion_nombre

row = cur.fetchall()
    #while row:
#return jsonify(row)
print(row)

In [None]:
cur.execute("SELECT bitacoras.bitacora_id, bitacoras.ao, bitacoras.ciclo_agronomico, \
    bitacoras.tipo_produccion, bitacoras.parcela_id, bitacoras.institucion_nombre, parcelas.parcela_id,\
    parcelas.tipo_parcela,  parcelas.estado,  parcelas.municipio,  parcelas.localidad, parcelas.latitud_n,\
    parcelas.longitud_w, superficie_bit.bitacora_id, superficie_bit.superficie_sembrada_ha \
    FROM bitacoras INNER JOIN parcelas ON bitacoras.parcela_id = parcelas.parcela_id \
    INNER JOIN  superficie_bit ON bitacoras.bitacora_id = superficie_bit.bitacora_id LIMIT 5")
row = cur.fetchall()
    #while row:
#return jsonify(row)
print(row)

### Drop tables

In [None]:
# Drop tables
drop_table_names = ['test_table_2']
drop_tables(cur, conn)

#### Insert values

In [None]:
# Insert queries
bitacoras_table_insert = ("""
    INSERT INTO bitacora (bitacora_id, tipo_bitacora, institucion)
    VALUES (%s, %s, %s)
    ON CONFLICT (bitacora_id) DO NOTHING;
""")

In [None]:
for i, row in df_bitacoras.iterrows():
    print(list(row))
    cur.execute(bitacoras_table_insert, list(row))
    conn.commit()

## test

In [None]:
# Create test table
test_table_create = ("""
    CREATE TABLE IF NOT EXISTS test_table(
        test_id int PRIMARY KEY,
        num real NOT NULL,
        text varchar NOT NULL);
""")

In [None]:
# Create tables
create_table_queries = [test_table_create]
create_tables(cur, conn)

In [None]:
# Create test table
test_table_2_create = ("""
    CREATE TABLE IF NOT EXISTS test_table_2(
        test_id int PRIMARY KEY,
        num real NOT NULL);
""")

In [None]:
# Create tables
create_table_queries = [test_table_2_create]
create_tables(cur, conn)

In [None]:
# insert data
test_table_insert = ("""
    INSERT INTO test_table(test_id, num, text)
    VALUES (%s, %s, %s)
    ON CONFLICT (test_id) DO UPDATE SET (num, text)
            = (EXCLUDED.num, EXCLUDED.text);
""")

In [None]:
row = [3,3,'three']
cur.execute(test_table_insert, list(row))
conn.commit()

In [None]:
# insert data
test_table_2_insert = ("""
    INSERT INTO test_table_2(test_id, num)
    VALUES (%s, %s)
    ON CONFLICT (test_id) DO NOTHING;
""")

In [None]:
row = [1,1]
cur.execute(test_table_2_insert, list(row))
conn.commit()

In [None]:
bitacoras_par = cur.execute('''SELECT parcela_id FROM bitacoras;''')
conn.commit()

In [None]:
bitacoras_par

### Close database connection

In [None]:
# Close the database conection
cur.close()
conn.close()

In [None]:
import psycopg2
conn = psycopg2.connect(user=DATABASE_USER, password=PASSWORD,
                        host='localhost', port='5432')