In [2]:
import os
import dotenv
dotenv.load_dotenv()
import pandas as pd
from config import DATA_FILES, DATABASE_HOST, DATABASE_NAME, DATABASE_PASS, DATABASE_USER
import psycopg2
import io
import traceback
from sqlalchemy import create_engine

In [2]:
# DATABASE_HOST = os.environ['']
DATABASE_NAME = os.environ.get('TF_VAR_DATABASE')
DATABASE_USER = os.environ['TF_VAR_DATABASE_USER']
DATABASE_PASS = os.environ['TF_VAR_DATABASE_PASSWORD']
DATABASE_HOST = 'datachallenge.cwl4757u5g17.us-east-1.rds.amazonaws.com'

In [3]:
engine = create_engine(f'postgresql+psycopg2://{DATABASE_USER}:{DATABASE_PASS}@{DATABASE_HOST}/{DATABASE_NAME}')

In [4]:
def copy_expert(df, table,fields=''):
    """will store dataframe's data into a database table 
    
    Args:
        df (Pandas Dataframe): dataframe to persist
        table (string): destination table

    Returns:
        integer: 1 if a exception is raised
    """
    # save dataframe to an in memory buffer
    conn = psycopg2.connect(host=DATABASE_HOST, database=DATABASE_NAME, user=DATABASE_USER, password=DATABASE_PASS)
    df = df.reset_index(drop=True)
    cur = conn.cursor()
    buffer = io.StringIO()
    df.to_csv(buffer, index = False,header=False, sep='|')
    buffer.seek(0)
    cur.execute(f'SET search_path TO public')
    cur.execute(f'truncate table {table} cascade')
    try:
        copy_sql = f"""
                COPY {table} {fields} FROM stdin WITH CSV HEADER
                DELIMITER as '|'
                """
        cur.copy_expert(sql=copy_sql, file=io.StringIO(df.to_csv(index=False, header=True, sep='|')))
        conn.commit()
        cur.close()
    except Exception:
        conn.rollback()
        cur.close()
        print(traceback.format_exc())
        return 1

In [5]:
def load_to_logs(df_to_logs, table_name):
    df_to_logs['payload'] = df_to_logs.apply(lambda row: row.to_dict(), axis=1)
    df_to_logs['table_name'] = table_name
    df_to_logs = df_to_logs.loc[:,[ 'table_name', 'payload']]
    copy_expert(df_to_logs,'logs','(table_name, payload)')

In [6]:
def load_jobs_file():
    table_name = 'jobs'
    jobs_data = pd.read_excel(os.path.join(DATA_FILES, 'jobs.xlsx'))
    #Format Jobs
    job_columns = ['id', 'job']
    jobs_data.columns = job_columns
    jobs_data['id'] = jobs_data['id'].astype(int)
    jobs_data['job'] = jobs_data['job'].astype(str)
    ###Apply All fields required rule
    jobs_data_to_db = jobs_data.loc[(jobs_data['job'].notnull())&(jobs_data['id'].notnull()),:]
    jobs_data_to_logs = jobs_data.loc[(jobs_data['job'].isnull())|(jobs_data['id'].isnull()),:]
    copy_expert(df=jobs_data_to_db, table=table_name)
    load_to_logs(jobs_data_to_logs,table_name)

In [7]:
def load_departments_file():
    table_name = 'departments'
    departments_data = pd.read_excel(os.path.join(DATA_FILES, 'departments.xlsx'))
    #Format Departments
    departments_columns = ['id', 'department']
    departments_data.columns = departments_columns
    departments_data['id'] = departments_data['id'].astype(int)
    departments_data['department'] = departments_data['department'].astype(str)
    ###Apply All fields required rule
    departments_data_to_db = departments_data.loc[(departments_data['department'].notnull())&(departments_data['id'].notnull()),:]
    departments_data_to_logs = departments_data.loc[(departments_data['department'].isnull())|(departments_data['id'].isnull()),:]
    copy_expert(df=departments_data_to_db, table=table_name)
    load_to_logs(departments_data_to_logs,table_name)

In [8]:

def load_hired_employees():
    table_name = 'hired_employees'
    hired_employees_data = pd.read_excel(os.path.join(DATA_FILES, 'hired_employees.xlsx'))
    loaded_departments = pd.read_sql("select distinct id as dep_id_db from departments",con=engine)
    loaded_jobs = pd.read_sql("select distinct id as job_id_db from jobs",con=engine)

    #Format Hired_employees
    hired_employees_columns = ['id', 'name', 'datetime', 'department_id', 'job_id']
    hired_employees_data.columns = hired_employees_columns
    ##Replace broken relations
    hired_employees_data['id'] = hired_employees_data['id'].fillna(-99)
    hired_employees_data['department_id'] = hired_employees_data['department_id'].fillna(-99)
    hired_employees_data['job_id'] = hired_employees_data['job_id'].fillna(-99)
    hired_employees_data['id'] = hired_employees_data['id'].astype(int)
    hired_employees_data['name'] = hired_employees_data['name'].astype(str)
    hired_employees_data['datetime'] = hired_employees_data['datetime'].astype(str)
    hired_employees_data['department_id'] = hired_employees_data['department_id'].astype(int)
    hired_employees_data['job_id'] = hired_employees_data['job_id'].astype(int)

    hired_employees_data = pd.merge(hired_employees_data, loaded_jobs, how='left', left_on='job_id', right_on='job_id_db')
    hired_employees_data = pd.merge(hired_employees_data, loaded_departments, how='left', left_on='department_id', right_on='dep_id_db')
    hired_employees_data['dep_id_db'] = hired_employees_data['dep_id_db'].fillna(-99)
    hired_employees_data['job_id_db'] = hired_employees_data['job_id_db'].fillna(-99)
    ###Apply All fields required rule
    hired_employees_data_to_logs = hired_employees_data.loc[(hired_employees_data['id']==-99)|(hired_employees_data['name']=='')|(hired_employees_data['datetime']=='')|(hired_employees_data['job_id']==-99)|(hired_employees_data['department_id']==-99)|(hired_employees_data['job_id_db']==-99)|(hired_employees_data['dep_id_db']==-99), :]
    hired_employees_data_to_db = hired_employees_data.loc[(hired_employees_data['id']!=-99)&(hired_employees_data['name']!='')&(hired_employees_data['datetime']!='')&(hired_employees_data['job_id']!=-99)&(hired_employees_data['department_id']!=-99)&(hired_employees_data['job_id_db']!=-99)&(hired_employees_data['dep_id_db']!=-99), :]
    del hired_employees_data_to_db['dep_id_db']
    del hired_employees_data_to_db['job_id_db']
    copy_expert(df=hired_employees_data_to_db, table=table_name)
    load_to_logs(hired_employees_data_to_logs,table_name)
    
    

In [9]:
def load_all_tables_data():
    load_jobs_file()
    load_departments_file()
    load_hired_employees()

In [10]:
load_all_tables_data()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_to_logs['payload'] = df_to_logs.apply(lambda row: row.to_dict(), axis=1)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_to_logs['table_name'] = table_name


In [15]:
import boto3
def download_s3_files():
    """Downloads jobs, departments, hired_employees files from s3
    """
    os.environ['AWS_DEFAULT_REGION']='us-east-1'
    s3_client = boto3.client('s3')
    s3_client.download_file('data-challenge-bucket-oscar','data/departments.xlsx',os.path.join(DATA_FILES,'jobs.xlsx'))
    s3_client.download_file('data-challenge-bucket-oscar','data/departments.xlsx',os.path.join(DATA_FILES,'departments.xlsx'))
    s3_client.download_file('data-challenge-bucket-oscar','data/hired_employees.xlsx',os.path.join(DATA_FILES,'hired_employees.xlsx'))

In [3]:
import psycopg2
import fastavro
from fastavro.schema import load_schema

# Conectar a la base de datos PostgreSQL
conn = psycopg2.connect(
    dbname=DATABASE_NAME,
    user=DATABASE_USER,
    password=DATABASE_PASS,
    host=DATABASE_HOST,
    port='5432'
)

# Crear un cursor para ejecutar consultas
cur = conn.cursor()

# Ejecutar una consulta para obtener los datos de la tabla `users`
cur.execute("select * from jobs")
records = cur.fetchall()

# Cerrar la conexión
cur.close()
conn.close()

In [None]:
schema = {
    "type": "record",
    "name": "jobs",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "job", "type": "string"},
    ]
}

In [6]:
avro_records = [{"id": r[0], "job": r[1]} for r in records]

In [7]:
output_file = 'users.avro'

In [8]:
with open(output_file, 'wb') as out:
    fastavro.writer(out, schema, avro_records)

In [9]:
with open(output_file, 'rb') as file:
    reader = fastavro.reader(file)
    records = [record for record in reader]

In [10]:
df = pd.DataFrame.from_records(records)

In [2]:
from config import DATA_FILES, DATABASE_HOST, DATABASE_NAME, DATABASE_PASS, DATABASE_USER, BUCKET
import boto3
import psycopg2
import fastavro

In [11]:
def create_table_backup(table_name):
    conn = psycopg2.connect(
    dbname=DATABASE_NAME,
    user=DATABASE_USER,
    password=DATABASE_PASS,
    host=DATABASE_HOST,
    port='5432'
    )

    cur = conn.cursor()
    cur.execute(f"select * from {table_name}")
    records = cur.fetchall()
    cur.close()
    conn.close()
    
    if table_name == 'jobs':
        schema = {
                "type": "record",
                "name": "jobs",
                "fields": [
                    {"name": "id", "type": "int"},
                    {"name": "job", "type": "string"},
                ]
            }
        avro_records = [{"id": r[0], "job": r[1]} for r in records]
    elif table_name == 'departments':
        schema = {
                "type": "record",
                "name": "departments",
                "fields": [
                    {"name": "id", "type": "int"},
                    {"name": "department", "type": "string"},
                ]
            }
        avro_records = [{"id": r[0], "department": r[1]} for r in records]
    elif table_name == 'hired_employees':
        schema = {
                "type": "record",
                "name": "jobs",
                "fields": [
                    {"name": "id", "type": "int"},
                    {"name": "name", "type": "string"},
                    {"name": "datetime", "type": "string"},
                    {"name": "department_id", "type": "int"},
                    {"name": "job_id", "type": "int"}
                ]
            }
        avro_records = [{"id": r[0], "name": r[1], "datetime": r[2], "department_id": r[3], "job_id": r[4]} for r in records]
    else:
        pass
    with open(table_name, 'wb') as out:
        fastavro.writer(out, schema, avro_records)
        
    s3Client = boto3.client('s3')    

    # upload_fileobj
    with open(table_name, 'rb') as fileObj:
        response = s3Client.upload_fileobj(fileObj, BUCKET, f'backups/{table_name.txt}')
        print(response)


Unnamed: 0,id,job
0,2,VP Sales
1,3,Biostatistician IV
2,4,Account Representative II
3,5,VP Marketing
4,6,Environmental Specialist
...,...,...
177,179,Software Engineer II
178,180,Statistician IV
179,181,Programmer Analyst I
180,182,Account Representative I
