In [None]:
from client import Api
from kafka import KafkaProducer
from json import dumps
from kafka.errors import KafkaError
import functools
import time
import os

BROKER_LIST = ["15.164.218.105:9091", "15.164.218.105:9092", "15.164.218.105:9093"]
TOPIC = "transction"

def retry_with_backoff(max_retries: int = 5,
                       initial_backoff: float = 1.0,
                       max_backoff: float = 30.0):
    """
    예외 발생 시 지수적 백오프를 사용하여 함수를 다시 시도하는 데코레이터입니다.
    """
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            backoff = initial_backoff
            while True:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    retries += 1
                    if retries > max_retries:
                        print(f"{func.__name__} failed after {retries} attempts: {e}")
                        raise
                    print(f"{func.__name__} error: {e}, retry {retries}/{max_retries} in {backoff}s"))
                    time.sleep(backoff)
                    backoff = min(backoff * 2, max_backoff)
        return wrapper
    return decorator
        
class KafkaDataProducerService:
    """
    Description:
        API Data to Kafka
    Params:
        - topic_name (str, Required) : Topic명
        - brokers (List[str], Required) : Broker 목록
        - retries (Optional[int]) : 재시도 횟수
    """
    def __init__(self, topic, brokers, retries = 3):
        self.topic = topic
        self.brokers = brokers
        self.retries = retries

        # Backoff settings
        self._max_retries = int(os.getenv("RETRY_MAX", 10))
        self._initial_backoff = float(os.getenv("RETRY_INITIAL_BACKOFF", 1.0))
        self._max_backoff = float(os.getenv("RETRY_MAX_BACKOFF", 30.0))
        
    def publish_prices(self):
        producer = KafkaProducer(
            acks=0,
            bootstrap_servers=self.brokers,
            retries = self.retries,
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )
        try:
            tickers = Api.get_tickers()
            if tickers is None:
                raise Exception("Ticker 조회가 안되었습니다.")
            
            for ticker in tickers:
                data = Api.get_price(ticker)
                producer.send(self.topic_name, data)
                print(data)

            producer.flush()
        
        except KafkaError as e:
            print(f"Kafka error occurred: {e}")
        finally:
            producer.close() # kafka-python는 __enter__() / __exit__() 메서드를 구현하고 있지 않기 때문에 with 문을 사용할 수 없습니다. 따라서 close 선언 필요


if __name__ == "__main__":
    generator = KafkaDataProducerService(TOPIC, BROKER_LIST)
    generator.publish_prices()

{'market': 'KRW-BTC', 'trade_date': '20250510', 'trade_time': '071825', 'trade_date_kst': '20250510', 'trade_time_kst': '161825', 'trade_timestamp': 1746893905126, 'opening_price': 144520000, 'high_price': 145302000, 'low_price': 143702000, 'trade_price': 145016000, 'prev_closing_price': 144520000, 'change': 'RISE', 'change_price': 496000, 'change_rate': 0.0034, 'signed_change_price': 496000, 'signed_change_rate': 0.0034, 'trade_volume': 0.00034394, 'acc_trade_price': 38642624474.42264, 'acc_trade_price_24h': 101456622342.28896, 'acc_trade_volume': 267.83911513, 'acc_trade_volume_24h': 702.04127158, 'highest_52_week_price': 163460000, 'highest_52_week_date': '2025-01-21', 'lowest_52_week_price': 71573000, 'lowest_52_week_date': '2024-08-06', 'timestamp': 1746861506928}
{'market': 'KRW-ETH', 'trade_date': '20250510', 'trade_time': '071822', 'trade_date_kst': '20250510', 'trade_time_kst': '161822', 'trade_timestamp': 1746893902666, 'opening_price': 3234000, 'high_price': 3328000, 'low_pr

In [56]:
class Human:
    def __init__(self):
        self._test = 1
    def fubc(self):
        print(self._test)

a = Human()

In [57]:
a.fubc()

1


In [55]:
import functools

def retry_with_backoff(max_retries: int = 5,
                       initial_backoff: float = 1.0,
                       max_backoff: float = 30.0):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            backoff = initial_backoff
            while True:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    return "에러 발생"
        return wrapper
    return decorator

@retry_with_backoff(5,1.0,3.0)
def add(*args):
    return max(args)

print(add(1,3,4))

4


In [40]:
def test_func(**kwargs):
    print(kwargs)

test_func(a=10,b=20)

{'a': 10, 'b': 20}
