diff --git a/load_tests/load_test.py b/load_tests/load_test.py index 7ac79fc13..cb765fc25 100644 --- a/load_tests/load_test.py +++ b/load_tests/load_test.py @@ -9,20 +9,25 @@ from datetime import datetime, timezone import create_testing_resources.kinesis_s3_firehose.resource_resolver as resource_resolver -WAITER_SLEEP = 300 -MAX_WAITER_ATTEMPTS = 24 +WAITER_SLEEP = 30 +MAX_WAITER_ATTEMPTS = 240 MAX_WAITER_DESCRIBE_FAILURES = 2 IS_TASK_DEFINITION_PRINTED = True PLATFORM = os.environ['PLATFORM'].lower() OUTPUT_PLUGIN = os.environ['OUTPUT_PLUGIN'].lower() +LOG_GROUP_NAME = os.environ.get('CW_LOG_GROUP_NAME', "unavailable") +AWS_REGION = os.environ['AWS_REGION'] +S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME', "unavailable") TESTING_RESOURCES_STACK_NAME = os.environ['TESTING_RESOURCES_STACK_NAME'] PREFIX = os.environ['PREFIX'] EKS_CLUSTER_NAME = os.environ['EKS_CLUSTER_NAME'] LOGGER_RUN_TIME_IN_SECOND = 600 -BUFFER_TIME_IN_SECOND = 1000 NUM_OF_EKS_NODES = 4 +BUFFER_TIME_IN_SECOND = 600 if OUTPUT_PLUGIN == 'cloudwatch': THROUGHPUT_LIST = json.loads(os.environ['CW_THROUGHPUT_LIST']) + # Cloudwatch requires more waiting for all log events to show up in the stream. + BUFFER_TIME_IN_SECOND = 2400 else: THROUGHPUT_LIST = json.loads(os.environ['THROUGHPUT_LIST']) @@ -49,6 +54,10 @@ "cloudwatch": "cloudwatch_logs", } +def __sleep(duration, reason): + print("Sleeping for {}s, ts=[{}] reason=[{}]".format(duration, datetime.now().isoformat(), reason), flush=True) + time.sleep(duration) + # Return the approximate log delay for each ecs load test # Estimate log delay = task_stop_time - task_start_time - logger_image_run_time def get_log_delay(log_delay_epoch_time): @@ -58,7 +67,9 @@ def get_log_delay(log_delay_epoch_time): def set_buffer(stop_epoch_time): curr_epoch_time = time.time() if (curr_epoch_time - stop_epoch_time) < BUFFER_TIME_IN_SECOND: - time.sleep(int(BUFFER_TIME_IN_SECOND - curr_epoch_time + stop_epoch_time)) + __sleep(int(BUFFER_TIME_IN_SECOND - curr_epoch_time + stop_epoch_time), + "Waiting for all logs to be sent to destination. "+ + f"destination={OUTPUT_PLUGIN} logGroupName={LOG_GROUP_NAME} s3Bucket={S3_BUCKET_NAME} prefix={PREFIX}") # convert datetime to epoch time def parse_time(time): @@ -92,7 +103,7 @@ def generate_task_definition(throughput, input_logger, s3_fluent_config_arn): # App Container Environment Variables '$APP_IMAGE': input_logger['logger_image'], '$LOGGER_RUN_TIME_IN_SECOND': str(LOGGER_RUN_TIME_IN_SECOND), - + # Firelens Container Environment Variables '$FLUENT_BIT_IMAGE': os.environ['FLUENT_BIT_IMAGE'], '$INPUT_NAME': input_logger['name'], @@ -110,7 +121,7 @@ def generate_task_definition(throughput, input_logger, s3_fluent_config_arn): # Plugin Specific Environment Variables 'cloudwatch': { - '$CW_LOG_GROUP_NAME': os.environ['CW_LOG_GROUP_NAME'], + '$CW_LOG_GROUP_NAME': LOG_GROUP_NAME, '$STD_LOG_STREAM_NAME': resource_resolver.resolve_cloudwatch_logs_stream_name(std_config), '$CUSTOM_LOG_STREAM_NAME': resource_resolver.resolve_cloudwatch_logs_stream_name(custom_config) }, @@ -123,7 +134,7 @@ def generate_task_definition(throughput, input_logger, s3_fluent_config_arn): '$CUSTOM_STREAM_PREFIX': resource_resolver.resolve_kinesis_delivery_stream_name(custom_config), }, 's3': { - '$S3_BUCKET_NAME': os.environ['S3_BUCKET_NAME'], + '$S3_BUCKET_NAME': S3_BUCKET_NAME, '$STD_S3_OBJECT_NAME': resource_resolver.resolve_s3_object_name(std_config), }, } @@ -141,16 +152,16 @@ def generate_task_definition(throughput, input_logger, s3_fluent_config_arn): # Register task definition task_def = json.loads(task_def_formatted) - + if IS_TASK_DEFINITION_PRINTED: - print("Registering task definition:") - print(json.dumps(task_def, indent=4)) + print("Registering task definition:", flush=True) + print(json.dumps(task_def, indent=4), flush=True) client = boto3.client('ecs') client.register_task_definition( **task_def ) else: - print("Registering task definition") + print("Registering task definition", flush=True) # With multiple codebuild projects running parallel, # Testing resources only needs to be created once @@ -171,11 +182,11 @@ def create_testing_resources(): StackName=TESTING_RESOURCES_STACK_NAME ) else: - # scale up eks cluster + # scale up eks cluster if PLATFORM == 'eks': os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes={NUM_OF_EKS_NODES} ng') while True: - time.sleep(90) + __sleep(90, "Waiting for EKS cluster nodes") number_of_nodes = subprocess.getoutput("kubectl get nodes --no-headers=true | wc -l") if(int(number_of_nodes) == NUM_OF_EKS_NODES): break @@ -191,11 +202,12 @@ def wait_ecs_tasks(ecs_cluster_name, task_arn): running = True attempts = 0 failures = 0 - print(f'waiting on task_arn={task_arn}') + print(f'Waiting on task_arn={task_arn}', flush=True) client = boto3.client('ecs') while running: - time.sleep(WAITER_SLEEP) + if attempts > 0: + __sleep(WAITER_SLEEP, "Waiting to poll for task status, taskarn={}".format(task_arn)) attempts += 1 response = client.describe_tasks( cluster=ecs_cluster_name, @@ -203,21 +215,21 @@ def wait_ecs_tasks(ecs_cluster_name, task_arn): task_arn, ] ) - print(f'describe_task_wait_on={response}') + print(f'describe_task_wait_on={response}', flush=True) if len(response['failures']) > 0: # above we print the full actual reponse for debugging - print('decribe_task failure') + print('decribe_task failure', flush=True) failures += 1 if failures >= MAX_WAITER_DESCRIBE_FAILURES: break continue status = response['tasks'][0]['lastStatus'] - print(f'task {task_arn} is {status}') + print(f'task {task_arn} is {status}', flush=True) # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-lifecycle.html if status == 'STOPPED' or status == 'DELETED': running = False if attempts >= MAX_WAITER_ATTEMPTS: - print(f'stopped tasks waiter failed after {MAX_WAITER_ATTEMPTS}') + print(f'stopped tasks waiter failed after {MAX_WAITER_ATTEMPTS}', flush=True) running = False @@ -247,20 +259,20 @@ def run_ecs_tests(): launchType='EC2', taskDefinition=f'{PREFIX}{OUTPUT_PLUGIN}-{throughput}-{input_logger["name"]}' ) - print(f'run_task_response={response}') + print(f'run_task_response={response}', flush=True) names[f'{OUTPUT_PLUGIN}_{input_logger["name"]}_{throughput}_task_arn'] = response['tasks'][0]['taskArn'] - + # Validation input type banner - print(f'\nTest {input_logger["name"]} to {OUTPUT_PLUGIN} in progress...') + print(f'\nTest {input_logger["name"]} to {OUTPUT_PLUGIN} in progress...', flush=True) # Tasks need time to run - time.sleep(600) + __sleep(LOGGER_RUN_TIME_IN_SECOND, "Waiting for tasks to have time to run") # wait for tasks and validate for input_logger in INPUT_LOGGERS: # Wait until task stops and start validation processes = [] - + for throughput in THROUGHPUT_LIST: client = boto3.client('ecs') task_arn = names[f'{OUTPUT_PLUGIN}_{input_logger["name"]}_{throughput}_task_arn'] @@ -271,8 +283,8 @@ def run_ecs_tests(): task_arn, ] ) - print(f'task_arn={task_arn}') - print(f'describe_tasks_response={response}') + print(f'task_arn={task_arn}', flush=True) + print(f'describe_tasks_response={response}', flush=True) input_record = calculate_total_input_number(throughput) if len(response['failures']) == 0: check_app_exit_code(response) @@ -284,7 +296,7 @@ def run_ecs_tests(): # missing tasks might mean the task stopped some time ago # and ECS already reaped/deleted it # try skipping straight to validation - log_delay = 'not supported' # we don't actually use this right now in results + log_delay = 'unavailable' # we don't actually use this right now in results # Validate logs os.environ['LOG_SOURCE_NAME'] = input_logger["name"] @@ -295,33 +307,38 @@ def run_ecs_tests(): "input_configuration": input_configuration, } - validator_env = { - **os.environ.copy(), - } - if OUTPUT_PLUGIN == 'cloudwatch': - validator_env['LOG_PREFIX'] = resource_resolver.get_destination_cloudwatch_prefix(test_configuration["input_configuration"]) - validator_env['DESTINATION'] = 'cloudwatch' + log_prefix = resource_resolver.get_destination_cloudwatch_prefix(test_configuration["input_configuration"]) else: - validator_env['LOG_PREFIX'] = resource_resolver.get_destination_s3_prefix(test_configuration["input_configuration"], OUTPUT_PLUGIN) - validator_env['DESTINATION'] = 's3' - + log_prefix = resource_resolver.get_destination_s3_prefix(test_configuration["input_configuration"], OUTPUT_PLUGIN) + + exec_args = ['go', 'run', './load_tests/validation/validate.go', + '-input-record', input_record, + '-log-delay', log_delay, + '-region', AWS_REGION, + '-bucket', S3_BUCKET_NAME, + '-log-group', LOG_GROUP_NAME, + '-prefix', log_prefix, + '-destination', OUTPUT_PLUGIN, + ] + print("Running validator process. cmd=[{}]".format(' '.join(exec_args)), flush=True) processes.append({ "input_logger": input_logger, "test_configuration": test_configuration, - "process": subprocess.Popen(['go', 'run', './load_tests/validation/validate.go', input_record, log_delay], stdout=subprocess.PIPE, - env=validator_env - ) + "process": subprocess.Popen(exec_args, stdout=subprocess.PIPE) }) # Wait until all subprocesses for validation completed for p in processes: + print("Waiting for validator process to complete cmd=[{}]".format(' '.join(p["process"].args)), flush=True) p["process"].wait() stdout, stderr = p["process"].communicate() - print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stdout: {stdout}') - print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stderr: {stderr}') + return_code = p["process"].returncode + print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stdout: {stdout}', flush=True) + print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stderr: {stderr}', flush=True) + print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator return code: {return_code}', flush=True) p["result"] = stdout - print(f'Test {input_logger["name"]} to {OUTPUT_PLUGIN} complete.') + print(f'Test {input_logger["name"]} to {OUTPUT_PLUGIN} complete.', flush=True) parsedValidationOutputs = list(map(lambda p: { **p, @@ -331,15 +348,15 @@ def run_ecs_tests(): test_results.extend(parsedValidationOutputs) # Print output - print("\n\nValidation results:\n") - print(format_test_results_to_markdown(test_results)) + print("\n\nValidation results:\n", flush=True) + print(format_test_results_to_markdown(test_results), flush=True) # Bar check if not validation_bar.bar_raiser(test_results): - print("Failed validation bar.") + print("Failed validation bar.", flush=True) sys.exit("Failed to pass the test_results validation bar") else: - print("Passed validation bar.") + print("Passed validation bar.", flush=True) def parse_validation_output(validationResultString): return { x[0]: x[1] for x in list( @@ -427,30 +444,38 @@ def parse_json_template(template, dict): # Returns s3 arn def publish_fluent_config_s3(input_logger): - bucket_name = os.environ['S3_BUCKET_NAME'] s3 = boto3.client('s3') s3.upload_file( input_logger["fluent_config_file_path"], - bucket_name, + S3_BUCKET_NAME, f'{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf', ) - return f'arn:aws:s3:::{bucket_name}/{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf' + return f'arn:aws:s3:::{S3_BUCKET_NAME}/{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf' -# The following method is used to clear data after all tests run +# The following method is used to clear data after all tests run. +# We set retention/expiration policies so that tests do not interfere with each other, and so that +# we can debug and run validation manually if necessary. def delete_testing_data(session): - # Delete associated cloudwatch log streams - client = session.client('logs') - response = client.describe_log_streams( - logGroupName=os.environ['CW_LOG_GROUP_NAME'] - ) - for stream in response["logStreams"]: - client.delete_log_stream( - logGroupName=os.environ['CW_LOG_GROUP_NAME'], - logStreamName=stream["logStreamName"] + print("Setting auto-delete policies for CW log groups, deleting S3 bucket") + retention_days = 5 + + logs_client = session.client('logs') + try: + # Set the retention policy for the log group + response = logs_client.put_retention_policy( + logGroupName=LOG_GROUP_NAME, + retentionInDays=retention_days ) + print(f"Retention policy set successfully for log group. logGroupName={LOG_GROUP_NAME} retentionDays={retention_days}") + except Exception as e: + print(f"Error setting retention policy: {e}") + # Empty s3 bucket + # lifecycle config cannot currently be used because the bucket name + # is reused between tests, so it must be completely deleted after each test. s3 = session.resource('s3') - bucket = s3.Bucket(os.environ['S3_BUCKET_NAME']) + bucket = s3.Bucket(S3_BUCKET_NAME) + print(f"Deleting all objects in s3 bucket: {S3_BUCKET_NAME}") bucket.objects.all().delete() def generate_daemonset_config(throughput): @@ -459,12 +484,12 @@ def generate_daemonset_config(throughput): '$FLUENT_BIT_IMAGE': os.environ['FLUENT_BIT_IMAGE'], '$APP_IMAGE': os.environ['EKS_APP_IMAGE'], '$TIME': str(LOGGER_RUN_TIME_IN_SECOND), - '$CW_LOG_GROUP_NAME': os.environ['CW_LOG_GROUP_NAME'], + '$CW_LOG_GROUP_NAME': LOG_GROUP_NAME, } fin = open(f'./load_tests/daemonset/{OUTPUT_PLUGIN}.yaml', 'r') data = fin.read() for key in daemonset_config_dict: - data = data.replace(key, daemonset_config_dict[key]) + data = data.replace(key, daemonset_config_dict[key]) fout = open(f'./load_tests/daemonset/{OUTPUT_PLUGIN}_{throughput}.yaml', 'w') fout.write(data) fout.close() @@ -478,14 +503,14 @@ def run_eks_tests(): generate_daemonset_config(throughput) os.system(f'kubectl apply -f ./load_tests/daemonset/{OUTPUT_PLUGIN}_{throughput}.yaml') # wait (10 mins run + buffer for setup/log delivery) - time.sleep(1000) + __sleep(1000, "Waiting 10 minutes+buffer to setup and log delivery") for throughput in THROUGHPUT_LIST: input_record = calculate_total_input_number(throughput) response = client.describe_log_streams( - logGroupName=os.environ['CW_LOG_GROUP_NAME'], + logGroupName=LOG_GROUP_NAME, logStreamNamePrefix=f'{PREFIX}kube.var.log.containers.ds-cloudwatch-{throughput}', orderBy='LogStreamName' - ) + ) for log_stream in response['logStreams']: if 'app-' not in log_stream['logStreamName']: continue @@ -495,12 +520,13 @@ def run_eks_tests(): os.environ['LOG_PREFIX'] = log_stream['logStreamName'] os.environ['DESTINATION'] = 'cloudwatch' processes.add(subprocess.Popen(['go', 'run', './load_tests/validation/validate.go', input_record, log_delay])) - + # Wait until all subprocesses for validation completed for p in processes: p.wait() def delete_testing_resources(): + print("Deleting test resources") # Create sts session session = get_sts_boto_session() @@ -508,13 +534,15 @@ def delete_testing_resources(): # delete all S3 config files delete_testing_data(session) - # All related testing resources will be destroyed once the stack is deleted + print(f"Deleting cloudformation stack. stackName={TESTING_RESOURCES_STACK_NAME}", flush=True) + # All related testing resources will be destroyed once the stack is deleted client = session.client('cloudformation') client.delete_stack( StackName=TESTING_RESOURCES_STACK_NAME ) # scale down eks cluster if PLATFORM == 'eks': + print("Scaling down EKS cluster", flush=True) os.system('kubectl delete namespace load-test-fluent-bit-eks-ns') os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes=0 ng') @@ -541,7 +569,7 @@ def get_sts_boto_session(): DurationSeconds=3600 ) - # From the response that contains the assumed role, get the temporary + # From the response that contains the assumed role, get the temporary # credentials that can be used to make subsequent API calls credentials=assumed_role_object['Credentials'] diff --git a/load_tests/task_definitions/cloudwatch.json b/load_tests/task_definitions/cloudwatch.json index 0a9648b4a..3e08e14d3 100644 --- a/load_tests/task_definitions/cloudwatch.json +++ b/load_tests/task_definitions/cloudwatch.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 50 + "memoryReservation": 10240 }, { "essential": true, @@ -50,7 +50,7 @@ "containerName": "log_router" } ], - "environment" : [ + "environment" : [ { "name" : "ITERATION", "value" : "$THROUGHPUT" }, { "name" : "TIME", "value" : "$LOGGER_RUN_TIME_IN_SECOND" }, { "name" : "LOGGER_PORT", "value": "$LOGGER_PORT" }, @@ -61,4 +61,4 @@ "memoryReservation": 100 } ] -} \ No newline at end of file +} diff --git a/load_tests/task_definitions/firehose.json b/load_tests/task_definitions/firehose.json index 33b1fb436..a27b00a8a 100644 --- a/load_tests/task_definitions/firehose.json +++ b/load_tests/task_definitions/firehose.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 50 + "memoryReservation": 10240 }, { "essential": true, @@ -50,7 +50,7 @@ "containerName": "log_router" } ], - "environment" : [ + "environment" : [ { "name" : "ITERATION", "value" : "$THROUGHPUT" }, { "name" : "TIME", "value" : "$LOGGER_RUN_TIME_IN_SECOND" }, { "name" : "LOGGER_PORT", "value": "$LOGGER_PORT" }, @@ -61,4 +61,4 @@ "memoryReservation": 100 } ] -} \ No newline at end of file +} diff --git a/load_tests/task_definitions/kinesis.json b/load_tests/task_definitions/kinesis.json index 5ce6f8e0f..7513fc7db 100644 --- a/load_tests/task_definitions/kinesis.json +++ b/load_tests/task_definitions/kinesis.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 50 + "memoryReservation": 10240 }, { "essential": true, @@ -50,7 +50,7 @@ } ], "name": "app", - "environment" : [ + "environment" : [ { "name" : "ITERATION", "value" : "$THROUGHPUT" }, { "name" : "TIME", "value" : "$LOGGER_RUN_TIME_IN_SECOND" }, { "name" : "LOGGER_PORT", "value": "$LOGGER_PORT" }, @@ -61,4 +61,4 @@ "memoryReservation": 100 } ] -} \ No newline at end of file +} diff --git a/load_tests/task_definitions/s3.json b/load_tests/task_definitions/s3.json index a25b487c7..162025285 100644 --- a/load_tests/task_definitions/s3.json +++ b/load_tests/task_definitions/s3.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 50 + "memoryReservation": 10240 }, { "essential": true, @@ -50,7 +50,7 @@ "containerName": "log_router" } ], - "environment" : [ + "environment" : [ { "name" : "ITERATION", "value" : "$THROUGHPUT" }, { "name" : "TIME", "value" : "$LOGGER_RUN_TIME_IN_SECOND" }, { "name" : "LOGGER_PORT", "value": "$LOGGER_PORT" }, @@ -61,4 +61,4 @@ "memoryReservation": 100 } ] -} \ No newline at end of file +} diff --git a/load_tests/validation/validate.go b/load_tests/validation/validate.go index 461edc216..e06e48add 100644 --- a/load_tests/validation/validate.go +++ b/load_tests/validation/validate.go @@ -2,8 +2,9 @@ package main import ( "encoding/json" + "flag" "fmt" - "io/ioutil" + "io" "os" "strconv" "strings" @@ -16,12 +17,11 @@ import ( ) const ( - envAWSRegion = "AWS_REGION" - envS3Bucket = "S3_BUCKET_NAME" - envCWLogGroup = "CW_LOG_GROUP_NAME" - envLogPrefix = "LOG_PREFIX" - envDestination = "DESTINATION" - idCounterBase = 10000000 + idCounterBase = 10000000 +) + +var ( + inputMap map[uint32]struct{} ) type Message struct { @@ -29,67 +29,63 @@ type Message struct { } func main() { - region := os.Getenv(envAWSRegion) - if region == "" { - exitErrorf("[TEST FAILURE] AWS Region required. Set the value for environment variable- %s", envAWSRegion) + // Define flags + region := flag.String("region", "", "AWS Region") + bucket := flag.String("bucket", "", "S3 Bucket Name") + logGroup := flag.String("log-group", "", "CloudWatch Log Group Name") + prefix := flag.String("prefix", "", "Log Prefix") + destination := flag.String("destination", "", "Log Destination (s3 or cloudwatch)") + inputRecord := flag.Int("input-record", 0, "Total input record number") + logDelay := flag.String("log-delay", "", "Log delay") + + // Parse flags + flag.Parse() + + // Validate required flags + if *region == "" { + exitErrorf("[TEST FAILURE] AWS Region required. Use the -region flag.") } - - bucket := os.Getenv(envS3Bucket) - if bucket == "" { - exitErrorf("[TEST FAILURE] Bucket name required. Set the value for environment variable- %s", envS3Bucket) + if *bucket == "" { + exitErrorf("[TEST FAILURE] Bucket name required. Use the -bucket flag.") } - - logGroup := os.Getenv(envCWLogGroup) - if logGroup == "" { - exitErrorf("[TEST FAILURE] Log group name required. Set the value for environment variable- %s", envCWLogGroup) + if *logGroup == "" { + exitErrorf("[TEST FAILURE] Log group name required. Use the -log-group flag.") } - - prefix := os.Getenv(envLogPrefix) - if prefix == "" { - exitErrorf("[TEST FAILURE] Object prefix required. Set the value for environment variable- %s", envLogPrefix) + if *prefix == "" { + exitErrorf("[TEST FAILURE] Object prefix required. Use the -prefix flag.") } - - destination := os.Getenv(envDestination) - if destination == "" { - exitErrorf("[TEST FAILURE] Log destination for validation required. Set the value for environment variable- %s", envDestination) + if *destination == "" { + exitErrorf("[TEST FAILURE] Log destination for validation required. Use the -destination flag.") } - - inputRecord := os.Args[1] - if inputRecord == "" { - exitErrorf("[TEST FAILURE] Total input record number required. Set the value as the first argument") + if *inputRecord == 0 { + exitErrorf("[TEST FAILURE] Total input record number required. Use the -input-record flag.") } - totalInputRecord, _ := strconv.Atoi((inputRecord)) - // Map for counting unique records in corresponding destination - inputMap := make(map[string]bool) - for i := 0; i < totalInputRecord; i++ { - recordId := strconv.Itoa(idCounterBase + i) - inputMap[recordId] = false + if *logDelay == "" { + exitErrorf("[TEST FAILURE] Log delay required. Use the -log-delay flag.") } - logDelay := os.Args[2] - if logDelay == "" { - exitErrorf("[TEST FAILURE] Log delay required. Set the value as the second argument") - } + // Map for counting unique records in corresponding destination + inputMap = make(map[uint32]struct{}, *inputRecord) totalRecordFound := 0 - if destination == "s3" { - s3Client, err := getS3Client(region) + if *destination == "s3" { + s3Client, err := getS3Client(*region) if err != nil { exitErrorf("[TEST FAILURE] Unable to create new S3 client: %v", err) } - totalRecordFound, inputMap = validate_s3(s3Client, bucket, prefix, inputMap) - } else if destination == "cloudwatch" { - cwClient, err := getCWClient(region) + totalRecordFound = validate_s3(s3Client, *bucket, *prefix) + } else if *destination == "cloudwatch" { + cwClient, err := getCWClient(*region) if err != nil { exitErrorf("[TEST FAILURE] Unable to create new CloudWatch client: %v", err) } - totalRecordFound, inputMap = validate_cloudwatch(cwClient, logGroup, prefix, inputMap) + totalRecordFound = validate_cloudwatch(cwClient, *logGroup, *prefix) } // Get benchmark results based on log loss, log delay and log duplication - get_results(totalInputRecord, totalRecordFound, inputMap, logDelay) + get_results(*inputRecord, totalRecordFound, *logDelay) } // Creates a new S3 Client @@ -109,14 +105,12 @@ func getS3Client(region string) (*s3.S3, error) { // Log format generated by our producer: 8CharUniqueID_13CharTimestamp_RandomString (10029999_1639151827578_RandomString). // Both of the Kinesis Streams and Kinesis Firehose try to send each log maintaining the "at least once" policy. // To validate, we need to make sure all the log records from input file are stored at least once. -func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[string]bool) (int, map[string]bool) { +func validate_s3(s3Client *s3.S3, bucket string, prefix string) int { var continuationToken *string var input *s3.ListObjectsV2Input s3RecordCounter := 0 s3ObjectCounter := 0 - // Returns all the objects from a S3 bucket with the given prefix. - // This approach utilizes NextContinuationToken to pull all the objects from the S3 bucket. for { input = &s3.ListObjectsV2Input{ Bucket: aws.String(bucket), @@ -126,7 +120,7 @@ func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[str response, err := s3Client.ListObjectsV2(input) if err != nil { - exitErrorf("[TEST FAILURE] Error occured to get the objects from bucket: %q., %v", bucket, err) + exitErrorf("[TEST FAILURE] Error occurred to get the objects from bucket: %q., %v", bucket, err) } for _, content := range response.Contents { @@ -134,39 +128,38 @@ func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[str Bucket: aws.String(bucket), Key: content.Key, } - obj := getS3Object(s3Client, input) - s3ObjectCounter++ - - dataByte, err := ioutil.ReadAll(obj.Body) + obj, err := s3Client.GetObject(input) if err != nil { - exitErrorf("[TEST FAILURE] Error to parse GetObject response. %v", err) + exitErrorf("[TEST FAILURE] Error to get S3 object. %v", err) } + s3ObjectCounter++ - data := strings.Split(string(dataByte), "\n") - - for _, d := range data { - if d == "" { - continue - } - + // Directly unmarshal the JSON objects from the S3 object body + decoder := json.NewDecoder(obj.Body) + for { var message Message - - decodeError := json.Unmarshal([]byte(d), &message) - if decodeError != nil { - fmt.Println("[TEST ERROR] Malform log entry. Unmarshal Error:", decodeError) - fmt.Println(" Malform entry: %s", d) - // Skip malform log entries (count them as lost logs) + err := decoder.Decode(&message) + if err == io.EOF { + break + } + if err != nil { + fmt.Println("[TEST ERROR] Malform log entry. Unmarshal Error:", err) continue } - // First 8 char is the unique record ID recordId := message.Log[:8] - s3RecordCounter += 1 - if _, ok := inputMap[recordId]; ok { - // Setting true to indicate that this record was found in the destination - inputMap[recordId] = true + s3RecordCounter++ + value, err := strconv.ParseUint(recordId, 10, 32) + if err != nil { + fmt.Println("[TEST ERROR] Malform log entry. ParseUint Error:", err) + continue } + recordIdUint := uint32(value) + inputMap[recordIdUint] = struct{}{} } + + // Close the S3 object body + obj.Body.Close() } if !aws.BoolValue(response.IsTruncated) { @@ -177,18 +170,43 @@ func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[str fmt.Println("total_s3_obj, ", s3ObjectCounter) - return s3RecordCounter, inputMap + return s3RecordCounter } -// Retrieves an object from a S3 bucket -func getS3Object(s3Client *s3.S3, input *s3.GetObjectInput) *s3.GetObjectOutput { - obj, err := s3Client.GetObject(input) - +func processFile(file *os.File, filePath string) (int, error) { + var err error + file, err = os.Open(filePath) if err != nil { - exitErrorf("[TEST FAILURE] Error occured to get s3 object: %v", err) + return 0, fmt.Errorf("error opening file: %v", err) } + defer file.Close() + var message Message + var recordId string + + localCounter := 0 + // Directly unmarshal the JSON objects from the S3 object body + decoder := json.NewDecoder(file) + for { + err = decoder.Decode(&message) + if err == io.EOF { + break + } + if err != nil { + fmt.Println("[TEST ERROR] Malform log entry. Unmarshal Error:", err) + continue + } - return obj + recordId = message.Log[:8] + value, err := strconv.ParseUint(recordId, 10, 32) + if err != nil { + fmt.Println("[TEST ERROR] Malform log entry. ParseUint Error:", err) + continue + } + recordIdUint := uint32(value) + inputMap[recordIdUint] = struct{}{} + } + + return localCounter, nil } // Creates a new CloudWatch Client @@ -206,7 +224,7 @@ func getCWClient(region string) (*cloudwatchlogs.CloudWatchLogs, error) { // Validate logs in CloudWatch. // Similar logic as S3 validation. -func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup string, logStream string, inputMap map[string]bool) (int, map[string]bool) { +func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup string, logStream string) int { var forwardToken *string var input *cloudwatchlogs.GetLogEventsInput cwRecoredCounter := 0 @@ -219,6 +237,7 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin LogGroupName: aws.String(logGroup), LogStreamName: aws.String(logStream), StartFromHead: aws.Bool(true), + Limit: aws.Int64(10000), } } else { input = &cloudwatchlogs.GetLogEventsInput{ @@ -226,22 +245,17 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin LogStreamName: aws.String(logStream), NextToken: forwardToken, StartFromHead: aws.Bool(true), + Limit: aws.Int64(10000), } + // Sleep between GetLogEvents calls to avoid throttling + time.Sleep(100 * time.Millisecond) } - /* - * In testing we have found that CW GetLogEvents results are highly inconsistent - * Re-running validation long after tests shows that fewer events were lost than - * first calculated. So we sleep between calls to ensure we never exceed 1 TPS - * load_test.py also has a sleep before validation runs. - */ - time.Sleep(1 * time.Second) - response, err := cwClient.GetLogEvents(input) for err != nil { // retry for throttling exception if strings.Contains(err.Error(), "ThrottlingException: Rate exceeded") { - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) response, err = cwClient.GetLogEvents(input) } else { exitErrorf("[TEST FAILURE] Error occured to get the log events from log group: %q., %v", logGroup, err) @@ -253,11 +267,14 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin // First 8 char is the unique record ID recordId := log[:8] - cwRecoredCounter += 1 - if _, ok := inputMap[recordId]; ok { - // Setting true to indicate that this record was found in the destination - inputMap[recordId] = true + value, err := strconv.ParseUint(recordId, 10, 32) + if err != nil { + fmt.Println("Error:", err) + continue } + recordIdUint := uint32(value) + cwRecoredCounter += 1 + inputMap[recordIdUint] = struct{}{} } // Same NextForwardToken will be returned if we reach the end of the log stream @@ -268,17 +285,11 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin forwardToken = response.NextForwardToken } - return cwRecoredCounter, inputMap + return cwRecoredCounter } -func get_results(totalInputRecord int, totalRecordFound int, recordMap map[string]bool, logDelay string) { - uniqueRecordFound := 0 - // Count how many unique records were found in the destination - for _, v := range recordMap { - if v { - uniqueRecordFound++ - } - } +func get_results(totalInputRecord int, totalRecordFound int, logDelay string) { + uniqueRecordFound := len(inputMap) fmt.Println("total_input, ", totalInputRecord) fmt.Println("total_destination, ", totalRecordFound)