/
fragment_extensions.go
184 lines (165 loc) · 5.98 KB
/
fragment_extensions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
package protocol
import (
"crypto/sha1"
"encoding/binary"
"encoding/hex"
"fmt"
"path"
"strconv"
"strings"
)
// ContentName returns the content-addressed base file name of this Fragment.
func (m *Fragment) ContentName() string {
return fmt.Sprintf("%016x-%016x-%x%s", m.Begin, m.End,
m.Sum.ToDigest(), m.CompressionCodec.ToExtension())
}
// ContentPath returns the content-addressed path of this Fragment.
func (m *Fragment) ContentPath() string {
return path.Join(m.Journal.String(), m.PathPostfix, m.ContentName())
}
// ContentLength returns the number of content bytes contained in this Fragment.
// If compression is used, this will differ from the file size of the Fragment.
func (m *Fragment) ContentLength() int64 { return m.End - m.Begin }
// Validate returns an error if the Fragment is not well-formed.
func (m *Fragment) Validate() error {
if err := m.Journal.Validate(); err != nil {
return ExtendContext(err, "Journal")
} else if m.Begin > m.End {
return NewValidationError("expected Begin <= End (have %d, %d)", m.Begin, m.End)
} else if err = m.CompressionCodec.Validate(); err != nil {
return ExtendContext(err, "CompressionCodec")
} else if err = ValidatePathComponent(m.PathPostfix, 0, maxJournalNameLen); err != nil {
return ExtendContext(err, "PathPostfix")
}
return nil
}
// ParseFragmentFromRelativePath parses a Fragment from its relative path name,
// under the Journal's storage location within a fragment store. Path components
// contributed by the Journal must have already been stripped from the path
// string, leaving only a path postfix, content name, and compression extension.
//
// ParseFragmentFromRelativePath("a/journal",
// "a=1/b=2/00000000499602d2-7fffffffffffffff-0102030405060708090a0b0c0d0e0f1011121314.gz")
//
func ParseFragmentFromRelativePath(journal Journal, name string) (Fragment, error) {
var f Fragment
var postfix = path.Dir(name)
if postfix == "." {
postfix = ""
} else {
name = name[len(postfix)+1:]
}
var ext = path.Ext(name)
name = name[:len(name)-len(ext)]
if fields := strings.Split(name, "-"); len(fields) != 3 {
return Fragment{}, NewValidationError("wrong Fragment format: %v", name)
} else if begin, err := strconv.ParseInt(fields[0], 16, 64); err != nil {
return Fragment{}, ExtendContext(&ValidationError{Err: err}, "Begin")
} else if end, err := strconv.ParseInt(fields[1], 16, 64); err != nil {
return Fragment{}, ExtendContext(&ValidationError{Err: err}, "End")
} else if sum, err := hex.DecodeString(fields[2]); err != nil {
return Fragment{}, ExtendContext(&ValidationError{Err: err}, "Sum")
} else if len(sum) != sha1.Size {
return Fragment{}, NewValidationError("invalid SHA1Sum length: %x", sum)
} else if cc, err := CompressionCodecFromExtension(ext); err != nil {
return Fragment{}, err
} else {
f = Fragment{
Journal: journal,
Begin: begin,
End: end,
Sum: SHA1SumFromDigest(sum),
CompressionCodec: cc,
PathPostfix: postfix,
}
}
return f, f.Validate()
}
// SHA1SumFromDigest converts SHA1 sum in digest form into a SHA1Sum.
// |r| must have the length of a SHA1 digest (20 bytes), or it panics.
func SHA1SumFromDigest(r []byte) SHA1Sum {
if len(r) != 20 {
panic("invalid slice length")
}
var m SHA1Sum
m.Part1 = binary.BigEndian.Uint64(r[0:8])
m.Part2 = binary.BigEndian.Uint64(r[8:16])
m.Part3 = binary.BigEndian.Uint32(r[16:20])
return m
}
// SHA1SumOf SHA1 sums |str| and returns a SHA1Sum.
func SHA1SumOf(str string) SHA1Sum {
var r = sha1.Sum([]byte(str))
return SHA1SumFromDigest(r[:])
}
// ToDigest converts the SHA1Sum to a flat, fixed-size array.
func (m SHA1Sum) ToDigest() (r [20]byte) {
binary.BigEndian.PutUint64(r[0:8], m.Part1)
binary.BigEndian.PutUint64(r[8:16], m.Part2)
binary.BigEndian.PutUint32(r[16:20], m.Part3)
return
}
// IsZero returns whether this SHA1Sum is zero-valued. As a special case,
// Fragments having no content are consistently mapped to the zero-valued SHA1Sum
// (rather than SHA1 of "", which is da39a3ee5e6b4b0d3255bfef95601890afd80709).
func (m SHA1Sum) IsZero() bool { return m == (SHA1Sum{}) }
// CompressionCodecFromExtension matches a file extension to its corresponding CompressionCodec.
func CompressionCodecFromExtension(ext string) (CompressionCodec, error) {
switch strings.ToLower(ext) {
case ".raw":
return CompressionCodec_NONE, nil
case ".gz", ".gzip":
return CompressionCodec_GZIP, nil
case ".zst", ".zstandard":
return CompressionCodec_ZSTANDARD, nil
case ".sz", ".snappy":
return CompressionCodec_SNAPPY, nil
case "", ".gzod":
return CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION, nil
default:
return CompressionCodec_NONE, NewValidationError("unrecognized compression extension: %s", ext)
}
}
// Validate returns an error if the CompressionCodec is not well-formed.
func (m CompressionCodec) Validate() error {
if _, ok := CompressionCodec_name[int32(m)]; !ok || m == CompressionCodec_INVALID {
return NewValidationError("invalid value (%s)", m)
}
return nil
}
// ToExtension returns the file extension of the CompressionCodec.
func (m CompressionCodec) ToExtension() string {
switch m {
case CompressionCodec_NONE:
return ".raw"
case CompressionCodec_GZIP:
return ".gz"
case CompressionCodec_ZSTANDARD:
return ".zst"
case CompressionCodec_SNAPPY:
return ".sz"
case CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION:
return "" // TODO(johnny): Switch to ".gzod" when v2 broker fully released.
default:
panic("invalid CompressionCodec")
}
}
func (m CompressionCodec) MarshalYAML() (interface{}, error) {
return m.String(), nil
}
func (m *CompressionCodec) UnmarshalYAML(unmarshal func(interface{}) error) error {
var str string
if err := unmarshal(&str); err != nil {
return err
}
if tag, ok := CompressionCodec_value[str]; !ok {
var names []string
for n := range CompressionCodec_value {
names = append(names, n)
}
return fmt.Errorf("%q is not a valid CompressionCodec (options are %q)", str, names)
} else {
*m = CompressionCodec(tag)
return nil
}
}