Skip to content

Commit

Permalink
Add support pre-sorted CSV tables
Browse files Browse the repository at this point in the history
Signed-off-by: Byron Ruth <b@devel.io>
  • Loading branch information
bruth committed Oct 25, 2017
1 parent a9b06fc commit e8257ba
Show file tree
Hide file tree
Showing 3 changed files with 286 additions and 62 deletions.
80 changes: 45 additions & 35 deletions README.md
@@ -1,6 +1,6 @@
# diff-table

A tool to compare two tables of data. Currently the tool only supports executing queries against a Postgres database, but there are plans to add more SQL drivers and flat file support.
A tool to compare two tables of data. Currently the tool supported tables from Postgres and pre-sorted CSV files.

The primary use case is to compare the output of a query executed at different points in time. For example, in a batch ETL process that runs every night, you can compare the previous batch with the new batch.

Expand All @@ -14,40 +14,6 @@ go get -u github.com/chop-dbhi/diff-table/cmd/diff-table

## Usage

This is a minimum example which shows the required options.

```
diff-table \
-db postgres://localhost:5432/postgres \
-table1 data_v1 \
-table2 data_v2 \
-key id
```

Here are the full set of options.

```
Usage of diff-table:
-db string
Database 1 connection URL.
-db2 string
Database 2 connection URL. Defaults to db option.
-schema string
Name of the first schema.
-schema2 string
Name of the second schema. Defaults to schema option.
-table1 string
Name of the first table.
-table2 string
Name of the second table.
-key string
Comma-separate list of columns representing the natural key of a record.
-diff
Diff row values and output new rows and changes.
```

## Example

```
diff-table \
-db postgres://localhost:5432/postgres \
Expand Down Expand Up @@ -89,4 +55,48 @@ The output is a JSON encoded value which various information about the table dif
}
```

## Examples

Two tables in the same database.

```
diff-table \
-db postgres://localhost:5432/postgres \
-table1 data_v1 \
-table2 data_v2 \
-key id
```

Two tables from different servers/databases.

```
diff-table \
-db postgres://localhost:5432/postgres \
-db2 postgres://localhost:5435/other \
-table1 data_v1 \
-table2 data_v2 \
-key id
```

Two CSV files.

*Note: at this time records must be pre-sorted by the key columns.*

```
diff-table \
-csv1 data_v1.csv \
-csv2 data_v2.csv \
-key id
```

A CSV file and a database table (o.O).

*Note: at this time records must be pre-sorted by the key columns.*

```
diff-table \
-csv1 data_v1.csv \
-db2 postgres://localhost:5432/postgres \
-table2 data_v2 \
-key id
```
166 changes: 139 additions & 27 deletions cmd/diff-table/main.go
@@ -1,10 +1,13 @@
package main

import (
"bytes"
"database/sql"
"encoding/csv"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"strings"
Expand All @@ -18,6 +21,12 @@ func main() {
keyList string
diffRows bool

csv1 string
csv1delim string

csv2 string
csv2delim string

url1 string
schema1 string
table1 string
Expand All @@ -30,6 +39,12 @@ func main() {
flag.StringVar(&keyList, "key", "", "Required comma-separate list of columns.")
flag.BoolVar(&diffRows, "diff", false, "Diff row values and output changes.")

flag.StringVar(&csv1, "csv1", "", "Path to CSV file.")
flag.StringVar(&csv1delim, "csv1.delim", ",", "CSV delimiter.")

flag.StringVar(&csv2, "csv2", "", "Path to CSV file.")
flag.StringVar(&csv2delim, "csv2.delim", ",", "CSV delimiter.")

flag.StringVar(&url1, "db", "", "Database 1 connection URL.")
flag.StringVar(&schema1, "schema", "", "Name of the first schema.")
flag.StringVar(&table1, "table1", "", "Name of the first table.")
Expand All @@ -55,45 +70,108 @@ func main() {
schema2 = schema1
}

// TODO: remove hard-coded postgres dependency
db1, err := sql.Open("postgres", url1)
if err != nil {
log.Printf("db1 open: %s", err)
var (
t1, t2 difftable.Table
err error
)

if csv1 != "" && url1 != "" {
log.Print("can't both a csv and db source defined")
return
}
defer db1.Close()

db2, err := sql.Open("postgres", url2)
if err != nil {
log.Printf("db2 open: %s", err)
if csv2 != "" && url2 != "" {
log.Print("can't both a csv and db target defined")
return
}
defer db2.Close()

rows1, err := runQuery(db1, schema1, table1, key)
if err != nil {
log.Printf("db1 query: %s", err)
return
if csv1 != "" {
f1, err := os.Open(csv1)
if err != nil {
log.Printf("csv1 open: %s", err)
return
}
defer f1.Close()

cr1 := csv.NewReader(&uniReader{f1})

cr1.Comma = rune(csv1delim[0])
cr1.LazyQuotes = true
cr1.TrimLeadingSpace = true
cr1.ReuseRecord = true

t1, err = difftable.CSVTable(cr1, key)
if err != nil {
log.Printf("csv1 table: %s", err)
return
}
}
defer rows1.Close()

rows2, err := runQuery(db2, schema2, table2, key)
if err != nil {
log.Printf("db2 query: %s", err)
return
if csv2 != "" {
f2, err := os.Open(csv2)
if err != nil {
log.Printf("csv2 open: %s", err)
return
}
defer f2.Close()

cr2 := csv.NewReader(&uniReader{f2})

cr2.Comma = rune(csv2delim[0])
cr2.LazyQuotes = true
cr2.TrimLeadingSpace = true
cr2.ReuseRecord = true

t2, err = difftable.CSVTable(cr2, key)
if err != nil {
log.Printf("csv2 table: %s", err)
return
}
}
defer rows2.Close()

t1, err := difftable.SQLTable(rows1, key)
if err != nil {
log.Printf("db1 table: %s", err)
return
if url1 != "" {
// TODO: remove hard-coded postgres dependency
db1, err := sql.Open("postgres", url1)
if err != nil {
log.Printf("db1 open: %s", err)
return
}
defer db1.Close()

rows1, err := runQuery(db1, schema1, table1, key)
if err != nil {
log.Printf("db1 query: %s", err)
return
}
defer rows1.Close()

t1, err = difftable.SQLTable(rows1, key)
if err != nil {
log.Printf("db1 table: %s", err)
return
}
}

t2, err := difftable.SQLTable(rows2, key)
if err != nil {
log.Printf("db2 table: %s", err)
return
if url2 != "" {
db2, err := sql.Open("postgres", url2)
if err != nil {
log.Printf("db2 open: %s", err)
return
}
defer db2.Close()

rows2, err := runQuery(db2, schema2, table2, key)
if err != nil {
log.Printf("db2 query: %s", err)
return
}
defer rows2.Close()

t2, err = difftable.SQLTable(rows2, key)
if err != nil {
log.Printf("db2 table: %s", err)
return
}
}

diff, err := difftable.Diff(t1, t2, diffRows)
Expand Down Expand Up @@ -137,3 +215,37 @@ func runQuery(db *sql.DB, schema, table string, key []string) (*sql.Rows, error)

return db.Query(stmt)
}

var bom = []byte{0xef, 0xbb, 0xbf}

// uniReader wraps an io.Reader to replace carriage returns with newlines.
// This is used with the csv.Reader so it can properly delimit lines.
type uniReader struct {
r io.Reader
}

func (r *uniReader) Read(buf []byte) (int, error) {
n, err := r.r.Read(buf)

// Detect and remove BOM.
if bytes.HasPrefix(buf, bom) {
copy(buf, buf[len(bom):])
n -= len(bom)
}

// Replace carriage returns with newlines
for i, b := range buf {
if b == '\r' {
buf[i] = '\n'
}
}

return n, err
}

func (r *uniReader) Close() error {
if rc, ok := r.r.(io.Closer); ok {
return rc.Close()
}
return nil
}

0 comments on commit e8257ba

Please sign in to comment.