/
AbstractQueueProviderSpec.cfc
139 lines (118 loc) · 6.03 KB
/
AbstractQueueProviderSpec.cfc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
component extends="tests.resources.ModuleIntegrationSpec" appMapping="/app" {
function run() {
describe( "queue provider - #getProviderMapping()#", function() {
beforeEach( function() {
structDelete( application, "jobBeforeCalled" );
structDelete( application, "jobAfterCalled" );
} );
it( "can manually release a job back on to a queue with a given delay", function() {
var provider = getProvider();
$spy( provider, "releaseJob" );
var workerPool = makeWorkerPool( provider );
var job = getInstance( "ReleaseTestJob" )
.setCurrentAttempt( 1 )
.setId( randRange( 1, 1000 ) )
.setMaxAttempts( 2 );
// work the job
var jobFuture = provider.marshalJob( job, workerPool );
// if it is an async operation, wait for it to finish
if ( !isNull( jobFuture ) ) {
jobFuture.get();
}
// inspect the spy
expect( provider.$once( "releaseJob" ) ).toBeTrue( "releaseJob should have been called on the provider" );
var callLog = provider.$callLog()[ "releaseJob" ][ 1 ];
expect( provider.getBackoffForJob( callLog[ 1 ], callLog[ 2 ] ) ).toBe( 2, "The delay [2] should have been passed to the provider" );
} );
it( "does not manually release a job back on to a queue if the maximum attempts have been reached", function() {
var provider = getProvider();
$spy( provider, "releaseJob" );
var workerPool = makeWorkerPool( provider );
var job = getInstance( "ReleaseTestJob" )
.setCurrentAttempt( 1 )
.setId( randRange( 1, 1000 ) )
.setMaxAttempts( 1 );
// work the job
try {
var jobFuture = provider.marshalJob( job, workerPool );
// if it is an async operation, wait for it to finish
if ( !isNull( jobFuture ) ) {
jobFuture.get();
}
} catch ( cbq.MaxAttemptsReached e ) {
// ignore
} catch ( any e ) {
fail( "Unexpected exception: #e.message#", e.detail );
}
// if it is an async operation, wait for it to finish
if ( !isNull( jobFuture ) ) {
jobFuture.get();
}
// inspect the spy
expect( provider.$never( "releaseJob" ) ).toBeTrue( "releaseJob should not have been called on the provider since the job was at its max attempts." );
expect( job ).toHaveKey( "onFailureCalled" );
expect( job.onFailureCalled ).toBeTrue( "onFailure should have been called on the job" );
} );
it( "will always release a job with a maxAttempts of 0 regardless of the currentAttempt count", function() {
var provider = getProvider();
$spy( provider, "releaseJob" );
var workerPool = makeWorkerPool( provider );
var job = getInstance( "ReleaseTestJob" )
.setCurrentAttempt( 1000000 ) // one million
.setId( randRange( 1, 1000 ) )
.setMaxAttempts( 0 );
// work the job
var jobFuture = provider.marshalJob( job, workerPool );
// if it is an async operation, wait for it to finish
if ( !isNull( jobFuture ) ) {
jobFuture.get();
}
// inspect the spy
expect( provider.$once( "releaseJob" ) ).toBeTrue( "releaseJob should have been called on the provider" );
var callLog = provider.$callLog()[ "releaseJob" ][ 1 ];
expect( provider.getBackoffForJob( callLog[ 1 ], callLog[ 2 ] ) ).toBe( 2, "The delay [2] should have been passed to the provider" );
} );
it( "calls a before and after method if present as lifecycle methods", () => {
var provider = getProvider();
var workerPool = makeWorkerPool( provider );
var job = getInstance( "BeforeAndAfterJob" )
.setId( randRange( 1, 1000 ) );
param application.jobBeforeCalled = false;
param application.jobAfterCalled = false;
expect( application.jobBeforeCalled ).toBeFalse();
expect( application.jobAfterCalled ).toBeFalse();
// work the job
var jobFuture = provider.marshalJob( job, workerPool );
// if it is an async operation, wait for it to finish
if ( !isNull( jobFuture ) ) {
jobFuture.get();
}
expect( application.jobBeforeCalled ).toBeTrue( "The before method should have been called" );
expect( application.jobAfterCalled ).toBeTrue( "The after method should have been called" );
} );
} );
}
function getProvider() {
return duplicate( getWireBox().getInstance( getProviderMapping() ) );
}
function getProviderMapping() {
throw(
type = "MissingAbstractMethod",
message = "This is an abstract method and must be implemented in a subclass."
);
}
function makeWorkerPool(
any provider = getProvider(),
string connectionName = "TestConnection",
string workerPoolName = ""
) {
var connection = getInstance( "QueueConnection@cbq" )
.setName( arguments.connectionName )
.setProvider( arguments.provider );
return getInstance( "WorkerPool@cbq" )
.setName( arguments.workerPoolName )
.setConnection( connection )
.setConnectionName( connection.getName() )
.startWorkers();
}
}