### Bibliotecas:

In [1]:
import ftplib
import pandas as pd
import traceback
import logging
import psycopg2
from sqlalchemy import exc, create_engine

### Cargar los datos en un dataframe:

In [None]:
# Datos del usuario del servidor:
HOST_NAME = "172.17.0.1"
USER_NAME = "xal"
PASSWORD = "xal"

ftp_server = ftplib.FTP(HOST_NAME, USER_NAME, PASSWORD)
filename = "sample.csv"

# Escribir en modo binario:
try:
    with open(filename, "wb") as file:
        # Intentar descargar del servidor:
        ftp_server.retrbinary(f"RETR {filename}", file.write)
    ftp_server.quit()
except Exception as e:
    # Imprimir en caso de error:
    print("Error: ", str(e))
    logging.error(traceback.format_exc())

In [2]:
df = pd.read_csv("sample.csv")
df.head()

Unnamed: 0,first_name,last_name,company_name,address,city,state,zip,phone1,phone2,email,department
0,James,Butt,"Benton, John B Jr",6649 N Blue Gum St,New Orleans,LA,70116,504-621-8927,504-845-1427,jbutt@gmail.com,Sales
1,James,Butt,"Benton, John B Jr",6649 N Blue Gum St,New Orleans,LA,70116,504-621-8927,504-845-1427,jbutt@gmail.com,Marketing
2,Josephine,Darakjy,"Chanay, Jeffrey A Esq",4 B Blue Ridge Blvd,Brighton,MI,48116,810-292-9388,810-374-9840,josephine_darakjy@darakjy.org,Human Resources
3,Art,Venere,"Chemel, James L Cpa",8 W Cerritos Ave #54,Bridgeport,NJ,8014,856-636-8749,856-264-4130,art@venere.org,Purchasing
4,Lenna,Paprocki,Feltz Printing Service,639 Main St,Anchorage,AK,99501,907-385-4412,907-921-2010,lpaprocki@hotmail.com,Marketing


### ¿Tiene la estructura correcta?

In [3]:
schema_df = pd.read_csv("schema.csv", sep="|")
schema_df

Unnamed: 0,column_name,type,nullable
0,first_name,string,False
1,last_name,string,False
2,company_name,string,False
3,address,string,False
4,city,string,False
5,state,string,False
6,zip,int,False
7,phone1,string,False
8,phone2,string,True
9,email,string,False


In [4]:
# ¿Tiene el mismo número de campos?
data_len = len(df.columns)
schema_len = len(schema_df)
bandera_validacion = 0

if data_len > schema_len:
    print("El set de datos tiene más columnas que la estructura original.")
    bandera_validacion = 1
    
elif data_len < schema_len:
    print("El set de datos tiene menos columnas que la estructura original.")
    bandera_validacion = 2
else: 
    print("El set de datos coincide con la estructura")
    bandera_validacion = 3
    

El set de datos coincide con la estructura


In [5]:
# ¿El header es correcto?
columns = df.columns
schema_columns = schema_df.column_name

for index in range(schema_len):
    if columns[index] != schema_columns[index]:
        raise ValueError(f'Se esperaba nombre de columna {schema_columns[index]}.')

In [6]:
# En este momento solo se validan cadenas y enteros.
if bandera_validacion == 3:
    for row in schema_df.itertuples():
        column_name = row[1]
        column_type = row[2]
        print(column_name, column_type)
        
        if column_type == 'string' and df[column_name].dtypes == 'object':
            print("\tTipo string correcto")
        elif column_type == 'int' and df[column_name].dtypes == 'int64':
            print("\tTipo entero correcto")
        else:
            print(f'\t{column_type} {df[column_name].dtypes}')
            raise ValueError('Error de estructura.')
            # Implementar log de malformed.

first_name string
	Tipo string correcto
last_name string
	Tipo string correcto
company_name string
	Tipo string correcto
address string
	Tipo string correcto
city string
	Tipo string correcto
state string
	Tipo string correcto
zip int
	Tipo entero correcto
phone1 string
	Tipo string correcto
phone2 string
	Tipo string correcto
email string
	Tipo string correcto
department string
	Tipo string correcto


### Crear la base de datos: 

In [None]:
try:
    # Connect to an existing database
    connection = psycopg2.connect(user='xaluser',
                                  password='123',
                                  host='192.168.100.204',
                                  port='5432',
                                  database='xaldatabase')

    cursor = connection.cursor()
    # SQL query:
    create_table_query = '''CREATE TABLE company
          (ID INT PRIMARY KEY     NOT NULL,
          MODEL           TEXT    NOT NULL,
          PRICE         REAL); '''
    # Execute a command: this creates a new table
    cursor.execute(create_table_query)
    connection.commit()
    print("Table created successfully in PostgreSQL ")

except (Exception, Error) as error:
    print("Error while connecting to PostgreSQL", error)
finally:
    if connection:
        cursor.close()
        connection.close()
        print("PostgreSQL connection is closed")

### Insertar en la base de datos:

In [7]:
insert_sql = """INSERT INTO ${TABLE_NAME}
                VALUES(${VALUES_STRING});"""

In [8]:
company_elements = ['company_name','address','city','state','zip']
department_elements = ['department', 'company_name', 'address']
employee_elements = ['first_name', 'last_name', 'phone1', 'phone2', 'email']
department_employee = ['department', 'company_name', 'address', 'email']

In [12]:
engine = create_engine('postgresql+psycopg2://xaluser:123@192.168.0.22:5432/xaldatabase')
connection = engine.connect()
# Insert query base para ejecutar sustituyendo el nombre de la tabla y los valores:
insert_sql = """INSERT INTO ${TABLE_NAME} VALUES(${VALUES_STRING});"""
#Para cada una de las rows en el Dataframe:
for row in df[company_elements].iterrows():
    try:
        # Row es una tupla que contiene el index y la Serie con la información del row.
        # .values obtiene los valores de una Serie, que resulta un npArray.
        # list convierte el npArray a una lista común.
        # la función lambda obtiene los valores en cadenas de texto en formato
        #            '{valor}´,
        # y los concatena.
        # El [:-1] quita la última coma de la cadena.
        values = ''.join(f"'{str(x)}'," for x in list(row[1].values))[:-1]
        actual_insert_sql = insert_sql.replace('${TABLE_NAME}', 'company').replace('${VALUES_STRING}', values)
        print(actual_insert_sql)
        connection.execute(actual_insert_sql)
    except exc.IntegrityError as e:
        print(e,'\n\n')

INSERT INTO company VALUES('Benton, John B Jr','6649 N Blue Gum St','New Orleans','LA','70116');
(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "company_pkey"
DETAIL:  Key (company_name, address)=(Benton, John B Jr, 6649 N Blue Gum St) already exists.

[SQL: INSERT INTO company VALUES('Benton, John B Jr','6649 N Blue Gum St','New Orleans','LA','70116');]
(Background on this error at: https://sqlalche.me/e/14/gkpj) 


INSERT INTO company VALUES('Benton, John B Jr','6649 N Blue Gum St','New Orleans','LA','70116');
(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "company_pkey"
DETAIL:  Key (company_name, address)=(Benton, John B Jr, 6649 N Blue Gum St) already exists.

[SQL: INSERT INTO company VALUES('Benton, John B Jr','6649 N Blue Gum St','New Orleans','LA','70116');]
(Background on this error at: https://sqlalche.me/e/14/gkpj) 


INSERT INTO company VALUES('Chanay, Jeffrey A Esq','4 B Blue Ridge Blvd','Brighton','MI','

In [None]:
for row in df[department_elements].iterrows():
    try:
        values = ''.join(f"'{str(x)}'," for x in list(row[1].values))[:-1]
        actual_insert_sql = insert_sql.replace('${TABLE_NAME}', 'department').replace('${VALUES_STRING}', values)
        print(actual_insert_sql)
        connection.execute(actual_insert_sql)
    except exc.IntegrityError as e:
        print(e,'\n\n')

In [None]:
for row in df[employee_elements].iterrows():
    try:
        values = ''.join(f"'{str(x)}'," for x in list(row[1].values))[:-1]
        actual_insert_sql = insert_sql.replace('${TABLE_NAME}', 'employee').replace('${VALUES_STRING}', values)
        print(actual_insert_sql)
        connection.execute(actual_insert_sql)
    except exc.IntegrityError as e:
        print(e,'\n\n')

In [None]:
for row in df[department_employee].iterrows():
    try:
        values = ''.join(f"'{str(x)}'," for x in list(row[1].values))[:-1]
        actual_insert_sql = insert_sql.replace('${TABLE_NAME}', 'department_employee').replace('${VALUES_STRING}', values)
        print(actual_insert_sql)
        connection.execute(actual_insert_sql)
    except exc.IntegrityError as e:
        print(e,'\n\n')