-
Notifications
You must be signed in to change notification settings - Fork 0
/
message.go
161 lines (132 loc) · 4.69 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package repos
import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"time"
"github.com/almerlucke/go-iban/iban"
"github.com/cockroachdb/apd/v3"
)
// Errors returned by this repo
var (
ErrAlreadyProcessed = errors.New("message already processed")
)
type (
// UpdateOption provides a bit of flexibility to Updates.
UpdateOption func(*UpdateOptions)
UpdateOptions struct {
Force bool
}
// Message contains a Payload for processing as well a various headers used
// ensure a no loss transmission.
Message struct {
ID string `json:"id" db:"id"`
ProducerID string `json:"producer_id" db:"producer_id"`
ConsumerID string `json:"consumer_id" db:"consumer_id"`
MessageStatus MessageStatus `json:"message_status" db:"message_status"`
ProcessingStatus ProcessingStatus `json:"processing_status" db:"processing_status"`
InceptionTime time.Time `json:"inception_time" db:"inception_time"`
LastTime time.Time `json:"last_time" db:"last_time"`
ProducerReplays uint `json:"producer_replays" db:"producer_replays"`
ConsumerReplays uint `json:"consumer_replays" db:"consumer_replays"`
Payload `json:"payload"`
}
// Payload represents the functional payload of a message.
//
// In this example, the message payload is a typical bank transfer.
Payload struct {
OperationType OperationType `json:"operation_type" db:"operation_type"`
CreditorAccount string `json:"creditor_account" db:"creditor_account"`
DebtorAccount string `json:"debtor_account" db:"debtor_account"`
Amount Decimal `json:"amount" db:"amount"`
Currency string `json:"currency" db:"currency"`
BalanceBefore *Decimal `json:"balance_before,omitempty" db:"balance_before"`
BalanceAfter *Decimal `json:"balance_after,omitempty" db:"balance_after"`
Comment *string `json:"comment,omitempty" db:"comment"`
RejectionCause *string `json:"rejection_cause,omitempty" db:"rejection_cause"`
}
// InputPayload represents the accepted user input for a message.
InputPayload struct {
CorrespondantBank string `json:"correspondant_bank"`
OperationType OperationType `json:"operation_type"`
CreditorAccount string `json:"creditor_account"`
DebtorAccount string `json:"debtor_account"`
Amount Decimal `json:"amount"`
Currency string `json:"currency"`
Comment *string `json:"comment,omitempty"`
}
// MessagePredicate is used to specify filters when querying Messages
MessagePredicate struct {
UpdatedSince *time.Time
NotUpdatedSince *time.Time
WithMessageStatus *MessageStatus
WithProcessingStatus *ProcessingStatus
MaxMessageStatus *MessageStatus
MaxProcessingStatus *ProcessingStatus
FromProducer *string
FromConsumer *string
Limit uint64
Unconfirmed bool
_ struct{}
}
)
func (p InputPayload) AsMessage() Message {
return Message{
ConsumerID: p.CorrespondantBank,
Payload: Payload{
OperationType: p.OperationType,
CreditorAccount: p.CreditorAccount,
DebtorAccount: p.DebtorAccount,
Amount: p.Amount,
Currency: p.Currency,
Comment: p.Comment,
},
}
}
func (p Message) Validate() error {
// TODO: validation - check ConsumerID is legit
return p.Payload.Validate()
}
var zero = apd.New(0, 2)
// Validate the presence of required field and legit IBAN account identifiers.
func (p Payload) Validate() error {
if !p.OperationType.IsValid() {
return fmt.Errorf("invalid operation type: %d", p.OperationType)
}
if len(p.CreditorAccount) == 0 {
return fmt.Errorf("required creditor account: %q", p.CreditorAccount)
}
if len(p.DebtorAccount) == 0 {
return fmt.Errorf("required debtor account: %q", p.DebtorAccount)
}
if len(p.Currency) != 3 {
return fmt.Errorf("invalid currency: %q", p.Currency)
}
if !p.Amount.Valid || p.Amount.Decimal.Cmp(zero) <= 0 {
return fmt.Errorf("invalid amount: %v", p.Amount)
}
if p.Comment != nil && len(*p.Comment) > 255 {
return fmt.Errorf("comment is too long: %d chars", len(*p.Comment))
}
if _, err := iban.NewIBAN(p.CreditorAccount); err != nil {
return fmt.Errorf("creditor account is an invalid IBAN: %q: %w", p.CreditorAccount, err)
}
if _, err := iban.NewIBAN(p.DebtorAccount); err != nil {
return fmt.Errorf("debtor account is an invalid IBAN: %q: %w", p.DebtorAccount, err)
}
return nil
}
func (p Message) Bytes() ([]byte, error) {
var buffer bytes.Buffer
enc := gob.NewEncoder(&buffer)
if err := enc.Encode(p); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
func WithForceUpdate(enabled bool) UpdateOption {
return func(o *UpdateOptions) {
o.Force = enabled
}
}