# MS Teams 채팅 추출
- 기업 아이디 로그인의 경우 채팅 목록 내보내기 기능 사용이 불가능합니다.
- Azure Portal API 를 사용해서 해결하였습니다.

## 라이브러리 import

In [None]:
# 필요시 설치
# !pip install requests
# !pip install flask

In [None]:
import requests
import webbrowser
import os
import sys
import json
import csv
import threading
import re
from flask import Flask, request
from threading import Timer
from datetime import datetime, timedelta

## Azure key 입력

In [None]:
# 환경 변수를 저장할 딕셔너리
config = {}

# config.txt 파일 읽기
config_file_path = "config.txt"

with open(config_file_path, "r", encoding="utf-8") as f:
    for line in f:
        key, value = line.strip().split("=", 1)  # '=' 기준으로 key, value 분리, value 내부에서 '='가 포함될 것을 염려하여 maxsplit=1 로 지정
        config[key] = value.strip()  # 양쪽 공백 제거 후 저장

# 환경 변수에 설정
os.environ.update(config)

# 환경 변수 사용
TENANT_ID = os.getenv("TENANT_ID")
CLIENT_ID = os.getenv("CLIENT_ID")
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
REDIRECT_URI = os.getenv("REDIRECT_URI")

print("[DEBUG] 환경 변수 설정 완료")

## Access Token 발급
- token 발급 완료 후 flask 서버를 강제 종료
- 코드 셀이 무한히 돌아가는 문제는 threading 을 통해 flask를 백그라운드에 실행함으로써 해결
    - access_token 저장 후 자동 종료

In [None]:
# Flask 앱 생성
app = Flask(__name__)

# OAuth 2.0 인증 URL 생성
AUTH_URL = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/authorize?" \
           f"client_id={CLIENT_ID}&response_type=code&redirect_uri={REDIRECT_URI}" \
           f"&response_mode=query&scope=User.Read%20Chat.Read"

@app.route("/callback")
def callback():
    auth_code = request.args.get("code")
    if not auth_code:
        return "[DEBUG] Authorization Code 받기 실패"

    # Access Token 요청
    token_url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
    data = {
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "grant_type": "authorization_code",
        "code": auth_code,
        "redirect_uri": REDIRECT_URI,
        "scope": "User.Read Chat.Read"
    }
    
    response = requests.post(token_url, data=data)
    token_data = response.json()
    
    access_token = token_data.get("access_token")  

    if access_token:
        # Access Token을 파일에 저장
        with open("token.txt", "w") as f:
            f.write(access_token)

        # 인증 완료 메시지 표시 후 서버 종료
        Timer(1.0, shutdown_server).start()  
        return "[DEBUG] Access Token 발급 성공! 서버를 종료합니다."
    else:
        return f"[DEBUG] Access Token 발급 실패!<br>{token_data}"

def shutdown_server():
    print("[DEBUG] Flask 서버를 종료합니다.")
    func = request.environ.get('werkzeug.server.shutdown')
    if func is None:
        sys.exit()
    func()

# Flask를 백그라운드에서 실행
def run_flask():
    app.run(port=5000)

if __name__ == "__main__":
    print("[DEBUG] 브라우저에서 인증을 진행하세요:", AUTH_URL)
    webbrowser.open(AUTH_URL)  # 기본 브라우저에서 인증 페이지 열기

    # Flask를 별도의 쓰레드에서 실행하여 코드 셀이 계속 돌아가지 않도록 함
    flask_thread = threading.Thread(target=run_flask)
    flask_thread.start()


## 채팅방 목록 불러오기

In [None]:
# Access Token을 파일에서 불러오기
if not os.path.exists("token.txt"):
    raise FileNotFoundError("[DEBUG] Access Token 파일이 없습니다! 먼저 인증을 수행하세요.")

# Access Token 파일 불러오기
with open("token.txt", "r") as f:
    access_token = f.read().strip()

# get 요청을 위한 headers
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json"
}

CHATS_URL = "https://graph.microsoft.com/v1.0/me/chats" # 전체 채팅목록을 가져옴
response = requests.get(CHATS_URL, headers=headers)
chats_data = response.json()


# 체팅방 목록 가져오기
if "value" in chats_data:
    for chat in chats_data["value"]:
        chat_id = chat["id"]

        # 채팅 참여자 정보 가져오기
        MEMBERS_URL = f"https://graph.microsoft.com/v1.0/me/chats/{chat_id}/members"
        members_response = requests.get(MEMBERS_URL, headers=headers)
        members_data = members_response.json()

        participants = []
        if "value" in members_data:
            for member in members_data["value"]:
                if member.get("displayName"):  # 사용자 이름이 있는 경우만 추가
                    participants.append(member["displayName"])

        # 채팅방 정보 저장
        chat_info = {
            "채팅방 ID": chat_id,
            "참여자": participants
        }

        print(f"[DEBUG] 채팅방 ID: {chat_id}, 참여자: {', '.join(participants)}")
else:
    print("[DEBUG] 채팅방 목록 가져오기 실패:", chats_data)


## 특정 채팅방의 모든 채팅을 페이지를 돌면서 처음부터 끝까지 가져오기 

In [None]:
# 특정 채팅방 ID (위에서 얻은 값으로 변경)
CHAT_ID = "19:meeting_NjY2ZTk0OTktYTRjZi00ZTVkLWJlYTctYTA4NDU3ZWEyYzg5@thread.v2"

In [None]:
# 처음 요청할 URL
# top 인자는 한 번의 요청에 몇 개의 메시지를 가져올지 설정하기 위함 (최대 50)
MESSAGES_URL = f"https://graph.microsoft.com/v1.0/me/chats/{CHAT_ID}/messages?$top=50"

# 모든 메시지를 저장할 리스트
all_messages = []

while MESSAGES_URL:
    # API 요청
    response = requests.get(MESSAGES_URL, headers=headers)
    messages_data = response.json()

    # 응답에서 메시지 저장
    if "value" in messages_data:
        all_messages.extend(messages_data["value"])
    
    # 다음 페이지가 있으면 URL 변경, 없으면 종료
    MESSAGES_URL = messages_data.get("@odata.nextLink", None)
    
    if MESSAGES_URL:
        print(f"[DEBUG] 다음 페이지 요청: {MESSAGES_URL}")
    else:
        print("[DEBUG] 모든 메시지 가져오기 완료!")

# 중복 제거 전 메시지 수
print(f"[DEBUG] 중복 제거 전 총 메시지 개수: {len(all_messages)}")

# 메시지 ID 기준으로 중복 제거
unique_messages = {msg["id"]: msg for msg in all_messages}
all_messages = list(unique_messages.values())

# 중복 제거된 메시지 수와 중복된 메시지 수 계산
duplicates_count = len(all_messages) - len(unique_messages)
print(f"[DEBUG] 중복 제거 후 총 메시지 개수: {len(all_messages)}")
print(f"[DEBUG] 제거된 중복 메시지 수: {duplicates_count}")

# 결과 출력
print(f"[DEBUG] 최종 메시지 개수: {len(all_messages)}")

# 메시지 내용 출력 (최대 5개)
print("[DEBUG] 첫 메시지 5개를 표시합니다.")

for message in all_messages[:5]:  # 처음 5개만 표시
    print(f"[{message['createdDateTime']}] {message['from']['user']['displayName']}: {message['body']['content']}")


## 메시지id, 답글id, 보낸시간, 보낸사람, 채팅, 이모지만 저장

In [None]:
# 채팅 메시지 & 이모지 반응 정리 함수 (메시지 ID + 답글 대상 ID 포함)
def extract_chat_summary(messages):
    chat_summary = []

    for message in messages:
        # 메시지를 보낸 사용자가 없을 경우 대비 (None 체크 추가)
        if message.get("from") and message["from"].get("user"):
            user_name = message["from"]["user"].get("displayName", "알 수 없음")
        else:
            user_name = "알 수 없음"

        # 기본 메시지 정보
        message_id = message.get("id", "알 수 없음")
        message_time = message.get("createdDateTime", "시간 정보 없음")
        message_content = message.get("body", {}).get("content", "")

        # HTML 태그 제거 (특히 <p>, <attachment> 제거)
        message_content = re.sub(r"<p>|</p>", "", message_content).strip()
        message_content = re.sub(r"<div>|</div>", "", message_content).strip()
        message_content = re.sub(r'<attachment id=".*?"></attachment>', "", message_content).strip()

        # 답글 대상 메시지 ID 찾기 (attachments에서 가져오기)
        reply_to_id = None
        if "attachments" in message and isinstance(message["attachments"], list) and message["attachments"]:
            reply_to_id = message["attachments"][0].get("id", None)  # 첫 번째 attachment의 id를 사용

        # 메시지 기록 저장
        chat_entry = {
            "메시지 ID": message_id,
            "답글 대상 ID": reply_to_id,
            "시간": message_time,
            "보낸 사람": user_name,
            "메시지": message_content,
            "이모지 반응": []
        }

        # 이모지 반응 기록 정리
        for reaction in message.get("reactions", []):
            if reaction.get("user") and reaction["user"].get("user"):
                reaction_user = reaction["user"]["user"].get("displayName", "알 수 없음")
            else:
                reaction_user = "알 수 없음"

            reaction_type = reaction.get("reactionType", "알 수 없음")
            reaction_time = reaction.get("createdDateTime", "시간 정보 없음")

            chat_entry["이모지 반응"].append({
                "이모지": reaction_type,
                "반응한 사람": reaction_user,
                "반응 시간": reaction_time
            })

        chat_summary.append(chat_entry)

    return chat_summary

# 채팅 내용 정리 실행
chat_summary = extract_chat_summary(all_messages)

# 깔끔하게 JSON 형식으로 출력
print(json.dumps(chat_summary, ensure_ascii=False, indent=4))


## json, csv 파일을 현재 폴더에 저장

In [None]:
# 바탕화면 경로 가져오기
current_path = os.getcwd()
print("[DEBUG] 현재 경로 :",current_path)

### JSON 저장

In [None]:
json_file_path = os.path.join(current_path, "chat_data.json")

# JSON 파일로 저장하는 함수 (UTC → KST 변환 추가)
def save_to_json(data, file_path):
    converted_data = []  # 변환된 데이터를 저장할 리스트

    for chat in data:
        chat_copy = chat.copy()  # 원본 데이터 변형 방지

        # UTC → KST 변환
        raw_time = chat_copy["시간"].split(".")[0].replace("T", " ").replace("Z", "")  
        utc_time = datetime.strptime(raw_time, "%Y-%m-%d %H:%M:%S")  
        kst_time = utc_time + timedelta(hours=9)  # 9시간 추가

        chat_copy["시간"] = kst_time.strftime("%Y-%m-%d %H:%M:%S")  # KST 변환 적용

        converted_data.append(chat_copy)  # 변환된 데이터 리스트에 추가

    # JSON 파일로 저장
    with open(file_path, "w", encoding="utf-8") as f:
        json.dump(converted_data, f, ensure_ascii=False, indent=4)

    print(f"[DEBUG] JSON 파일로 저장 완료: {file_path}")

# JSON 파일 저장 실행
save_to_json(chat_summary, json_file_path)

### CSV 저장

In [None]:
def save_to_csv(data, file_path):
    with open(file_path, "w", encoding="utf-8-sig", newline="") as f:
        reversed_data = data[::-1]
        
        writer = csv.writer(f)

        # 헤더 작성
        writer.writerow(["메시지 ID", "답글 대상 ID", "시간 (KST)", "보낸 사람", "메시지", "이모지 반응"])

        # 데이터 작성
        for chat in reversed_data:
            # UTC 시간 → KST(UTC+9) 변환
            raw_time = chat["시간"].split(".")[0].replace("T", " ").replace("Z", "")  # 밀리초 및 'T' 제거
            utc_time = datetime.strptime(raw_time, "%Y-%m-%d %H:%M:%S")  # UTC 시간 변환
            kst_time = utc_time + timedelta(hours=9)  # 9시간 추가

            reaction_summary = ", ".join(
                [f"{reaction['이모지']} ({reaction['반응한 사람']})" for reaction in chat["이모지 반응"]]
            )
            writer.writerow([
                chat["메시지 ID"], 
                chat["답글 대상 ID"] if chat["답글 대상 ID"] and chat["답글 대상 ID"].isdigit() else None,
                kst_time.strftime("%Y-%m-%d %H:%M:%S"),  # KST 시간으로 저장
                chat["보낸 사람"], 
                chat["메시지"], 
                reaction_summary
            ])

    print(f"[DEBUG] CSV 파일로 저장 완료: {file_path}")

csv_file_path = os.path.join(current_path, "chat_data.csv")

# CSV 파일 저장 실행
save_to_csv(chat_summary, csv_file_path)