From bf24935aaa918e36e5076f10032a7cf5970b3b42 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Wed, 18 Dec 2024 16:50:36 -0800 Subject: [PATCH 01/11] wip --- load_tests/load_test.py | 91 +++++++++++---------- load_tests/task_definitions/cloudwatch.json | 6 +- load_tests/task_definitions/firehose.json | 6 +- load_tests/task_definitions/kinesis.json | 6 +- load_tests/task_definitions/s3.json | 6 +- load_tests/validation/validate.go | 14 ++-- 6 files changed, 67 insertions(+), 62 deletions(-) diff --git a/load_tests/load_test.py b/load_tests/load_test.py index 7ac79fc13..aaf757497 100644 --- a/load_tests/load_test.py +++ b/load_tests/load_test.py @@ -9,8 +9,8 @@ from datetime import datetime, timezone import create_testing_resources.kinesis_s3_firehose.resource_resolver as resource_resolver -WAITER_SLEEP = 300 -MAX_WAITER_ATTEMPTS = 24 +WAITER_SLEEP = 30 +MAX_WAITER_ATTEMPTS = 240 MAX_WAITER_DESCRIBE_FAILURES = 2 IS_TASK_DEFINITION_PRINTED = True PLATFORM = os.environ['PLATFORM'].lower() @@ -19,10 +19,12 @@ PREFIX = os.environ['PREFIX'] EKS_CLUSTER_NAME = os.environ['EKS_CLUSTER_NAME'] LOGGER_RUN_TIME_IN_SECOND = 600 -BUFFER_TIME_IN_SECOND = 1000 NUM_OF_EKS_NODES = 4 +BUFFER_TIME_IN_SECOND = 600 if OUTPUT_PLUGIN == 'cloudwatch': THROUGHPUT_LIST = json.loads(os.environ['CW_THROUGHPUT_LIST']) + # Cloudwatch requires more waiting for all log events to show up in the stream. + BUFFER_TIME_IN_SECOND = 2400 else: THROUGHPUT_LIST = json.loads(os.environ['THROUGHPUT_LIST']) @@ -49,6 +51,10 @@ "cloudwatch": "cloudwatch_logs", } +def __sleep(duration, reason): + print("Sleeping for {}s, ts=[{}] reason=[{}]".format(duration, datetime.now().isoformat(), reason), flush=True) + time.sleep(duration) + # Return the approximate log delay for each ecs load test # Estimate log delay = task_stop_time - task_start_time - logger_image_run_time def get_log_delay(log_delay_epoch_time): @@ -58,7 +64,7 @@ def get_log_delay(log_delay_epoch_time): def set_buffer(stop_epoch_time): curr_epoch_time = time.time() if (curr_epoch_time - stop_epoch_time) < BUFFER_TIME_IN_SECOND: - time.sleep(int(BUFFER_TIME_IN_SECOND - curr_epoch_time + stop_epoch_time)) + __sleep(int(BUFFER_TIME_IN_SECOND - curr_epoch_time + stop_epoch_time), "Waiting for all logs to be sent to destination") # convert datetime to epoch time def parse_time(time): @@ -92,7 +98,7 @@ def generate_task_definition(throughput, input_logger, s3_fluent_config_arn): # App Container Environment Variables '$APP_IMAGE': input_logger['logger_image'], '$LOGGER_RUN_TIME_IN_SECOND': str(LOGGER_RUN_TIME_IN_SECOND), - + # Firelens Container Environment Variables '$FLUENT_BIT_IMAGE': os.environ['FLUENT_BIT_IMAGE'], '$INPUT_NAME': input_logger['name'], @@ -141,16 +147,16 @@ def generate_task_definition(throughput, input_logger, s3_fluent_config_arn): # Register task definition task_def = json.loads(task_def_formatted) - + if IS_TASK_DEFINITION_PRINTED: - print("Registering task definition:") - print(json.dumps(task_def, indent=4)) + print("Registering task definition:", flush=True) + print(json.dumps(task_def, indent=4), flush=True) client = boto3.client('ecs') client.register_task_definition( **task_def ) else: - print("Registering task definition") + print("Registering task definition", flush=True) # With multiple codebuild projects running parallel, # Testing resources only needs to be created once @@ -171,11 +177,11 @@ def create_testing_resources(): StackName=TESTING_RESOURCES_STACK_NAME ) else: - # scale up eks cluster + # scale up eks cluster if PLATFORM == 'eks': os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes={NUM_OF_EKS_NODES} ng') while True: - time.sleep(90) + __sleep(90, "Waiting for EKS cluster nodes") number_of_nodes = subprocess.getoutput("kubectl get nodes --no-headers=true | wc -l") if(int(number_of_nodes) == NUM_OF_EKS_NODES): break @@ -191,11 +197,12 @@ def wait_ecs_tasks(ecs_cluster_name, task_arn): running = True attempts = 0 failures = 0 - print(f'waiting on task_arn={task_arn}') + print(f'waiting on task_arn={task_arn}', flush=True) client = boto3.client('ecs') while running: - time.sleep(WAITER_SLEEP) + if attempts > 0: + __sleep(WAITER_SLEEP, "Waiting to poll for task status, taskarn={}".format(task_arn)) attempts += 1 response = client.describe_tasks( cluster=ecs_cluster_name, @@ -203,21 +210,21 @@ def wait_ecs_tasks(ecs_cluster_name, task_arn): task_arn, ] ) - print(f'describe_task_wait_on={response}') + print(f'describe_task_wait_on={response}', flush=True) if len(response['failures']) > 0: # above we print the full actual reponse for debugging - print('decribe_task failure') + print('decribe_task failure', flush=True) failures += 1 if failures >= MAX_WAITER_DESCRIBE_FAILURES: break continue status = response['tasks'][0]['lastStatus'] - print(f'task {task_arn} is {status}') + print(f'task {task_arn} is {status}', flush=True) # https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-lifecycle.html if status == 'STOPPED' or status == 'DELETED': running = False if attempts >= MAX_WAITER_ATTEMPTS: - print(f'stopped tasks waiter failed after {MAX_WAITER_ATTEMPTS}') + print(f'stopped tasks waiter failed after {MAX_WAITER_ATTEMPTS}', flush=True) running = False @@ -247,20 +254,20 @@ def run_ecs_tests(): launchType='EC2', taskDefinition=f'{PREFIX}{OUTPUT_PLUGIN}-{throughput}-{input_logger["name"]}' ) - print(f'run_task_response={response}') + print(f'run_task_response={response}', flush=True) names[f'{OUTPUT_PLUGIN}_{input_logger["name"]}_{throughput}_task_arn'] = response['tasks'][0]['taskArn'] - + # Validation input type banner - print(f'\nTest {input_logger["name"]} to {OUTPUT_PLUGIN} in progress...') + print(f'\nTest {input_logger["name"]} to {OUTPUT_PLUGIN} in progress...', flush=True) # Tasks need time to run - time.sleep(600) + __sleep(LOGGER_RUN_TIME_IN_SECOND, "Waiting for tasks to have time to run") # wait for tasks and validate for input_logger in INPUT_LOGGERS: # Wait until task stops and start validation processes = [] - + for throughput in THROUGHPUT_LIST: client = boto3.client('ecs') task_arn = names[f'{OUTPUT_PLUGIN}_{input_logger["name"]}_{throughput}_task_arn'] @@ -271,8 +278,8 @@ def run_ecs_tests(): task_arn, ] ) - print(f'task_arn={task_arn}') - print(f'describe_tasks_response={response}') + print(f'task_arn={task_arn}', flush=True) + print(f'describe_tasks_response={response}', flush=True) input_record = calculate_total_input_number(throughput) if len(response['failures']) == 0: check_app_exit_code(response) @@ -284,7 +291,7 @@ def run_ecs_tests(): # missing tasks might mean the task stopped some time ago # and ECS already reaped/deleted it # try skipping straight to validation - log_delay = 'not supported' # we don't actually use this right now in results + log_delay = 'unavailable' # we don't actually use this right now in results # Validate logs os.environ['LOG_SOURCE_NAME'] = input_logger["name"] @@ -306,22 +313,24 @@ def run_ecs_tests(): validator_env['LOG_PREFIX'] = resource_resolver.get_destination_s3_prefix(test_configuration["input_configuration"], OUTPUT_PLUGIN) validator_env['DESTINATION'] = 's3' + exec_args = ['go', 'run', './load_tests/validation/validate.go', input_record, log_delay] + print("Running validator process. args={}".format(exec_args), flush=True) processes.append({ "input_logger": input_logger, "test_configuration": test_configuration, - "process": subprocess.Popen(['go', 'run', './load_tests/validation/validate.go', input_record, log_delay], stdout=subprocess.PIPE, - env=validator_env - ) + "process": subprocess.Popen(exec_args, stdout=subprocess.PIPE, env=validator_env) }) # Wait until all subprocesses for validation completed + print("Waiting for all validation processes to complete", flush=True) for p in processes: + print("Waiting for validator process to complete {}".format(p["process"].args), flush=True) p["process"].wait() stdout, stderr = p["process"].communicate() - print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stdout: {stdout}') - print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stderr: {stderr}') + print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stdout: {stdout}', flush=True) + print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stderr: {stderr}', flush=True) p["result"] = stdout - print(f'Test {input_logger["name"]} to {OUTPUT_PLUGIN} complete.') + print(f'Test {input_logger["name"]} to {OUTPUT_PLUGIN} complete.', flush=True) parsedValidationOutputs = list(map(lambda p: { **p, @@ -331,15 +340,15 @@ def run_ecs_tests(): test_results.extend(parsedValidationOutputs) # Print output - print("\n\nValidation results:\n") - print(format_test_results_to_markdown(test_results)) + print("\n\nValidation results:\n", flush=True) + print(format_test_results_to_markdown(test_results), flush=True) # Bar check if not validation_bar.bar_raiser(test_results): - print("Failed validation bar.") + print("Failed validation bar.", flush=True) sys.exit("Failed to pass the test_results validation bar") else: - print("Passed validation bar.") + print("Passed validation bar.", flush=True) def parse_validation_output(validationResultString): return { x[0]: x[1] for x in list( @@ -464,7 +473,7 @@ def generate_daemonset_config(throughput): fin = open(f'./load_tests/daemonset/{OUTPUT_PLUGIN}.yaml', 'r') data = fin.read() for key in daemonset_config_dict: - data = data.replace(key, daemonset_config_dict[key]) + data = data.replace(key, daemonset_config_dict[key]) fout = open(f'./load_tests/daemonset/{OUTPUT_PLUGIN}_{throughput}.yaml', 'w') fout.write(data) fout.close() @@ -478,14 +487,14 @@ def run_eks_tests(): generate_daemonset_config(throughput) os.system(f'kubectl apply -f ./load_tests/daemonset/{OUTPUT_PLUGIN}_{throughput}.yaml') # wait (10 mins run + buffer for setup/log delivery) - time.sleep(1000) + __sleep(1000, "Waiting 10 minutes+buffer to setup and log delivery") for throughput in THROUGHPUT_LIST: input_record = calculate_total_input_number(throughput) response = client.describe_log_streams( logGroupName=os.environ['CW_LOG_GROUP_NAME'], logStreamNamePrefix=f'{PREFIX}kube.var.log.containers.ds-cloudwatch-{throughput}', orderBy='LogStreamName' - ) + ) for log_stream in response['logStreams']: if 'app-' not in log_stream['logStreamName']: continue @@ -495,7 +504,7 @@ def run_eks_tests(): os.environ['LOG_PREFIX'] = log_stream['logStreamName'] os.environ['DESTINATION'] = 'cloudwatch' processes.add(subprocess.Popen(['go', 'run', './load_tests/validation/validate.go', input_record, log_delay])) - + # Wait until all subprocesses for validation completed for p in processes: p.wait() @@ -508,7 +517,7 @@ def delete_testing_resources(): # delete all S3 config files delete_testing_data(session) - # All related testing resources will be destroyed once the stack is deleted + # All related testing resources will be destroyed once the stack is deleted client = session.client('cloudformation') client.delete_stack( StackName=TESTING_RESOURCES_STACK_NAME @@ -541,7 +550,7 @@ def get_sts_boto_session(): DurationSeconds=3600 ) - # From the response that contains the assumed role, get the temporary + # From the response that contains the assumed role, get the temporary # credentials that can be used to make subsequent API calls credentials=assumed_role_object['Credentials'] diff --git a/load_tests/task_definitions/cloudwatch.json b/load_tests/task_definitions/cloudwatch.json index 0a9648b4a..0af0a25ae 100644 --- a/load_tests/task_definitions/cloudwatch.json +++ b/load_tests/task_definitions/cloudwatch.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 50 + "memoryReservation": 20480 }, { "essential": true, @@ -50,7 +50,7 @@ "containerName": "log_router" } ], - "environment" : [ + "environment" : [ { "name" : "ITERATION", "value" : "$THROUGHPUT" }, { "name" : "TIME", "value" : "$LOGGER_RUN_TIME_IN_SECOND" }, { "name" : "LOGGER_PORT", "value": "$LOGGER_PORT" }, @@ -61,4 +61,4 @@ "memoryReservation": 100 } ] -} \ No newline at end of file +} diff --git a/load_tests/task_definitions/firehose.json b/load_tests/task_definitions/firehose.json index 33b1fb436..a7af6c6f7 100644 --- a/load_tests/task_definitions/firehose.json +++ b/load_tests/task_definitions/firehose.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 50 + "memoryReservation": 20480 }, { "essential": true, @@ -50,7 +50,7 @@ "containerName": "log_router" } ], - "environment" : [ + "environment" : [ { "name" : "ITERATION", "value" : "$THROUGHPUT" }, { "name" : "TIME", "value" : "$LOGGER_RUN_TIME_IN_SECOND" }, { "name" : "LOGGER_PORT", "value": "$LOGGER_PORT" }, @@ -61,4 +61,4 @@ "memoryReservation": 100 } ] -} \ No newline at end of file +} diff --git a/load_tests/task_definitions/kinesis.json b/load_tests/task_definitions/kinesis.json index 5ce6f8e0f..114ff6b18 100644 --- a/load_tests/task_definitions/kinesis.json +++ b/load_tests/task_definitions/kinesis.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 50 + "memoryReservation": 20480 }, { "essential": true, @@ -50,7 +50,7 @@ } ], "name": "app", - "environment" : [ + "environment" : [ { "name" : "ITERATION", "value" : "$THROUGHPUT" }, { "name" : "TIME", "value" : "$LOGGER_RUN_TIME_IN_SECOND" }, { "name" : "LOGGER_PORT", "value": "$LOGGER_PORT" }, @@ -61,4 +61,4 @@ "memoryReservation": 100 } ] -} \ No newline at end of file +} diff --git a/load_tests/task_definitions/s3.json b/load_tests/task_definitions/s3.json index a25b487c7..6b65aed9d 100644 --- a/load_tests/task_definitions/s3.json +++ b/load_tests/task_definitions/s3.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 50 + "memoryReservation": 20480 }, { "essential": true, @@ -50,7 +50,7 @@ "containerName": "log_router" } ], - "environment" : [ + "environment" : [ { "name" : "ITERATION", "value" : "$THROUGHPUT" }, { "name" : "TIME", "value" : "$LOGGER_RUN_TIME_IN_SECOND" }, { "name" : "LOGGER_PORT", "value": "$LOGGER_PORT" }, @@ -61,4 +61,4 @@ "memoryReservation": 100 } ] -} \ No newline at end of file +} diff --git a/load_tests/validation/validate.go b/load_tests/validation/validate.go index 461edc216..16dd0e5bb 100644 --- a/load_tests/validation/validate.go +++ b/load_tests/validation/validate.go @@ -219,6 +219,7 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin LogGroupName: aws.String(logGroup), LogStreamName: aws.String(logStream), StartFromHead: aws.Bool(true), + Limit: aws.Int64(10000), } } else { input = &cloudwatchlogs.GetLogEventsInput{ @@ -226,22 +227,17 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin LogStreamName: aws.String(logStream), NextToken: forwardToken, StartFromHead: aws.Bool(true), + Limit: aws.Int64(10000), } + // Sleep between GetLogEvents calls to avoid throttling + time.Sleep(100 * time.Millisecond) } - /* - * In testing we have found that CW GetLogEvents results are highly inconsistent - * Re-running validation long after tests shows that fewer events were lost than - * first calculated. So we sleep between calls to ensure we never exceed 1 TPS - * load_test.py also has a sleep before validation runs. - */ - time.Sleep(1 * time.Second) - response, err := cwClient.GetLogEvents(input) for err != nil { // retry for throttling exception if strings.Contains(err.Error(), "ThrottlingException: Rate exceeded") { - time.Sleep(1 * time.Second) + time.Sleep(5 * time.Second) response, err = cwClient.GetLogEvents(input) } else { exitErrorf("[TEST FAILURE] Error occured to get the log events from log group: %q., %v", logGroup, err) From 35e897817862c5fef2da35cf9858d632315afdf5 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Thu, 19 Dec 2024 22:53:32 -0800 Subject: [PATCH 02/11] change validator env vars to flags --- load_tests/load_test.py | 17 +++++-- load_tests/validation/validate.go | 82 +++++++++++++++---------------- 2 files changed, 53 insertions(+), 46 deletions(-) diff --git a/load_tests/load_test.py b/load_tests/load_test.py index aaf757497..a42d37995 100644 --- a/load_tests/load_test.py +++ b/load_tests/load_test.py @@ -313,12 +313,23 @@ def run_ecs_tests(): validator_env['LOG_PREFIX'] = resource_resolver.get_destination_s3_prefix(test_configuration["input_configuration"], OUTPUT_PLUGIN) validator_env['DESTINATION'] = 's3' - exec_args = ['go', 'run', './load_tests/validation/validate.go', input_record, log_delay] - print("Running validator process. args={}".format(exec_args), flush=True) + log_group_name = os.environ['CW_LOG_GROUP_NAME'] + if len(log_group_name) == 0: + log_group_name = "unavailable" + exec_args = ['go', 'run', './load_tests/validation/validate.go', + '-input-record', input_record, + '-log-delay', log_delay, + '-region', os.environ['AWS_REGION'], + '-bucket', os.environ['S3_BUCKET_NAME'], + '-log-group', log_group_name, + '-prefix', validator_env['LOG_PREFIX'], + '-destination', validator_env['DESTINATION'], + ] + print("Running validator process. cmd=[{}]".format(' '.join(exec_args)), flush=True) processes.append({ "input_logger": input_logger, "test_configuration": test_configuration, - "process": subprocess.Popen(exec_args, stdout=subprocess.PIPE, env=validator_env) + "process": subprocess.Popen(exec_args, stdout=subprocess.PIPE) }) # Wait until all subprocesses for validation completed diff --git a/load_tests/validation/validate.go b/load_tests/validation/validate.go index 16dd0e5bb..ee6429f81 100644 --- a/load_tests/validation/validate.go +++ b/load_tests/validation/validate.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "flag" "fmt" "io/ioutil" "os" @@ -16,12 +17,7 @@ import ( ) const ( - envAWSRegion = "AWS_REGION" - envS3Bucket = "S3_BUCKET_NAME" - envCWLogGroup = "CW_LOG_GROUP_NAME" - envLogPrefix = "LOG_PREFIX" - envDestination = "DESTINATION" - idCounterBase = 10000000 + idCounterBase = 10000000 ) type Message struct { @@ -29,67 +25,67 @@ type Message struct { } func main() { - region := os.Getenv(envAWSRegion) - if region == "" { - exitErrorf("[TEST FAILURE] AWS Region required. Set the value for environment variable- %s", envAWSRegion) + // Define flags + region := flag.String("region", "", "AWS Region") + bucket := flag.String("bucket", "", "S3 Bucket Name") + logGroup := flag.String("log-group", "", "CloudWatch Log Group Name") + prefix := flag.String("prefix", "", "Log Prefix") + destination := flag.String("destination", "", "Log Destination (s3 or cloudwatch)") + inputRecord := flag.Int("input-record", 0, "Total input record number") + logDelay := flag.String("log-delay", "", "Log delay") + + // Parse flags + flag.Parse() + + // Validate required flags + if *region == "" { + exitErrorf("[TEST FAILURE] AWS Region required. Use the -region flag.") } - - bucket := os.Getenv(envS3Bucket) - if bucket == "" { - exitErrorf("[TEST FAILURE] Bucket name required. Set the value for environment variable- %s", envS3Bucket) + if *bucket == "" { + exitErrorf("[TEST FAILURE] Bucket name required. Use the -bucket flag.") } - - logGroup := os.Getenv(envCWLogGroup) - if logGroup == "" { - exitErrorf("[TEST FAILURE] Log group name required. Set the value for environment variable- %s", envCWLogGroup) + if *logGroup == "" { + exitErrorf("[TEST FAILURE] Log group name required. Use the -log-group flag.") } - - prefix := os.Getenv(envLogPrefix) - if prefix == "" { - exitErrorf("[TEST FAILURE] Object prefix required. Set the value for environment variable- %s", envLogPrefix) + if *prefix == "" { + exitErrorf("[TEST FAILURE] Object prefix required. Use the -prefix flag.") } - - destination := os.Getenv(envDestination) - if destination == "" { - exitErrorf("[TEST FAILURE] Log destination for validation required. Set the value for environment variable- %s", envDestination) + if *destination == "" { + exitErrorf("[TEST FAILURE] Log destination for validation required. Use the -destination flag.") } - - inputRecord := os.Args[1] - if inputRecord == "" { - exitErrorf("[TEST FAILURE] Total input record number required. Set the value as the first argument") + if *inputRecord == 0 { + exitErrorf("[TEST FAILURE] Total input record number required. Use the -input-record flag.") } - totalInputRecord, _ := strconv.Atoi((inputRecord)) + if *logDelay == "" { + exitErrorf("[TEST FAILURE] Log delay required. Use the -log-delay flag.") + } + // Map for counting unique records in corresponding destination inputMap := make(map[string]bool) - for i := 0; i < totalInputRecord; i++ { + for i := 0; i < *inputRecord; i++ { recordId := strconv.Itoa(idCounterBase + i) inputMap[recordId] = false } - logDelay := os.Args[2] - if logDelay == "" { - exitErrorf("[TEST FAILURE] Log delay required. Set the value as the second argument") - } - totalRecordFound := 0 - if destination == "s3" { - s3Client, err := getS3Client(region) + if *destination == "s3" { + s3Client, err := getS3Client(*region) if err != nil { exitErrorf("[TEST FAILURE] Unable to create new S3 client: %v", err) } - totalRecordFound, inputMap = validate_s3(s3Client, bucket, prefix, inputMap) - } else if destination == "cloudwatch" { - cwClient, err := getCWClient(region) + totalRecordFound, inputMap = validate_s3(s3Client, *bucket, *prefix, inputMap) + } else if *destination == "cloudwatch" { + cwClient, err := getCWClient(*region) if err != nil { exitErrorf("[TEST FAILURE] Unable to create new CloudWatch client: %v", err) } - totalRecordFound, inputMap = validate_cloudwatch(cwClient, logGroup, prefix, inputMap) + totalRecordFound, inputMap = validate_cloudwatch(cwClient, *logGroup, *prefix, inputMap) } // Get benchmark results based on log loss, log delay and log duplication - get_results(totalInputRecord, totalRecordFound, inputMap, logDelay) + get_results(*inputRecord, totalRecordFound, inputMap, *logDelay) } // Creates a new S3 Client From 945778a25fbbb33736764dbdb1b2f37abb67a26b Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 20 Dec 2024 09:40:30 -0800 Subject: [PATCH 03/11] S3 lifecycle config --- load_tests/load_test.py | 35 +++++++++++++++++++++++++++++------ 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/load_tests/load_test.py b/load_tests/load_test.py index a42d37995..ea394f8fc 100644 --- a/load_tests/load_test.py +++ b/load_tests/load_test.py @@ -460,18 +460,41 @@ def publish_fluent_config_s3(input_logger): def delete_testing_data(session): # Delete associated cloudwatch log streams client = session.client('logs') + log_group_name = os.environ['CW_LOG_GROUP_NAME'] response = client.describe_log_streams( - logGroupName=os.environ['CW_LOG_GROUP_NAME'] + logGroupName=log_group_name ) for stream in response["logStreams"]: + print("Deleting log stream. logGroupName={} logStreamName={}".format(log_group_name, stream["logStreamName"]), flush=True) client.delete_log_stream( - logGroupName=os.environ['CW_LOG_GROUP_NAME'], + logGroupName=log_group_name, logStreamName=stream["logStreamName"] ) - # Empty s3 bucket - s3 = session.resource('s3') - bucket = s3.Bucket(os.environ['S3_BUCKET_NAME']) - bucket.objects.all().delete() + + # Set 5-day retention period for s3 bucket + s3 = session.client('s3') + bucket_name = os.environ['S3_BUCKET_NAME'] + + # Configure the lifecycle rule + lifecycle_configuration = { + 'Rules': [ + { + 'ID': 'Delete after 5 days', + 'Status': 'Enabled', + 'Expiration': {'Days': 5}, + } + ] + } + + try: + # Apply the lifecycle configuration to the bucket + response = s3_client.put_bucket_lifecycle_configuration( + Bucket=bucket_name, + LifecycleConfiguration=lifecycle_configuration + ) + print(f"Lifecycle rule set successfully for bucket: {bucket_name}", flush=True) + except ClientError as e: + print(f"Error setting lifecycle rule: {e}") def generate_daemonset_config(throughput): daemonset_config_dict = { From 028b4909864538fc36009df079e99665c40226c9 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 20 Dec 2024 10:38:53 -0800 Subject: [PATCH 04/11] wip --- load_tests/load_test.py | 89 +++++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 47 deletions(-) diff --git a/load_tests/load_test.py b/load_tests/load_test.py index ea394f8fc..e00712481 100644 --- a/load_tests/load_test.py +++ b/load_tests/load_test.py @@ -15,6 +15,9 @@ IS_TASK_DEFINITION_PRINTED = True PLATFORM = os.environ['PLATFORM'].lower() OUTPUT_PLUGIN = os.environ['OUTPUT_PLUGIN'].lower() +LOG_GROUP_NAME = os.environ.get('CW_LOG_GROUP_NAME', "unavailable") +AWS_REGION = os.environ['AWS_REGION'] +S3_BUCKET_NAME = os.environ.get('S3_BUCKET_NAME', "unavailable") TESTING_RESOURCES_STACK_NAME = os.environ['TESTING_RESOURCES_STACK_NAME'] PREFIX = os.environ['PREFIX'] EKS_CLUSTER_NAME = os.environ['EKS_CLUSTER_NAME'] @@ -64,7 +67,9 @@ def get_log_delay(log_delay_epoch_time): def set_buffer(stop_epoch_time): curr_epoch_time = time.time() if (curr_epoch_time - stop_epoch_time) < BUFFER_TIME_IN_SECOND: - __sleep(int(BUFFER_TIME_IN_SECOND - curr_epoch_time + stop_epoch_time), "Waiting for all logs to be sent to destination") + __sleep(int(BUFFER_TIME_IN_SECOND - curr_epoch_time + stop_epoch_time), + "Waiting for all logs to be sent to destination. "+ + f"destination={OUTPUT_PLUGIN} logGroupName={LOG_GROUP_NAME} s3Bucket={S3_BUCKET_NAME} prefix={PREFIX}") # convert datetime to epoch time def parse_time(time): @@ -116,7 +121,7 @@ def generate_task_definition(throughput, input_logger, s3_fluent_config_arn): # Plugin Specific Environment Variables 'cloudwatch': { - '$CW_LOG_GROUP_NAME': os.environ['CW_LOG_GROUP_NAME'], + '$CW_LOG_GROUP_NAME': LOG_GROUP_NAME, '$STD_LOG_STREAM_NAME': resource_resolver.resolve_cloudwatch_logs_stream_name(std_config), '$CUSTOM_LOG_STREAM_NAME': resource_resolver.resolve_cloudwatch_logs_stream_name(custom_config) }, @@ -129,7 +134,7 @@ def generate_task_definition(throughput, input_logger, s3_fluent_config_arn): '$CUSTOM_STREAM_PREFIX': resource_resolver.resolve_kinesis_delivery_stream_name(custom_config), }, 's3': { - '$S3_BUCKET_NAME': os.environ['S3_BUCKET_NAME'], + '$S3_BUCKET_NAME': S3_BUCKET_NAME, '$STD_S3_OBJECT_NAME': resource_resolver.resolve_s3_object_name(std_config), }, } @@ -197,7 +202,7 @@ def wait_ecs_tasks(ecs_cluster_name, task_arn): running = True attempts = 0 failures = 0 - print(f'waiting on task_arn={task_arn}', flush=True) + print(f'Waiting on task_arn={task_arn}', flush=True) client = boto3.client('ecs') while running: @@ -302,28 +307,19 @@ def run_ecs_tests(): "input_configuration": input_configuration, } - validator_env = { - **os.environ.copy(), - } - if OUTPUT_PLUGIN == 'cloudwatch': - validator_env['LOG_PREFIX'] = resource_resolver.get_destination_cloudwatch_prefix(test_configuration["input_configuration"]) - validator_env['DESTINATION'] = 'cloudwatch' + log_prefix = resource_resolver.get_destination_cloudwatch_prefix(test_configuration["input_configuration"]) else: - validator_env['LOG_PREFIX'] = resource_resolver.get_destination_s3_prefix(test_configuration["input_configuration"], OUTPUT_PLUGIN) - validator_env['DESTINATION'] = 's3' + log_prefix = resource_resolver.get_destination_s3_prefix(test_configuration["input_configuration"], OUTPUT_PLUGIN) - log_group_name = os.environ['CW_LOG_GROUP_NAME'] - if len(log_group_name) == 0: - log_group_name = "unavailable" exec_args = ['go', 'run', './load_tests/validation/validate.go', '-input-record', input_record, '-log-delay', log_delay, - '-region', os.environ['AWS_REGION'], - '-bucket', os.environ['S3_BUCKET_NAME'], - '-log-group', log_group_name, - '-prefix', validator_env['LOG_PREFIX'], - '-destination', validator_env['DESTINATION'], + '-region', AWS_REGION, + '-bucket', S3_BUCKET_NAME, + '-log-group', LOG_GROUP_NAME, + '-prefix', log_prefix, + '-destination', OUTPUT_PLUGIN, ] print("Running validator process. cmd=[{}]".format(' '.join(exec_args)), flush=True) processes.append({ @@ -335,7 +331,7 @@ def run_ecs_tests(): # Wait until all subprocesses for validation completed print("Waiting for all validation processes to complete", flush=True) for p in processes: - print("Waiting for validator process to complete {}".format(p["process"].args), flush=True) + print("Waiting for validator process to complete {}".format(' '.join(p["process"].args)), flush=True) p["process"].wait() stdout, stderr = p["process"].communicate() print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stdout: {stdout}', flush=True) @@ -447,53 +443,52 @@ def parse_json_template(template, dict): # Returns s3 arn def publish_fluent_config_s3(input_logger): - bucket_name = os.environ['S3_BUCKET_NAME'] s3 = boto3.client('s3') s3.upload_file( input_logger["fluent_config_file_path"], - bucket_name, + S3_BUCKET_NAME, f'{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf', ) - return f'arn:aws:s3:::{bucket_name}/{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf' + return f'arn:aws:s3:::{S3_BUCKET_NAME}/{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf' -# The following method is used to clear data after all tests run +# The following method is used to clear data after all tests run. +# We set retention/expiration policies so that tests do not interfere with each other, and so that +# we can debug and run validation manually if necessary. def delete_testing_data(session): - # Delete associated cloudwatch log streams - client = session.client('logs') - log_group_name = os.environ['CW_LOG_GROUP_NAME'] - response = client.describe_log_streams( - logGroupName=log_group_name - ) - for stream in response["logStreams"]: - print("Deleting log stream. logGroupName={} logStreamName={}".format(log_group_name, stream["logStreamName"]), flush=True) - client.delete_log_stream( - logGroupName=log_group_name, - logStreamName=stream["logStreamName"] + retention_days = 4 + + logs_client = session.client('logs') + try: + # Set the retention policy for the log group + response = logs_client.put_retention_policy( + logGroupName=LOG_GROUP_NAME, + retentionInDays=retention_days ) + print(f"Retention policy set successfully for log group. logGroupName={LOG_GROUP_NAME} retentionDays={retention_days}") + except Exception as e: + print(f"Error setting retention policy: {e}") - # Set 5-day retention period for s3 bucket - s3 = session.client('s3') - bucket_name = os.environ['S3_BUCKET_NAME'] + # Set retention period for s3 bucket + s3_client = session.client('s3') # Configure the lifecycle rule lifecycle_configuration = { 'Rules': [ { - 'ID': 'Delete after 5 days', + 'ID': "Delete after {} days".format(retention_days), 'Status': 'Enabled', - 'Expiration': {'Days': 5}, + 'Expiration': {'Days': retention_days}, } ] } - try: # Apply the lifecycle configuration to the bucket response = s3_client.put_bucket_lifecycle_configuration( - Bucket=bucket_name, + Bucket=S3_BUCKET_NAME, LifecycleConfiguration=lifecycle_configuration ) - print(f"Lifecycle rule set successfully for bucket: {bucket_name}", flush=True) - except ClientError as e: + print(f"Lifecycle rule set successfully for S3 bucket. bucketName={S3_BUCKET_NAME} retentionDays={retention_days}", flush=True) + except Exception as e: print(f"Error setting lifecycle rule: {e}") def generate_daemonset_config(throughput): @@ -502,7 +497,7 @@ def generate_daemonset_config(throughput): '$FLUENT_BIT_IMAGE': os.environ['FLUENT_BIT_IMAGE'], '$APP_IMAGE': os.environ['EKS_APP_IMAGE'], '$TIME': str(LOGGER_RUN_TIME_IN_SECOND), - '$CW_LOG_GROUP_NAME': os.environ['CW_LOG_GROUP_NAME'], + '$CW_LOG_GROUP_NAME': LOG_GROUP_NAME, } fin = open(f'./load_tests/daemonset/{OUTPUT_PLUGIN}.yaml', 'r') data = fin.read() @@ -525,7 +520,7 @@ def run_eks_tests(): for throughput in THROUGHPUT_LIST: input_record = calculate_total_input_number(throughput) response = client.describe_log_streams( - logGroupName=os.environ['CW_LOG_GROUP_NAME'], + logGroupName=LOG_GROUP_NAME, logStreamNamePrefix=f'{PREFIX}kube.var.log.containers.ds-cloudwatch-{throughput}', orderBy='LogStreamName' ) From d262ec293dbfcbeee0e4d0a9bbe8d4a8cf4531a5 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 20 Dec 2024 13:04:40 -0800 Subject: [PATCH 05/11] validate enhancements --- load_tests/load_test.py | 7 +++-- load_tests/validation/validate.go | 43 +++++++++++++------------------ 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/load_tests/load_test.py b/load_tests/load_test.py index e00712481..f6da5b266 100644 --- a/load_tests/load_test.py +++ b/load_tests/load_test.py @@ -329,9 +329,8 @@ def run_ecs_tests(): }) # Wait until all subprocesses for validation completed - print("Waiting for all validation processes to complete", flush=True) for p in processes: - print("Waiting for validator process to complete {}".format(' '.join(p["process"].args)), flush=True) + print("Waiting for validator process to complete cmd=[{}]".format(' '.join(p["process"].args)), flush=True) p["process"].wait() stdout, stderr = p["process"].communicate() print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stdout: {stdout}', flush=True) @@ -455,6 +454,7 @@ def publish_fluent_config_s3(input_logger): # We set retention/expiration policies so that tests do not interfere with each other, and so that # we can debug and run validation manually if necessary. def delete_testing_data(session): + print("Setting auto-delete policies for CW log groups and S3 buckets") retention_days = 4 logs_client = session.client('logs') @@ -539,6 +539,7 @@ def run_eks_tests(): p.wait() def delete_testing_resources(): + print("Deleting test resources") # Create sts session session = get_sts_boto_session() @@ -546,6 +547,7 @@ def delete_testing_resources(): # delete all S3 config files delete_testing_data(session) + print(f"Deleting cloudformation stack. stackName={TESTING_RESOURCES_STACK_NAME}", flush=True) # All related testing resources will be destroyed once the stack is deleted client = session.client('cloudformation') client.delete_stack( @@ -553,6 +555,7 @@ def delete_testing_resources(): ) # scale down eks cluster if PLATFORM == 'eks': + print("Scaling down EKS cluster", flush=True) os.system('kubectl delete namespace load-test-fluent-bit-eks-ns') os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes=0 ng') diff --git a/load_tests/validation/validate.go b/load_tests/validation/validate.go index ee6429f81..37f54a2f1 100644 --- a/load_tests/validation/validate.go +++ b/load_tests/validation/validate.go @@ -4,7 +4,7 @@ import ( "encoding/json" "flag" "fmt" - "io/ioutil" + "io" "os" "strconv" "strings" @@ -111,8 +111,6 @@ func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[str s3RecordCounter := 0 s3ObjectCounter := 0 - // Returns all the objects from a S3 bucket with the given prefix. - // This approach utilizes NextContinuationToken to pull all the objects from the S3 bucket. for { input = &s3.ListObjectsV2Input{ Bucket: aws.String(bucket), @@ -122,7 +120,7 @@ func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[str response, err := s3Client.ListObjectsV2(input) if err != nil { - exitErrorf("[TEST FAILURE] Error occured to get the objects from bucket: %q., %v", bucket, err) + exitErrorf("[TEST FAILURE] Error occurred to get the objects from bucket: %q., %v", bucket, err) } for _, content := range response.Contents { @@ -130,39 +128,34 @@ func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[str Bucket: aws.String(bucket), Key: content.Key, } - obj := getS3Object(s3Client, input) - s3ObjectCounter++ - - dataByte, err := ioutil.ReadAll(obj.Body) + obj, err := s3Client.GetObject(input) if err != nil { - exitErrorf("[TEST FAILURE] Error to parse GetObject response. %v", err) + exitErrorf("[TEST FAILURE] Error to get S3 object. %v", err) } + s3ObjectCounter++ - data := strings.Split(string(dataByte), "\n") - - for _, d := range data { - if d == "" { - continue - } - + // Directly unmarshal the JSON objects from the S3 object body + decoder := json.NewDecoder(obj.Body) + for { var message Message - - decodeError := json.Unmarshal([]byte(d), &message) - if decodeError != nil { - fmt.Println("[TEST ERROR] Malform log entry. Unmarshal Error:", decodeError) - fmt.Println(" Malform entry: %s", d) - // Skip malform log entries (count them as lost logs) + err := decoder.Decode(&message) + if err == io.EOF { + break + } + if err != nil { + fmt.Println("[TEST ERROR] Malform log entry. Unmarshal Error:", err) continue } - // First 8 char is the unique record ID recordId := message.Log[:8] - s3RecordCounter += 1 + s3RecordCounter++ if _, ok := inputMap[recordId]; ok { - // Setting true to indicate that this record was found in the destination inputMap[recordId] = true } } + + // Close the S3 object body + obj.Body.Close() } if !aws.BoolValue(response.IsTruncated) { From 462060e1bd9ea44ddc8dd3f60fb3a05eaed56e59 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 20 Dec 2024 13:26:48 -0800 Subject: [PATCH 06/11] wip --- load_tests/load_test.py | 5 ++++- load_tests/task_definitions/cloudwatch.json | 2 +- load_tests/task_definitions/firehose.json | 2 +- load_tests/task_definitions/kinesis.json | 2 +- load_tests/task_definitions/s3.json | 2 +- 5 files changed, 8 insertions(+), 5 deletions(-) diff --git a/load_tests/load_test.py b/load_tests/load_test.py index f6da5b266..6f6832ba1 100644 --- a/load_tests/load_test.py +++ b/load_tests/load_test.py @@ -333,8 +333,10 @@ def run_ecs_tests(): print("Waiting for validator process to complete cmd=[{}]".format(' '.join(p["process"].args)), flush=True) p["process"].wait() stdout, stderr = p["process"].communicate() + return_code = p["process"].returncode print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stdout: {stdout}', flush=True) print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator stderr: {stderr}', flush=True) + print(f'{input_logger["name"]} to {OUTPUT_PLUGIN} raw validator return code: {return_code}', flush=True) p["result"] = stdout print(f'Test {input_logger["name"]} to {OUTPUT_PLUGIN} complete.', flush=True) @@ -455,7 +457,7 @@ def publish_fluent_config_s3(input_logger): # we can debug and run validation manually if necessary. def delete_testing_data(session): print("Setting auto-delete policies for CW log groups and S3 buckets") - retention_days = 4 + retention_days = 5 logs_client = session.client('logs') try: @@ -478,6 +480,7 @@ def delete_testing_data(session): 'ID': "Delete after {} days".format(retention_days), 'Status': 'Enabled', 'Expiration': {'Days': retention_days}, + 'Prefix': '' } ] } diff --git a/load_tests/task_definitions/cloudwatch.json b/load_tests/task_definitions/cloudwatch.json index 0af0a25ae..3e08e14d3 100644 --- a/load_tests/task_definitions/cloudwatch.json +++ b/load_tests/task_definitions/cloudwatch.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 20480 + "memoryReservation": 10240 }, { "essential": true, diff --git a/load_tests/task_definitions/firehose.json b/load_tests/task_definitions/firehose.json index a7af6c6f7..a27b00a8a 100644 --- a/load_tests/task_definitions/firehose.json +++ b/load_tests/task_definitions/firehose.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 20480 + "memoryReservation": 10240 }, { "essential": true, diff --git a/load_tests/task_definitions/kinesis.json b/load_tests/task_definitions/kinesis.json index 114ff6b18..7513fc7db 100644 --- a/load_tests/task_definitions/kinesis.json +++ b/load_tests/task_definitions/kinesis.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 20480 + "memoryReservation": 10240 }, { "essential": true, diff --git a/load_tests/task_definitions/s3.json b/load_tests/task_definitions/s3.json index 6b65aed9d..162025285 100644 --- a/load_tests/task_definitions/s3.json +++ b/load_tests/task_definitions/s3.json @@ -37,7 +37,7 @@ } }, "cpu": 512, - "memoryReservation": 20480 + "memoryReservation": 10240 }, { "essential": true, From f2f992097ee1bc1082aa85f54b093d7664f41a1c Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Fri, 20 Dec 2024 23:49:51 -0800 Subject: [PATCH 07/11] uint --- load_tests/validation/validate.go | 67 ++++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/load_tests/validation/validate.go b/load_tests/validation/validate.go index 37f54a2f1..043bef3c9 100644 --- a/load_tests/validation/validate.go +++ b/load_tests/validation/validate.go @@ -20,6 +20,10 @@ const ( idCounterBase = 10000000 ) +var ( + inputMap *swiss.Map[uint32, struct{}] +) + type Message struct { Log string } @@ -177,7 +181,53 @@ func getS3Object(s3Client *s3.S3, input *s3.GetObjectInput) *s3.GetObjectOutput exitErrorf("[TEST FAILURE] Error occured to get s3 object: %v", err) } - return obj + _, err = downloader.Download(tempFile, input) + if err != nil { + os.Remove(tempFile.Name()) // Clean up the file if download fails + return "", fmt.Errorf("error downloading S3 object: %v", err) + } + + return tempFile.Name(), nil +} + +func processFile(file *os.File, filePath string) (int, error) { + var err error + file, err = os.Open(filePath) + if err != nil { + return 0, fmt.Errorf("error opening file: %v", err) + } + defer file.Close() + var message Message + var recordId string + + localCounter := 0 + // Directly unmarshal the JSON objects from the S3 object body + decoder := json.NewDecoder(file) + for { + err = decoder.Decode(&message) + if err == io.EOF { + break + } + if err != nil { + fmt.Println("[TEST ERROR] Malform log entry. Unmarshal Error:", err) + continue + } + + recordId = message.Log[:8] + value, err := strconv.ParseUint(recordId, 10, 32) + if err != nil { + fmt.Println("Error:", err) + continue + } + recordIdUint := uint32(value) + localCounter++ + inputMap.Put(recordIdUint, struct{}{}) + // if _, ok = inputMap[recordId]; ok { + // inputMap[recordId] = true + // } + } + + return localCounter, nil } // Creates a new CloudWatch Client @@ -238,11 +288,18 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin // First 8 char is the unique record ID recordId := log[:8] - cwRecoredCounter += 1 - if _, ok := inputMap[recordId]; ok { - // Setting true to indicate that this record was found in the destination - inputMap[recordId] = true + value, err := strconv.ParseUint(recordId, 10, 32) + if err != nil { + fmt.Println("Error:", err) + continue } + recordIdUint := uint32(value) + cwRecoredCounter += 1 + inputMap.Put(recordIdUint, struct{}{}) + // if _, ok := inputMap[recordId]; ok { + // // Setting true to indicate that this record was found in the destination + // inputMap[recordId] = true + // } } // Same NextForwardToken will be returned if we reach the end of the log stream From 8747cf47f52e4fc765f0e872f924cbc808ccd5c2 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Sat, 21 Dec 2024 00:01:24 -0800 Subject: [PATCH 08/11] undo swiss --- load_tests/validation/validate.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/load_tests/validation/validate.go b/load_tests/validation/validate.go index 043bef3c9..6ee1c8643 100644 --- a/load_tests/validation/validate.go +++ b/load_tests/validation/validate.go @@ -21,7 +21,7 @@ const ( ) var ( - inputMap *swiss.Map[uint32, struct{}] + inputMap map[uint32]struct{} ) type Message struct { @@ -65,11 +65,11 @@ func main() { } // Map for counting unique records in corresponding destination - inputMap := make(map[string]bool) - for i := 0; i < *inputRecord; i++ { - recordId := strconv.Itoa(idCounterBase + i) - inputMap[recordId] = false - } + inputMap = make(map[uint32]struct{}, *inputRecord) + // for i := 0; i < *inputRecord; i++ { + // recordId := strconv.Itoa(idCounterBase + i) + // inputMap[recordId] = false + // } totalRecordFound := 0 if *destination == "s3" { @@ -221,7 +221,7 @@ func processFile(file *os.File, filePath string) (int, error) { } recordIdUint := uint32(value) localCounter++ - inputMap.Put(recordIdUint, struct{}{}) + inputMap[recordIdUint] = struct{}{} // if _, ok = inputMap[recordId]; ok { // inputMap[recordId] = true // } @@ -295,7 +295,7 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin } recordIdUint := uint32(value) cwRecoredCounter += 1 - inputMap.Put(recordIdUint, struct{}{}) + inputMap[recordIdUint] = struct{}{} // if _, ok := inputMap[recordId]; ok { // // Setting true to indicate that this record was found in the destination // inputMap[recordId] = true @@ -313,8 +313,8 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin return cwRecoredCounter, inputMap } -func get_results(totalInputRecord int, totalRecordFound int, recordMap map[string]bool, logDelay string) { - uniqueRecordFound := 0 +func get_results(totalInputRecord int, totalRecordFound int, logDelay string) { + uniqueRecordFound := len(inputMap) // Count how many unique records were found in the destination for _, v := range recordMap { if v { From 0d4b1c4cf0e3bf5f10c952c4f40dc7aa0547b43c Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Sat, 21 Dec 2024 08:25:50 -0800 Subject: [PATCH 09/11] wip --- load_tests/validation/validate.go | 65 +++++++++---------------------- 1 file changed, 19 insertions(+), 46 deletions(-) diff --git a/load_tests/validation/validate.go b/load_tests/validation/validate.go index 6ee1c8643..742436f81 100644 --- a/load_tests/validation/validate.go +++ b/load_tests/validation/validate.go @@ -66,10 +66,6 @@ func main() { // Map for counting unique records in corresponding destination inputMap = make(map[uint32]struct{}, *inputRecord) - // for i := 0; i < *inputRecord; i++ { - // recordId := strconv.Itoa(idCounterBase + i) - // inputMap[recordId] = false - // } totalRecordFound := 0 if *destination == "s3" { @@ -78,18 +74,18 @@ func main() { exitErrorf("[TEST FAILURE] Unable to create new S3 client: %v", err) } - totalRecordFound, inputMap = validate_s3(s3Client, *bucket, *prefix, inputMap) + totalRecordFound = validate_s3(s3Client, *bucket, *prefix) } else if *destination == "cloudwatch" { cwClient, err := getCWClient(*region) if err != nil { exitErrorf("[TEST FAILURE] Unable to create new CloudWatch client: %v", err) } - totalRecordFound, inputMap = validate_cloudwatch(cwClient, *logGroup, *prefix, inputMap) + totalRecordFound = validate_cloudwatch(cwClient, *logGroup, *prefix) } // Get benchmark results based on log loss, log delay and log duplication - get_results(*inputRecord, totalRecordFound, inputMap, *logDelay) + get_results(*inputRecord, totalRecordFound, *logDelay) } // Creates a new S3 Client @@ -109,7 +105,7 @@ func getS3Client(region string) (*s3.S3, error) { // Log format generated by our producer: 8CharUniqueID_13CharTimestamp_RandomString (10029999_1639151827578_RandomString). // Both of the Kinesis Streams and Kinesis Firehose try to send each log maintaining the "at least once" policy. // To validate, we need to make sure all the log records from input file are stored at least once. -func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[string]bool) (int, map[string]bool) { +func validate_s3(s3Client *s3.S3, bucket string, prefix string) int { var continuationToken *string var input *s3.ListObjectsV2Input s3RecordCounter := 0 @@ -153,8 +149,14 @@ func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[str recordId := message.Log[:8] s3RecordCounter++ - if _, ok := inputMap[recordId]; ok { - inputMap[recordId] = true + value, err := strconv.ParseUint(recordId, 10, 32) + if err != nil { + fmt.Println("[TEST ERROR] Malform log entry. ParseUint Error:", err) + continue + } + recordIdUint := uint32(value) + if _, ok := inputMap[recordIdUint]; ok { + inputMap[recordIdUint] = struct{}{} } } @@ -170,24 +172,7 @@ func validate_s3(s3Client *s3.S3, bucket string, prefix string, inputMap map[str fmt.Println("total_s3_obj, ", s3ObjectCounter) - return s3RecordCounter, inputMap -} - -// Retrieves an object from a S3 bucket -func getS3Object(s3Client *s3.S3, input *s3.GetObjectInput) *s3.GetObjectOutput { - obj, err := s3Client.GetObject(input) - - if err != nil { - exitErrorf("[TEST FAILURE] Error occured to get s3 object: %v", err) - } - - _, err = downloader.Download(tempFile, input) - if err != nil { - os.Remove(tempFile.Name()) // Clean up the file if download fails - return "", fmt.Errorf("error downloading S3 object: %v", err) - } - - return tempFile.Name(), nil + return s3RecordCounter } func processFile(file *os.File, filePath string) (int, error) { @@ -216,15 +201,13 @@ func processFile(file *os.File, filePath string) (int, error) { recordId = message.Log[:8] value, err := strconv.ParseUint(recordId, 10, 32) if err != nil { - fmt.Println("Error:", err) + fmt.Println("[TEST ERROR] Malform log entry. ParseUint Error:", err) continue } recordIdUint := uint32(value) - localCounter++ - inputMap[recordIdUint] = struct{}{} - // if _, ok = inputMap[recordId]; ok { - // inputMap[recordId] = true - // } + if _, ok := inputMap[recordIdUint]; ok { + inputMap[recordIdUint] = struct{}{} + } } return localCounter, nil @@ -245,7 +228,7 @@ func getCWClient(region string) (*cloudwatchlogs.CloudWatchLogs, error) { // Validate logs in CloudWatch. // Similar logic as S3 validation. -func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup string, logStream string, inputMap map[string]bool) (int, map[string]bool) { +func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup string, logStream string) int { var forwardToken *string var input *cloudwatchlogs.GetLogEventsInput cwRecoredCounter := 0 @@ -296,10 +279,6 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin recordIdUint := uint32(value) cwRecoredCounter += 1 inputMap[recordIdUint] = struct{}{} - // if _, ok := inputMap[recordId]; ok { - // // Setting true to indicate that this record was found in the destination - // inputMap[recordId] = true - // } } // Same NextForwardToken will be returned if we reach the end of the log stream @@ -310,17 +289,11 @@ func validate_cloudwatch(cwClient *cloudwatchlogs.CloudWatchLogs, logGroup strin forwardToken = response.NextForwardToken } - return cwRecoredCounter, inputMap + return cwRecoredCounter } func get_results(totalInputRecord int, totalRecordFound int, logDelay string) { uniqueRecordFound := len(inputMap) - // Count how many unique records were found in the destination - for _, v := range recordMap { - if v { - uniqueRecordFound++ - } - } fmt.Println("total_input, ", totalInputRecord) fmt.Println("total_destination, ", totalRecordFound) From 9490b37d79916ded300ccc7080efcf30343a9d68 Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Sun, 22 Dec 2024 10:47:48 -0800 Subject: [PATCH 10/11] wip --- load_tests/validation/validate.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/load_tests/validation/validate.go b/load_tests/validation/validate.go index 742436f81..e06e48add 100644 --- a/load_tests/validation/validate.go +++ b/load_tests/validation/validate.go @@ -155,9 +155,7 @@ func validate_s3(s3Client *s3.S3, bucket string, prefix string) int { continue } recordIdUint := uint32(value) - if _, ok := inputMap[recordIdUint]; ok { - inputMap[recordIdUint] = struct{}{} - } + inputMap[recordIdUint] = struct{}{} } // Close the S3 object body @@ -205,9 +203,7 @@ func processFile(file *os.File, filePath string) (int, error) { continue } recordIdUint := uint32(value) - if _, ok := inputMap[recordIdUint]; ok { - inputMap[recordIdUint] = struct{}{} - } + inputMap[recordIdUint] = struct{}{} } return localCounter, nil From da94e3e219e718b32fdd3cc15c57d745e66063fb Mon Sep 17 00:00:00 2001 From: Cameron Sparr Date: Sun, 22 Dec 2024 11:34:29 -0800 Subject: [PATCH 11/11] completely delete s3 bucket --- load_tests/load_test.py | 32 ++++++++------------------------ 1 file changed, 8 insertions(+), 24 deletions(-) diff --git a/load_tests/load_test.py b/load_tests/load_test.py index 6f6832ba1..cb765fc25 100644 --- a/load_tests/load_test.py +++ b/load_tests/load_test.py @@ -456,7 +456,7 @@ def publish_fluent_config_s3(input_logger): # We set retention/expiration policies so that tests do not interfere with each other, and so that # we can debug and run validation manually if necessary. def delete_testing_data(session): - print("Setting auto-delete policies for CW log groups and S3 buckets") + print("Setting auto-delete policies for CW log groups, deleting S3 bucket") retention_days = 5 logs_client = session.client('logs') @@ -470,29 +470,13 @@ def delete_testing_data(session): except Exception as e: print(f"Error setting retention policy: {e}") - # Set retention period for s3 bucket - s3_client = session.client('s3') - - # Configure the lifecycle rule - lifecycle_configuration = { - 'Rules': [ - { - 'ID': "Delete after {} days".format(retention_days), - 'Status': 'Enabled', - 'Expiration': {'Days': retention_days}, - 'Prefix': '' - } - ] - } - try: - # Apply the lifecycle configuration to the bucket - response = s3_client.put_bucket_lifecycle_configuration( - Bucket=S3_BUCKET_NAME, - LifecycleConfiguration=lifecycle_configuration - ) - print(f"Lifecycle rule set successfully for S3 bucket. bucketName={S3_BUCKET_NAME} retentionDays={retention_days}", flush=True) - except Exception as e: - print(f"Error setting lifecycle rule: {e}") + # Empty s3 bucket + # lifecycle config cannot currently be used because the bucket name + # is reused between tests, so it must be completely deleted after each test. + s3 = session.resource('s3') + bucket = s3.Bucket(S3_BUCKET_NAME) + print(f"Deleting all objects in s3 bucket: {S3_BUCKET_NAME}") + bucket.objects.all().delete() def generate_daemonset_config(throughput): daemonset_config_dict = {