In [None]:
import psycopg2
import os
import pandas as pd
from simpledbf import Dbf5
import re
import multiprocessing
from threading import Thread
import requests

In [None]:
class DBConnection(object):
    _db=None
    def __init__(self, host, db, usr, pwd):
        self._db = psycopg2.connect(host=host,
                        user=usr,
                        password=pwd,
                        database=db)

    def close_connection(self):
        self._db.close()

    def dml(self, sql):
        try:
            cur = self._db.cursor()
            cur.execute(sql)
            cur.close()
            self._db.commit()
        except Exception as e:
            print(f"SQLException dml: {e}")
            return False

    def dml_many(self, sql, tpls):
        try:
            cur = self._db.cursor()
            cur.executemany(sql, tpls)
            cur.close()
            self._db.commit()
        except Exception as e:
            print(f"SQLException dml: {e}")
            return False

    def query(self, sql):
        rs = None
        try:
            cur = self._db.cursor()
            cur.execute(sql)
            rs = cur.fetchall()
        except Exception as e:
            print(f"SQLException query: {e}")
            return False
        return rs

In [None]:
db_conn = DBConnection(host='localhost', db='cdcs_challenge', usr='postgres', pwd='sergtsop')

In [None]:
# Inserindo os dados iniciais (registros das unidades federativas)

ufs = db_conn.query("select * from unidade_federativa;")

if not len(ufs):
    df = pd.read_csv('UF.csv')
    if len(df):
        for i, j in df.iterrows():
            try:
                sql = f"INSERT INTO unidade_federativa values ({j['UF_COD']}, '{j['UF_NOME']}', '{j['UF_SIGLA']}', '{j['UF_REGIAO']}');"
                db_conn.dml(sql)
            except Exception as e:
                print(f"SQLException: {e}")

In [None]:
ufs_query = pd.read_sql_query("select * from unidade_federativa", db_conn._db)
ufs = pd.DataFrame(ufs_query)
ufs

In [None]:
def get_uf_code(fname):
    fsplit = fname.split('.dbf')[0]
    ds, state, year, month = re.findall('..', fsplit)
    # sigla = ufs[ufs.uf_sigla == initials]
    sigla = ufs.query(f'uf_sigla == "{state}"')
    return sigla['uf_cod']

In [None]:
f_path = '../A1/dbc_files'
def dbf_file_2_dataframe(filename):
    dbf = Dbf5(f'{f_path}/{filename}', codec='latin')
    df = dbf.to_dataframe()
    return df

In [None]:
def persist_data_in_db(filename):
    print(f"Reading {filename} data (dbf to dataframe)...")
    df = dbf_file_2_dataframe(f"{filename}")
    initials = get_uf_code(filename).iloc[0]
    df['UF_ID'] = initials
    tpls = [tuple(x) for x in df.to_numpy()]
    cols = ','.join(list(df.columns))
    values_refs = ','.join(['%s' for x in range(len(df.columns))])
    print(f"making insert into script from {filename}...")
    sql = f"INSERT INTO apac_medicamentos ({cols}) VALUES ({values_refs})"
    print(f"Inserting data from {filename}...")
    db_conn.dml_many(sql, tpls)

In [None]:
f_names = [file for file in os.listdir(f_path) if file.endswith('.dbf')]


In [None]:
def print_fnames(fname):
    print(f"File {fname}")

In [None]:
# def run_paralell():
#     pool = multiprocessing.Pool(processes=len(f_names))
#     pool.map(persist_data_in_db, f_names)

In [None]:
# run_paralell()

In [None]:
for fname in f_names:
    Thread(target=persist_data_in_db(fname)).start()

In [None]:
db_conn.close_connection()