# Kinesis 이벤트 데이터 수집 데모 테스트

라이브러리 로드

In [2]:
# 이벤트 데이터를 JSON 문자열로 변환
import json
import time
import datetime
import random
import logging
import boto3
from pprint import pprint
from IPython.display import clear_output

헬퍼 함수 정의

In [3]:
# 자격증명풀 아이디
IDENTITY_POOL_ID = 'ap-northeast-2:161c835f-ec3b-488a-8816-121f6a219c2f'
# 스트림 이름
STREAM_NAME = 'kmu-quiz-stream'

In [4]:
def get_credential(identity_pool_id):
    """
    주어진 Identity Pool ID를 사용하여 AWS Cognito로부터 임시 자격 증명을 얻는 함수입니다.
    
    이 함수는 다음 단계를 수행합니다:
    1. Cognito Identity 클라이언트를 생성합니다.
    2. 주어진 Identity Pool ID로 Identity ID를 얻습니다.
    3. Identity ID를 사용하여 임시 자격 증명을 얻습니다.
    4. 얻은 자격 증명을 반환합니다.

    Args:
        identity_pool_id (str): Cognito Identity Pool ID

    Returns:
        dict: AWS 임시 자격 증명 (AccessKeyId, SecretKey, SessionToken 포함)
    """
    # Cognito Identity 클라이언트 생성
    cognito_identity = boto3.client('cognito-identity', region_name='ap-northeast-2')
    
    # 주어진 Identity Pool ID로 Identity ID 얻기
    response = cognito_identity.get_id(IdentityPoolId=identity_pool_id)
    identity_id = response['IdentityId']
    
    # Identity ID를 사용하여 임시 자격 증명 얻기
    credentials = cognito_identity.get_credentials_for_identity(IdentityId=identity_id)
    
    print("Cognito를 통해 Kinesis 자격 증명이 성공적으로 설정되었습니다.")
    
    # 자격 증명 반환
    return credentials['Credentials']

In [5]:
def get_kinesis_client(credential):
    """
    주어진 자격 증명을 사용하여 Kinesis 클라이언트를 생성하는 함수입니다.
    
    이 함수는 다음 단계를 수행합니다:
    1. 제공된 자격 증명을 사용하여 Kinesis 클라이언트를 생성합니다.
    2. 클라이언트 생성 성공 메시지를 출력합니다.
    3. 생성된 클라이언트를 반환합니다.

    Args:
        credential (dict): AWS 자격 증명 (AccessKeyId, SecretKey, SessionToken 포함)

    Returns:
        boto3.client: 생성된 Kinesis 클라이언트 객체
    """
    # Kinesis 클라이언트 생성
    kinesis_client = boto3.client(
        'kinesis',
        region_name='ap-northeast-2',
        aws_access_key_id=credential['AccessKeyId'],
        aws_secret_access_key=credential['SecretKey'],
        aws_session_token=credential['SessionToken']
    )
    # 클라이언트 생성 성공 메시지 출력
    print("Kinesis 클라이언트가 성공적으로 생성되었습니다.")
    # 생성된 클라이언트 반환
    return kinesis_client

In [6]:
def generate_user_log():
    """
    사용자 로그를 생성하는 함수입니다.
    
    이 함수는 랜덤하게 'click' 또는 'view' 이벤트를 생성합니다.
    'click' 이벤트의 경우 버튼, 이미지, 섹션 중 하나를 선택하고 번호를 추가합니다.
    'view' 이벤트의 경우 기본 URL에 페이지 경로를 추가합니다.
    
    Returns:
        dict: 생성된 사용자 로그 데이터
    """
    # 이벤트 타입 선택 (클릭 또는 뷰)
    event_types = ['click', 'view']
    event_type = random.choice(event_types)
    
    if event_type == 'click':
        # 클릭 이벤트의 경우 버튼, 이미지, 섹션 중 하나를 선택하고 번호 추가
        areas = ['button', 'image', 'section']
        event_area = random.choice(areas) + str(random.randint(1, 5))
        
    elif event_type == 'view':
        # 뷰 이벤트의 경우 기본 URL에 페이지 경로 추가
        base_url = 'https://example.com/'
        page_paths = ['home', 'product', 'cart', 'checkout', 'payment']
        event_area = base_url + random.choice(page_paths)
    
    # 사용자 로그 생성 및 반환
    return {
        'user_id': f'user_{random.randint(1, 1000)}',  # 랜덤 사용자 ID 생성
        'event_time': datetime.datetime.now().isoformat(),  # 현재 시간을 ISO 형식으로
        'event_type': event_type,  # 선택된 이벤트 타입
        'event_area': event_area,  # 이벤트가 발생한 영역
        'event_text': 'this is a test'  # 테스트용 텍스트
    }

In [7]:
def send_log_to_kinesis(client, stream_name, log, partition_key):
    """
    Kinesis 스트림에 로그를 전송하는 함수

    Args:
        client (boto3.client): Kinesis 클라이언트 객체
        stream_name (str): Kinesis 스트림 이름
        log (dict): 전송할 로그 데이터
        partition_key (str): 파티션 키

    Returns:
        dict: Kinesis put_record API 응답
    """
    response = client.put_record(
        StreamName=stream_name,
        Data=json.dumps(log),
        PartitionKey=partition_key
    )
    return response

In [8]:
credentials = get_credential(IDENTITY_POOL_ID)
kinesis_client = get_kinesis_client(credentials)

Cognito를 통해 Kinesis 자격 증명이 성공적으로 설정되었습니다.
Kinesis 클라이언트가 성공적으로 생성되었습니다.


In [12]:
# 초당 1건씩 이벤트 생성 및 전송
for i in range(60):
    log = generate_user_log()
    response = send_log_to_kinesis(kinesis_client,STREAM_NAME, log, partition_key='user_id')
    clear_output(wait=True)  # 주피터 노트북 출력 클리어
    print(f'{i+1}번째 로그 생성')
    pprint(log)
    pprint(response)
    time.sleep(1)  # 1초 대기

60번째 로그 생성
{'event_area': 'https://example.com/product',
 'event_text': 'this is a test',
 'event_time': '2024-09-21T11:15:03.729167',
 'event_type': 'view',
 'user_id': 'user_889'}
{'ResponseMetadata': {'HTTPHeaders': {'connection': 'keep-alive',
                                      'content-length': '110',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Sat, 21 Sep 2024 02:15:03 GMT',
                                      'x-amz-id-2': 'X/ewH+drGVDWlRM3gEaUMdrhauB+/PH1Y5Wl1W4/WCHbFSdO8oM/Aha6mvS7y5U3VOqNAh5uAjc9kuQZiuh+XRMm2Omci3Hf',
                                      'x-amzn-requestid': 'c0869140-3e71-724e-9fee-bf27c8c73296'},
                      'HTTPStatusCode': 200,
                      'RequestId': 'c0869140-3e71-724e-9fee-bf27c8c73296',
                      'RetryAttempts': 0},
 'SequenceNumber': '49656028860076324291158165099251217750606487796657946674',
 'ShardId': 'shardId-000000000003