Navigation Menu

Skip to content

Commit

Permalink
feat(bigtable/cbt): cbt 'import' cmd to parse a .csv file and write t…
Browse files Browse the repository at this point in the history
…o CBT
  • Loading branch information
markduffett committed Nov 3, 2021
1 parent 54cbf4c commit 9d8f2d3
Show file tree
Hide file tree
Showing 3 changed files with 610 additions and 0 deletions.
191 changes: 191 additions & 0 deletions bigtable/cmd/cbt/cbt.go 100644 → 100755
Expand Up @@ -32,6 +32,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"text/tabwriter"
"text/template"
"time"
Expand Down Expand Up @@ -409,6 +410,23 @@ var commands = []struct {
" Example: cbt help createtable",
Required: cbtconfig.NoneRequired,
},
{
Name: "import",
Desc: "Batch write many rows based on the input file (write)",
do: doImport,
Usage: "cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]\n" +
" app-profile=<app-profile-id> The app profile ID to use for the request\n" +
" column-family=<family-name> The column family label to use\n" +
" batch-size=<500> The max number of rows per batch write request\n" +
" workers=<1> The number of worker threads\n\n" +
" Import data from a csv file into an existing cbt table that has the required column families.\n" +
" See <example.csv.github.com/cbt-import-sample.csv> for a sample .csv file and formatting.\n" +
" If no column family row is present, use the column-family flag to specify an existing family.\n" +
" Examples:\n" +
" cbt import csv-import-table cbt-import-sample.csv\n" +
" cbt import csv-import-table cbt-import-sample.csv app-profile=batch-write-profile column-family=my-family workers=5\n",
Required: cbtconfig.ProjectAndInstanceRequired,
},
{
Name: "listinstances",
Desc: "List instances in a project",
Expand Down Expand Up @@ -1534,6 +1552,179 @@ func doDeleteAppProfile(ctx context.Context, args ...string) {
}
}

type importerArgs struct {
appProfile string
fams []string
sz int
workers int
}

type safeReader struct {
mu sync.Mutex
r *csv.Reader
t int // total rows
}

func doImport(ctx context.Context, args ...string) {
ia, err := parseImporterArgs(ctx, args)
if err != nil {
log.Fatalf("Error parsing importer args: %s", err)
}
f, err := os.Open(args[1])
if err != nil {
log.Fatalf("Couldn't open the csv file: %s", err)
}

tbl := getClient(bigtable.ClientConfig{AppProfile: ia.appProfile}).Open(args[0])
r := csv.NewReader(f)
importCSV(ctx, tbl, r, ia)
}

func parseImporterArgs(ctx context.Context, args []string) (importerArgs, error) {
var err error
ia := importerArgs{
fams: []string{""},
sz: 500,
workers: 1,
}
if len(args) < 2 {
return ia, fmt.Errorf("usage: cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]")
}
for _, arg := range args[2:] {
switch {
case strings.HasPrefix(arg, "app-profile="):
ia.appProfile = strings.Split(arg, "=")[1]
case strings.HasPrefix(arg, "column-family="):
ia.fams = append(ia.fams, strings.Split(arg, "=")[1])
case strings.HasPrefix(arg, "batch-size="):
ia.sz, err = strconv.Atoi(strings.Split(arg, "=")[1])
if err != nil || ia.sz <= 0 || ia.sz >= 100000 {
return ia, fmt.Errorf("batch-size must be > 0 and <= 100000")
}
case strings.HasPrefix(arg, "workers="):
ia.workers, err = strconv.Atoi(strings.Split(arg, "=")[1])
if err != nil || ia.workers <= 0 {
return ia, fmt.Errorf("Workers must be > 0, err:%s", err)
}
}
}
return ia, nil
}

func importCSV(ctx context.Context, tbl *bigtable.Table, r *csv.Reader, ia importerArgs) {
fams, cols, err := parseCsvHeaders(r, ia.fams)
if err != nil {
log.Fatalf("Error parsing headers: %s", err)
}
sr := safeReader{r: r}
ts := bigtable.Now()

var wg sync.WaitGroup
wg.Add(ia.workers)
for i := 0; i < ia.workers; i++ {
go func(w int) {
defer wg.Done()
if e := sr.parseAndWrite(ctx, tbl, fams, cols, ts, ia.sz, w); e != nil {
log.Fatalf("Error: %s", e)
}
}(i)
}
wg.Wait()
log.Printf("Done importing %d rows.\n", sr.t)
}

func parseCsvHeaders(r *csv.Reader, fams []string) ([]string, []string, error) {
var err error
if len(fams) < 2 { // no column-family from flag, using first row
fams, err = r.Read()
if err != nil {
return fams, nil, fmt.Errorf("Family header reader error:%s", err)
}
}
cols, err := r.Read()
if err != nil {
return fams, cols, fmt.Errorf("Columns header reader error:%s", err)
}
if len(fams) < 2 || fams[1] == "" {
return fams, cols, fmt.Errorf("The first data column requires a non-empty column family or column-family flag set")
} else if len(fams) < len(cols) {
ext := make([]string, len(cols)-len(fams))
fams = append(fams, ext...)
}
for i := range cols[1:] {
if fams[i+1] == "" {
fams[i+1] = fams[i]
}
if fams[i+1] == "" {
return fams, cols, fmt.Errorf("Empty column family for column %d", i)
}
}
return fams, cols, nil
}

func batchWrite(ctx context.Context, tbl *bigtable.Table, rk []string, muts []*bigtable.Mutation, worker int) (int, error) {
log.Printf("[%d] Writing batch:: size: %d, firstRowKey: %s, lastRowKey: %s\n", worker, len(rk), rk[0], rk[len(rk)-1])
errors, err := tbl.ApplyBulk(ctx, rk, muts)
if err != nil {
return 0, fmt.Errorf("Applying bulk mutations process error: %v", err)
}
if errors != nil {
return 0, fmt.Errorf("Applying bulk mutations had %d errors, first:%v", len(errors), errors[0])

}
return len(rk), nil
}

func (sr *safeReader) parseAndWrite(ctx context.Context, tbl *bigtable.Table, fams, cols []string, ts bigtable.Timestamp, max, worker int) error {
var rowKey []string
var muts []*bigtable.Mutation
var c int
for {
sr.mu.Lock()
for len(rowKey) < max {
line, err := sr.r.Read()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
mut := bigtable.NewMutation()
empty := true
for i, val := range line {
if i > 0 && val != "" {
mut.Set(fams[i], cols[i], ts, []byte(val))
empty = false
}
}
if empty {
log.Printf("RowKey [%s] has no mutations, skipping", line[0])
continue
}
if line[0] == "" {
log.Printf("RowKey not present, skipping line")
continue
}
rowKey = append(rowKey, line[0])
muts = append(muts, mut)
}
if len(rowKey) > 0 {
sr.mu.Unlock()
n, err := batchWrite(ctx, tbl, rowKey, muts, worker)
if err != nil {
return err
}
c += n
rowKey = rowKey[:0]
muts = muts[:0]
continue
}
sr.t += c
sr.mu.Unlock()
return nil
}
}

// parseDuration parses a duration string.
// It is similar to Go's time.ParseDuration, except with a different set of supported units,
// and only simple formats supported.
Expand Down

0 comments on commit 9d8f2d3

Please sign in to comment.