# 3. Rapid API 기반 트위터(X) 데이터 수집


### 3-1. 라이브러리 Import
트윗 수집에 필요한 라이브러리들을 가져옵니다.


In [9]:
# HTTP 요청을 위한 라이브러리
import requests

# JSON 데이터 처리를 위한 라이브러리
import json

# CSV 파일 저장을 위한 라이브러리
import csv

# 파일 시스템 작업을 위한 라이브러리
import os

# 지연 시간 처리를 위한 라이브러리
import time

# 타입 힌팅을 위한 라이브러리
from typing import List, Dict, Optional, Any


### 3-2. 설정값 정의
API 키, 사용자 ID, 수집할 트윗 수 등 크롤링에 필요한 설정값들을 정의합니다.


In [10]:
# RapidAPI에서 발급받은 API 키 (보안상 실제 키는 환경변수 사용 권장)
API_KEY = "5fac920861msh988e449f8d91b60p10459bjsnba691d3d2d81"

# 수집할 사용자의 [ @이름 / ID(Twitter 사용자 고유 번호) / 설명 ]
'''
@WhiteHouse / 1879644163769335808 / 백악관 = 도널드 트럼프 대통령
@SecScottBessent / 1889019333960998912 / 스콧베센트 재무장관
@JDVance / 1542228578 / 밴스 부통령
@marcorubio / 15745368 / 마르코 루비오 국무장관
@elonmusk / 44196397 / 일론 머스크 테슬라 CEO
@sundarpichai / 14130366 / 순다르 피차이 구글 CEO
@tim_cook / 1636590253 / 팀 쿡 애플 CEO
@CathieDWood / 2361631088 / ARK Invest CEO, 혁신 성장주 투자, 시장 트렌드 주도
@BillAckman / 880412538625810432 / 빌 액먼 펀드매니저, 행동주의 투자 성향
@RayDalio / 62603893 / 브리지워터 창립자, 거시경제 분석, 투자 전략가
@michaelbatnick / 93529573 / 투자 분석, 금융 인사이트 제공
@LizAnnSonders / 2961589380 / 찰스슈왑 수석 투자전략가, 시장 전망, 투자 전략
@Ajay_Bagga / 86437069 / 글로벌 매크로 전문가, 시장 전망, 투자 전략
'''
USER_ID = "86437069"

# 수집할 최대 트윗 수
MAX_TWEETS = 1000


### 3-3. RapidAPI Twitter 크롤러 클래스 정의
트윗 수집 기능을 담은 클래스를 정의합니다. 모든 메서드가 클래스 내부에 포함되어 있습니다.


In [11]:
class RapidAPITweetCrawler:

    # 크롤러 초기화 함수
    def __init__(self, api_key: str):
        if not api_key:
            raise ValueError("API 키가 제공되지 않았습니다.")
            
        # RapidAPI 인증 정보 설정
        self.api_key = api_key
        self.base_url = "https://twitter241.p.rapidapi.com/user-tweets"
        self.headers = {
            "x-rapidapi-key": self.api_key,
            "x-rapidapi-host": "twitter241.p.rapidapi.com"
        }
        
        # API 요청당 트윗 수 설정 (최대 200까지 가능)
        self.count_per_request = 200
        
        # cursor 중복 방지를 위한 캐시
        self.used_cursors = set()

    # API 응답 JSON에서 트윗 데이터를 파싱하는 함수
    def _parse_tweets_from_response(self, response_json: Dict[str, Any]) -> List[Dict[str, str]]:
        tweets_data = []
        
        try:
            # Twitter API의 timeline 구조에서 instructions 리스트를 찾습니다.
            instructions = response_json.get('result', {}).get('timeline', {}).get('instructions', [])
            
            # 'TimelineAddEntries' 타입의 instruction에서 트윗 entries를 찾습니다.
            timeline_entries = []
            for instruction in instructions:
                if instruction.get('type') == 'TimelineAddEntries':
                    timeline_entries = instruction.get('entries', [])
                    break
            
            if not timeline_entries:
                return []

            # 각 entry를 순회하며 트윗 데이터를 추출합니다.
            for entry in timeline_entries:
                # 'TimelineTweet' 타입의 콘텐츠만 처리
                item_content = entry.get('content', {}).get('itemContent', {})
                if item_content and item_content.get('itemType') == 'TimelineTweet':
                    # 트윗 결과 데이터를 가져옵니다.
                    tweet_results = item_content.get('tweet_results', {})
                    result = tweet_results.get('result', {})
                    
                    # legacy 필드에 실제 트윗 데이터가 있습니다.
                    legacy_data = result.get('legacy', {})
                    
                    if legacy_data:
                        # 트윗 생성 시간 추출
                        created_at = legacy_data.get('created_at', 'N/A')
                        full_text = ""
                        
                        # 리트윗(RT)인 경우 원본 트윗의 full_text를 가져옵니다.
                        if 'retweeted_status_result' in legacy_data:
                            # 원본 트윗의 legacy 데이터를 찾습니다.
                            original_tweet_legacy = legacy_data.get('retweeted_status_result', {}).get('result', {}).get('legacy', {})
                            full_text = original_tweet_legacy.get('full_text', '')
                        else:
                            # 일반 트윗은 기존 방식대로 full_text를 가져옵니다.
                            full_text = legacy_data.get('full_text', '')

                        # 줄바꿈 문자를 공백으로 변환하고 양 끝 공백 제거
                        full_text = full_text.replace('\n', ' ').strip()
                        
                        # 추출한 데이터를 리스트에 추가
                        tweets_data.append({
                            'created_at': created_at,
                            'full_text': full_text
                        })
        except (AttributeError, KeyError, IndexError):
            # 데이터 파싱 중 오류 발생 시 빈 리스트 반환
            pass
            
        return tweets_data

    # API 응답에서 다음 페이지를 위한 cursor 값을 찾는 함수
    def _find_next_cursor(self, response_json: Dict[str, Any]) -> Optional[str]:
        try:
            instructions = response_json.get('result', {}).get('timeline', {}).get('instructions', [])
            
            # 모든 instruction 타입에서 cursor 찾기
            all_cursors = []
            
            for instruction in instructions:
                # TimelineAddEntries에서 cursor 찾기
                if instruction.get('type') == 'TimelineAddEntries':
                    entries = instruction.get('entries', [])
                    for entry in entries:
                        content = entry.get('content', {})
                        if content.get('entryType') == 'TimelineTimelineCursor':
                            cursor_value = content.get('value')
                            cursor_type = content.get('cursorType', '')
                            
                            if cursor_value and cursor_value not in self.used_cursors:
                                all_cursors.append({
                                    'value': cursor_value,
                                    'type': cursor_type,
                                    'priority': 1 if cursor_type == 'Bottom' else 2
                                })
                
                # TimelineReplaceEntry에서도 cursor 찾기
                elif instruction.get('type') == 'TimelineReplaceEntry':
                    entry = instruction.get('entry', {})
                    content = entry.get('content', {})
                    if content.get('entryType') == 'TimelineTimelineCursor':
                        cursor_value = content.get('value')
                        cursor_type = content.get('cursorType', '')
                        
                        if cursor_value and cursor_value not in self.used_cursors:
                            all_cursors.append({
                                'value': cursor_value,
                                'type': cursor_type,
                                'priority': 1 if cursor_type == 'Bottom' else 2
                            })
            
            # cursor를 우선순위에 따라 정렬 (Bottom이 우선)
            if all_cursors:
                all_cursors.sort(key=lambda x: x['priority'])
                selected_cursor = all_cursors[0]['value']
                self.used_cursors.add(selected_cursor)
                return selected_cursor
                
        except (AttributeError, KeyError, IndexError):
            pass
            
        return None

    # 트윗 데이터 리스트와 파일 이름을 저장하는 함수    
    def _save_to_csv(self, tweets_list: List[Dict[str, str]], filename: str):

        try:
            with open(filename, 'w', newline='', encoding='utf-8-sig') as f:
                # 'utf-8-sig'는 Excel에서 한글이 깨지지 않도록 BOM을 추가합니다.
                writer = csv.DictWriter(f, fieldnames=['created_at', 'full_text'])
                writer.writeheader()
                writer.writerows(tweets_list)
            print(f"CSV 파일 저장 완료: {filename}")
        except IOError as e:
            print(f"파일 저장 중 오류 발생: {e}")

    # 특정 사용자의 트윗을 수집하여 CSV 파일로 저장하는 함수
    def fetch_user_tweets(self, user_id: str, max_tweets: int = 1000):
        all_tweets = []
        cursor = None
        request_count = 0
        max_requests = 100  # 무한 루프 방지
        consecutive_empty_responses = 0
        
        # cursor 캐시 초기화
        self.used_cursors.clear()
        
        print(f"사용자 ID {user_id}의 트윗 수집을 시작합니다...")
        
        while len(all_tweets) < max_tweets and request_count < max_requests:
            # count를 동적으로 조정 (남은 트윗 수에 따라)
            remaining_tweets = max_tweets - len(all_tweets)
            current_count = min(self.count_per_request, remaining_tweets)
            
            querystring = {
                "user": user_id,
                "count": str(current_count)
            }
            if cursor:
                querystring["cursor"] = cursor
            
            try:
                response = requests.get(self.base_url, headers=self.headers, params=querystring, timeout=45)
                request_count += 1
                
                if response.status_code == 429:  # Rate limit
                    print("API 요청 한도 초과, 60초 대기...")
                    time.sleep(60)
                    continue
                elif response.status_code != 200:
                    if response.status_code >= 500:  # 서버 에러인 경우 재시도
                        print(f"서버 오류 ({response.status_code}), 10초 후 재시도...")
                        time.sleep(10)
                        continue
                    else:
                        print(f"API 요청 실패: HTTP {response.status_code}")
                        break
                    
                data = response.json()
                
                # 트윗 데이터 파싱
                newly_fetched_tweets = self._parse_tweets_from_response(data)
                
                if not newly_fetched_tweets:
                    consecutive_empty_responses += 1
                    
                    if consecutive_empty_responses >= 3:
                        print("연속으로 빈 응답을 받아 수집을 종료합니다.")
                        break
                else:
                    consecutive_empty_responses = 0
                
                all_tweets.extend(newly_fetched_tweets)
                
                # 중복 제거 (created_at + full_text 기준)
                seen = set()
                unique_tweets = []
                for tweet in all_tweets:
                    tweet_key = (tweet['created_at'], tweet['full_text'])
                    if tweet_key not in seen:
                        seen.add(tweet_key)
                        unique_tweets.append(tweet)
                
                all_tweets = unique_tweets
                
                # 다음 cursor 찾기
                next_cursor = self._find_next_cursor(data)
                if not next_cursor or next_cursor == cursor:
                    print("더 이상 수집할 트윗이 없습니다.")
                    break
                
                cursor = next_cursor

                # API rate limit를 고려한 대기 시간 (요청 수에 따라 조정)
                if request_count % 10 == 0:  # 10번째마다 긴 대기
                    wait_time = 5
                else:
                    wait_time = 1
                    
                time.sleep(wait_time)

            except requests.exceptions.Timeout:
                print("요청 타임아웃, 5초 후 재시도...")
                time.sleep(5)
                continue
            except requests.exceptions.RequestException as e:
                print(f"요청 오류: {e}, 10초 후 재시도...")
                time.sleep(10)
                continue
            except json.JSONDecodeError:
                print("JSON 파싱 오류, 5초 후 재시도...")
                time.sleep(5)
                continue

        if all_tweets:
            # f"user_@Ajay_Bagga_tweets.csv"에서 @이름은 하드코딩으로 넣을것
            # 예시1: f"user_@Ajay_Bagga_tweets.csv"
            # 예시2: f"user_@elonmusk_tweets.csv"
            filename = f"user_@Ajay_Bagga_tweets.csv"
            self._save_to_csv(all_tweets, filename)
            print(f"총 {len(all_tweets)}개의 트윗을 수집했습니다.")
        else:
            print("수집된 트윗이 없습니다.")
            
        return all_tweets


### 3-4. 크롤러 인스턴스 생성 및 트윗 수집 실행
설정된 API 키로 크롤러를 생성하고 트윗 수집을 실행합니다.

In [12]:
# 크롤러 인스턴스 생성
crawler = RapidAPITweetCrawler(api_key=API_KEY)

# 트윗 수집 실행
collected_tweets = crawler.fetch_user_tweets(user_id=USER_ID, max_tweets=MAX_TWEETS)

# 수집 결과 요약 출력
if collected_tweets:
    print(f"총 {len(collected_tweets)}개의 트윗을 수집했습니다.")
    print(f"결과는 user_@Ajay_Bagga_tweets.csv 파일에 저장되었습니다.")
    
    # 처음 3개 트윗 미리보기
    print("\n=== 수집된 트윗 미리보기 (처음 3개) ===")
    for i, tweet in enumerate(collected_tweets[:3], 1):
        print(f"{i}. [{tweet['created_at']}] {tweet['full_text'][:100]}...")
else:
    print("수집된 트윗이 없습니다.")


사용자 ID 86437069의 트윗 수집을 시작합니다...
현재까지 수집된 트윗: 0 + 20
현재까지 수집된 트윗: 20 + 20
현재까지 수집된 트윗: 40 + 20
현재까지 수집된 트윗: 60 + 14
현재까지 수집된 트윗: 74 + 20
현재까지 수집된 트윗: 94 + 20
현재까지 수집된 트윗: 114 + 20
현재까지 수집된 트윗: 134 + 18
현재까지 수집된 트윗: 152 + 18
현재까지 수집된 트윗: 170 + 20
현재까지 수집된 트윗: 190 + 16
현재까지 수집된 트윗: 206 + 17
현재까지 수집된 트윗: 223 + 20
현재까지 수집된 트윗: 243 + 14
현재까지 수집된 트윗: 257 + 20
현재까지 수집된 트윗: 277 + 18
현재까지 수집된 트윗: 295 + 11
현재까지 수집된 트윗: 306 + 17
현재까지 수집된 트윗: 323 + 17
현재까지 수집된 트윗: 340 + 8
현재까지 수집된 트윗: 348 + 11
현재까지 수집된 트윗: 359 + 5
현재까지 수집된 트윗: 364 + 4
현재까지 수집된 트윗: 368 + 20
현재까지 수집된 트윗: 388 + 20
현재까지 수집된 트윗: 408 + 17
현재까지 수집된 트윗: 425 + 3
현재까지 수집된 트윗: 428 + 9
현재까지 수집된 트윗: 437 + 6
현재까지 수집된 트윗: 443 + 12
현재까지 수집된 트윗: 455 + 18
현재까지 수집된 트윗: 473 + 3
현재까지 수집된 트윗: 476 + 9
현재까지 수집된 트윗: 485 + 16
현재까지 수집된 트윗: 501 + 18
현재까지 수집된 트윗: 519 + 18
현재까지 수집된 트윗: 537 + 18
현재까지 수집된 트윗: 555 + 14
현재까지 수집된 트윗: 569 + 9
현재까지 수집된 트윗: 578 + 6
현재까지 수집된 트윗: 584 + 2
현재까지 수집된 트윗: 586 + 20
현재까지 수집된 트윗: 586 + 20
현재까지 수집된 트윗: 586 + 20
현재까지 수집된 트윗: 586 