In [1]:
from pathlib import Path


CONNECTION_STRING_PATH = Path('../appsettings.Production.json')
CONNECTION_STRING_NAME = 'Database.SQL'

USERS_TABLE_NAME = 'users'
EOULLIM_BOOTHS_TABLE_NAME = 'EoullimBooths'
EOULLIM_BALANCES_TABLE_NAME = 'EoullimBalances'
EOULLIM_TRANSACTIONS_TABLE_NAME = 'EoullimTransactions'
EOULLIM_PAYMENTS_TABLE_NAME = 'EoullimPayments'

BACKEND_URL = 'http://localhost:5213'

TEST_USER_COUNT = 100
TEST_BOOTH_COUNT = 10

In [4]:
import pymysql, json

# ConnectionString 로드
with CONNECTION_STRING_PATH.open(encoding='utf-8') as f:
    connection_keys = {kv[0].strip().lower(): kv[1] for pair in json.load(f)['ConnectionStrings'][CONNECTION_STRING_NAME].split(';') if pair and (kv := pair.split('=')) and len(kv) == 2}
    
class Connection:
    def __enter__(self):
        self.conn = pymysql.connect(
            host=connection_keys['server'],
            port=int(connection_keys['port']),
            user=connection_keys['uid'],
            password=connection_keys['pwd'],
            db=connection_keys['database'],
            charset=connection_keys['charset'],
            cursorclass=pymysql.cursors.DictCursor
        )

        return self.conn.cursor()
    
    def __exit__(self, type, value, traceback):
        self.conn.close()

def connection():
    return Connection()

def execute(sql, params=None):
    c = connection()
    with c as cursor:
        cursor.execute(sql, params)
        c.conn.commit()

def fetch_scalar(sql, params=None):
    with connection() as cursor:
        cursor.execute(sql, params)
        item = cursor.fetchone()
        return item and item[list(item.keys())[0]]
    
def fetch_all(sql, params=None):
    with connection() as cursor:
        cursor.execute(sql, params)
        return cursor.fetchall()

In [24]:
def yn(message):
    while True:
        answer = input(message + ' (y/n): ').lower()
        if answer in ('y', 'n'):
            return answer == 'y'
        
def warn_exists(table_name, name):
    if fetch_scalar(f"SELECT COUNT(*) FROM `{table_name}`"):
        print(f'경고: 데이터베이스에 {name}이가 존재합니다. 테스트 진행에 문제가 발생 할 수 있습니다.')

def exec_or_clear(table_name, name):
    if fetch_scalar(f"SELECT COUNT(*) FROM `{table_name}`"):
        print(f'오류: 데이터베이스에 {name}이가 존재하여 테스트 진행이 불가능합니다.')

        if not yn(f'{name} 테이블을 초기화 하시겠습니까?'):
            exit()

        execute(f'DELETE FROM `{table_name}`')


# 환경 확인
def check_environment():
    print(f'Server Version: {fetch_scalar("SELECT @@version;")}')
    print()

    # 유저 수 확인
    warn_exists(USERS_TABLE_NAME, '사용자')
    # 부스 수 확인
    warn_exists(EOULLIM_BOOTHS_TABLE_NAME, '부스')

    # 결제 존재 확인
    exec_or_clear(EOULLIM_PAYMENTS_TABLE_NAME, '결제이력')
    # 거래 존재 확인
    exec_or_clear(EOULLIM_TRANSACTIONS_TABLE_NAME, '거래이력')
    # 잔고 존재 확인
    exec_or_clear(EOULLIM_BALANCES_TABLE_NAME, '잔고')

    print('환경 확인이 완료되었습니다.')

check_environment()

Server Version: 10.6.14-MariaDB

경고: 데이터베이스에 사용자이가 존재합니다. 테스트 진행에 문제가 발생 할 수 있습니다.
경고: 데이터베이스에 부스이가 존재합니다. 테스트 진행에 문제가 발생 할 수 있습니다.
환경 확인이 완료되었습니다.


In [5]:
import string, random


def grs(num_strings, string_length):
    random_strings = set()
    
    while len(random_strings) < num_strings:
        random_string = ''.join(random.choice(string.ascii_letters) for _ in range(string_length))
        random_strings.add(random_string)
    
    return list(random_strings)


def create_user(id, name, phone):
    execute(f"INSERT IGNORE INTO `{USERS_TABLE_NAME}` (`id`, `name`, `phone`) VALUES (%s, %s, %s)", (id, name, phone))

def create_booth(id, token, class_, name, location):
    execute(f"INSERT IGNORE INTO `{EOULLIM_BOOTHS_TABLE_NAME}` (`id`, `token`, `class`, `name`, `location`) VALUES (%s, %s, %s, %s, %s)", (id, token, class_, name, location))

def create_booth_balance(id, booth_id):
    execute(f"INSERT IGNORE INTO `{EOULLIM_BALANCES_TABLE_NAME}` (`id`, `boothId`, `type`) VALUES (%s, %s, %s)", (id, booth_id, 'booth'))

user_ids = list(range(100000, 100000 + TEST_USER_COUNT))
booth_ids = list(range(200000, 200000 + TEST_BOOTH_COUNT))
booth_balance_ids = list(range(300000, 300000 + TEST_BOOTH_COUNT))

for uid, name, phone in zip(
    user_ids,
    grs(TEST_USER_COUNT, 5),
    random.sample(range(0, 99999999), TEST_USER_COUNT)
):
    phone = f'010{phone:08}'
    create_user(uid, name, phone)

print(f'{TEST_USER_COUNT}명의 사용자가 생성되었습니다.')

for bid, token, class_, name, location in zip(
    booth_ids,
    grs(TEST_BOOTH_COUNT, 16),
    grs(TEST_BOOTH_COUNT, 5),
    grs(TEST_BOOTH_COUNT, 5),
    grs(TEST_BOOTH_COUNT, 5)
):
    create_booth(bid, token, class_, name, location)

print(f'{TEST_BOOTH_COUNT}개의 부스가 생성되었습니다.')

for bid, bbid in zip(
    booth_ids,
    booth_balance_ids,
):
    create_booth_balance(bbid, bid)

print(f'{TEST_BOOTH_COUNT}개의 부스 잔고가 생성되었습니다.')

100명의 사용자가 생성되었습니다.
10개의 부스가 생성되었습니다.
10개의 부스 잔고가 생성되었습니다.


In [6]:
import requests
from concurrent.futures import ThreadPoolExecutor

def amount_integrity():
    exchange = fetch_scalar(f'SELECT SUM(amount) FROM `{EOULLIM_TRANSACTIONS_TABLE_NAME}` WHERE senderId IS NULL')
    balance = fetch_scalar(f'SELECT SUM(amount) FROM `{EOULLIM_BALANCES_TABLE_NAME}`')

    print(f'환전금액: {exchange}, 잔고총액: {balance}')
    print(f'환전금액과 잔고총액이 일치합니다.') if exchange == balance else print(f'환전금액과 잔고총액이 일치하지 않습니다!!!')
    return exchange == balance, exchange, balance

def exchange(user_id, amount, message=None):
    res = requests.post(
        f'{BACKEND_URL}/eoullim/exchange/transfer',
        json={
            'userId': user_id,
            'amount': amount,
            'message': message
        },
        headers={
            'authorization': 'Bearer 3'
        }
    )
    res.raise_for_status()
    return res.json()

def payment(user_id, booth_id, amount):
    res = requests.post(
        f'{BACKEND_URL}/eoullim/balance/payment',
        json={
            "boothId": booth_id,
            "amount": amount
        },
        headers={
            'authorization': f'Bearer {user_id}'
        }
    )
    res.raise_for_status()
    return res.json()


In [86]:
# 다중 환전 테스트
with ThreadPoolExecutor(64) as pool:
    for uid in user_ids:
        pool.submit(exchange, uid, 100000)

In [5]:
# 집중 결제 테스트
target_booth_id = booth_ids[0]

with ThreadPoolExecutor(64) as pool:
    for uid in user_ids:
        pool.submit(payment, uid, target_booth_id, 1000)


In [8]:
# 분산 결제 테스트

with ThreadPoolExecutor(64) as pool:
    print(f'예상 결제 횟수: {TEST_USER_COUNT * TEST_BOOTH_COUNT}')

    for bid in booth_ids:
        for uid in user_ids:
            pool.submit(payment, uid, bid, 1000)

예상 결제 횟수: 1000


In [7]:
amount_integrity()

환전금액: 10000300, 잔고총액: 10000300
환전금액과 잔고총액이 일치합니다.


(True, Decimal('10000300'), Decimal('10000300'))

In [30]:
# 결제 취소
# with ThreadPoolExecutor(64) as pool:
for id in fetch_all(f'SELECT id FROM `{EOULLIM_PAYMENTS_TABLE_NAME}` WHERE status = "paid"'):
    id = id['id']
    with connection() as cursor:
        cursor.execute('SET @a = NULL, @b = NULL, @c = NULL, @d = NULL, @e = NULL, @f = NULL, @g = NULL, @h = NULL, @i = NULL, @j = NULL')
        cursor.execute('CALL EoullimPaymentCancel(%s, %s, @a, @b, @c, @d, @e, @f, @g, @h, @i, @j);', (id, None))