Skip to content
Permalink
Browse files
feat(bigtable/cbt): cbt 'import' cmd to parse a .csv file and write t…
…o CBT (#5072)

Co-authored-by: Christopher Wilcox <crwilcox@google.com>
  • Loading branch information
markduffett and crwilcox committed Nov 8, 2021
1 parent 516d765 commit 5a2ed6b2cd1c304e0f59daa29959863bff9b5c29
Showing with 634 additions and 0 deletions.
  1. +199 −0 bigtable/cmd/cbt/cbt.go
  2. +413 −0 bigtable/cmd/cbt/cbt_test.go
  3. +22 −0 bigtable/cmd/cbt/cbtdoc.go
199 bigtable/cmd/cbt/cbt.go 100644 → 100755
@@ -32,6 +32,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"text/tabwriter"
"text/template"
"time"
@@ -409,6 +410,23 @@ var commands = []struct {
" Example: cbt help createtable",
Required: cbtconfig.NoneRequired,
},
{
Name: "import",
Desc: "Batch write many rows based on the input file",
do: doImport,
Usage: "cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]\n" +
" app-profile=<app-profile-id> The app profile ID to use for the request\n" +
" column-family=<family-name> The column family label to use\n" +
" batch-size=<500> The max number of rows per batch write request\n" +
" workers=<1> The number of worker threads\n\n" +
" Import data from a csv file into an existing cbt table that has the required column families.\n" +
" See <example.csv.github.com/cbt-import-sample.csv> for a sample .csv file and formatting.\n" +
" If no column family row is present, use the column-family flag to specify an existing family.\n\n" +
" Examples:\n" +
" cbt import csv-import-table cbt-import-sample.csv\n" +
" cbt import csv-import-table cbt-import-sample.csv app-profile=batch-write-profile column-family=my-family workers=5\n",
Required: cbtconfig.ProjectAndInstanceRequired,
},
{
Name: "listinstances",
Desc: "List instances in a project",
@@ -1534,6 +1552,187 @@ func doDeleteAppProfile(ctx context.Context, args ...string) {
}
}

type importerArgs struct {
appProfile string
fam string
sz int
workers int
}

type safeReader struct {
mu sync.Mutex
r *csv.Reader
t int // total rows
}

func doImport(ctx context.Context, args ...string) {
ia, err := parseImporterArgs(ctx, args)
if err != nil {
log.Fatalf("error parsing importer args: %s", err)
}
f, err := os.Open(args[1])
if err != nil {
log.Fatalf("couldn't open the csv file: %s", err)
}

tbl := getClient(bigtable.ClientConfig{AppProfile: ia.appProfile}).Open(args[0])
r := csv.NewReader(f)
importCSV(ctx, tbl, r, ia)
}

func parseImporterArgs(ctx context.Context, args []string) (importerArgs, error) {
var err error
ia := importerArgs{
fam: "",
sz: 500,
workers: 1,
}
if len(args) < 2 {
return ia, fmt.Errorf("usage: cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]")
}
for _, arg := range args[2:] {
switch {
case strings.HasPrefix(arg, "app-profile="):
ia.appProfile = strings.Split(arg, "=")[1]
case strings.HasPrefix(arg, "column-family="):
ia.fam = strings.Split(arg, "=")[1]
if ia.fam == "" {
return ia, fmt.Errorf("column-family cannot be ''")
}
case strings.HasPrefix(arg, "batch-size="):
ia.sz, err = strconv.Atoi(strings.Split(arg, "=")[1])
if err != nil || ia.sz <= 0 || ia.sz >= 100000 {
return ia, fmt.Errorf("batch-size must be > 0 and <= 100000")
}
case strings.HasPrefix(arg, "workers="):
ia.workers, err = strconv.Atoi(strings.Split(arg, "=")[1])
if err != nil || ia.workers <= 0 {
return ia, fmt.Errorf("workers must be > 0, err:%s", err)
}
}
}
return ia, nil
}

func importCSV(ctx context.Context, tbl *bigtable.Table, r *csv.Reader, ia importerArgs) {
fams, cols, err := parseCsvHeaders(r, ia.fam)
if err != nil {
log.Fatalf("error parsing headers: %s", err)
}
sr := safeReader{r: r}
ts := bigtable.Now()

var wg sync.WaitGroup
wg.Add(ia.workers)
for i := 0; i < ia.workers; i++ {
go func(w int) {
defer wg.Done()
if e := sr.parseAndWrite(ctx, tbl, fams, cols, ts, ia.sz, w); e != nil {
log.Fatalf("error: %s", e)
}
}(i)
}
wg.Wait()
log.Printf("Done importing %d rows.\n", sr.t)
}

func parseCsvHeaders(r *csv.Reader, family string) ([]string, []string, error) {
var err error
var fams, cols []string
if family == "" { // no column-family from flag, using first row
fams, err = r.Read()
if err != nil {
return nil, nil, fmt.Errorf("family header reader error:%s", err)
}
}
cols, err = r.Read() // column names are next row
if err != nil {
return nil, nil, fmt.Errorf("columns header reader error:%s", err)
}
if family != "" {
fams = make([]string, len(cols))
fams[1] = family
}
if len(fams) < 2 || len(cols) < 2 {
return fams, cols, fmt.Errorf("at least 2 columns are required (rowkey + data)")
}
if fams[0] != "" || cols[0] != "" {
return fams, cols, fmt.Errorf("the first column must be empty for column-family and column name rows")
}
if fams[1] == "" || cols[1] == "" {
return fams, cols, fmt.Errorf("the second column (first data column) must have values for column family and column name rows if present")
}
for i := range cols { // fill any blank column families with previous
if i > 0 && fams[i] == "" {
fams[i] = fams[i-1]
}
}
return fams, cols, nil
}

func batchWrite(ctx context.Context, tbl *bigtable.Table, rk []string, muts []*bigtable.Mutation, worker int) (int, error) {
log.Printf("[%d] Writing batch:: size: %d, firstRowKey: %s, lastRowKey: %s\n", worker, len(rk), rk[0], rk[len(rk)-1])
errors, err := tbl.ApplyBulk(ctx, rk, muts)
if err != nil {
return 0, fmt.Errorf("applying bulk mutations process error: %v", err)
}
if errors != nil {
return 0, fmt.Errorf("applying bulk mutations had %d errors, first:%v", len(errors), errors[0])

}
return len(rk), nil
}

func (sr *safeReader) parseAndWrite(ctx context.Context, tbl *bigtable.Table, fams, cols []string, ts bigtable.Timestamp, max, worker int) error {
var rowKey []string
var muts []*bigtable.Mutation
var c int
for {
sr.mu.Lock()
for len(rowKey) < max {
line, err := sr.r.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
mut := bigtable.NewMutation()
empty := true
for i, val := range line {
if i > 0 && val != "" {
mut.Set(fams[i], cols[i], ts, []byte(val))
empty = false
}
}
if empty {
log.Printf("[%d] RowKey '%s' has no mutations, skipping", worker, line[0])
continue
}
if line[0] == "" {
log.Printf("[%d] RowKey not present, skipping line", worker)
continue
}
rowKey = append(rowKey, line[0])
muts = append(muts, mut)
}
if len(rowKey) > 0 {
sr.mu.Unlock()
n, err := batchWrite(ctx, tbl, rowKey, muts, worker)
if err != nil {
return err
}
c += n
rowKey = rowKey[:0]
muts = muts[:0]
continue
}
sr.t += c
sr.mu.Unlock()
return nil
}
}

// parseDuration parses a duration string.
// It is similar to Go's time.ParseDuration, except with a different set of supported units,
// and only simple formats supported.

0 comments on commit 5a2ed6b

Please sign in to comment.