forked from asonawalla/gazette
/
interfaces.go
97 lines (80 loc) · 3.54 KB
/
interfaces.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package consumer
import (
rocks "github.com/tecbot/gorocksdb"
"github.com/LiveRamp/gazette/pkg/topic"
)
//go:generate mockery -inpkg -name=Shard
// ShardID uniquely identifies a specific Shard. A ShardID must be consistent
// across processes for the entire duration of the Consumer lifetime.
type ShardID string
func (id ShardID) String() string {
return string(id)
}
type Shard interface {
// The concrete ID of this Shard.
ID() ShardID
// The consumed Partition of this Shard.
Partition() topic.Partition
// A consumer may wish to maintain in-memory state for
// performance reasons. Examples could include:
// * Objects we’re reducing over, for which we wish to avoid
// excessive database writes.
// * An LRU of "hot" objects we expect to reference again soon.
// However, to guarantee required transactionality properties,
// consumers must be careful not to mix states between shards.
// |Cache| is available to consumers for shard-level isolation
// of a consumer-specific local memory context.
Cache() interface{}
SetCache(interface{})
// Returns the database of the Shard.
Database() *rocks.DB
// Current Transaction of the consumer shard. All writes issued through
// Transaction will commit atomically and be check-pointed with consumed
// Journal offsets. This provides exactly-once processing of Journal content
// (though note that Gazette is itself an at-least once system, and Journal
// writes themselves could be duplicated). Writes may be done directly to
// the database, in which case they will be applied at-least once (for
// example, because a Shard is recovered to a state after a write was applied
// but before corresponding Journal offsets were written).
Transaction() *rocks.WriteBatch
// Returns initialized read and write options for the database.
ReadOptions() *rocks.ReadOptions
WriteOptions() *rocks.WriteOptions
}
type Consumer interface {
// Topics this Consumer is consuming.
Topics() []*topic.Description
// Called when a message becomes available from one of the consumer’s
// joined topics. If the returned error is non-nil, the Shard is assumed to
// be in an unhealthy state and will be torn down.
Consume(topic.Envelope, Shard, *topic.Publisher) error
// Called when a consumer transaction is about to complete. If the Shard
// Cache() contains any modified state, it must be persisted to Transaction()
// during this call. As in Consume(), a returned error will result in the
// tear-down of the Shard.
Flush(Shard, *topic.Publisher) error
}
// Optional Consumer interface for implementations which prune or expire
// keys & values from the Shard database on a Consumer-specific criteria.
// This interface intentionally overlaps with `rocks.CompactionFilter`.
type Filterer interface {
Filter(level int, key, val []byte) (remove bool, newVal []byte)
}
// Optional Consumer interface for notification of Shard initialization prior
// to an initial Consume. A common use case is to initialize the shard cache.
type ShardIniter interface {
InitShard(Shard) error
}
// Optional Consumer interface for notification the Shard is no longer being
// processed by this Consumer. No further Consume or Flush calls will occur,
// nor will further writes to the recovery log. A common use case is to hint
// to the Consumer that external resources or connections associated with the
// Shard should be released.
type ShardHalter interface {
HaltShard(Shard)
}
// Optional Consumer interface for customization of Shard database options
// prior to initial open.
type OptionsIniter interface {
InitOptions(*rocks.Options)
}