/
FountainDecoder.swift
239 lines (203 loc) · 7.96 KB
/
FountainDecoder.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
//
// FountainDecoder.swift
//
// Copyright © 2020 by Blockchain Commons, LLC
// Licensed under the "BSD-2-Clause Plus Patent License"
//
import Foundation
// Implements Luby transform code rateless decoding
// https://en.wikipedia.org/wiki/Luby_transform_code
public final class FountainDecoder {
typealias PartIndexes = Set<Int>
typealias PartDict = [PartIndexes: Part]
var expectedPartIndexes: PartIndexes!
var expectedFragmentLen: Int!
var expectedMessageLen: Int!
var expectedChecksum: UInt32!
var receivedPartIndexes: PartIndexes = []
var simpleParts: PartDict = [:]
var mixedParts: PartDict = [:]
var queuedParts: [Part] = []
public private (set) var processedPartsCount = 0
public private (set) var result: Result<Data, Error>?
public private (set) var checksum: UInt32?
public var estimatedPercentComplete: Double {
guard result == nil else { return 1 }
guard let expectedPartCount = expectedPartIndexes?.count else { return 0 }
let estimatedInputParts = Double(expectedPartCount) * 1.75
return min( 0.99, Double(processedPartsCount) / estimatedInputParts)
}
public enum Error: Swift.Error {
case invalidPart
case invalidChecksum
}
struct Part {
let partIndexes: PartIndexes
let data: Data
var index: Int { partIndexes.first! }
init(_ p: FountainEncoder.Part) {
partIndexes = chooseFragments(seqNum: p.seqNum, seqLen: p.seqLen, checksum: p.checksum)
data = p.data
}
init(fragmentIndexes: PartIndexes, data: Data) {
self.partIndexes = fragmentIndexes
self.data = data
}
var isSimple: Bool {
partIndexes.count == 1
}
}
public init() {
}
public func receivePart(_ encoderPart: FountainEncoder.Part) {
// Don't process the part if we're already done
guard result == nil else { return }
// Don't continue if this part doesn't validate
guard validatePart(encoderPart) else { return }
// Add this part to the queue
let part = Part(encoderPart)
enqueue(part)
// Process the queue until we're done or the queue is empty
while result == nil && !queuedParts.isEmpty {
processQueueItem()
}
// Keep track of how many parts we've processed
processedPartsCount += 1
// printPartEnd()
}
private func enqueue(_ part: Part) {
queuedParts.append(part)
}
func printPartEnd() {
let expected = expectedPartIndexes.count
let received = receivedPartIndexes.count
let percent = Int((estimatedPercentComplete * 100).rounded())
print("processed: \(processedPartsCount) expected: \(expected) received: \(received) percent: \(percent)%")
}
func printPart(_ part: Part) {
let indexes = Array(part.partIndexes).sorted()
print("part indexes: \(indexes)")
}
func printState() {
let parts = expectedPartIndexes.count
let received = Array(receivedPartIndexes).sorted()
let mixed = mixedParts.keys.map { Array($0).sorted() }
let queued = queuedParts.count
print("parts: \(parts), received: \(received), mixed: \(mixed), queued: \(queued), result: \(String(describing: result))")
}
private func processQueueItem() {
let part = queuedParts.removeFirst()
// printPart(part)
if part.isSimple {
processSimplePart(part)
} else {
processMixedPart(part)
}
// printState()
}
private func reduceMixed(by part: Part) {
// Reduce all the current mixed parts by the given part
let reducedParts = mixedParts.values.map {
reducePart($0, by: part)
}
// Collect all the remaining mixed parts
var newMixed: PartDict = [:]
reducedParts.forEach { reducedPart in
// If this reduced part is now simple
if reducedPart.isSimple {
// Add it to the queue
enqueue(reducedPart)
} else {
// Otherwise, add it to the list of current mixed parts
newMixed[reducedPart.partIndexes] = reducedPart
}
}
mixedParts = newMixed
}
// Reduce part `a` by part `b`
private func reducePart(_ a: Part, by b: Part) -> Part {
// If the fragments mixed into `b` are a strict (proper) subset of those in `a`...
if b.partIndexes.isStrictSubset(of: a.partIndexes) {
// The new fragments in the revised part are `a` - `b`.
let newIndexes = a.partIndexes.subtracting(b.partIndexes)
// The new data in the revised part are `a` XOR `b`
let newData = a.data.xor(with: b.data)
return Part(fragmentIndexes: newIndexes, data: newData)
} else {
// `a` is not reducable by `b`, so return a
return a
}
}
private func processSimplePart(_ part: Part) {
// Don't process duplicate parts
let fragmentIndex = part.partIndexes.first!
guard !receivedPartIndexes.contains(fragmentIndex) else { return }
// Record this part
simpleParts[part.partIndexes] = part
receivedPartIndexes.insert(fragmentIndex)
// If we've received all the parts
if receivedPartIndexes == expectedPartIndexes {
// Reassemble the message from its fragments
let sortedParts = Array(simpleParts.values).sorted { $0.index < $1.index }
let fragments = sortedParts.map { $0.data }
let message = Self.joinFragments(fragments, messageLen: expectedMessageLen)
// Verify the message checksum and note success or failure
let checksum = CRC32.checksum(data: message)
if checksum == expectedChecksum {
result = .success(message)
} else {
result = .failure(.invalidChecksum)
}
} else {
// Reduce all the mixed parts by this part
reduceMixed(by: part)
}
}
private func processMixedPart(_ part: Part) {
// Don't process duplicate parts
guard !mixedParts.keys.contains(part.partIndexes) else { return }
// Reduce this part by all the others
let p = join(simpleParts.values, mixedParts.values).reduce(part) {
reducePart($0, by: $1)
}
// If the part is now simple
if p.isSimple {
// Add it to the queue
enqueue(p)
} else {
// Reduce all the mixed parts by this one
reduceMixed(by: p)
// Record this new mixed part
mixedParts[p.partIndexes] = p
}
}
private func validatePart(_ part: FountainEncoder.Part) -> Bool {
// If this is the first part we've seen
if expectedPartIndexes == nil {
// Record the things that all the other parts we see will have to match to be valid.
expectedPartIndexes = Set(0 ..< part.seqLen)
expectedMessageLen = part.messageLen
expectedChecksum = part.checksum
expectedFragmentLen = part.data.count
} else {
// If this part's values don't match the first part's values
guard expectedPartIndexes.count == part.seqLen,
expectedMessageLen == part.messageLen,
expectedChecksum == part.checksum,
expectedFragmentLen == part.data.count
else {
// Throw away the part
return false
}
}
// This part should be processed
return true
}
// Join all the fragments of a message together, throwing away any padding
static func joinFragments(_ fragments: [Data], messageLen: Int) -> Data {
var message = Data(fragments.joined())
let padding = message.count - messageLen
message.removeLast(padding)
return message
}
}