Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

awsIteratedList causes a very tight loop on GetRecord when stream is empty #4

Open
ozataman opened this issue Dec 13, 2014 · 3 comments
Labels
Milestone

Comments

@ozataman
Copy link
Contributor

If the stream is empty, GetRecordResponse returns an empty list, but a valid starting point for the next request. With a fast connection, this leads to an endless, very high request throughput loop until more data arrive and slow things down. The immediate effect is receiving ThroughputExceeded exceptions with an empty stream.

Not sure what the solution can be, since internals of awsIteratedList is outside of this package's control. The right thing to do may be to remove the IteratedTransaction instance and roll our own iteratedList function that delays by, say, a second if the previous list is empty.

I've already implemented this in our app and can confirm it eliminates the throughput exceptions.

@larskuhtz larskuhtz added the bug label Dec 23, 2014
@larskuhtz
Copy link
Member

@ozataman could catch. I think that's serious bug. We consider addressing this by changing the IteratedTransaction instance to the following in version 0.2:

-- | The request parameter 'getRecordsLimit' is interpreted as limit for each
-- single request and not for the overall transaction.
--
-- The 'getRecordsResNextShardIterator' is 'Nothing' only if the shard is
-- closed. This instance will terminate the iteration as soon as an empty list
-- of records is returned.
--
instance IteratedTransaction GetRecords GetRecordsResponse where
    nextIteratedRequest GetRecords{..} (GetRecordsResponse _ []) = Nothing
    nextIteratedRequest GetRecords{..} GetRecordsResponse{..} =
        GetRecords getRecordsLimit <$> getRecordsResNextShardIterator

@larskuhtz larskuhtz added this to the 1.2 milestone Dec 23, 2014
@ozataman
Copy link
Contributor Author

I think that's certainly better. However, it may still not help the user completely, as even the empty list contains a valid iterator to be used in the next query and it is not possible to extract that from this instance.

It contains a bit of application context, but here is how I've handled it in the interim:

-------------------------------------------------------------------------------
-- | Produce an infinite stream of records from shard.
streamRecords
    :: (Functor n, MonadIO n, MonadReader AppEnv n, MonadCatch n)
    => ShardId
    -> Maybe SequenceNumber
    -> Maybe Int
    -> Producer (ResourceT n) Record
streamRecords sid sn lim = do
    nm <- either (error.toS) id <$> runEitherT getStream
    let pos = case sn of
          Nothing -> Latest
          Just _ -> AfterSequenceNumber
        gsi = GetShardIterator sid pos sn nm
    iter <- lift $ getShardIteratorResShardIterator <$> runKinesis 10 gsi
    go (GetRecords lim iter)
  where

    go r = do
      a <- lift $ runKinesis 10 r
      let rs = getRecordsResRecords a
      unless (null rs) $ C.sourceList rs

      whenJust (nextIteratedRequest r a) $ \ r' -> do
        when (null rs) $ liftIO (threadDelay 1000000)
        go r'

@larskuhtz
Copy link
Member

Yes, I agree that your version makes more sense.

The reason for my simple solution above is to make it usable with with the ListResponse instance.

I wonder if in a production setting one may just bypass the IteratedTransaction, in the end it doesn't provide a lot. If we would keep it as is, I think, the ListResponse instance should be deleted. I am fine with either solution.

It would be really cool if Kinesis would support long-polling...

@larskuhtz larskuhtz modified the milestones: 0.2, 1.2 Jan 28, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants