In [2]:
import os
from dotenv import load_dotenv
from slack_sdk import WebClient

load_dotenv()

SLACK_BOT_TOKEN = os.getenv("SLACK_BOT_TOKEN")
CHANNEL_NAME = os.getenv("CHANNEL_NAME")

# 슬랙 봇 토큰
SLACK_BOT_TOKEN = SLACK_BOT_TOKEN

In [16]:
import python_bithumb as bit

# 환경 변수 가져오기
BITHUMB_API_KEY = os.getenv("BITHUMB_API_KEY")
BITHUMB_SECRET_KEY = os.getenv("BITHUMB_SECRET_KEY")

In [None]:
access_key = BITHUMB_API_KEY 
secret_key = BITHUMB_SECRET_KEY  

bithumb = bit.Bithumb(access_key, secret_key)

krw_balance = bithumb.get_balance("KRW")

send_message(
    f"""
        안녕하세요 오전 8시마다 보내드리는 알림입니다.
    ------------------------------------
    전체 자산 : {krw_balance}원

    전체 수익률 : 

    전날 대비 수익률 : 
    ------------------------------------        
    활기찬 연구생활 되시기를 바랍니다.
    """, CHANNEL_NAME
)

메시지가 전송되었습니다.


In [27]:
import os
import pandas as pd
import python_bithumb

def save_scalping_ohlcv_to_csv(
    ticker: str = "KRW-BTC",
    interval: str = "minute1",
    count: int = 100,
    filename: str = "scalping_data.csv"
):
    """
    스캘핑을 위해 짧은 봉(예: 1분봉)의 OHLCV 데이터를 조회하여 CSV로 저장하는 함수 예시.
    
    Parameters
    ----------
    ticker : str
        'KRW-BTC', 'KRW-ETH', 'KRW-XRP' 등 Bithumb 시장 티커
    interval : str
        'minute1', 'minute3', 'minute5', 'minute30' 등
        python_bithumb가 지원하는 분봉 단위
    count : int
        몇 개의 봉 데이터를 가져올지 설정 (스캘핑 시 100~200 정도 사용 가능)
    filename : str
        저장할 CSV 파일 경로(이름). 예: "btc_scalping_data.csv"
    
    Returns
    -------
    None
        지정한 파일로 CSV 저장 후 함수 종료
    """
    # 1) 데이터 가져오기
    df = python_bithumb.get_ohlcv(ticker, interval=interval, count=count)
    if df is None or df.empty:
        print(f"[WARNING] {ticker} {interval} 데이터가 비어 있습니다. 저장을 중단합니다.")
        return
    
    # 2) CSV 파일 저장할 폴더가 없으면 생성
    dir_name = os.path.dirname(filename)
    if dir_name and not os.path.exists(dir_name):
        os.makedirs(dir_name, exist_ok=True)
    
    # 3) CSV로 저장 (한글 깨짐 방지 위해 utf-8-sig 사용)
    #    index=True 또는 False 선택 가능. 여기서는 True로 저장해 날짜 인덱스 포함
    df.to_csv(filename, encoding='utf-8-sig', index=True)
    
    print(f"[INFO] '{ticker}'의 {interval} 데이터 {count}개를 '{filename}'에 저장했습니다.")


if __name__ == "__main__":
    # 1분봉 기준, 최근 100개 봉 데이터를 "btc_scalping_data.csv"에 저장
    save_scalping_ohlcv_to_csv(
        ticker="KRW-BTC",
        interval="minute1",
        count=100,
        filename="btc_scalping_data.csv"
    )

    # 필요에 따라 3분봉, 5분봉 등도 따로 저장할 수 있음
    save_scalping_ohlcv_to_csv(
        ticker="KRW-BTC",
        interval="minute3",
        count=100,
        filename="btc_scalping_data_3min.csv"
    )


[INFO] 'KRW-BTC'의 minute1 데이터 100개를 'btc_scalping_data.csv'에 저장했습니다.
[INFO] 'KRW-BTC'의 minute3 데이터 100개를 'btc_scalping_data_3min.csv'에 저장했습니다.


In [3]:
import os
import time
import re
import datetime
import pandas as pd
from selenium import webdriver
from selenium.webdriver.common.by import By

# Slack 클라이언트 초기화
client = WebClient(token=SLACK_BOT_TOKEN)  # 직접 토큰 입력 (환경 변수 대신)

In [None]:
def send_message(message: str, channel: str):
    try:
        client.chat_postMessage(channel=channel, text=message)
        print("메시지가 전송되었습니다.")
    except Exception as e:
        print(f"오류 발생: {e}")


def crawl_with_selenium() -> str:
    """
    Selenium으로 coinness.com/article에서 
    특정 CSS 셀렉터(#root > ... > .ArticleListContainer-sc-cj3rkv-0.fkxjqP)
    영역의 텍스트만 추출.
    """
    driver = webdriver.Chrome()
    driver.get("https://coinness.com/article")
    time.sleep(5)  # JS 로딩 대기
    
    css = "#root > div > div.Wrap-sc-v065lx-0.hwmGSB > div > main > div.ContentContainer-sc-91rcal-0.jJHYjq > div.ArticleListContainer-sc-cj3rkv-0.fkxjqP"
    try:
        container = driver.find_element(By.CSS_SELECTOR, css)
        return container.text
    except Exception as e:
        print(f"[ERROR] 요소 찾기 실패: {e}")
        return ""
    finally:
        driver.quit()

# 파싱싱
def parse_news_text(raw_text: str) -> pd.DataFrame:
    """
    1) 'HH:MM' 형식을 만나면 새 기사 시작
    2) 바로 다음 줄 = 날짜
    3) 그 다음 줄 = 제목
    4) 이후로 같은 패턴(시간)이 나오기 전까지 = 본문
    결과: DataFrame[time, date, title, content]
    """
    lines = [line.strip() for line in raw_text.splitlines() if line.strip()]
    time_pattern = re.compile(r'^\d{2}:\d{2}$')

    articles = []
    current_time = None
    current_date = None
    current_title = None
    content_lines = []

    def save_article():
        if current_time and current_date and current_title:
            articles.append({
                "time": current_time,
                "date": current_date,
                "title": current_title,
                "content": "\n".join(content_lines).strip()
            })

    state = "idle"
    for line in lines:
        # (1) 새로운 시간(기사 시작 신호) 감지
        if time_pattern.match(line):
            save_article()
            current_time = line
            current_date = None
            current_title = None
            content_lines = []
            state = "got_time"
            continue
        
        # (2) got_time → 날짜
        if state == "got_time":
            current_date = line
            state = "got_date"
            continue
        
        # (3) got_date → 제목
        if state == "got_date":
            current_title = line
            state = "got_title"
            continue
        
        # (4) 나머지 줄은 본문
        content_lines.append(line)
        state = "collecting_content"

    # 마지막 기사 저장
    save_article()

    df = pd.DataFrame(articles, columns=["time", "date", "title", "content"])
    return df

# CSV 저장장
def save_to_csv(new_df: pd.DataFrame,
                csv_path: str,
                channel_name = CHANNEL_NAME):
    """
    - CSV가 없으면 바로 저장.
    - 있으면 기존 데이터와 결합 후 중복 제거.
    - "새롭게 추가된 기사"만 Slack 메시지로 전송.
    """
    if new_df.empty:
        print("[INFO] 새로 추출된 기사가 없습니다.")
        return
    
    if os.path.exists(csv_path):
        # 기존 CSV 불러오기
        existing_df = pd.read_csv(csv_path)
        before_count = len(existing_df)

        # (1) 두 DF 결합
        merged_df = pd.concat([existing_df, new_df], ignore_index=True)
        merged_df.drop_duplicates(subset=["time", "date", "title", "content"], inplace=True)
        
        after_count = len(merged_df)
        added_count = after_count - before_count
        
        if added_count > 0:
            # (2) 최종 저장
            merged_df.to_csv(csv_path, index=False, encoding="utf-8-sig")
            print(f"[INFO] 새 기사 {added_count}건 추가 (총 {after_count}건).")
            
            # (3) "어떤 기사들이 새로 추가됐는지"만 뽑아서 메시지 보내기
            # 새롭게 추가된 기사 = merged_df.tail(added_count)
            new_articles = merged_df.tail(added_count)
            
            # # Slack으로 보낼 메시지 구성 (간단 예시)
            # message_str = ""
            # for _, row in new_articles.iterrows():
            #     message_str += f"\n[{row['time']}][{row['date']}]\n제목: {row['title']}\n{row['content']}\n---"
            
            # if message_str:
            send_message(new_articles, channel_name)
        else:
            print("[INFO] 새로운 기사 없음. CSV가 최신 상태입니다.")
    else:
        # CSV 파일이 없으면 새로 저장
        new_df.to_csv(csv_path, index=False, encoding="utf-8-sig")
        print(f"[INFO] 새 CSV 파일 생성 (총 {len(new_df)}건).")

# 자동 업데이트
def run_autoupdate(interval: int = 1800, channel_name: str = CHANNEL_NAME):
    """
    1) 30분 간격(기본 1800초)으로 크롤링 & 저장
    2) 매일 자정(날짜가 바뀌는 시점)마다 기존 CSV 초기화
    """
    news_folder = "NEWS"
    os.makedirs(news_folder, exist_ok=True)
    csv_path = os.path.join(news_folder, "my_news.csv")
    
    print(f"[INFO] 자동 업데이트 시작 (간격: {interval}초).")

    # 이전 루프에서 사용했던 'date' (yyyy-mm-dd)
    last_date = datetime.date.today()

    try:
        while True:
            now = datetime.datetime.now()
            today_date = now.date()
            
            # (A) 날짜가 바뀌었으면 CSV 초기화
            if today_date != last_date:
                # 자정이 지난 시점 -> 기존 CSV 삭제
                if os.path.exists(csv_path):
                    os.remove(csv_path)
                    print(f"[INFO] 날짜가 바뀌어 CSV({csv_path})를 초기화했습니다.")
                last_date = today_date

            print(f"\n[INFO] 뉴스 확인 중... ({now.strftime('%Y-%m-%d %H:%M:%S')})")
            
            # (B) 크롤링
            raw_text = crawl_with_selenium()
            # (C) 파싱 -> DataFrame
            new_df = parse_news_text(raw_text)
            # (D) CSV 저장 + 새 기사만 Slack 전송
            save_to_csv(new_df, csv_path, channel_name)

            print(f"[INFO] {interval}초 후 다시 업데이트를 진행합니다...")
            time.sleep(interval)

    except KeyboardInterrupt:
        print("[INFO] 자동 업데이트 종료 (KeyboardInterrupt).")

# 메인 실행
if __name__ == "__main__":
    run_autoupdate(600)


[INFO] 자동 업데이트 시작 (간격: 600초).

[INFO] 뉴스 확인 중... (2025-01-23 00:05:45)
[INFO] 새 CSV 파일 생성 (총 10건).
[INFO] 600초 후 다시 업데이트를 진행합니다...
