This repository has been archived by the owner on Jan 28, 2022. It is now read-only.
/
document_extensions.go
104 lines (86 loc) · 3.13 KB
/
document_extensions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package flow
import (
"encoding/binary"
pb "go.gazette.dev/core/broker/protocol"
"go.gazette.dev/core/message"
)
// Arena is a packed memory region into which byte content and strings are written.
type Arena []byte
// Add to the Arena, returning their indexed Slice.
func (a *Arena) Add(b []byte) Slice {
var out = Slice{Begin: uint32(len(*a))}
*a = append(*a, b...)
out.End = uint32(len(*a))
return out
}
// AddAll to the Arena, returning a slice of indexed Slices.
func (a *Arena) AddAll(b ...[]byte) []Slice {
var out = make([]Slice, 0, len(b))
for _, bb := range b {
out = append(out, a.Add(bb))
}
return out
}
// Bytes returns the portion of the Arena indexed by Slice as []byte.
func (a *Arena) Bytes(s Slice) []byte { return (*a)[s.Begin:s.End] }
// AllBytes returns all []bytes indexed by the given Slices.
func (a *Arena) AllBytes(s ...Slice) [][]byte {
var out = make([][]byte, 0, len(s))
for _, ss := range s {
out = append(out, a.Bytes(ss))
}
return out
}
// NewUUIDParts returns a decomposition of |uuid| into its UUIDParts.
func NewUUIDParts(uuid message.UUID) UUIDParts {
var tmp [8]byte
var producer = message.GetProducerID(uuid)
copy(tmp[:6], producer[:])
binary.BigEndian.PutUint16(tmp[6:8], uint16(message.GetFlags(uuid)))
return UUIDParts{
ProducerAndFlags: binary.BigEndian.Uint64(tmp[:]),
Clock: message.GetClock(uuid),
}
}
// Pack this UUIDParts into a message.UUID.
func (parts *UUIDParts) Pack() message.UUID {
var tmp [8]byte
binary.BigEndian.PutUint64(tmp[:], parts.ProducerAndFlags)
var producerID message.ProducerID
copy(producerID[:], tmp[:6])
return message.BuildUUID(
producerID,
parts.Clock,
message.Flags(parts.ProducerAndFlags),
)
}
// IndexedShuffleResponse is an implementation of message.Message which
// indexes a specific document within a ShuffleResponse.
type IndexedShuffleResponse struct {
ShuffleResponse
Index int
// Shuffle on whose behalf this document was read.
Shuffle *Shuffle
}
var _ message.Message = IndexedShuffleResponse{}
// GetUUID fetches the UUID of the Document.
func (sd IndexedShuffleResponse) GetUUID() message.UUID { return sd.UuidParts[sd.Index].Pack() }
// SetUUID panics if called.
func (sd IndexedShuffleResponse) SetUUID(uuid message.UUID) { panic("not implemented") }
// NewAcknowledgement panics if called.
func (sd IndexedShuffleResponse) NewAcknowledgement(pb.Journal) message.Message {
panic("not implemented")
}
// Tailing returns whether the ShuffleResponse is at the tail of the journal's available content.
func (m *ShuffleResponse) Tailing() bool {
return m != nil && m.ReadThrough == m.WriteHead
}
var (
// DocumentUUIDPlaceholder is a unique 36-byte sequence which is used to mark
// the location within a document serialization which holds the document UUID.
// This "magic" value is defined here, and also in crates/derive/src/combiner.rs.
// We never write this value anywhere; it's a temporary placeholder generated
// within combined documents returned by Rust, that's then immediately replaced
// with a properly sequenced UUID by flow.Mapper prior to publishing.
DocumentUUIDPlaceholder = []byte("DocUUIDPlaceholder-329Bb50aa48EAa9ef")
)