/
SyncProvider.cfc
141 lines (116 loc) · 3.79 KB
/
SyncProvider.cfc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
component accessors="true" extends="AbstractQueueProvider" {
public any function push(
required string queueName,
required string payload,
numeric delay = 0,
numeric attempts = 0
) {
if ( isNull( variables.pool ) ) {
if ( variables.log.canWarn() ) {
variables.log.warn( "No worker pools have been defined so this job will not be executed." );
}
return;
}
marshalJob(
deserializeJob(
arguments.payload,
createUUID(),
arguments.attempts
),
variables.pool
);
return this;
}
public function function startWorker( required WorkerPool pool ) {
variables.pool = arguments.pool;
return function() {
};
}
public any function listen( required WorkerPool pool ) {
return this;
}
public void function marshalJob( required AbstractJob job, required WorkerPool pool ) {
try {
if ( variables.log.canDebug() ) {
variables.log.debug( "Marshaling job ###arguments.job.getId()#", arguments.job.getMemento() );
}
beforeJobRun( arguments.job );
if ( structKeyExists( job, "before" ) ) {
job.before();
}
variables.interceptorService.announce( "onCBQJobMarshalled", { "job" : arguments.job } );
if ( variables.log.canDebug() ) {
variables.log.debug( "Running job ###arguments.job.getId()#", arguments.job.getMemento() );
}
var result = arguments.job.handle();
if ( job.getIsReleased() ) {
variables.log.debug( "Job [#job.getId()#] requested manual release." );
var jobMaxAttempts = getMaxAttemptsForJob( job, arguments.pool );
if ( jobMaxAttempts != 0 && job.getCurrentAttempt() >= getMaxAttemptsForJob( job, pool ) ) {
throw(
type = "cbq.MaxAttemptsReached",
message = "Job [#job.getId()#] requested manual release, but has reached its maximum attempts [#job.getCurrentAttempt()#]."
);
}
if ( jobMaxAttempts == 0 ) {
variables.log.debug( "Job ###job.getId()# has a maxAttempts of 0 and will always be released." );
}
variables.log.debug( "Releasing job ###job.getId()#" );
releaseJob( job, pool );
variables.log.debug( "Released job ###job.getId()#" );
return;
}
if ( variables.log.canDebug() ) {
variables.log.debug( "Job ###job.getId()# completed successfully." );
}
variables.interceptorService.announce(
"onCBQJobComplete",
{
"job" : job,
"result" : isNull( result ) ? javacast( "null", "" ) : result
}
);
if ( structKeyExists( job, "after" ) ) {
job.after();
}
afterJobRun( job );
ensureSuccessfulBatchJobIsRecorded( job );
dispatchNextJobInChain( job );
} catch ( any e ) {
// log failed job
if ( log.canError() ) {
log.error(
"Exception when running job: #e.message#",
{
"job" : job.getMemento(),
"exception" : e
}
);
}
variables.interceptorService.announce( "onCBQJobException", { "job" : job, "exception" : e } );
var jobMaxAttempts = getMaxAttemptsForJob( job, arguments.pool );
if ( jobMaxAttempts == 0 || job.getCurrentAttempt() < jobMaxAttempts ) {
if ( jobMaxAttempts == 0 ) {
variables.log.debug( "Job ###job.getId()# has a maxAttempts of 0 and will always be released." );
}
variables.log.debug( "Releasing job ###job.getId()#" );
releaseJob( job, pool );
variables.log.debug( "Released job ###job.getId()#" );
} else {
variables.log.debug( "Maximum attempts reached. Deleting job ###job.getId()#" );
if ( structKeyExists( job, "onFailure" ) ) {
invoke(
job,
"onFailure",
{ "excpetion" : e }
);
}
variables.interceptorService.announce( "onCBQJobFailed", { "job" : job, "exception" : e } );
afterJobFailed( job.getId(), job );
ensureFailedBatchJobIsRecorded( job, e );
variables.log.debug( "Deleted job ###job.getId()# after maximum failed attempts." );
rethrow;
}
}
}
}