Skip to content

Commit

Permalink
importccl: fix target column populating for AVRO
Browse files Browse the repository at this point in the history
Target columns for AVRO are not properly populated when
starting a job for IMPORT INTO. This hinders the ability
for default expressions to be supported, and therefore this
PR does that by extracting these information out from the AVRO
schema to be populated into `importCtx`. It also extracts the
columns present in the table but missing from the AVRO schema
so that an error could be returned if strict validation is turned
on, but when some columns in the table are not specified in the
AVRO schema.

The default values for each column follows the precendence:
1. Data of that column specified in an AVRO row;
2. Default value specified in AVRO schema:
3. Default value of a table.

Release note: None
  • Loading branch information
anzoteh96 committed Aug 16, 2020
1 parent 1049eaa commit 909c510
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 46 deletions.
116 changes: 75 additions & 41 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,34 @@ import (
"github.com/stretchr/testify/require"
)

func getAvroData(schema map[string]interface{}, rows []map[string]interface{}) (string, error) {
var data bytes.Buffer
schemaStr, err := json.Marshal(schema)
if err != nil {
return "", err
}
codec, err := goavro.NewCodec(string(schemaStr))
if err != nil {
return "", err
}
// Create an AVRO writer from the schema.
ocf, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: &data,
Codec: codec,
})
if err != nil {
return "", err
}
for _, row := range rows {
err = ocf.Append([]interface{}{row})
if err != nil {
return "", err
}
}
// Retrieve the AVRO encoded data.
return data.String(), nil
}

func TestImportData(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1087,48 +1115,26 @@ SET experimental_enable_enums = true;
CREATE TYPE greeting AS ENUM ('hello', 'hi');
`)

// Create some AVRO encoded data.
var avroData string
{
var data bytes.Buffer
// Set up a simple schema for the import data.
schema := map[string]interface{}{
"type": "record",
"name": "t",
"fields": []map[string]interface{}{
{
"name": "a",
"type": "string",
},
{
"name": "b",
"type": "string",
},
avroSchema := map[string]interface{}{
"type": "record",
"name": "t",
"fields": []map[string]interface{}{
{
"name": "a",
"type": "string",
},
}
schemaStr, err := json.Marshal(schema)
require.NoError(t, err)
codec, err := goavro.NewCodec(string(schemaStr))
require.NoError(t, err)
// Create an AVRO writer from the schema.
ocf, err := goavro.NewOCFWriter(goavro.OCFConfig{
W: &data,
Codec: codec,
})
require.NoError(t, err)
row1 := map[string]interface{}{
"a": "hello",
"b": "hello",
}
row2 := map[string]interface{}{
"a": "hi",
"b": "hi",
}
// Add the data rows to the writer.
require.NoError(t, ocf.Append([]interface{}{row1, row2}))
// Retrieve the AVRO encoded data.
avroData = data.String()
{
"name": "b",
"type": "string",
},
},
}
avroRows := []map[string]interface{}{
{"a": "hello", "b": "hello"},
{"a": "hi", "b": "hi"},
}
avroData, err := getAvroData(avroSchema, avroRows)
require.NoError(t, err)

tests := []struct {
create string
Expand Down Expand Up @@ -1159,7 +1165,7 @@ CREATE TYPE greeting AS ENUM ('hello', 'hi');
}

// Set up a directory for the data files.
err := os.Mkdir(filepath.Join(baseDir, "test"), 0777)
err = os.Mkdir(filepath.Join(baseDir, "test"), 0777)
require.NoError(t, err)
// Test IMPORT INTO.
for _, test := range tests {
Expand Down Expand Up @@ -3029,6 +3035,26 @@ func TestImportDefault(t *testing.T) {
}
}))
defer srv.Close()
avroSchema := map[string]interface{}{
"type": "record",
"name": "t",
"fields": []map[string]interface{}{
{
"name": "a",
"type": "int",
},
{
"name": "b",
"type": "int",
},
},
}
avroRows := []map[string]interface{}{
{"a": 1, "b": 2, "c": 10},
{"a": 3, "b": 4},
}
avroData, err := getAvroData(avroSchema, avroRows)
require.NoError(t, err)
tests := []struct {
name string
data string
Expand Down Expand Up @@ -3149,6 +3175,14 @@ func TestImportDefault(t *testing.T) {
},
// TODO (anzoteh96): add AVRO format, and also MySQL and PGDUMP once
// IMPORT INTO are supported for these file formats.
{
name: "avro",
data: avroData,
create: "a INT, b INT, c INT DEFAULT 33",
targetCols: "a, b",
format: "AVRO",
expectedResults: [][]string{{"1", "2", "33"}, {"3", "4", "33"}},
},
}
for _, test := range tests {
if test.sequence != "" {
Expand Down
88 changes: 83 additions & 5 deletions pkg/ccl/importccl/read_import_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"unicode/utf8"

"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -130,6 +133,48 @@ func nativeToDatum(
return d, nil
}

// getTargetColFromCodec gets the targeted columns based on codec so that
// it can be injected into row converter. It also identifies the columns
// that are in tableColNames but not in codec for checking, just in case
// validation is strict.
func getTargetColFromCodec(
codec *goavro.Codec, tableColNames map[string](struct{}),
) ([]tree.Name, []string, error) {
schemaStr := codec.Schema()
var schema map[string]interface{}
err := json.Unmarshal([]byte(schemaStr), &schema)
if err != nil {
return []tree.Name{}, []string{}, err
}
colMap, ok := schema["fields"].([]interface{})
if !ok {
return []tree.Name{}, []string{}, errors.New("schema not found")
}
targetCols := make([]tree.Name, 0)
targetColsMap := make(map[string]struct{}, 0)
for _, colInterface := range colMap {
col, ok := colInterface.(map[string]interface{})
if !ok {
return []tree.Name{}, []string{}, errors.New("casting colMap failed")
}
colName, ok := col["name"].(string)
if !ok {
return []tree.Name{}, []string{}, errors.New("bad field name description")
}
if _, ok := tableColNames[colName]; ok {
targetCols = append(targetCols, tree.Name(colName))
targetColsMap[colName] = struct{}{}
}
}
missingCols := make([]string, 0)
for colName, _ := range tableColNames {
if _, ok := targetColsMap[colName]; !ok {
missingCols = append(missingCols, colName)
}
}
return targetCols, missingCols, nil
}

// A mapping from supported types.Family to the list of avro
// type names that can be used to construct our target type.
var familyToAvroT = map[types.Family][]string{
Expand Down Expand Up @@ -160,9 +205,10 @@ var familyToAvroT = map[types.Family][]string{

// avroConsumer implements importRowConsumer interface.
type avroConsumer struct {
importCtx *parallelImportContext
fieldNameToIdx map[string]int
strict bool
importCtx *parallelImportContext
fieldNameToIdx map[string]int
strict bool
missingColNames []string
}

// Converts avro record to datums as expected by DatumRowConverter.
Expand Down Expand Up @@ -205,6 +251,13 @@ func (a *avroConsumer) FillDatums(
return err
}

if a.strict && len(a.missingColNames) > 0 {
return fmt.Errorf(
"columns %s were not present in the import",
strings.Join(a.missingColNames, ", "),
)
}

// Set any nil datums to DNull (in case native
// record didn't have the value set at all)
for i := range conv.Datums {
Expand Down Expand Up @@ -390,21 +443,33 @@ func newImportAvroPipeline(
avro *avroInputReader, input *fileReader,
) (importRowProducer, importRowConsumer, error) {
fieldIdxByName := make(map[string]int)
for idx, col := range avro.importContext.tableDesc.VisibleColumns() {
fieldIdxByName[col.Name] = idx
tableColNames := make(map[string](struct{}))
for _, col := range avro.importContext.tableDesc.VisibleColumns() {
tableColNames[col.Name] = struct{}{}
}

consumer := &avroConsumer{
importCtx: avro.importContext,
fieldNameToIdx: fieldIdxByName,
strict: avro.opts.StrictMode,
}
// TODO: need to check strict mode.

if avro.opts.Format == roachpb.AvroOptions_OCF {
ocf, err := goavro.NewOCFReader(bufio.NewReaderSize(input, 64<<10))
if err != nil {
return nil, nil, err
}
targetCols, missingColNames, err := getTargetColFromCodec(ocf.Codec(), tableColNames)
if err != nil {
return nil, nil, err
}
consumer.importCtx.targetCols = targetCols
consumer.missingColNames = missingColNames
for idx, col := range consumer.importCtx.targetCols {
fieldIdxByName[col.String()] = idx
}
consumer.fieldNameToIdx = fieldIdxByName
producer := &ocfStream{
ocf: ocf,
progress: func() float32 { return input.ReadFraction() },
Expand All @@ -416,6 +481,19 @@ func newImportAvroPipeline(
if err != nil {
return nil, nil, err
}
targetCols, missingColNames, err := getTargetColFromCodec(codec, tableColNames)
if err != nil {
return nil, nil, err
}
consumer.importCtx.targetCols = targetCols
consumer.missingColNames = missingColNames
for idx, col := range consumer.importCtx.targetCols {
fieldIdxByName[col.String()] = idx
}
consumer.fieldNameToIdx = fieldIdxByName
if err != nil {
return nil, nil, err
}

producer := &avroRecordStream{
importCtx: avro.importContext,
Expand Down

0 comments on commit 909c510

Please sign in to comment.