In [62]:
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_sdk.http_retry import RateLimitErrorRetryHandler, ConnectionErrorRetryHandler
from http.client import IncompleteRead
import httpx
import urllib3
import datetime
import time
import os
from dotenv import load_dotenv, find_dotenv
from zoneinfo import ZoneInfo
from random import random

load_dotenv(find_dotenv(".env", usecwd=True), override=True)

JST = ZoneInfo("Asia/Tokyo")

def backoff_sleep(base=1.5, factor=1.8, attempt=0, max_sleep=30):
    sleep = min(max_sleep, base * (factor ** attempt)) * (0.5 + random())
    time.sleep(sleep)

class SlackMessageFetcher:
    def __init__(self, token, channels, start_date, end_date):
        retry_handlers = [
            RateLimitErrorRetryHandler(max_retry_count=3),
            ConnectionErrorRetryHandler(max_retry_count=3),
        ]
        self.client = WebClient(token=token, retry_handlers=retry_handlers, timeout=30)
        self.channels = channels
        # tsの丸め落ちを避けるため、floatのまま文字列化して渡す
        self.start_ts_str = f"{start_date.timestamp():.3f}"
        self.end_ts_str = f"{end_date.timestamp():.3f}"
        self.user_id = self._get_user_id()

    def _get_user_id(self):
        try:
            response = self.client.auth_test()
            return response["user_id"]
        except SlackApiError as e:
            print(f"Slack API Error: {e.response.get('error')}")
            return None

    def fetch_my_messages(self):
        messages = []
        for channel in self.channels:
            cursor = None
            page = 0
            while True:
                # 手動の追加リトライ（SDKのRetryHandlerに加えて、IncompleteRead等を補完）
                attempt = 0
                while True:
                    try:
                        response = self.client.conversations_history(
                            channel=str(channel),
                            oldest=self.start_ts_str,
                            latest=self.end_ts_str,
                            limit=200,               # Slack公式推奨値
                            cursor=cursor,
                            inclusive=True
                        )
                        break
                    except (IncompleteRead, httpx.ReadTimeout, httpx.RemoteProtocolError, urllib3.exceptions.ProtocolError) as e:
                        if attempt >= 3:
                            print(f"[{channel}] 再試行上限。historyスキップ: {e}")
                            return messages
                        attempt += 1
                        print(f"[{channel}] 断片読込/接続エラー。{attempt}回目のリトライ…")
                        backoff_sleep(attempt=attempt)
                    except SlackApiError as e:
                        print(f"[{channel}] Slack Api Error: {e.response.get('error')}")
                        return messages

                for msg in response.get("messages", []):
                    # 親スレ判定
                    is_parent = ("thread_ts" in msg and msg["ts"] == msg["thread_ts"])

                    # 自分の発言だけ抽出
                    if msg.get("user") == self.user_id:
                        messages.append(self._format_message(msg, channel, is_parent))

                    # スレッドがあれば自分の返信を取得
                    if is_parent:
                        messages.extend(self._fetch_thread_replies(channel, msg))

                cursor = response.get("response_metadata", {}).get("next_cursor")
                page += 1
                if not cursor:
                    break
                time.sleep(1.2)

        return messages

    def _fetch_thread_replies(self, channel, parent_msg):
        replies_list = []
        cursor = None
        page = 0
        while True:
            attempt = 0
            while True:
                try:
                    replies = self.client.conversations_replies(
                        channel=channel,
                        ts=parent_msg["thread_ts"],
                        limit=200,
                        cursor=cursor,
                        inclusive=True
                    )
                    break
                except (IncompleteRead, httpx.ReadTimeout, httpx.RemoteProtocolError, urllib3.exceptions.ProtocolError) as e:
                    if attempt >= 3:
                        print(f"[{channel}] thread再試行上限。スレッドスキップ: {e}")
                        return replies_list
                    attempt += 1
                    print(f"[{channel}] thread 断片読込/接続エラー。{attempt}回目のリトライ…")
                    backoff_sleep(attempt=attempt)
                except SlackApiError as e:
                    print(f"[{channel}] thread SlackApiError: {e.response.get('error')}")
                    return replies_list

            for reply in replies.get("messages", []):
                if reply["ts"] == parent_msg["ts"]:
                    continue
                if reply.get("user") == self.user_id:
                    replies_list.append(self._format_message(reply, channel, is_thread_parent=False))

            cursor = replies.get("response_metadata", {}).get("next_cursor")
            page += 1
            if not cursor:
                break
            time.sleep(1.0)
        return replies_list

    @staticmethod
    def _format_message(msg, channel, is_thread_parent):
        ts = float(msg["ts"])
        # JST に統一（UTC運用したい場合は ZoneInfo("UTC") に差し替え）
        dt = datetime.datetime.fromtimestamp(ts, JST)
        return {
            "channel": channel,
            "text": msg.get("text", ""),
            "timestamp": dt.strftime('%Y-%m-%d %H:%M:%S %Z'),
            "thread_ts": msg.get("thread_ts", None),
            "is_thread_parent": is_thread_parent,
        }



In [None]:
# --- 設定 ---
SLACK_TOKEN = os.getenv("SLACK_TOKEN")
if not SLACK_TOKEN:
    raise RuntimeError("SLACK_TOKEN が未設定です。")

CHANNELS = [
    "C", # channel id
]

START_DATE = datetime.datetime(2025, 9, 1, 0, 0, 0, tzinfo=JST)
END_DATE   = datetime.datetime(2025, 9, 7, 23, 59, 59, tzinfo=JST)

In [82]:
import pandas as pd

# 自分のメッセージを取得（スレッド含む）
fetcher = SlackMessageFetcher(token=SLACK_TOKEN, channels=CHANNELS, start_date=START_DATE, end_date=END_DATE)

my_msgs = fetcher.fetch_my_messages()
print(f"取得件数: {len(my_msgs)}")
df = pd.DataFrame(my_msgs)

# CSV保存
df.to_csv("my_slack_logs.csv", index=False)

取得件数: 3
