Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake Avro sync method #123

Merged
merged 12 commits into from
Jun 12, 2023
8 changes: 8 additions & 0 deletions .github/workflows/flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,17 @@ jobs:
name: "bq_service_account.json"
json: ${{ secrets.GCP_GH_CI_PKEY }}

- name: setup snowflake credentials
id: sf-credentials
uses: jsdaniell/create-json@v1.2.2
with:
name: "snowflake_creds.json"
json: ${{ secrets.SNOWFLAKE_GH_CI_PKEY }}

- name: run tests
run: |
gotestsum --format testname
working-directory: ./flow
env:
TEST_BQ_CREDS: ${{ github.workspace }}/bq_service_account.json
TEST_SF_CREDS: ${{ github.workspace }}/snowflake_creds.json
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(

// Write each QRecord to the OCF file
for _, qRecord := range records.Records {
avroMap, err := qRecord.ToAvroCompatibleMap(&nullable, records.ColumnNames)
avroMap, err := qRecord.ToAvroCompatibleMap(model.QDBTypeBigQuery, &nullable, records.Schema.GetColumnNames())
if err != nil {
return 0, fmt.Errorf("failed to convert QRecord to Avro compatible map: %w", err)
}
Expand Down Expand Up @@ -205,7 +205,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
case bigquery.TimestampFieldType:
return map[string]string{
"type": "long",
"logicalType": "timestamp-millis",
"logicalType": "timestamp-micros",
}, nil
case bigquery.DateFieldType:
return map[string]string{
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrep_sync_method.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
numRowsInserted := 0
for _, qRecord := range records.Records {
toPut := QRecordValueSaver{
ColumnNames: records.ColumnNames,
ColumnNames: records.Schema.GetColumnNames(),
Record: qRecord,
PartitionID: partitionID,
RunID: runID,
Expand Down
61 changes: 52 additions & 9 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,55 @@ func (qe *QRepQueryExecutor) ExecuteQuery(query string, args ...interface{}) (pg
return rows, nil
}

func fieldDescriptionToQValueKind(fd pgconn.FieldDescription) model.QValueKind {
switch fd.DataTypeOID {
case pgtype.BoolOID:
return model.QValueKindBoolean
case pgtype.Int2OID:
return model.QValueKindInt16
case pgtype.Int4OID:
return model.QValueKindInt32
case pgtype.Int8OID:
return model.QValueKindInt64
case pgtype.Float4OID:
return model.QValueKindFloat32
case pgtype.Float8OID:
return model.QValueKindFloat64
case pgtype.TextOID, pgtype.VarcharOID:
return model.QValueKindString
case pgtype.ByteaOID:
return model.QValueKindBytes
case pgtype.JSONOID, pgtype.JSONBOID:
return model.QValueKindJSON
case pgtype.UUIDOID:
return model.QValueKindUUID
case pgtype.TimestampOID, pgtype.TimestamptzOID, pgtype.DateOID, pgtype.TimeOID:
return model.QValueKindETime
case pgtype.NumericOID:
return model.QValueKindNumeric
default:
return model.QValueKindInvalid
}
}

// FieldDescriptionsToSchema converts a slice of pgconn.FieldDescription to a QRecordSchema.
func fieldDescriptionsToSchema(fds []pgconn.FieldDescription) *model.QRecordSchema {
qfields := make([]*model.QField, len(fds))
for i, fd := range fds {
cname := fd.Name
ctype := fieldDescriptionToQValueKind(fd)
// there isn't a way to know if a column is nullable or not
// TODO fix this.
cnullable := true
qfields[i] = &model.QField{
Name: cname,
Type: ctype,
Nullable: cnullable,
}
}
return model.NewQRecordSchema(qfields)
}

func (qe *QRepQueryExecutor) ProcessRows(
rows pgx.Rows,
fieldDescriptions []pgconn.FieldDescription,
Expand All @@ -57,16 +106,10 @@ func (qe *QRepQueryExecutor) ProcessRows(
return nil, fmt.Errorf("row iteration failed: %w", rows.Err())
}

// get col names from fieldDescriptions
colNames := make([]string, len(fieldDescriptions))
for i, fd := range fieldDescriptions {
colNames[i] = fd.Name
}

batch := &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
ColumnNames: colNames,
NumRecords: uint32(len(records)),
Records: records,
Schema: fieldDescriptionsToSchema(fieldDescriptions),
}

return batch, nil
Expand Down