/
stream.go
860 lines (655 loc) · 27.5 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
package service
import (
"context"
"net/url"
"strings"
"time"
"github.com/EmissarySocial/emissary/model"
"github.com/EmissarySocial/emissary/queries"
"github.com/EmissarySocial/emissary/tools/id"
"github.com/benpate/data"
"github.com/benpate/data/option"
"github.com/benpate/derp"
"github.com/benpate/domain"
"github.com/benpate/exp"
"github.com/benpate/hannibal/vocab"
"github.com/benpate/rosetta/html"
"github.com/benpate/rosetta/list"
"github.com/benpate/rosetta/schema"
"github.com/benpate/rosetta/sliceof"
"github.com/benpate/sherlock"
"github.com/gernest/mention"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// Stream manages all interactions with the Stream collection
type Stream struct {
collection data.Collection
templateService *Template
draftService *StreamDraft
outboxService *Outbox
attachmentService *Attachment
activityService *ActivityStream
contentService *Content
keyService *EncryptionKey
followerService *Follower
ruleService *Rule
userService *User
host string
streamUpdateChannel chan<- model.Stream
}
// NewStream returns a fully populated Stream service.
func NewStream() Stream {
return Stream{}
}
/******************************************
* Lifecycle Methods
******************************************/
// Refresh updates any stateful data that is cached inside this service.
func (service *Stream) Refresh(collection data.Collection, templateService *Template, draftService *StreamDraft, outboxService *Outbox, attachmentService *Attachment, activityService *ActivityStream, contentService *Content, keyService *EncryptionKey, followerService *Follower, ruleService *Rule, userService *User, host string, streamUpdateChannel chan model.Stream) {
service.collection = collection
service.templateService = templateService
service.draftService = draftService
service.outboxService = outboxService
service.attachmentService = attachmentService
service.activityService = activityService
service.contentService = contentService
service.keyService = keyService
service.followerService = followerService
service.ruleService = ruleService
service.userService = userService
service.host = host
service.streamUpdateChannel = streamUpdateChannel
}
func (service *Stream) Startup(theme *model.Theme) error {
// Try to count the number of streams currently in the database
count, err := service.Count(exp.All())
if err != nil {
return derp.Wrap(err, "service.Theme.Startup", "Unable to count streams")
}
// If the database is not empty, then do not add more...
if count > 0 {
return nil
}
streamSchema := service.Schema()
for _, data := range theme.StartupStreams {
// Create a new Stream
stream := model.NewStream()
if err := streamSchema.SetAll(&stream, data); err != nil {
derp.Report(derp.Wrap(err, "service.Theme.Startup", "Unable to set stream data", data))
continue
}
// If we have default content, then add that too.
if content, ok := data["content"].(model.Content); ok {
stream.Content = content
}
// Validate with the general-purpose Stream schema
if err := streamSchema.Validate(&stream); err != nil {
derp.Report(derp.Wrap(err, "service.Theme.Startup", "Invalid stream data"))
continue
}
// Get/Validate the template for the new stream
templateID := data.GetString("templateId")
template, err := service.templateService.Load(templateID)
if err != nil {
derp.Report(derp.Wrap(err, "service.Theme.Startup", "Unable to load template", templateID))
continue
}
// Validate with the specific Template schema
if err := template.Schema.Validate(&stream); err != nil {
derp.Report(derp.Wrap(err, "service.Theme.Startup", "Invalid stream data"))
continue
}
// Save the new Stream to the database
if err := service.Save(&stream, "Created by Startup"); err != nil {
derp.Report(derp.Wrap(err, "service.Theme.Startup", "Unable to save stream", stream))
continue
}
}
return nil
}
// Close stops any background processes controlled by this service
func (service *Stream) Close() {
}
/******************************************
* Common Methods
******************************************/
// New returns a new Stream that uses the named template.
func (service *Stream) New() model.Stream {
result := model.NewStream()
result.URL = service.host + "/" + result.Token
// TODO: HIGH: Use stream Template schema to set default values in the new stream.
return result
}
// Query returns an slice containing all of the Streams that match the provided criteria
func (service *Stream) Query(criteria exp.Expression, options ...option.Option) ([]model.Stream, error) {
result := make([]model.Stream, 0)
err := service.collection.Query(&result, notDeleted(criteria), options...)
return result, err
}
// QuerySummary returns an slice containing StreamSummaries for all of the Streams that match the provided criteria
func (service *Stream) QuerySummary(criteria exp.Expression, options ...option.Option) ([]model.StreamSummary, error) {
result := make([]model.StreamSummary, 0)
err := service.collection.Query(&result, notDeleted(criteria), options...)
return result, err
}
// List returns an iterator containing all of the Streams that match the provided criteria
func (service *Stream) List(criteria exp.Expression, options ...option.Option) (data.Iterator, error) {
return service.collection.Iterator(notDeleted(criteria), options...)
}
// Load retrieves an Stream from the database
func (service *Stream) Load(criteria exp.Expression, stream *model.Stream) error {
if err := service.collection.Load(notDeleted(criteria), stream); err != nil {
return derp.Wrap(err, "service.Stream", "Error loading Stream", criteria)
}
return nil
}
// Save adds/updates an Stream in the database
func (service *Stream) Save(stream *model.Stream, note string) error {
const location = "service.Stream"
template, err := service.templateService.Load(stream.TemplateID)
if err != nil {
return derp.Wrap(err, location, "Invalid Template", stream.TemplateID)
}
// Copy default values from the Template
stream.SocialRole = template.SocialRole
stream.URL = service.host + "/" + stream.StreamID.Hex()
// RULE: Calculate "defaultAllow" groups for this stream.
defaultTemplate := template.Default()
defaultRoles := defaultTemplate.AllowedRoles(stream.StateID)
stream.DefaultAllow = stream.PermissionGroups(defaultRoles...)
// RULE: Calculate rank
if stream.Rank == 0 {
maxRank, err := service.MaxRank(stream.ParentID)
if err != nil {
return derp.Wrap(err, location, "Error calculating max rank")
}
stream.Rank = maxRank
}
// RULE: Default Token
if stream.Token == "" {
stream.Token = stream.StreamID.Hex()
}
// Clean the value (using the global stream schema) before saving
if err := service.Schema().Clean(stream); err != nil {
return derp.Wrap(err, "service.Stream.Save", "Error cleaning Stream using StreamSchema", stream)
}
// Clean the value (using the template-specific schema) before saving
if err := template.Schema.Clean(stream); err != nil {
return derp.Wrap(err, "service.Stream.Save", "Error cleaning Stream using TemplateSchema", stream)
}
// RULE: If this stream does not have ParentIDs, then calculate them now.
if len(stream.ParentIDs) == 0 {
if err := service.CalcParentIDs(stream); err != nil {
return derp.Wrap(err, location, "Error calculating parent IDs", stream)
}
}
// RULE: Calculate the stream context
service.CalcContext(stream)
// Try to save the Stream to the database
if err := service.collection.Save(stream, note); err != nil {
return derp.Wrap(err, location, "Error saving Stream", stream, note)
}
// NON-BLOCKING: Notify other processes on this server that the stream has been updated
go func() {
service.streamUpdateChannel <- *stream
}()
// One milisecond delay prevents overlapping stream.CreateDates. Deal with it.
// TODO: There has to be a better way than this...
time.Sleep(1 * time.Millisecond)
return nil
}
// Delete removes an Stream from the database (virtual delete)
func (service *Stream) Delete(stream *model.Stream, note string) error {
// Delete this Stream
if err := service.collection.Delete(stream, note); err != nil {
return derp.Wrap(err, "service.Stream.Delete", "Error deleting Stream", stream, note)
}
// Delete related records -- this can happen in the background
go func() {
// RULE: Delete all related Children
if err := service.DeleteByParent(stream.StreamID, note); err != nil {
derp.Report(derp.Wrap(err, "service.Stream.Delete", "Error deleting child streams", stream, note))
}
// RULE: Delete all related Attachments
if err := service.attachmentService.DeleteAll(model.AttachmentTypeStream, stream.StreamID, note); err != nil {
derp.Report(derp.Wrap(err, "service.Stream.Delete", "Error deleting attachments", stream, note))
}
// RULE: Delete all related Drafts
if err := service.draftService.Delete(stream, note); err != nil {
derp.Report(derp.Wrap(err, "service.Stream.Delete", "Error deleting drafts", stream, note))
}
}()
// Bueno!!
return nil
}
// DeleteMany removes all child streams from the provided stream (virtual delete)
func (service *Stream) DeleteMany(criteria exp.Expression, note string) error {
it, err := service.List(notDeleted(criteria))
if err != nil {
return derp.Wrap(err, "service.Stream.Delete", "Error listing streams to delete", criteria)
}
stream := model.NewStream()
for it.Next(&stream) {
if err := service.Delete(&stream, note); err != nil {
return derp.Wrap(err, "service.Stream.Delete", "Error deleting stream", stream)
}
stream = model.NewStream()
}
return nil
}
/******************************************
* Generic Data Methods
******************************************/
// ObjectType returns the type of object that this service manages
func (service *Stream) ObjectType() string {
return "Stream"
}
// New returns a fully initialized model.Stream as a data.Object.
func (service *Stream) ObjectNew() data.Object {
result := model.NewStream()
return &result
}
func (service *Stream) ObjectID(object data.Object) primitive.ObjectID {
if stream, ok := object.(*model.Stream); ok {
return stream.StreamID
}
return primitive.NilObjectID
}
func (service *Stream) ObjectQuery(result any, criteria exp.Expression, options ...option.Option) error {
return service.collection.Query(result, notDeleted(criteria), options...)
}
func (service *Stream) ObjectList(criteria exp.Expression, options ...option.Option) (data.Iterator, error) {
return service.List(criteria, options...)
}
func (service *Stream) ObjectLoad(criteria exp.Expression) (data.Object, error) {
result := model.NewStream()
err := service.Load(criteria, &result)
return &result, err
}
func (service *Stream) ObjectSave(object data.Object, note string) error {
if stream, ok := object.(*model.Stream); ok {
return service.Save(stream, note)
}
return derp.NewInternalError("service.Stream.ObjectSave", "Invalid object type", object)
}
func (service *Stream) ObjectDelete(object data.Object, note string) error {
if stream, ok := object.(*model.Stream); ok {
return service.Delete(stream, note)
}
return derp.NewInternalError("service.Stream.ObjectDelete", "Invalid object type", object)
}
func (service *Stream) ObjectUserCan(object data.Object, authorization model.Authorization, action string) error {
return derp.NewUnauthorizedError("service.Stream", "Not Authorized")
}
func (service *Stream) Schema() schema.Schema {
result := schema.New(model.StreamSchema())
result.ID = "https://emissary.social/schemas/stream"
return result
}
/******************************************
* Custom Queries
******************************************/
// ListNavigation returns all Streams of type FOLDER at the top of the hierarchy
func (service *Stream) ListNavigation() (data.Iterator, error) {
return service.List(
exp.Equal("parentId", primitive.NilObjectID),
option.SortAsc("rank"),
)
}
// ListByParent returns all Streams that match a particular parentID
func (service *Stream) ListByParent(parentID primitive.ObjectID) (data.Iterator, error) {
return service.List(exp.Equal("parentId", parentID))
}
// ListByTemplate returns all `Streams` that use a particular `Template`
func (service *Stream) ListByTemplate(template string) (data.Iterator, error) {
return service.List(exp.Equal("templateId", template))
}
// QueryByParentAndDate returns a slice of Streams that are DIRECT CHILDREN of the provided StreamID
func (service *Stream) QueryByParentAndDate(streamID primitive.ObjectID, publishedDate int64, pageSize int) ([]model.Stream, error) {
criteria := exp.Equal("parentId", streamID).AndLessThan("publishDate", publishedDate)
return service.Query(criteria, option.SortDesc("publishDate"), option.MaxRows(int64(pageSize)))
}
// QueryByParentAndDate returns a slice of Streams that are ANY DEPTH below the provided StreamID
func (service *Stream) QueryByAncestorAndDate(streamID primitive.ObjectID, publishedDate int64, pageSize int) ([]model.Stream, error) {
criteria := exp.Equal("parentIds", streamID).AndLessThan("publishDate", publishedDate)
return service.Query(criteria, option.SortDesc("publishDate"), option.MaxRows(int64(pageSize)))
}
// LoadByToken returns a single `Stream` that matches a particular `Token`
func (service *Stream) LoadByToken(token string, result *model.Stream) error {
// If the token looks like an ObjectID, then try Load by ID first.
if streamID, err := primitive.ObjectIDFromHex(token); err == nil {
if err := service.LoadByID(streamID, result); err == nil {
return nil
}
}
// Default to Load by Token
return service.Load(exp.Equal("token", token), result)
}
// LoadByID returns a single `Stream` that matches the provided streamID
func (service *Stream) LoadByID(streamID primitive.ObjectID, result *model.Stream) error {
return service.Load(exp.Equal("_id", streamID), result)
}
// LoadByURL returns a single `Stream` that matches the provided URL
func (service *Stream) LoadByURL(streamURL string, result *model.Stream) error {
// Verify we have a valid URL
uri, err := url.Parse(streamURL)
if err != nil {
return derp.Wrap(err, "service.Stream.LoadByURL", "Invalid URL", streamURL)
}
// Retrieve the Token from the request path
token, _, err := service.ParsePath(uri)
if err != nil {
return derp.Wrap(err, "service.Stream.LoadByURL", "Invalid URL", streamURL)
}
return service.LoadByToken(token, result)
}
// LoadParent returns the Stream that is the parent of the provided Stream
func (service *Stream) LoadParent(stream *model.Stream, parent *model.Stream) error {
if !stream.HasParent() {
return derp.NewNotFoundError("service.Stream.LoadParent", "Stream does not have a parent")
}
if err := service.LoadByID(stream.ParentID, parent); err != nil {
return derp.Wrap(err, "service.stream.LoadParent", "Error loading parent", stream)
}
return nil
}
// LoadNavigationByID locates a single stream in the top level of the site hierarchy
func (service *Stream) LoadNavigationByID(streamID primitive.ObjectID, result *model.Stream) error {
criteria := exp.
Equal("_id", streamID).
AndEqual("parentId", primitive.NilObjectID)
return service.Load(criteria, result)
}
func (service *Stream) LoadWithOptions(criteria exp.Expression, result *model.Stream, options ...option.Option) error {
const location = "service.stream.LoadWithOptions"
it, err := service.List(notDeleted(criteria), options...)
if err != nil {
return derp.Wrap(err, location, "Error getting iterator")
}
for it.Next(result) {
return nil
}
return derp.NewNotFoundError(location, "collection is empty")
}
func (service *Stream) LoadFirstSibling(parentID primitive.ObjectID, result *model.Stream) error {
return service.LoadWithOptions(exp.Equal("parentId", parentID), result, option.SortAsc("rank"))
}
func (service *Stream) LoadPrevSibling(parentID primitive.ObjectID, rank int, result *model.Stream) error {
if rank == 0 {
return service.LoadLastSibling(parentID, result)
}
criteria := exp.Equal("parentId", parentID).AndLessThan("rank", rank)
err := service.LoadWithOptions(criteria, result, option.SortDesc("rank"))
if err == nil {
return nil
}
if derp.NotFound(err) {
return service.LoadLastSibling(parentID, result)
}
return derp.Wrap(err, "service.stream.LoadPreviousSibling", "Error loading Previous Sibling")
}
func (service *Stream) LoadNextSibling(parentID primitive.ObjectID, rank int, result *model.Stream) error {
criteria := exp.Equal("parentId", parentID).AndGreaterThan("rank", rank)
err := service.LoadWithOptions(criteria, result, option.SortAsc("rank"))
if err == nil {
return nil
}
if derp.NotFound(err) {
return service.LoadFirstSibling(parentID, result)
}
return derp.Wrap(err, "service.stream.LoadNextSibling", "Error loading Next Sibling")
}
func (service *Stream) LoadLastSibling(parentID primitive.ObjectID, result *model.Stream) error {
return service.LoadWithOptions(exp.Equal("parentId", parentID), result, option.SortDesc("rank"))
}
func (service *Stream) LoadFirstAttachment(streamID primitive.ObjectID) (model.Attachment, error) {
return service.attachmentService.LoadFirstByObjectID(model.AttachmentTypeStream, streamID)
}
// Count returns the number of (non-deleted) records in the Stream collection
func (service *Stream) Count(criteria exp.Expression) (int64, error) {
return service.collection.Count(notDeleted(criteria))
}
// MaxRank returns the maximum rank of all children of a stream
func (service *Stream) MaxRank(parentID primitive.ObjectID) (int, error) {
return queries.MaxRank(context.TODO(), service.collection, parentID)
}
/******************************************
* Initialization Actions
******************************************/
// SetLocationTop sets a Stream to be a top-level navigation item
func (service *Stream) SetLocationTop(template *model.Template, stream *model.Stream) error {
// RULE: Template must be allowed in the Top
if !template.CanBeContainedBy("top") {
return derp.NewBadRequestError("service.Stream.SetLocationTop", "Template cannot be contained by 'top'", template)
}
// Set values in the Stream
stream.TemplateID = template.TemplateID
stream.NavigationID = stream.StreamID.Hex()
stream.ParentID = primitive.NilObjectID
stream.ParentIDs = make([]primitive.ObjectID, 0)
stream.ParentTemplateID = ""
return nil
}
// SetLocationInbox sets a Stream's location to be a User's outbox
func (service *Stream) SetLocationOutbox(template *model.Template, stream *model.Stream, userID primitive.ObjectID) error {
const location = "service.Stream.SetLocationOutbox"
// RULE: Valid User is Required
if userID.IsZero() {
return derp.NewUnauthorizedError(location, "User ID is required")
}
// RULE: Template must be allowed in the Outbox
if !template.CanBeContainedBy("outbox") {
return derp.NewBadRequestError(location, "Template cannot be contained by 'outbox'", template)
}
// Set values in the Stream
stream.TemplateID = template.TemplateID
stream.NavigationID = "profile"
stream.ParentID = userID
stream.ParentIDs = []primitive.ObjectID{}
stream.ParentTemplateID = ""
return nil
}
// SetLocationChild sets a Stream to be a child of another Stream
func (service *Stream) SetLocationChild(template *model.Template, stream *model.Stream, parent *model.Stream) error {
const location = "service.Stream.SetLocationChild"
// Get the Parent Template
parentTemplate, err := service.templateService.Load(parent.TemplateID)
if err != nil {
return derp.Wrap(err, location, "Invalid Parent Template", parent)
}
// RULE: Template must be allowed in the Parent
if !template.CanBeContainedBy(parentTemplate.TemplateRole) {
return derp.NewBadRequestError(location, "Template cannot be contained by parent", template, parent)
}
// Set values in the Stream
stream.TemplateID = template.TemplateID
stream.NavigationID = parent.NavigationID
stream.ParentID = parent.StreamID
stream.ParentIDs = append(parent.ParentIDs, parent.StreamID)
stream.ParentTemplateID = parent.TemplateID
return nil
}
/******************************************
* Custom Actions
******************************************/
func (service *Stream) DeleteByParent(parentID primitive.ObjectID, note string) error {
return service.DeleteMany(exp.Equal("parentId", parentID), note)
}
// Delete RelatedDuplicate hard deletes any inbox/outbox streams that point to the same original.
func (service *Stream) DeleteRelatedDuplicate(parentID primitive.ObjectID, originalStreamID primitive.ObjectID) error {
criteria := exp.Equal("parentId", parentID).AndEqual("data.originalStreamId", originalStreamID)
if err := service.collection.HardDelete(criteria); err != nil {
return derp.Wrap(err, "service.Stream.DeleteRelatedDuplicate", "Error deleting related duplicate")
}
return nil
}
// RestoreDeleted un-deletes all soft-deleted records underneath a common ancestor.
func (service *Stream) RestoreDeleted(ancestorID primitive.ObjectID) error {
const location = "service.Stream.RestoreDeleted"
// Try to list all deleted descendents
criteria := exp.Equal("parentIds", ancestorID).AndGreaterThan("deleteDate", 0)
iterator, err := service.collection.Iterator(criteria)
if err != nil {
return derp.Wrap(err, location, "Error listing soft-deleted streams")
}
// Iterate through all descendents and UnDelete
stream := model.NewStream()
for iterator.Next(&stream) {
stream.Journal.DeleteDate = 0
if err := service.Save(&stream, "RestoreDeleted stream"); err != nil {
return derp.Wrap(err, location, "Error restoring deleted stream", stream)
}
stream = model.NewStream()
}
// No discomfort, no expansion.
return nil
}
// PurgeDeleted hard deletes all items with the given ancestor that have already been soft-deleted
func (service *Stream) PurgeDeleted(ancestorID primitive.ObjectID) error {
const location = "service.Stream.PurgeDeleted"
criteria := exp.Equal("parentIds", ancestorID).AndGreaterThan("deleteDate", 0)
if err := service.collection.HardDelete(criteria); err != nil {
return derp.Wrap(err, location, "Error purging soft-deleted streams")
}
return nil
}
// ParsePathextracts the Stream token and actionID from a URL
func (service *Stream) ParsePath(uri *url.URL) (string, string, error) {
// Verify the URL matches this service
if domain.AddProtocol(uri.Host) != service.host {
return "", "", derp.NewBadRequestError("service.Stream.LoadByURL", "Hostname must match this server", uri.String())
}
// Load the Stream using the token
path := list.BySlash(strings.TrimPrefix(uri.Path, "/"))
token, path := path.Split()
if token == "" {
token = "home"
}
actionID := path.Head()
if actionID == "" {
actionID = "view"
}
return token, actionID, nil
}
// ParseURL validates that a URL matches the current server, and then extracts the streamID from it.
func (service *Stream) ParseURL(streamURL string) (primitive.ObjectID, error) {
const location = "service.Stream.ParseURL"
parsedURL, err := url.Parse(streamURL)
if err != nil {
return primitive.NilObjectID, derp.Wrap(err, location, "Invalid URL", streamURL)
}
// Get the first part of the path (which is the stream ID or token)
path := strings.TrimPrefix(parsedURL.Path, "/")
path, _, _ = strings.Cut(path, "/")
// If the value looks like an ObjectID, then return it
if streamID, err := primitive.ObjectIDFromHex(path); err == nil {
return streamID, nil
}
// Otherwise, try to load the stream by Token
stream := model.NewStream()
if err := service.LoadByToken(path, &stream); err != nil {
return primitive.NilObjectID, derp.Wrap(err, location, "Invalid Token", path)
}
return stream.StreamID, nil
}
// CalcParentIDs scans the parent chain of a stream and generates a "breadcrumbs" slice
// of all of this Stream's parents
func (service *Stream) CalcParentIDs(stream *model.Stream) error {
// Rule: Notes are always stored under a user's profile, so they have no parents
if stream.SocialRole == vocab.ObjectTypeNote {
stream.ParentIDs = id.NewSlice()
return nil
}
// If this stream has no parent, then it has no parent IDs
if stream.ParentID == primitive.NilObjectID {
stream.ParentIDs = id.NewSlice()
return nil
}
// Otherwise, load the Parent stream and try to use its parentIDs
parentStream := model.NewStream()
if err := service.LoadByID(stream.ParentID, &parentStream); err != nil {
return derp.Wrap(err, "service.Stream.CalcParentIDs", "Unable to load Parent stream", stream.ParentID)
}
// If the parent has no parentIDs, then try to calculate them
if len(parentStream.ParentIDs) == 0 {
if err := service.CalcParentIDs(&parentStream); err != nil {
return derp.Wrap(err, "service.Stream.CalcParentIDs", "Unable to calculate ParentIDs for Parent stream", stream.ParentID)
}
// If the parent has been changed, then save that calculation.
if len(parentStream.ParentIDs) > 0 {
if err := service.Save(&parentStream, "CalcParentIDs"); err != nil {
return derp.Wrap(err, "service.Stream.CalcParentIDs", "Unable to save Parent stream", stream.ParentID)
}
}
}
// Save the ParentIDs to the current stream
stream.ParentIDs = append(parentStream.ParentIDs, parentStream.StreamID)
return nil
}
// UserCan checks a user's permission to perform an action on a Stream. If not allowed,
// then the returned error describes why the access was denied.
func (service *Stream) UserCan(authorization *model.Authorization, stream *model.Stream, actionID string) error {
const location = "service.Stream.UserCan"
// Find the Template used by this stream
template, err := service.templateService.Load(stream.TemplateID)
if err != nil {
return derp.Wrap(err, location, "Invalid Template")
}
// Find the action that the user wants to perform
action, ok := template.Action(actionID)
if !ok {
return derp.NewBadRequestError(location, "Invalid Action", actionID)
}
// Check permissions on the action
if !action.UserCan(stream, authorization) {
return derp.NewUnauthorizedError(location, "User is not authorized to perform this action", actionID)
}
// UserCan!
return nil
}
// CalcContext calculates the conversational context for a given stream,
// IF it can be determined.
func (service *Stream) CalcContext(stream *model.Stream) {
// If this is an original stream (not a reply) then its context is itself.
if stream.InReplyTo == "" {
stream.Context = stream.ActivityPubURL()
return
}
// Load the "InReplyTo" document from the ActivityStream and use its
// context. Note: this should have been calculated already via the
// ascontextmaker client.
document, _ := service.activityService.Load(stream.InReplyTo)
if context := document.Context(); context != "" {
stream.Context = document.Context()
return
}
// If a context could not be assigned, then use the InReplyTo value instead.
stream.Context = stream.InReplyTo
}
func (service *Stream) CalcTags(stream *model.Stream) {
stream.Tags = make(sliceof.Object[model.Tag], 0)
plainText := html.ToSearchText(stream.Content.HTML)
// Add all @mentions into the Tags map
mentions := mention.GetTags('@', plainText)
for _, value := range mentions {
tag := model.NewTag()
tag.Type = vocab.LinkTypeMention
tag.Name = string(value.Char) + value.Tag
tag.Name = strings.TrimSuffix(tag.Name, ".")
tag.Name = strings.TrimSuffix(tag.Name, ",")
if actor, err := service.activityService.Load(tag.Name, sherlock.AsActor()); err == nil {
tag.Href = actor.ID()
}
stream.Tags = append(stream.Tags, tag)
}
// Add all @mentions into the Tags map
hashtags := mention.GetTags('#', plainText)
for _, value := range hashtags {
tag := model.NewTag()
tag.Type = "Hastag" // TODO: This constant should be defined by Hannibal
tag.Name = string(value.Char) + value.Tag
tag.Name = strings.TrimSuffix(tag.Name, ".")
tag.Name = strings.TrimSuffix(tag.Name, ",")
stream.Tags = append(stream.Tags, tag)
}
}