-
Notifications
You must be signed in to change notification settings - Fork 4
/
engine-tasks.go
161 lines (137 loc) · 7.35 KB
/
engine-tasks.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package engine
import (
"github.com/alexandre-normand/glukit/app/model"
"github.com/alexandre-normand/glukit/app/store"
"github.com/alexandre-normand/glukit/app/util"
"context"
"google.golang.org/appengine/delay"
"google.golang.org/appengine/log"
"google.golang.org/appengine/taskqueue"
"time"
)
var RunGlukitScoreCalculationChunk = delay.Func(GLUKIT_SCORE_BATCH_CALCULATION_FUNCTION_NAME, func(context context.Context, userEmail string,
lowerBound time.Time) {
log.Criticalf(context, "This function purely exists as a workaround to the \"initialization loop\" error that "+
"shows up because the function calls itself. This implementation defines the same signature as the "+
"real one which we define in init() to override this implementation!")
})
var RunA1CCalculationChunk = delay.Func(A1C_BATCH_CALCULATION_FUNCTION_NAME, func(context context.Context, userEmail string,
lowerBound time.Time) {
log.Criticalf(context, "This function purely exists as a workaround to the \"initialization loop\" error that "+
"shows up because the function calls itself. This implementation defines the same signature as the "+
"real one which we define in init() to override this implementation!")
})
const (
PERIODS_PER_BATCH = 6
BATCH_CALCULATION_QUEUE_NAME = "batch-calculation"
GLUKIT_SCORE_BATCH_CALCULATION_FUNCTION_NAME = "runGlukitScoreCalculationChunk"
A1C_BATCH_CALCULATION_FUNCTION_NAME = "runA1CCalculationChunk"
)
func RunGlukitScoreBatchCalculation(context context.Context, userEmail string, lowerBound time.Time) {
glukitUser, _, _, err := store.GetUserData(context, userEmail)
if _, ok := err.(store.StoreError); err != nil && !ok {
log.Errorf(context, "We're trying to run a batch glukit score calculation for user [%s] that doesn't exist. "+
"Got error: %v", userEmail, err)
return
}
bestScore := glukitUser.BestScore
mostRecentScore := glukitUser.MostRecentScore
glukitScoreBatch := make([]model.GlukitScore, 0)
var periodUpperBound time.Time
log.Debugf(context, "Calculating batch of GlukitScores for user [%s] with current best of [%v] and most recent score of [%v]",
userEmail, glukitUser.BestScore, mostRecentScore)
upperBound := lowerBound.AddDate(0, 0, PERIODS_PER_BATCH*GLUKIT_SCORE_PERIOD)
// Calculate the GlukitScore for every period until now by increment of 1 day. This is a moving score over the last GLUKIT_SCORE_PERIOD that gets a new value every day.
// This will likely go through a few calculations for which we don't have data yet but this seems like the fair
// price to pay for making sure we don't stop processing glukit scores because someone might have stopped using their CGM for a week or so.
for periodUpperBound = lowerBound.AddDate(0, 0, 1); periodUpperBound.Before(time.Now()) && periodUpperBound.Before(upperBound); periodUpperBound = periodUpperBound.AddDate(0, 0, 1) {
glukitScore, err := CalculateGlukitScore(context, glukitUser, periodUpperBound)
if err != nil {
util.Propagate(err)
}
if glukitScore.IsBetterThan(glukitUser.BestScore) {
bestScore = *glukitScore
}
if glukitScore.Value != model.UNDEFINED_SCORE_VALUE {
if periodUpperBound.After(mostRecentScore.UpperBound) {
mostRecentScore = *glukitScore
}
glukitScoreBatch = append(glukitScoreBatch, *glukitScore)
}
}
// Store the batch
store.StoreGlukitScoreBatch(context, userEmail, glukitScoreBatch)
// Update the bestScore/LastScoredRead if one of them is different than what was already there
if bestScore != glukitUser.BestScore || mostRecentScore != glukitUser.MostRecentScore {
glukitUser.BestScore = bestScore
glukitUser.MostRecentScore = mostRecentScore
if _, err := store.StoreUserProfile(context, time.Now(), *glukitUser); err != nil {
util.Propagate(err)
} else {
log.Debugf(context, "Updated glukit user [%s] with an improved GlukitScore of [%v] and most recent score of [%v]",
glukitUser.Email, bestScore, mostRecentScore)
}
}
// Kick off the next chunk of glukit score calculation
if !periodUpperBound.Before(upperBound) {
task, err := RunGlukitScoreCalculationChunk.Task(userEmail, periodUpperBound)
if err != nil {
log.Criticalf(context, "Couldn't schedule the next execution of [%s] for user [%s]. "+
"This breaks batch calculation of glukit scores for that user!: %v", GLUKIT_SCORE_BATCH_CALCULATION_FUNCTION_NAME, userEmail, err)
}
taskqueue.Add(context, task, BATCH_CALCULATION_QUEUE_NAME)
log.Infof(context, "Queued up next chunk of glukit score calculation for user [%s] and lowerBound [%s]", userEmail, periodUpperBound.Format(util.TIMEFORMAT))
} else {
log.Infof(context, "Done with glukit score calculation for user [%s]", userEmail)
}
}
func RunA1CBatchCalculation(context context.Context, userEmail string, lowerBound time.Time) {
glukitUser, _, _, err := store.GetUserData(context, userEmail)
if _, ok := err.(store.StoreError); err != nil && !ok {
log.Errorf(context, "We're trying to run a batch of a1c estimates for user [%s] that doesn't exist. "+
"Got error: %v", userEmail, err)
return
}
mostRecentA1C := glukitUser.MostRecentA1C
a1cBatch := make([]model.A1CEstimate, 0)
var periodUpperBound time.Time
log.Debugf(context, "Calculating batch of A1C estimates for user [%s]", userEmail)
upperBound := lowerBound.AddDate(0, 0, PERIODS_PER_BATCH*A1C_ESTIMATION_SCORE_PERIOD)
// Calculate the GlukitScore for every period until now by increment of 1 day. This is a moving estimate over the last A1C_ESTIMATION_SCORE_PERIOD that gets a new value every day.
// This will likely go through a few calculations for which we don't have data yet but this seems like the fair
// price to pay for making sure we don't stop processing estimates because someone might have stopped using their CGM for a week or so.
for periodUpperBound = lowerBound.AddDate(0, 0, 1); periodUpperBound.Before(time.Now()) && periodUpperBound.Before(upperBound); periodUpperBound = periodUpperBound.AddDate(0, 0, 1) {
a1cEstimate, err := EstimateA1C(context, glukitUser, periodUpperBound)
if err != nil {
log.Warningf(context, "Error trying to calculate a1c for user [%s] with upper bound [%s]: %v", userEmail, periodUpperBound, err)
} else {
a1cBatch = append(a1cBatch, *a1cEstimate)
if periodUpperBound.After(mostRecentA1C.UpperBound) {
mostRecentA1C = *a1cEstimate
}
}
}
// Store the batch
store.StoreA1CBatch(context, userEmail, a1cBatch)
if mostRecentA1C != glukitUser.MostRecentA1C {
glukitUser.MostRecentA1C = mostRecentA1C
if _, err := store.StoreUserProfile(context, time.Now(), *glukitUser); err != nil {
util.Propagate(err)
} else {
log.Debugf(context, "Updated glukit user [%s] with a most recent a1c [%v]",
glukitUser.Email, mostRecentA1C)
}
}
// Kick off the next chunk of glukit score calculation
if !periodUpperBound.Before(upperBound) {
task, err := RunA1CCalculationChunk.Task(userEmail, periodUpperBound)
if err != nil {
log.Criticalf(context, "Couldn't schedule the next execution of [%s] for user [%s]. "+
"This breaks batch calculation of glukit scores for that user!: %v", A1C_BATCH_CALCULATION_FUNCTION_NAME, userEmail, err)
}
taskqueue.Add(context, task, BATCH_CALCULATION_QUEUE_NAME)
log.Infof(context, "Queued up next chunk of a1c calculation for user [%s] and lowerBound [%s]", userEmail, periodUpperBound.Format(util.TIMEFORMAT))
} else {
log.Infof(context, "Done with a1c estimation for user [%s]", userEmail)
}
}