-
Notifications
You must be signed in to change notification settings - Fork 21
/
cron.go
115 lines (104 loc) · 2.95 KB
/
cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package cron_jobs
import (
"encoding/json"
"fmt"
"github.com/Harry-027/go-notify/api-server/models"
"github.com/Harry-027/go-notify/api-server/repository"
"github.com/segmentio/kafka-go"
"log"
"strings"
)
// Send the scheduled mails ...
func SendScheduledMail(job models.Job) error {
log.Println("Sending scheduled mail ...")
template, err := repository.GetTemplate(job.TemplateID)
if err != nil {
log.Println("An error occurred while fetching template: ", err.Error())
return err
}
clientRecord, err := repository.GetClientById(job.ClientID)
if err != nil {
log.Println("An error occurred while fetching Client: ", err.Error())
return err
}
log.Println("ClientDetails :: ", clientRecord)
clientDetails, _ := repository.GetClientFields(job.ClientID)
log.Println("Client Fields :: ", clientDetails)
subject := template.Subject
bodyContent := template.Body
if len(clientDetails) > 0 {
for _, field := range clientDetails {
oldVal := fmt.Sprintf("{{ %s }}", field.Key)
newVal := field.Value
subject = strings.Replace(subject, oldVal, newVal, -1)
bodyContent = strings.Replace(bodyContent, oldVal, newVal, -1)
}
}
kafkaPayload := models.KafkaPayload{
To: job.To,
From: job.From,
Subject: subject,
Text: bodyContent,
}
log.Println("kafka Payload :: ", kafkaPayload)
payload, err := json.Marshal(kafkaPayload)
if err != nil {
log.Println("An error occurred while marshalling payload: ", err.Error())
return err
}
_, err = KafkaConn.WriteMessages(
kafka.Message{Value: payload},
)
if err != nil {
log.Println("failed to write messages:", err)
return err
}
user, _ := repository.GetUserByName(job.From)
err = repository.UpdateUserCounter(user, 1)
return err
}
// Job scheduler ...
func JobScheduler(jobs []models.Job) {
log.Println("Job scheduler started ...")
cronScheduler := GetNewCron()
for _, job := range jobs {
SetCronJob(cronScheduler, job, SendScheduledMail)
}
go cronScheduler.c.Start()
}
// Server startup job scheduler ...
func ScheduleJobOnServerStart() {
log.Println("Server startup job scheduler initiated ...")
jobs, err := repository.GetActiveJobs()
if err != nil {
log.Println("An error occurred while fetching jobs: ", err)
return
}
log.Println("Server startup job scheduler started ...")
JobScheduler(jobs)
}
// Schedule the daily active jobs ...
func ScheduleDailyJob() {
log.Println("Daily job scheduler initiated ...")
cronScheduler := GetNewCron()
entryId, err := cronScheduler.c.AddFunc("@daily", func() {
DailyJobScheduler()
})
if err != nil {
log.Println("An error occurred while scheduling cron job", err)
}
log.Println("EntryId for Daily Cron Job :: ", entryId)
go cronScheduler.c.Start()
}
// Daily job scheduler ...
func DailyJobScheduler() {
log.Println("Daily Job Scheduler started ...")
jobs, err := repository.GetPendingJobs()
if err != nil {
return
}
for _, job := range jobs {
_ = repository.UpdateJobStatus(job.ID, "ACTIVE")
}
JobScheduler(jobs)
}