/
Main.hs
304 lines (261 loc) · 10 KB
/
Main.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
-- Copyright (c) 2013-2015 PivotCloud, Inc.
--
-- Main
--
-- Please feel free to contact us at licensing@pivotmail.com with any
-- contributions, additions, or other feedback; we would love to hear from
-- you.
--
-- Licensed under the Apache License, Version 2.0 (the "License"); you may
-- not use this file except in compliance with the License. You may obtain a
-- copy of the License at http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-- License for the specific language governing permissions and limitations
-- under the License.
-- |
-- Module: Main
-- Copyright: Copyright (c) 2013-2015 PivotCloud, Inc.
-- license: Apache License, Version 2.0
-- Maintainer: Lars Kuhtz <lars@alephcloud.com>
-- Stability: experimental
--
-- Tests for Haskell Kinesis bindings
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GADTs #-}
module Main
( main
) where
import Aws
import Aws.Kinesis
import Control.Error
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class
import qualified Data.ByteString as B
import qualified Data.List as L
import Data.Monoid
import Data.Proxy
import qualified Data.Text as T
import Test.Tasty
import System.Exit
import System.Environment
import Utils
defaultStreamName :: StreamName
defaultStreamName = "test-stream"
-- -------------------------------------------------------------------------- --
-- Main
-- | Since these tests generate costs there should be a warning and
-- we also should require an explicit command line argument that expresses
-- the concent of the user.
--
main :: IO ()
main = getArgs >>= runMain
where
runMain args
| "--help" `elem` args || "-h" `elem` args = defaultMain tests
| "--run-with-aws-credentials" `elem` args =
withArgs (filter (/= "--run-with-aws-credentials") args) $ defaultMain tests
| otherwise = putStrLn help >> exitFailure
help :: String
help = L.intercalate "\n"
[ ""
, "NOTE"
, ""
, "This test suite accesses the AWS account that is associated with"
, "the default credentials from the credential file ~/.aws-keys."
, ""
, "By running the tests in this test-suite costs for usage of AWS"
, "services may incur."
, ""
, "In order to actually excute the tests in this test-suite you must"
, "provide the command line option:"
, ""
, " --run-with-aws-credentials"
, ""
, "When running this test-suite through cabal you may use the following"
, "command:"
, ""
, " cabal test kinesis-tests --test-option=--run-with-aws-credentials"
, ""
]
tests :: TestTree
tests = testGroup "Kinesis Tests"
[ test_jsonRoundtrips
, test_createStream
, test_stream1
]
-- -------------------------------------------------------------------------- --
-- Kinesis Utils
kinesisConfiguration :: KinesisConfiguration qt
kinesisConfiguration = defaultKinesisConfiguration testRegion
simpleKinesis
:: (AsMemoryResponse a, Transaction r a, ServiceConfiguration r ~ KinesisConfiguration, MonadIO m)
=> r
-> m (MemoryResponse a)
simpleKinesis command = do
c <- baseConfiguration
simpleAws c kinesisConfiguration command
simpleKinesisT
:: (AsMemoryResponse a, Transaction r a, ServiceConfiguration r ~ KinesisConfiguration, MonadIO m)
=> r
-> EitherT T.Text m (MemoryResponse a)
simpleKinesisT = tryT . simpleKinesis
testStreamName :: StreamName -> StreamName
testStreamName = either (error . T.unpack) id
. streamName . T.take 128 . testData . streamNameText
-- |
--
withStream
:: StreamName -- ^ Stream Name
-> Int -- ^ Shard count
-> IO a
-> IO a
withStream stream shardCount = bracket_ createStream deleteStream
where
createStream = simpleKinesis $ CreateStream shardCount stream
deleteStream = void $ simpleKinesis (DeleteStream stream)
-- | The function 'withResource' from "Tasty" synchronizes the aquired
-- resource through a 'TVar'. We don't need that for a stream. So instead
-- of passing the 'IO StreamName' from 'withResource' we directly pass
-- 'StreamName' to the inner function.
--
withStreamTest
:: StreamName -- ^ stream name suffix
-> Int -- ^ shard count
-> (StreamName -> TestTree)
-> TestTree
withStreamTest stream shardCount f = withResource createStream deleteStream
$ const (f tstream)
where
createStream = do
void . simpleKinesis $ CreateStream shardCount tstream
return tstream
deleteStream = const . void . simpleKinesis $ DeleteStream tstream
tstream = testStreamName stream
-- | Wait for a stream to become active
--
waitActiveT
:: Int
-- ^ upper bound on the number of seconds to wait.
-- The actual maximal number of seconds is closest smaller
-- power of two.
-> StreamName
-> EitherT T.Text IO StreamDescription
waitActiveT sec stream = retryT maxRetry $ do
DescribeStreamResponse d <- simpleKinesisT
$ DescribeStream Nothing Nothing stream
unless (streamDescriptionStreamStatus d == StreamStatusActive)
$ left "Stream is not active"
return d
where
maxRetry = floor $ logBase 2 (fromIntegral sec :: Double)
-- -------------------------------------------------------------------------- --
-- Types
test_jsonRoundtrips :: TestTree
test_jsonRoundtrips = testGroup "JSON encoding roundtrips"
[ test_jsonRoundtrip (Proxy :: Proxy StreamName)
, test_jsonRoundtrip (Proxy :: Proxy ShardId)
, test_jsonRoundtrip (Proxy :: Proxy SequenceNumber)
, test_jsonRoundtrip (Proxy :: Proxy PartitionHash)
, test_jsonRoundtrip (Proxy :: Proxy PartitionKey)
, test_jsonRoundtrip (Proxy :: Proxy ShardIterator)
, test_jsonRoundtrip (Proxy :: Proxy ShardIteratorType)
, test_jsonRoundtrip (Proxy :: Proxy Record)
, test_jsonRoundtrip (Proxy :: Proxy StreamDescription)
, test_jsonRoundtrip (Proxy :: Proxy StreamStatus)
, test_jsonRoundtrip (Proxy :: Proxy Shard)
]
-- -------------------------------------------------------------------------- --
-- Stream Tests
test_stream1 :: TestTree
test_stream1 = withStreamTest defaultStreamName 1 $ \stream ->
testGroup "Perform a series of tests on a single stream"
[ eitherTOnceTest0 "list streams" (prop_streamList stream)
, eitherTOnceTest0 "describe stream" (prop_streamDescribe 1 stream)
, eitherTOnceTest2 "put and get stream" (prop_streamPutGet stream)
]
prop_streamList :: StreamName -> EitherT T.Text IO ()
prop_streamList stream = do
ListStreamsResponse _ streams <- simpleKinesisT $ ListStreams Nothing Nothing
unless (stream `elem` streams) $
left $ "stream " <> streamNameText stream <> " is not listed"
prop_streamDescribe
:: Int -- ^ expected number of shards
-> StreamName
-> EitherT T.Text IO ()
prop_streamDescribe shardNum stream = do
desc <- waitActiveT 64 stream
unless (streamDescriptionStreamName desc == stream)
. left $ "unexpected stream name in description: "
<> streamNameText (streamDescriptionStreamName desc)
let l = length $ streamDescriptionShards desc
unless (l == shardNum)
. left $ "unexpected number of shards in stream description: " <> sshow l
prop_streamPutGet
:: StreamName
-> B.ByteString -- ^ Message data
-> PartitionKey
-> EitherT T.Text IO ()
prop_streamPutGet stream dat key = do
desc <- waitActiveT 64 stream
let shards = streamDescriptionShards desc
PutRecordResponse putSeqNr putShard <- simpleKinesisT PutRecord
{ putRecordData = dat
, putRecordExplicitHashKey = Nothing
, putRecordPartitionKey = key
, putRecordSequenceNumberForOrdering = Nothing
, putRecordStreamName = stream
}
let shardIds = map shardShardId shards
unless (putShard `elem` shardIds) . left
$ "unexpected shard id: expected on of " <> sshow shardIds <> "; got " <> sshow putShard
record <- retryT 5 $ do
GetShardIteratorResponse it <- simpleKinesisT GetShardIterator
{ getShardIteratorShardId = putShard
, getShardIteratorShardIteratorType = TrimHorizon
, getShardIteratorStartingSequenceNumber = Nothing
, getShardIteratorStreamName = stream
}
GetRecordsResponse _nextIt records <- simpleKinesisT GetRecords
{ getRecordsLimit = Nothing
, getRecordsShardIterator = it
}
case records of
[] -> left "no record found in stream"
[r] -> return r
t -> left $ "unexpected records found in stream: " <> sshow t
let getData = recordData record
unless (getData == dat) . left
$ "data does not match: expected " <> sshow dat <> "; got " <> sshow getData
let getSeqNr = recordSequenceNumber record
unless (getSeqNr == putSeqNr) . left
$ "sequence numbers don't match: expected " <> sshow putSeqNr
<> "; got " <> sshow getSeqNr
let getPartKey = recordPartitionKey record
unless (getPartKey == key) . left
$ "partition keys don't match: expected " <> sshow key
<> "; got " <> sshow getPartKey
-- -------------------------------------------------------------------------- --
-- Stream Creation Tests
test_createStream :: TestTree
test_createStream = testGroup "Stream creation"
[ eitherTOnceTest1 "create list delete" prop_createListDelete
]
prop_createListDelete
:: StreamName -- ^ stream name
-> EitherT T.Text IO ()
prop_createListDelete stream = do
CreateStreamResponse <- simpleKinesisT $ CreateStream 1 tstream
handleT (\e -> deleteStream >> left e) $ do
ListStreamsResponse _ allStreams <- simpleKinesisT
$ ListStreams Nothing Nothing
unless (tstream `elem` allStreams)
. left $ "stream " <> streamNameText tstream <> " not listed"
deleteStream
where
deleteStream = void $ simpleKinesisT (DeleteStream tstream)
tstream = testStreamName stream