/
Beam.mo
511 lines (421 loc) · 17 KB
/
Beam.mo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
import BeamEscrow "canister:beamescrow";
import Blob "mo:base/Blob";
import Debug "mo:base/Debug";
import Cycles "mo:base/ExperimentalCycles";
import Float "mo:base/Float";
import Int "mo:base/Int";
import Int64 "mo:base/Int64";
import List "mo:base/List";
import Nat "mo:base/Nat";
import Nat32 "mo:base/Nat32";
import Nat64 "mo:base/Nat64";
import Nat8 "mo:base/Nat8";
import Principal "mo:base/Principal";
import R "mo:base/Result";
import Text "mo:base/Text";
import T "mo:base/Time";
import Trie "mo:base/Trie";
import Env "../config/Env";
import Http "../http/Http";
import JSON "../http/JSON";
import BeamRelationStoreHelper "../model/beam/BeamRelationStoreHelper";
import BeamStoreHelper "../model/beam/BeamStoreHelper";
import BeamType "../model/beam/BeamType";
import EscrowType "../model/escrow/EscrowType";
import DateUtil "../utils/DateUtil";
import Guard "../utils/Guard";
import Op "../utils/Operation";
import TextUtil "../utils/TextUtil";
import ZoomUtil "../utils/ZoomUtil";
actor Beam {
type BeamModel = BeamType.BeamModel;
type BeamModelV2 = BeamType.BeamModelV2;
type BeamReadModel = BeamType.BeamReadModel;
type BeamId = BeamType.BeamId;
type Period = BeamType.Period;
type ErrorCode = BeamType.ErrorCode;
type BeamStatus = BeamType.BeamStatus;
type BeamMetric = BeamType.BeamMetric;
type BeamDateMetric = BeamType.BeamDateMetric;
type BeamRelationObjId = BeamType.BeamRelationObjId;
type EscrowId = EscrowType.EscrowId;
type Allocation = EscrowType.Allocation;
type HttpRequest = Http.HttpRequest;
type HttpResponse = Http.HttpResponse;
type QueryParam = Http.QueryParam;
type KeyValueText = JSON.KeyValueText;
type Time = T.Time;
type Result<Ok, Err> = R.Result<Ok, Err>;
stable var nextBeamId : BeamId = 0;
stable var version : Nat32 = 0;
stable var beamStoreV2 : Trie.Trie<BeamId, BeamModelV2> = Trie.empty();
stable var escrowBeamStore : Trie.Trie<EscrowId, BeamId> = Trie.empty();
stable var beamRelationStore : Trie.Trie<BeamRelationObjId, BeamId> = Trie.empty();
let topNBeams : Nat = 5;
let require = Guard.require;
// 30 secs in nanoseconds
let timerBeamPaymentEveryN : Nat64 = 30_000_000_000;
// Public func - Create new Beam for the EscrowContract escrowId and start beaming
// @return beamId if #ok, errorCode if #err
public shared ({ caller }) func createBeam(escrowId : EscrowId, scheduledEndDate : Time, rate : Period) : async Result<BeamId, ErrorCode> {
// allow only beamescrow canister
requireBeamEscrowCanisters(caller);
// --- Atomicity starts ---
let beamId = nextBeamId;
nextBeamId += 1;
// Create new Beam
let beam = BeamType.createBeam(beamId, escrowId, scheduledEndDate, rate);
// Add to beamStore and escrowBeamStore
beamStoreV2 := BeamStoreHelper.updateBeamStore(beamStoreV2, beam);
escrowBeamStore := BeamStoreHelper.updateEscrowBeamStore(escrowBeamStore, escrowId, beamId);
// --- Actor state changes commited ---
#ok(beamId)
};
// Create new beam with relation to external object id and paused beginning status. Use cases: Zoom Meeting
public shared ({ caller }) func createRelationBeam(escrowId : EscrowId, scheduledEndDate : Time, rate : Period, objId : BeamRelationObjId) : async Result<BeamId, ErrorCode> {
// allow only beamescrow canister
requireBeamEscrowCanisters(caller);
// --- Atomicity starts ---
let beamId = nextBeamId;
nextBeamId += 1;
// Create new Beam
let beam = BeamType.createRelationBeam(beamId, escrowId, scheduledEndDate, rate, objId);
// Add to beamStore, escrowBeamStore and beamEscrowRelationStore
beamStoreV2 := BeamStoreHelper.updateBeamStore(beamStoreV2, beam);
escrowBeamStore := BeamStoreHelper.updateEscrowBeamStore(escrowBeamStore, escrowId, beamId);
beamRelationStore := BeamRelationStoreHelper.updateBeamRelationStore(beamRelationStore, beamId, objId);
// --- Actor state changes commited ---
#ok(beamId)
};
// Stop the beam from streaming by setting the status to #paused
// Callable by Beam sender only
public shared ({ caller }) func stopBeam(escrowId : EscrowId) : async Result<BeamStatus, ErrorCode> {
await actionOnBeam(escrowId, #paused, caller)
};
// Restart the beam by setting status to #active
// Callable by Beam sender only
public shared ({ caller }) func restartBeam(escrowId : EscrowId) : async Result<BeamStatus, ErrorCode> {
await actionOnBeam(escrowId, #active, caller)
};
func actionOnBeam(escrowId : EscrowId, status : BeamStatus, caller : Principal) : async Result<BeamStatus, ErrorCode> {
// Assert caller to be Beam sender
let result = await BeamEscrow.queryMyBeamEscrowBySender(escrowId, caller);
let escrow = switch result {
case (#ok myContract) myContract;
case (#err content) return #err(#permission_denied(EscrowType.errorMesg(content)))
};
if (escrow.buyerPrincipal != caller) {
return #err(#permission_denied("Only beam sender can action on the beam"))
};
// fetch and update Beam.status to the status
let opBeam = BeamStoreHelper.findBeamByEscrowId(beamStoreV2, escrowBeamStore, escrowId);
let beam = switch opBeam {
case null {
return #err(#beam_notfound("Cannot find the beam."))
};
case (?myBeam) myBeam
};
let now = T.now();
let updatedBeam = BeamType.updateBeam(beam, now, status);
// persist beam
beamStoreV2 := BeamStoreHelper.updateBeamStore(beamStoreV2, updatedBeam);
#ok(updatedBeam.status)
};
func privateActionOnBeam(beamId : BeamId, status : BeamStatus) : () {
// fetch and update Beam.status to the status
let opBeam = BeamStoreHelper.findBeamById(beamStoreV2, beamId);
let beam = switch opBeam {
case null return;
case (?myBeam) myBeam
};
let now = T.now();
let updatedBeam = BeamType.updateBeam(beam, now, status);
// skip persist beam if status is #completed
if (updatedBeam.status == #completed) {
Debug.print("Cannot update status as beam is completed");
return
};
// persist beam
beamStoreV2 := BeamStoreHelper.updateBeamStore(beamStoreV2, updatedBeam)
};
// Private func - Find and process active BeamModels, called by heartbeat
func processActiveBeams() : async () {
let beamArray = Trie.toArray<BeamId, BeamModelV2, BeamModelV2>(
beamStoreV2,
func(key, value) : BeamModelV2 {
value
}
);
// Filter active beams only
let activeBeamArray = BeamStoreHelper.filterActiveBeams(beamArray);
// Find top 5 active beams ordered by lastProcessedDate
let topNArray = BeamStoreHelper.orderBy(activeBeamArray, #lastProcessedDate, topNBeams);
// Iterate beamArray with beamPayment
for (beam in topNArray.vals()) {
await beamPayment(beam)
}
};
// Private func - Beam (stream) payment to creator over time
// Follow Checks-Effects-Interactions-Rollback pattern
func beamPayment(beam : BeamModelV2) : async () {
// ----- Checks
// only do beaming (streaming) if numSec(now - lastProcessedDate) >= rate
let now = T.now();
if (DateUtil.numSecsBetween(now, beam.lastProcessedDate) < Nat32.toNat(beam.rate)) {
return
};
// ----- Effects
// --- Atomicity starts ---
// calculate the progress 0-1.0 of beam using min(1, (now - startDate) / scheduledEndDate)
// use the progress to update creator claimable allocation
var progress : Float = Float.fromInt(now - beam.startDate) / Float.fromInt(beam.scheduledEndDate - beam.startDate);
progress := Float.min(1.0, progress);
// Allocation is in Nat64 with e6s base e.g 10 = 10/1000000
let allocationBaseFloat = Float.fromInt64(Int64.fromNat64(EscrowType.allocationBase));
let allocationInt : Int = Int64.toInt(Float.toInt64(progress * allocationBaseFloat));
let creatorAllocation : Allocation = Nat64.fromNat(Int.abs(allocationInt));
assert (creatorAllocation <= EscrowType.allocationBase and creatorAllocation >= 0);
let escrowAllocation = EscrowType.allocationBase - creatorAllocation;
// update Beam to BeamStore
let status : BeamStatus = do {
if (creatorAllocation == EscrowType.allocationBase and beam.status == #active) {
#completed
} else {
beam.status
}
};
let updatedBeam = BeamType.updateBeam(beam, T.now(), status);
if (not BeamType.validateBeam(updatedBeam)) {
Debug.print("Invalid beam data for beamPayment");
return
};
beamStoreV2 := BeamStoreHelper.updateBeamStore(beamStoreV2, updatedBeam);
// --- Actor state changes commited ---
// ----- Interactions
let result = await BeamEscrow.updateEscrowAllocation(beam.escrowId, escrowAllocation, creatorAllocation, 0);
// Security - note another party can call beamPayment here (incl internally) while updateEscrowAllocation is processing
// ----- Rollback or Success
switch result {
case (#ok content)();
case (#err content) {
// Rollback if updateEscrowAllocation fails
// --- Atomicity starts ---
// load the beam again due to reentrancy, the beam above may have changed after updateEscrowAllocation
let opBeam = BeamStoreHelper.findBeamById(beamStoreV2, updatedBeam.id);
let currentBeam = switch opBeam {
case null {
Debug.print("BeamModel not found");
return ()
};
case (?myBeam) myBeam
};
let rollbackedBeam = BeamType.undoBeam(currentBeam, updatedBeam, beam);
beamStoreV2 := BeamStoreHelper.updateBeamStore(beamStoreV2, rollbackedBeam);
// --- Actor state changes commited ---
}
}
};
public query func queryBeamByEscrowIds(idArray : [EscrowId]) : async [BeamReadModel] {
return BeamStoreHelper.loadBeamReadModelByEscrowIds(beamStoreV2, escrowBeamStore, idArray)
};
// Trap if the caller is not BeamEscrow canister
func requireBeamEscrowCanisters(caller : Principal) : () {
if (caller == Principal.fromText(Env.beamEscrowCanisterId)) {
return
};
assert (false)
};
// Triggered processActiveBeans every timerBeamPaymentEveryN seconds
system func timer(setGlobalTimer : Nat64 -> ()) : async () {
let next = Nat64.fromIntWrap(T.now()) + timerBeamPaymentEveryN;
setGlobalTimer(next); // absolute time in nanoseconds
await processActiveBeams()
};
// Public func - simple health check
// @return true
public query func healthCheck() : async Bool {
true
};
// Public func - @return canister version
public query func canisterVersion() : async Nat32 {
version
};
// Public func - @return actor cycles balance
public query ({ caller }) func getActorBalance() : async Nat {
return Cycles.balance()
};
// Public func - @return canister memory info
public query func getCanisterMemoryInfo() : async Op.CanisterMemoryInfo {
return Op.getCanisterMemoryInfo()
};
type MesgType = {
// approved canister update - non-anonymous, arg sie <= 256 or 128
#createBeam : () -> (EscrowId, Time, Period);
#createRelationBeam : () -> (EscrowId, Time, Period, BeamRelationObjId);
#stopBeam : () -> EscrowId;
#restartBeam : () -> EscrowId;
// admin read - won't invoke inspect
#getActorBalance : () -> ();
// public read - won't invoke inspect
#queryBeamByEscrowIds : () -> [EscrowId];
#canisterVersion : () -> ();
#getCanisterMemoryInfo : () -> ();
#healthCheck : () -> ();
#http_request : () -> HttpRequest;
#http_request_update : () -> HttpRequest
};
system func inspect({ arg : Blob; caller : Principal; msg : MesgType }) : Bool {
switch msg {
case (#createBeam _) not Guard.isAnonymous(caller) and Guard.withinSize(arg, 256);
case (#createRelationBeam _) not Guard.isAnonymous(caller) and Guard.withinSize(arg, 256);
case (#stopBeam _) not Guard.isAnonymous(caller) and Guard.withinSize(arg, 128);
case (#restartBeam _) not Guard.isAnonymous(caller) and Guard.withinSize(arg, 128);
case _ true
}
};
// Metrics - reportMetric in HTTP request: {totalNumBeam, groupByDate: [{numBeam, date}]}
func reportMetric() : BeamMetric {
let totalNumBeam : Nat = BeamStoreHelper.queryTotalBeam(beamStoreV2);
let groupByDate : [BeamDateMetric] = BeamStoreHelper.queryBeamDate(beamStoreV2);
{
totalNumBeam;
groupByDate
}
};
public query func http_request(req : HttpRequest) : async HttpResponse {
let parsedURL = Http.parseURL(req.url);
switch (parsedURL) {
case (#err(_)) Http.BadRequest();
case (#ok(endPoint, queryParams)) {
switch (endPoint) {
case "/metric" {
processMetricRequest(queryParams)
};
case "/health" {
processHealthRequest(queryParams)
};
case "/zoom" {
processZoomReadRequest(req, queryParams)
};
case _ Http.BadRequest()
}
}
}
};
public func http_request_update(req : HttpRequest) : async HttpResponse {
let parsedURL = Http.parseURL(req.url);
switch (parsedURL) {
case (#err(_)) Http.BadRequest();
case (#ok(endPoint, queryParams)) {
switch (endPoint) {
case "/zoom" {
processZoomUpdateRequest(req)
};
case _ Http.BadRequest()
}
}
}
};
func processMetricRequest(queryParams : [QueryParam]) : HttpResponse {
if (not Http.checkKey(queryParams, "clientKey", Env.clientKey)) {
return Http.BadRequest()
};
let metric = reportMetric();
let jsonText = BeamType.toJSON(metric);
Http.JsonContent(jsonText, false)
};
func processHealthRequest(queryParams : [QueryParam]) : HttpResponse {
if (not Http.checkKey(queryParams, "clientKey", Env.clientKey)) {
return Http.BadRequest()
};
let jsonText = JSON.createArray("result", List.nil<KeyValueText>());
Http.JsonContent(jsonText, false)
};
func processZoomReadRequest(req : HttpRequest, queryParams : [QueryParam]) : HttpResponse {
if (not Http.checkKey(queryParams, "clientKey", Env.clientKey)) {
return Http.BadRequest()
};
let jsonStr = Text.decodeUtf8(req.body);
switch (jsonStr) {
case null Http.TextContent("Invalid string");
case (?myStr) {
var event = ZoomUtil.extractEvent(myStr, 2);
if (event == null) {
event := ZoomUtil.extractEvent(myStr, 0)
};
switch (event) {
case null return Http.TextContent("No event found");
case (?myEvent) {
switch (myEvent) {
case "endpoint.url_validation" {
let jsonRes = ZoomUtil.processValidationRequest(myStr);
return Http.JsonContent(jsonRes, false)
};
case "meeting.started" return Http.TextContentUpgrade("success", true);
case "app_deauthorized" {
let isAuthentic = ZoomUtil.verifySignature(myStr, req.headers);
if (not isAuthentic) {
return Http.BadRequestWith("Invalid signature")
};
return Http.TextContent("Event processed successfully")
};
case _ {
return Http.TextContent("No matching events found")
}
}
}
};
Http.JsonContent("", false)
}
}
};
func processZoomUpdateRequest(req : HttpRequest) : HttpResponse {
let jsonStr = Text.decodeUtf8(req.body);
switch (jsonStr) {
case null Http.TextContent("Invalid string");
case (?myStr) {
var event = ZoomUtil.extractEvent(myStr, 2);
if (event == null) {
event := ZoomUtil.extractEvent(myStr, 0)
};
switch (event) {
case null return Http.TextContent("No event found");
case (?myEvent) {
switch (myEvent) {
case "meeting.started" {
let isAuthentic = ZoomUtil.verifySignature(myStr, req.headers);
if (not isAuthentic) {
return Http.BadRequestWith("Invalid signature")
};
// start Beam
let meetingIdOp = ZoomUtil.extractMeetingId(myStr);
let meetingId = switch (meetingIdOp) {
case null {
Debug.print("Meeting id not found");
return Http.BadRequestWith("Meeting Id not found")
};
case (?id) id
};
let beamIdOp = BeamRelationStoreHelper.findBeamIdByRelId(beamRelationStore, meetingId);
let beamId = switch (beamIdOp) {
case null {
Debug.print("Beam Id not found");
return Http.BadRequestWith("Beam Id not found")
};
case (?id) id
};
let newStatus = #active;
privateActionOnBeam(beamId, newStatus);
return Http.TextContent("Event processed successfully")
};
case _ {
return Http.TextContent("No matching events found")
}
}
}
};
Http.JsonContent("", false)
}
}
};
}