Skip to content

AmazonKinesisClient silent data loss: CLOSE_WAIT connections cause PutRecord to succeed but data never reaches Kinesis #4022

@IIsijola

Description

@IIsijola

Describe the bug

We're experiencing silent data loss with AmazonKinesisClient where CLOSE_WAIT TCP connections cause the SDK to hang indefinitely, ignore cancellation tokens, and eventually either timeout or appear to succeed while data never reaches Kinesis.

We've lost days worth of data due to this issue.

Environment:

  • AWS SDK Version: AWSSDK.Kinesis (latest)
  • Framework: .NET 9.0
  • OS: Ubuntu (EC2 deployment)
  • Infrastructure: EC2 instances with VPC endpoints for Kinesis

The Hanging/Silent Failure Problem
When underlying TCP connections enter CLOSE_WAIT state:

  1. SDK hangs indefinitely - PutRecordAsync and PutRecordsAsync calls hang for long periods
  2. Cancellation tokens ignored - Calls don't respect CancellationToken timeouts
  3. Eventually errors or false success - After hanging, may timeout or return success with no data delivery
  4. Data never reaches Kinesis - Even "successful" responses result in lost data
  5. No proper timeout control - Standard .NET async patterns don't work

The SDK's failure to respect cancellation tokens makes it impossible to implement proper timeout management for production systems.

Note we implemented application level rate limiting to ensure that we don't exceed Kinesis rate limits, and we are using the on-demand capacity mode in kinesis itself.

Image

Before discovering that the cancellation tokens were not respected, we would be stuck indefinitely trying to send the data to kinesis. To address this element, we implemented our own timeouts as such

var putRecordTask = client.PutRecordAsync(request, cancellationToken);
var timeoutTask = Task.Delay(500, cancellationToken);
var completedTask = await Task.WhenAny(putRecordTask, timeoutTask);

if (completedTask == timeoutTask)
{
    throw new TimeoutException("SDK hung and ignored cancellation token");
}
// ...

Original we thought it could be something infrastructural like sending data over the internet instead of using AWS' backbone, but I ensured that all the data is going through VPC endpoints as will be demonstrated below.

Image

After our timeouts workaround, we were able to finally detect failures and once we eliminated the abovementioned factors as potential explanations, I started to investigate the network itself.

I noticed that every time the timing out starts the kinesis connection is stuck in a CLOSE_WAIT state, that never changes with exactly 11205 bytes left in Recv-Q

Proto Recv-Q Send-Q Local Address           Foreign Address         State       PID/Program name    
tcp6   11205      0 10.0.0.25:40132         10.0.0.75:443        CLOSE_WAIT  216205/./MyApp

To address this, we tried having multiple concurrent connections to kinesis and round-robining requests - once we experience timeouts on a client in our client pool we would force it's disposal and use another client from the pool, but to no avail - currently the prevailing hypothesis is that internal the kinesis client is doing some pooling of its own.

We've tried the following
Multiple approaches to work around hanging and ignored cancellation:

  • CancellationToken timeouts - Completely ignored during CLOSE_WAIT hangs
  • AmazonKinesisConfig.Timeout - Also ignored during hangs
  • Task.WhenAny workarounds - Required for any timeout control, but tasks still hang
  • Connection recreation - CLOSE_WAIT connections persist after client disposal
  • UseHttp = true - doesnt work at all
  • Multiple client instances - All exhibit same hanging on CLOSE_WAIT
  • Manual HttpClient configuration - Limited SDK configuration options available
  • Thread pool monitoring - Can detect resource exhaustion but can't prevent it

Note that things typically function fine for the first 10minutes to 1 hours, until we get the issue:

Image

Regression Issue

  • Select this option if this issue appears to be a regression.

Expected Behavior

We expect to be able to send data to kinesis, particularly one with an on-demand capacity mode, and not experience catastrophic failures. And, in the advent of those failures we actually would like for it to fail fast and be recoverable, because specifying the FastFailure flags this isn't currently possible.

Current Behavior

Thread hangs indefinitely whilst attempting to send data to an already closed connection causing memory exhaustion and host of other cascading issues.

Reproduction Steps

Aws EC2 instance, VPC Endpoints

using Amazon.Kinesis;
using Amazon.Kinesis.Model;
using Amazon;
using Amazon.Runtime;
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        // Basic AWS credentials (use your own)
        var credentials = new BasicAWSCredentials("your-access-key", "your-secret-key");
        
        var config = new AmazonKinesisConfig
        {
            RegionEndpoint = RegionEndpoint.USEast1, // Use your region
            Timeout = TimeSpan.FromMilliseconds(500) // This will be ignored during CLOSE_WAIT
        };

        var client = new AmazonKinesisClient(credentials, config);
        
        Console.WriteLine("Starting Kinesis reproduction test...");
        
        // Send some initial records to establish connection
        for (int i = 0; i < 10; i++)
        {
            try
            {
                var data = Encoding.UTF8.GetBytes($"test-message-{i}");
                var request = new PutRecordRequest
                {
                    StreamName = "your-stream-name", // Use your stream
                    Data = new MemoryStream(data),
                    PartitionKey = "test-key"
                };

                var start = DateTime.UtcNow;
                var response = await client.PutRecordAsync(request);
                var elapsed = DateTime.UtcNow - start;
                
                Console.WriteLine($"Record {i}: SUCCESS in {elapsed.TotalMilliseconds}ms");
                
                // Small delay between records
                await Task.Delay(100);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Record {i}: ERROR - {ex.Message}");
            }
        }

        Console.WriteLine("\nNow testing cancellation token behavior...");
        
        // Test cancellation token (this will demonstrate the hanging issue)
        for (int i = 0; i < 20; i++)
        {
            try
            {
                var data = Encoding.UTF8.GetBytes($"cancellation-test-{i}");
                var request = new PutRecordRequest
                {
                    StreamName = "your-stream-name",
                    Data = new MemoryStream(data),
                    PartitionKey = "test-key"
                };

                // Create cancellation token with 500ms timeout
                using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(500));
                
                var start = DateTime.UtcNow;
                Console.Write($"Record {i}: ");
                
                try
                {
                    // This will hang during CLOSE_WAIT and ignore the cancellation token
                    var response = await client.PutRecordAsync(request, cts.Token);
                    var elapsed = DateTime.UtcNow - start;
                    Console.WriteLine($"SUCCESS in {elapsed.TotalMilliseconds}ms");
                }
                catch (OperationCanceledException)
                {
                    var elapsed = DateTime.UtcNow - start;
                    Console.WriteLine($"CANCELLED in {elapsed.TotalMilliseconds}ms");
                }
                catch (Exception ex)
                {
                    var elapsed = DateTime.UtcNow - start;
                    Console.WriteLine($"ERROR in {elapsed.TotalMilliseconds}ms - {ex.Message}");
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"OUTER ERROR - {ex.Message}");
            }
        }

        Console.WriteLine("\nTesting Task.WhenAny workaround...");
        
        // Demonstrate the required workaround
        for (int i = 0; i < 10; i++)
        {
            try
            {
                var data = Encoding.UTF8.GetBytes($"workaround-test-{i}");
                var request = new PutRecordRequest
                {
                    StreamName = "your-stream-name",
                    Data = new MemoryStream(data),
                    PartitionKey = "test-key"
                };

                using var cts = new CancellationTokenSource();
                
                var start = DateTime.UtcNow;
                Console.Write($"Workaround {i}: ");
                
                // Required workaround using Task.WhenAny
                var putRecordTask = client.PutRecordAsync(request, cts.Token);
                var timeoutTask = Task.Delay(500, cts.Token);
                
                var completedTask = await Task.WhenAny(putRecordTask, timeoutTask);
                var elapsed = DateTime.UtcNow - start;
                
                if (completedTask == timeoutTask)
                {
                    Console.WriteLine($"TIMEOUT via Task.WhenAny in {elapsed.TotalMilliseconds}ms");
                    // Note: putRecordTask is still running and hanging in background
                }
                else
                {
                    var response = await putRecordTask;
                    Console.WriteLine($"SUCCESS in {elapsed.TotalMilliseconds}ms");
                }
            }
            catch (Exception ex)
            {
                var elapsed = DateTime.UtcNow - DateTime.UtcNow;
                Console.WriteLine($"ERROR - {ex.Message}");
            }
        }
        
        Console.WriteLine("\nTest complete. Check 'netstat -antp' for CLOSE_WAIT connections.");
        Console.WriteLine("Press any key to exit...");
        Console.ReadKey();
    }
}

Possible Solution

No response

Additional Information/Context

No response

AWS .NET SDK and/or Package version used

AWSSDK.Kinesis

Targeted .NET Platform

.NET 9.0

Operating System and version

AmazonLinux

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugThis issue is a bug.closing-soonThis issue will automatically close in 4 days unless further comments are made.queuedresponse-requestedWaiting on additional info and feedback. Will move to "closing-soon" in 7 days.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions