/
notify.go
85 lines (73 loc) 路 2.2 KB
/
notify.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package service
import (
"bikefest/pkg/bootstrap"
"bikefest/pkg/model"
"context"
"encoding/json"
"errors"
"log"
"time"
"github.com/hibiken/asynq"
)
// A list of task types.
const (
TypeEventReminder = "reminder"
)
// Task payload for any event notification related tasks.
type eventNotificationPayload struct {
UserID string
EventID string
}
type AsynqServiceImpl struct {
client *asynq.Client
inspector *asynq.Inspector
env *bootstrap.Env
}
func newEventNotification(userId, eventId string) (*asynq.Task, error) {
payload, err := json.Marshal(eventNotificationPayload{UserID: userId, EventID: eventId})
if err != nil {
return nil, err
}
return asynq.NewTask(TypeEventReminder, payload), nil
}
// DeleteEventNotification deletes the task from the queue.
// the taskID is the userID + eventID
func (as *AsynqServiceImpl) DeleteEventNotification(ctx context.Context, taskID string) error {
err := as.inspector.DeleteTask("default", taskID)
switch {
case errors.Is(err, asynq.ErrTaskNotFound):
log.Printf("Task with ID %q not found", taskID)
return nil
case err != nil:
return err
default:
return nil
}
}
func (as *AsynqServiceImpl) EnqueueEventNotification(ctx context.Context, userID, eventID, eventStartTime string) error {
t, err := newEventNotification(userID, eventID)
if err != nil {
return err
}
location, _ := time.LoadLocation(as.env.Server.TimeZone)
//TODO: currently we only set the process time 30 minutes before the event start time
processTime, _ := time.ParseInLocation(model.EventTimeLayout, eventStartTime, location)
processTime = processTime.Add(-time.Minute * 30)
info, err := as.client.Enqueue(t, asynq.ProcessAt(processTime), asynq.TaskID(userID+eventID))
switch {
case errors.Is(err, asynq.ErrTaskIDConflict):
log.Printf("Task with ID %q already exists", userID+eventID)
return nil
case err != nil:
return err
}
log.Printf(" [*] Successfully enqueued task: %+v\nThe task should be executed at %s", info, processTime.String())
return nil
}
func NewAsynqService(client *asynq.Client, inspector *asynq.Inspector, env *bootstrap.Env) model.AsynqNotificationService {
return &AsynqServiceImpl{
client: client,
inspector: inspector,
env: env,
}
}