-
Notifications
You must be signed in to change notification settings - Fork 0
/
parts_assembler.go
137 lines (111 loc) · 4.15 KB
/
parts_assembler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// Copyright 2017 Factom Foundation
// Use of this source code is governed by the MIT
// license that can be found in the LICENSE file.
package p2p
import (
"fmt"
"time"
log "github.com/sirupsen/logrus"
)
var partsLogger = packageLogger.WithField("subpack", "parts_assembler")
// maximum time we wait for a partial message to arrive, old entries are cleaned up only when new part arrives
const MaxTimeWaitingForReassembly time.Duration = time.Second * 60 * 10
type PartialMessage struct {
parts []*Parcel // array of message parts we've received so far
firstPartReceived time.Time // a timestamp indicating when the first part was received
mostRecentPartReceived time.Time // a timestamp indicating when the mostRecent part was received
}
// PartsAssembler is responsible for assembling message parts into full messages
type PartsAssembler struct {
messages map[string]*PartialMessage // a map of app hashes to partial messages
// logging
logger *log.Entry
}
// Initializes the assembler
func (assembler *PartsAssembler) Init() *PartsAssembler {
assembler.logger = partsLogger
assembler.messages = make(map[string]*PartialMessage)
return assembler
}
// Handles a single message part, returns either a fully assembled message or nil
func (assembler *PartsAssembler) handlePart(parcel Parcel) *Parcel {
assembler.logger.Debugf("Handling message part %s %d/%d", parcel.Header.AppHash, parcel.Header.PartNo+1, parcel.Header.PartsTotal)
partial, exists := assembler.messages[parcel.Header.AppHash]
valid, err := validateParcelPart(parcel, partial)
if !valid {
assembler.logger.Warnf("Detected invalid parcel: %s, dropping", err.Error())
return nil
}
if !exists {
partial = createNewPartialMessage(parcel)
assembler.messages[parcel.Header.AppHash] = partial
}
partial.parts[parcel.Header.PartNo] = &parcel
partial.mostRecentPartReceived = time.Now()
// get an assembled parcel or nil if not yet ready
fullParcel := tryReassemblingMessage(partial)
if fullParcel != nil {
delete(assembler.messages, parcel.Header.AppHash)
assembler.logger.Debugf("Fully assembled %s", parcel.Header.AppHash)
}
// go through all partial messages and removes the old ones
assembler.cleanupOldPartialMessages()
return fullParcel
}
// checks if part is valid for assembler to process
func validateParcelPart(parcel Parcel, partial *PartialMessage) (isValid bool, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Error in validateParcelPart")
return
}
}()
if parcel.Header.PartsTotal <= 0 {
err = fmt.Errorf("PartsTotal less or equal 0")
return
}
if parcel.Header.PartNo < 0 {
err = fmt.Errorf("PartNo less than 0")
return
}
if parcel.Header.PartNo >= parcel.Header.PartsTotal {
err = fmt.Errorf("PartNo outside of PartsTotal range")
return
}
if partial != nil && parcel.Header.PartsTotal != uint16(len(partial.parts)) {
err = fmt.Errorf("PartsTotal does not match allocated array of parts")
return
}
isValid = true
return // valid
}
// Checks existing partial messages and if there is anything older than MaxTimeWaitingForReassembly,
// drops the partial message
func (assembler *PartsAssembler) cleanupOldPartialMessages() {
for appHash, partial := range assembler.messages {
timeWaiting := time.Since(partial.mostRecentPartReceived)
timeSinceFirst := time.Since(partial.firstPartReceived)
if timeWaiting > MaxTimeWaitingForReassembly {
delete(assembler.messages, appHash)
assembler.logger.Debugf("dropping message %d after %s secs, time since first part: %s secs",
appHash, timeWaiting/time.Second, timeSinceFirst/time.Second)
}
}
}
// Creates a new PartialMessage from a given parcel
func createNewPartialMessage(parcel Parcel) *PartialMessage {
partial := new(PartialMessage)
partial.parts = make([]*Parcel, parcel.Header.PartsTotal)
partial.firstPartReceived = time.Now()
return partial
}
// Tries reassembling a full Message from existing MessageParts, returns nil if
// we don't have all the necessary parts yet
func tryReassemblingMessage(partial *PartialMessage) *Parcel {
for _, part := range partial.parts {
if part == nil {
return nil
}
}
return ReassembleParcel(partial.parts)
}