diff --git a/pb/destination/v1/arrow.go b/pb/destination/v1/arrow.go index 07a1385d..bb775425 100644 --- a/pb/destination/v1/arrow.go +++ b/pb/destination/v1/arrow.go @@ -7,30 +7,7 @@ import ( "github.com/apache/arrow/go/v13/arrow/ipc" ) -const ( - MetadataTableName = "cq:table_name" -) - -type Schemas []*arrow.Schema - -func (s Schemas) Len() int { - return len(s) -} - -func (s Schemas) SchemaByName(name string) *arrow.Schema { - for _, sc := range s { - tableName, ok := sc.Metadata().GetValue(MetadataTableName) - if !ok { - continue - } - if tableName == name { - return sc - } - } - return nil -} - -func NewSchemasFromBytes(b [][]byte) (Schemas, error) { +func NewSchemasFromBytes(b [][]byte) ([]*arrow.Schema, error) { ret := make([]*arrow.Schema, len(b)) for i, buf := range b { rdr, err := ipc.NewReader(bytes.NewReader(buf)) diff --git a/pb/source/v2/arrow.go b/pb/source/v2/arrow.go index a021d66c..b63b5de9 100644 --- a/pb/source/v2/arrow.go +++ b/pb/source/v2/arrow.go @@ -7,32 +7,9 @@ import ( "github.com/apache/arrow/go/v13/arrow/ipc" ) -const ( - MetadataTableName = "cq:table_name" -) - -type Schemas []*arrow.Schema - -func (s Schemas) Len() int { - return len(s) -} - -func (s Schemas) SchemaByName(name string) *arrow.Schema { - for _, sc := range s { - tableName, ok := sc.Metadata().GetValue(MetadataTableName) - if !ok { - continue - } - if tableName == name { - return sc - } - } - return nil -} - -func (s Schemas) Encode() ([][]byte, error) { - ret := make([][]byte, len(s)) - for i, sc := range s { +func SchemasToBytes(schemas []*arrow.Schema) ([][]byte, error) { + ret := make([][]byte, len(schemas)) + for i, sc := range schemas { var buf bytes.Buffer wr := ipc.NewWriter(&buf, ipc.WithSchema(sc)) if err := wr.Close(); err != nil {