This repository has been archived by the owner on Jun 5, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
schema.go
186 lines (164 loc) · 3.88 KB
/
schema.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// Package skiff implements efficient serialization format, optimized for YT.
package skiff
import (
"encoding/gob"
"fmt"
"github.com/go-faster/yt/schema"
"github.com/go-faster/yt/yson"
)
type WireType int
const (
TypeNothing WireType = iota
TypeBoolean
TypeInt64
TypeUint64
TypeDouble
TypeString32
TypeYSON32
TypeVariant8
TypeVariant16
TypeRepeatedVariant16
TypeTuple
)
// FromYTType returns skiff wire type used for transferring YT type.
func FromYTType(typ schema.Type) WireType {
switch typ {
case schema.TypeBoolean:
return TypeBoolean
case schema.TypeInt8, schema.TypeInt16, schema.TypeInt32, schema.TypeInt64:
return TypeInt64
case schema.TypeUint8, schema.TypeUint16, schema.TypeUint32, schema.TypeUint64:
return TypeUint64
case schema.TypeFloat64:
return TypeDouble
case schema.TypeBytes, schema.TypeString:
return TypeString32
case schema.TypeAny:
return TypeYSON32
default:
panic(fmt.Sprintf("invalid YT type %s", typ))
}
}
func (t WireType) IsSimple() bool {
switch t {
case TypeBoolean, TypeInt64, TypeUint64, TypeDouble, TypeString32, TypeYSON32:
return true
default:
return false
}
}
func (t *WireType) UnmarshalYSON(data []byte) error {
var s string
if err := yson.Unmarshal(data, &s); err != nil {
return err
}
switch s {
case "nothing":
*t = TypeNothing
case "boolean":
*t = TypeBoolean
case "int64":
*t = TypeInt64
case "uint64":
*t = TypeUint64
case "double":
*t = TypeDouble
case "string32":
*t = TypeString32
case "yson32":
*t = TypeYSON32
case "variant8":
*t = TypeVariant8
case "variant16":
*t = TypeVariant16
case "repeated_variant16":
*t = TypeRepeatedVariant16
case "tuple":
*t = TypeTuple
default:
return fmt.Errorf("invalid skiff type %q", s)
}
return nil
}
func (t WireType) String() string {
switch t {
case TypeNothing:
return "nothing"
case TypeBoolean:
return "boolean"
case TypeInt64:
return "int64"
case TypeUint64:
return "uint64"
case TypeDouble:
return "double"
case TypeString32:
return "string32"
case TypeYSON32:
return "yson32"
case TypeVariant8:
return "variant8"
case TypeVariant16:
return "variant16"
case TypeRepeatedVariant16:
return "repeated_variant16"
case TypeTuple:
return "tuple"
default:
return "invalid"
}
}
func (t WireType) MarshalYSON(w *yson.Writer) error {
w.String(t.String())
return nil
}
// Schema describes wire format for the single value.
type Schema struct {
Type WireType `yson:"wire_type"`
Name string `yson:"name,omitempty"`
Children []Schema `yson:"children,omitempty"`
}
func (c Schema) IsSystem() bool {
for _, col := range systemPrefix {
if col.Name == c.Name && col.Type == c.Type {
return true
}
}
return false
}
func init() {
gob.Register(&Schema{})
}
// FromTableSchema creates skiff schema from table schema.
func FromTableSchema(schema schema.Schema) Schema {
var columns []Schema
columns = append(columns, systemPrefix...)
for _, row := range schema.Columns {
if row.Required {
columns = append(columns, Schema{Name: row.Name, Type: FromYTType(row.Type)})
} else {
columns = append(columns, OptionalColumn(row.Name, FromYTType(row.Type)))
}
}
return Schema{
Type: TypeTuple,
Children: columns,
}
}
func OptionalColumn(name string, typ WireType) Schema {
return Schema{Type: TypeVariant8, Name: name, Children: []Schema{{Type: TypeNothing}, {Type: typ}}}
}
var systemPrefix = []Schema{
{Type: TypeBoolean, Name: "$key_switch"},
OptionalColumn("$row_index", TypeInt64),
OptionalColumn("$range_index", TypeInt64),
}
// Format describes skiff schemas for the stream.
type Format struct {
// name is always equal to string "skiff"
Name string `yson:",value"`
// either skiff.Schema of reference into registry
TableSchemas []interface{} `yson:"table_skiff_schemas,attr"`
// schemas shared between multiple tables
SchemaRegistry map[string]*Schema `yson:"skiff_schema_registry,attr"`
}