This repository has been archived by the owner on Oct 17, 2023. It is now read-only.
/
message.go
107 lines (92 loc) · 2.66 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
// Copyright 2014 The Transporter Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package message provides wrapper structs and helper methods to pipe
// actual database documents throughout transporter.
package message
import (
"fmt"
"time"
"gopkg.in/mgo.v2/bson"
"github.com/compose/transporter/message/data"
"github.com/compose/transporter/message/ops"
)
// A Msg serves to wrap the actual document to
// provide additional metadata about the document
// being transported.
type Msg interface {
ID() string
OP() ops.Op
Timestamp() int64
Data() data.Data
Namespace() string
UpdateNamespace(string)
Confirms() chan struct{}
}
// From builds a message.Msg specific to an elasticsearch document
func From(op ops.Op, namespace string, d data.Data) Msg {
return &Base{
Operation: op,
TS: time.Now().Unix(),
NS: namespace,
MapData: d,
confirm: nil,
}
}
// WithConfirms attaches a channel to be able to acknowledge message processing.
func WithConfirms(confirm chan struct{}, msg Msg) Msg {
switch m := msg.(type) {
case *Base:
m.confirm = confirm
}
return msg
}
// Base represents a standard message format for transporter data
// if it does not meet your need, you can embed the struct and override whatever
// methods needed to accurately represent the data structure.
type Base struct {
TS int64
NS string
Operation ops.Op
MapData data.Data
confirm chan struct{}
}
// Timestamp returns the time the object was created in transporter (i.e. it has no correlation
// with any time in the database).
func (m *Base) Timestamp() int64 {
return m.TS
}
// Namespace returns the combination of database/table/colleciton for the underlying adaptor.
func (m *Base) Namespace() string {
return m.NS
}
// UpdateNamespace provides access to change the current namespace associated with the message.
func (m *Base) UpdateNamespace(newNs string) {
m.NS = newNs
}
// OP returns the type of operation the message is associated with (i.e. insert/update/delete).
func (m *Base) OP() ops.Op {
return m.Operation
}
// Data returns the internal representation of the document as the data.Data type
func (m *Base) Data() data.Data {
return m.MapData
}
// ID will attempt to convert the _id field into a string representation
func (m *Base) ID() string {
if _, ok := m.MapData["_id"]; !ok {
return ""
}
switch id := m.MapData["_id"].(type) {
case string:
return id
case bson.ObjectId:
return id.Hex()
default:
return fmt.Sprintf("%v", id)
}
}
// Confirms provides access to the underlying channel.
func (m *Base) Confirms() chan struct{} {
return m.confirm
}