实验环境初始化

In [136]:
import os
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from boto3.session import Session
import botocore
from tqdm.notebook import tqdm
from ratelimiter import RateLimiter

# 准备密钥
aws_access_key_id = 'PG3XVLGJ0PI4EWF0SWP3'
aws_secret_access_key = 'QCyuKS2FIq40RpTiRushOh5NblkjmTGxt0hmDoav'

# 本地S3服务地址
local_s3 = 'http://master:8080'

# 建立会话
session = Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

# 连接到服务
s3 = session.resource('s3', endpoint_url=local_s3)

查看所有bucket

In [133]:
for bucket in s3.buckets.all():
    print('bucket name:%s' % bucket.name)

bucket name:loadgen


新建一个实验用 bucket (注意："bucket name" 中不能有下划线)

In [105]:
bucket_name = 'loadgen'
if s3.Bucket(bucket_name) not in s3.buckets.all():
    print(f'bucket {bucket_name} not found, creating new one...')
    s3.create_bucket(Bucket=bucket_name)
else:
    print(f'skip bucket {bucket_name}')

bucket loadgen not found, creating new one...


查看此 bucket 下的所有 object (若之前实验没有正常结束，则不为空)

In [135]:
bucket = s3.Bucket(bucket_name)
for obj in bucket.objects.all():
    print('obj name:%s' % obj.key)

准备负载，可以按照几种不同请求到达率 (Inter-Arrival Time, IAT) 设置。

In [121]:
## 初始化本地数据文件
local_file = "_test_4K.bin"
test_bytes = [0xFF for i in range(1024*4)] # 填充至所需大小
bytes_obj = io.BytesIO(bytearray(test_bytes))

with open(local_file, "wb") as lf:
    lf.write(bytearray(test_bytes))

def direct_request(action, res, i, localfile):
    start = time.time()
    action(res, i, localfile)
    end = time.time()
    system_time = end - start
    return system_time * 1000 # 换算为毫秒
    
def hedged_request(action, res, i, localfile):
    start = time.time()
    nThreads = 2
    with ThreadPoolExecutor(max_workers=nThreads) as executor: # 通过 max_workers 设置并发线程数
        futures = [executor.submit(arrival_rate_max, res, i, localfile + str(i)) for i in range(nThreads)]
        for future in as_completed(futures):
            # 只要第一个结果
            end = time.time()
            system_time = end - start
            return system_time * 1000 # 换算为毫秒

def tied_request(action, res, i, localfile):
    start = time.time()
    _95_percentile = 0.3 # 95%分位数时间，超过这个时间就丢弃这个请求，发送新的
    with ThreadPoolExecutor(max_workers=1) as executor: # 通过 max_workers 设置并发线程数
        future = executor.submit(arrival_rate_max, res, i, localfile)
        time.sleep(_95_percentile)
        while not future.done():
            future.cancel()
            future = executor.submit(arrival_rate_max)
            time.sleep(_95_percentile)
        end = time.time()
        system_time = end - start
        return system_time * 1000 # 换算为毫秒

    
# 发起请求和计算系统停留时间
def request_timing(s3res, i, localfile): # 使用独立 session.resource 以保证线程安全
    obj_name = "testObj%08d"%(i,) # 所建对象名
    # temp_file = '.tempfile'
    s3res.Object(bucket_name, obj_name).download_file(localfile)

# 按照请求到达率限制来执行和跟踪请求
def arrival_rate_max(s3res, i, *args, **kwargs): # 不进行限速
    return request_timing(s3res, i, *args, **kwargs)

@RateLimiter(0.1, 2) # 0.1s 内不超过 2 个请求，下同……
def arrival_rate_2(s3res, i, *args, **kwargs):
    return request_timing(s3res, i, *args, **kwargs)

@RateLimiter(0.1, 4)
def arrival_rate_4(s3res, i, *args, **kwargs):
    return request_timing(s3res, i, *args, **kwargs)

@RateLimiter(0.1, 8)
def arrival_rate_8(s3res, i, *args, **kwargs):
    return request_timing(s3res, i, *args, **kwargs)

@RateLimiter(0.1, 16)
def arrival_rate_16(s3res, i, *args, **kwargs):
    return request_timing(s3res, i, *args, **kwargs)

按照预设IAT发起请求

In [67]:
latency = []
failed_requests = []

with tqdm(desc="Accessing S3", total=100) as pbar:      # 进度条设置，合计执行 100 项上传任务 (见 submit 部分)，进度也设置为 100 步
    with ThreadPoolExecutor(max_workers=100) as executor: # 通过 max_workers 设置并发线程数
        futures = [
            executor.submit(
                direct_request,
                arrival_rate_max,
                session.resource('s3', endpoint_url=local_s3),
                i, bytes_obj) for i in range(100) # 为保证线程安全，应给每个任务申请一个新 resource
            ]
        for future in as_completed(futures):
            if future.exception():
                failed_requests.append(future)
            else:
                latency.append(future.result()) # 正确完成的请求，采集延迟
            pbar.update(1)

Accessing S3:   0%|          | 0/100 [00:00<?, ?it/s]

In [78]:
try:
    failed_requests[0].result()
except Exception as e:
    print(e)

list index out of range


清理实验环境

In [None]:
try:
    # 删除bucket下所有object
    bucket.objects.all().delete()

    # 删除bucket下某个object
    # bucket.objects.filter(Prefix=obj_name).delete()
except botocore.exceptions.ClientError as e:
    print(e)

删除本地测试文件

In [None]:
os.remove(local_file)

记录延迟到CSV文件

In [None]:
with open("latency.csv", "w+") as tracefile:
    tracefile.write("latency\n")
    tracefile.writelines([str(l) + '\n' for l in latency])

benchmark计划：
1. 明确要测的指标（请求延迟）及目的（测量不同访问特征下的请求延迟，分析访问特征对请求延迟的同质影响（类正态分布、尾延迟）和异质影响（分布的具体参数））
2. 明确影响待测指标的参数（文件大小、请求速率（每秒多少个请求）、访问策略）
3. 设定想测试的参数，运行自动化测试并收集数据

In [131]:
file_size_kb = [4, 256, 1024, 4096]
rate_limits = {
    'max': arrival_rate_max
    #'4by100ms': arrival_rate_4,
    #'16by100ms': arrival_rate_16
}
policies = {
    #'direct': direct_request,
    #'hedged': hedged_request,
    'tied': tied_request
}
client_count = [1, 16, 64]

def bench(fileBuffer=None, rate_limit=None, policy=None, n_clients=1, nRequests=128):
    assert(fileBuffer is not None)
    assert(rate_limit is not None)
    assert(policy is not None)
    
    _latency = []
    _failed = []

    with tqdm(desc="Accessing S3", total=nRequests) as pbar:      # 进度条设置，合计执行128项上传任务
        with ThreadPoolExecutor(max_workers=n_clients) as executor: # 通过 max_workers 设置并发线程数
            futures = [
                executor.submit(
                    policy,
                    rate_limit,
                    session.resource('s3', endpoint_url=local_s3),
                    i, fileBuffer) for i in range(nRequests) # 为保证线程安全，应给每个任务申请一个新 resource
                ]
            for future in as_completed(futures):
                if future.exception():
                    _failed.append(future)
                else:
                    _latency.append(future.result()) # 正确完成的请求，采集延迟
                pbar.update(1)
        
    return _latency, _failed

In [132]:
for file_size in file_size_kb:
    byte_file = '_test_{0}K.bin'.format(file_size)
    bytez = bytearray([0 for i in range(file_size * 1024)])
    with open(byte_file, 'wb') as f:
        f.write(bytez)
        
    print('preparing file size {0}K ...'.format(file_size))
    for i in range(128):
        obj_name = "testObj%08d"%(i,) # 所建对象名
        s3.Object(bucket_name, obj_name).upload_file(byte_file)
        
    for limit_name, limit in rate_limits.items():
        for policy_name, policy in policies.items():
            for n_client in client_count:

                print('config: {0}KB-{1}-{2}-{3}clients'.format(file_size, limit_name, policy_name, n_client))
                ltcy, failed = bench(byte_file, limit, policy, n_client)

                if len(failed) > 0:
                    try:
                        failed[0].result()
                    except Exception as e:
                        print(e)

                with open("download-latency-{0}KB-{1}-{2}-{3}clients.csv".format(file_size, limit_name, policy_name, n_client), "w") as tracefile:
                    tracefile.writelines([str(l) + '\n' for l in ltcy])
                    
    bucket.objects.filter().delete()

preparing file size 4K ...
config: 4KB-max-tied-1clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:50<00:00,  2.52it/s]


config: 4KB-max-tied-16clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:03<00:00, 39.03it/s]


config: 4KB-max-tied-64clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:01<00:00, 64.09it/s]


preparing file size 256K ...
config: 256KB-max-tied-1clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:39<00:00,  3.24it/s]


config: 256KB-max-tied-16clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:02<00:00, 44.86it/s]


config: 256KB-max-tied-64clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:02<00:00, 61.94it/s]


preparing file size 1024K ...
config: 1024KB-max-tied-1clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:39<00:00,  3.21it/s]


config: 1024KB-max-tied-16clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:03<00:00, 41.94it/s]


config: 1024KB-max-tied-64clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:02<00:00, 53.04it/s]


preparing file size 4096K ...
config: 4096KB-max-tied-1clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:39<00:00,  3.22it/s]


config: 4096KB-max-tied-16clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:03<00:00, 34.30it/s]


config: 4096KB-max-tied-64clients


Accessing S3: 100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 128/128 [00:03<00:00, 40.03it/s]
