diff --git a/parsers/financial.go b/parsers/financial.go index 67370b9..cbed8f5 100644 --- a/parsers/financial.go +++ b/parsers/financial.go @@ -108,10 +108,11 @@ func populateTable(db *sql.DB, dataType, file string) (err error) { return r == ';' } header := make(map[string]int) // stores the header item position (e.g., DT_FIM_EXERC:9) + deleteTable := make(map[string]bool) scanner := bufio.NewScanner(dec) count := 0 insert := "" - var stmt *sql.Stmt + var stmt, delStmt *sql.Stmt // Loop thru file, line by line for scanner.Scan() { @@ -136,11 +137,33 @@ func populateTable(db *sql.DB, dataType, file string) (err error) { } defer stmt.Close() + // Prepare delete statement (to avoid duplicated data in case of updated data from CVM) + _, ok1 := header["CNPJ_CIA"] + _, ok2 := header["DT_REFER"] + if ok1 && ok2 { + delete := fmt.Sprintf(`DELETE FROM %s WHERE CNPJ_CIA = ? AND strftime('%%Y', DT_REFER, 'unixepoch') = ?`, table) + delStmt, err = tx.Prepare(delete) + if err != nil { + err = errors.Wrapf(err, "erro ao preparar delete") + return + } + defer delStmt.Close() + } + } else { // VALUES if len(header) != len(fields) { fmt.Fprintf(os.Stderr, "[x] Linha com %d campos ao invés de %d\n", len(fields), len(header)) } else { + // DELETE + cnpj := fields[header["CNPJ_CIA"]] + year := fields[header["DT_REFER"]][:4] + if _, ok := deleteTable[cnpj+year]; !ok { + deleteTable[cnpj+year] = true + delStmt.Exec(cnpj, year) + } + + // INSERT hash := GetHash(line) code := GetHash(fields[header["CD_CONTA"]] + fields[header["DS_CONTA"]]) f, err := prepareFields(header, hash, code, fields) @@ -148,8 +171,6 @@ func populateTable(db *sql.DB, dataType, file string) (err error) { if err != nil { log.Fatal(err) } - // if err = insertLine(tx, dataType, &header, fields); err != nil { - // fmt.Printf("[x] %s: %v\n", dataType, err) } }