<a href="https://colab.research.google.com/github/Alejog20/Analisis_trafico_local/blob/main/reddit_sentiment_analysis_on_us_tariffs.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import requests
import pandas as pd
import datetime
import time
import re
import csv
import json
import base64
from urllib.parse import quote

def get_reddit_token(client_id, client_secret):
    """
    Obtiene un token de autenticación de Reddit usando OAuth

    Args:
        client_id (str): ID de cliente de la API de Reddit
        client_secret (str): Secret de cliente de la API de Reddit

    Returns:
        str: Token de acceso o None si falla
    """
    auth = base64.b64encode(f"{client_id}:{client_secret}".encode()).decode()
    headers = {
        "Authorization": f"Basic {auth}",
        "User-Agent": "ArancelesAnalysis/1.0"
    }
    data = {
        "grant_type": "client_credentials"
    }

    try:
        response = requests.post(
            "https://www.reddit.com/api/v1/access_token",
            headers=headers,
            data=data
        )

        if response.status_code == 200:
            return response.json().get("access_token")
        else:
            print(f"❌ Error obteniendo token: {response.status_code}")
            print(f"Respuesta: {response.text}")
            return None
    except Exception as e:
        print(f"❌ Error en la solicitud de token: {e}")
        return None

def search_reddit(token, query, subreddit, limit=25, sort="relevance"):
    """
    Busca posts en Reddit usando la API REST

    Args:
        token (str): Token de acceso OAuth
        query (str): Término de búsqueda
        subreddit (str): Subreddit o 'all' para todos
        limit (int): Número máximo de resultados
        sort (str): Método de ordenación (relevance, hot, new, top)

    Returns:
        list: Lista de posts encontrados
    """
    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "ArancelesAnalysis/1.0"
    }

    encoded_query = quote(query)

    if subreddit.lower() == 'all':
        url = f"https://oauth.reddit.com/search?q={encoded_query}&sort={sort}&limit={limit}"
    else:
        url = f"https://oauth.reddit.com/r/{subreddit}/search?q={encoded_query}&sort={sort}&restrict_sr=1&limit={limit}"

    try:
        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            data = response.json()
            return data.get("data", {}).get("children", [])
        else:
            print(f"❌ Error en la búsqueda: {response.status_code}")
            print(f"Respuesta: {response.text}")
            return []
    except Exception as e:
        print(f"❌ Error en la solicitud de búsqueda: {e}")
        return []

def get_comments(token, post_id, limit=25):
    """
    Obtiene los comentarios de un post específico

    Args:
        token (str): Token de acceso OAuth
        post_id (str): ID del post
        limit (int): Número máximo de comentarios

    Returns:
        list: Lista de comentarios
    """
    headers = {
        "Authorization": f"Bearer {token}",
        "User-Agent": "ArancelesAnalysis/1.0"
    }

    url = f"https://oauth.reddit.com/comments/{post_id}?limit={limit}"

    try:
        response = requests.get(url, headers=headers)

        if response.status_code == 200:
            data = response.json()
            if len(data) >= 2:  # El segundo elemento contiene los comentarios
                return data[1].get("data", {}).get("children", [])
            return []
        else:
            print(f"❌ Error obteniendo comentarios: {response.status_code}")
            return []
    except Exception as e:
        print(f"❌ Error en la solicitud de comentarios: {e}")
        return []

def extract_reddit_data(client_id, client_secret, search_terms, subreddits, post_limit=25, comment_limit=25):
    """
    Extrae datos de Reddit basados en términos de búsqueda y subreddits
    usando requests en lugar de PRAW

    Args:
        client_id (str): ID de cliente de la API de Reddit
        client_secret (str): Secret de cliente de la API de Reddit
        search_terms (list): Lista de términos a buscar
        subreddits (str): Subreddits donde buscar, separados por +
        post_limit (int): Número máximo de posts a extraer por término
        comment_limit (int): Número máximo de comentarios por post

    Returns:
        tuple: DataFrames de posts y comentarios
    """
    # Obtener token de autenticación
    token = get_reddit_token(client_id, client_secret)
    if not token:
        print("❌ No se pudo obtener el token de autenticación. Verifica tus credenciales.")
        return pd.DataFrame(), pd.DataFrame()

    print("✅ Autenticación exitosa en Reddit API")

    all_posts = []
    all_comments = []
    comment_id = 0
    subreddit_list = subreddits.split('+')

    # Procesa cada término de búsqueda
    for term in search_terms:
        for subreddit in subreddit_list:
            print(f"🔍 Buscando '{term}' en r/{subreddit}")

            posts = search_reddit(token, term, subreddit, limit=post_limit)

            for post_data in posts:
                post = post_data.get("data", {})

                # Extrae información del post
                post_id = post.get("id")

                # Evita duplicados
                if any(p.get("post_id") == post_id for p in all_posts):
                    continue

                # Extrae datos del post
                post_info = {
                    "post_id": post_id,
                    "title": post.get("title", ""),
                    "text": post.get("selftext", ""),
                    "score": post.get("score", 0),
                    "upvote_ratio": post.get("upvote_ratio", 0),
                    "created_utc": datetime.datetime.fromtimestamp(post.get("created_utc", 0)),
                    "num_comments": post.get("num_comments", 0),
                    "permalink": f"https://www.reddit.com{post.get('permalink', '')}",
                    "subreddit": post.get("subreddit", ""),
                    "author": post.get("author", "[deleted]"),
                    "search_term": term
                }

                all_posts.append(post_info)

                # Obtiene comentarios
                if post.get("num_comments", 0) > 0:
                    comments = get_comments(token, post_id, limit=comment_limit)

                    for comment_data in comments:
                        comment = comment_data.get("data", {})

                        # Ignora entradas que no son comentarios reales
                        if comment.get("body") is None or comment.get("id") is None:
                            continue

                        comment_info = {
                            "comment_id": comment_id,
                            "post_id": post_id,
                            "text": comment.get("body", ""),
                            "score": comment.get("score", 0),
                            "created_utc": datetime.datetime.fromtimestamp(comment.get("created_utc", 0)),
                            "author": comment.get("author", "[deleted]"),
                            "is_submitter": comment.get("is_submitter", False),
                            "permalink": f"https://www.reddit.com{comment.get('permalink', '')}"
                        }

                        all_comments.append(comment_info)
                        comment_id += 1

                # Pausa para evitar límites de tasa
                time.sleep(1)

            # Pausa entre subreddits
            time.sleep(2)

    # Convierte a DataFrames
    df_posts = pd.DataFrame(all_posts) if all_posts else pd.DataFrame()
    df_comments = pd.DataFrame(all_comments) if all_comments else pd.DataFrame()

    print(f"✅ Se encontraron {len(df_posts)} posts y {len(df_comments)} comentarios.")

    return df_posts, df_comments

def clean_text(text):
    """Limpia el texto de caracteres especiales y formatos Reddit"""
    if not isinstance(text, str):
        return ""

    # Elimina URLs
    text = re.sub(r'http\S+', '', text)
    # Elimina caracteres de formato Reddit
    text = re.sub(r'\[|\]|\(|\)|\*|#|>', '', text)
    # Normaliza espacios
    text = re.sub(r'\s+', ' ', text).strip()

    return text

def save_to_csv(df, filename):
    """Guarda un DataFrame en un archivo CSV manejando errores de codificación"""
    if df.empty:
        print(f"No hay datos para guardar en {filename}")
        return False

    try:
        df.to_csv(filename, index=False, encoding='utf-8-sig', quoting=csv.QUOTE_ALL)
        print(f"✅ Datos guardados en {filename}")
        return True
    except Exception as e:
        print(f"Error guardando {filename}: {e}")
        # Intenta con otra codificación
        try:
            df.to_csv(filename, index=False, encoding='latin1', quoting=csv.QUOTE_ALL)
            print(f"✅ Datos guardados en {filename} con codificación alternativa")
            return True
        except:
            print(f"❌ No se pudo guardar {filename}")
            return False

def get_user_inputs():
    """Solicita al usuario las entradas necesarias para la extracción de datos"""
    print("\n" + "="*60)
    print("📊 EXTRACTOR DE DATOS DE REDDIT SOBRE ARANCELES COMERCIALES 📊")
    print("="*60 + "\n")

    print("Para usar este script, necesitas las credenciales de una aplicación de Reddit.")
    print("Puedes crear una app en: https://www.reddit.com/prefs/apps\n")

    # Solicita credenciales
    client_id = input("🔑 Client ID de tu app de Reddit: ").strip()
    client_secret = input("🔑 Client Secret de tu app de Reddit: ").strip()

    # Solicita términos de búsqueda
    default_terms = ["aranceles México", "aranceles Canadá", "Trump aranceles",
                     "25% arancel", "TMEC aranceles", "tariffs Mexico Canada"]

    print("\nTérminos de búsqueda predeterminados:")
    for i, term in enumerate(default_terms, 1):
        print(f"  {i}. {term}")

    custom_terms = input("\n🔍 ¿Quieres usar términos personalizados? (s/n, default: n): ").strip().lower()

    if custom_terms == 's' or custom_terms == 'si' or custom_terms == 'sí' or custom_terms == 'y' or custom_terms == 'yes':
        terms_input = input("🔍 Ingresa los términos separados por comas: ").strip()
        search_terms = [term.strip() for term in terms_input.split(',') if term.strip()]
        if not search_terms:
            print("⚠️ No se ingresaron términos válidos. Usando términos predeterminados.")
            search_terms = default_terms
    else:
        search_terms = default_terms

    # Solicita subreddits
    default_subreddits = "Economics+Politics+worldnews+news+business+mexico+canada+trade"

    custom_subreddits = input(f"\n📚 ¿Quieres usar subreddits personalizados? (s/n, default: n): ").strip().lower()

    if custom_subreddits == 's' or custom_subreddits == 'si' or custom_subreddits == 'sí' or custom_subreddits == 'y' or custom_subreddits == 'yes':
        subreddits = input(f"📚 Ingresa los subreddits separados por '+': ").strip()
        if not subreddits:
            print("⚠️ No se ingresaron subreddits válidos. Usando subreddits predeterminados.")
            subreddits = default_subreddits
    else:
        subreddits = default_subreddits

    # Límites
    try:
        post_limit = int(input("\n📝 Número máximo de posts por término y subreddit (default: 25): ") or "25")
    except ValueError:
        post_limit = 25
        print("⚠️ Valor no válido. Usando 25 posts por término y subreddit.")

    try:
        comment_limit = int(input("💬 Número máximo de comentarios por post (default: 20): ") or "20")
    except ValueError:
        comment_limit = 20
        print("⚠️ Valor no válido. Usando 20 comentarios por post.")

    return {
        "client_id": client_id,
        "client_secret": client_secret,
        "search_terms": search_terms,
        "subreddits": subreddits,
        "post_limit": post_limit,
        "comment_limit": comment_limit
    }

def main():
    # Obtener entradas del usuario
    inputs = get_user_inputs()

    print("\n" + "-"*60)
    print(f"🚀 Iniciando extracción de datos de Reddit desde r/{inputs['subreddits']}")
    print(f"🔍 Buscando {len(inputs['search_terms'])} términos: {', '.join(inputs['search_terms'])}")
    print("-"*60 + "\n")

    # Extrae los datos
    df_posts, df_comments = extract_reddit_data(
        client_id=inputs["client_id"],
        client_secret=inputs["client_secret"],
        search_terms=inputs["search_terms"],
        subreddits=inputs["subreddits"],
        post_limit=inputs["post_limit"],
        comment_limit=inputs["comment_limit"]
    )

    print(f"\n📊 Extracción completada:")
    print(f"Posts extraídos: {len(df_posts)}")
    print(f"Comentarios extraídos: {len(df_comments)}")

    # Limpia los textos si hay datos
    if not df_posts.empty:
        df_posts['title_clean'] = df_posts['title'].apply(clean_text)
        df_posts['text_clean'] = df_posts['text'].apply(clean_text)

    if not df_comments.empty:
        df_comments['text_clean'] = df_comments['text'].apply(clean_text)

    # Ruta específica para guardar en Google Drive
    save_path = '/content/drive/MyDrive/Development/DataScience/Sentiment_Analysis'

    # Montar Google Drive si aún no está montado
    try:
        from google.colab import drive
        drive.mount('/content/drive', force_remount=False)
        print("✅ Google Drive montado correctamente")
    except:
        print("⚠️ No se pudo montar Google Drive o ya estaba montado")

    # Crear timestamp para los nombres de archivo
    timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M")

    # Guardar los dataframes en la ruta específica
    if not df_posts.empty:
        posts_filename = f"{save_path}/reddit_posts_aranceles_{timestamp}.csv"
        try:
            df_posts.to_csv(posts_filename, index=False, encoding='utf-8-sig', quoting=csv.QUOTE_ALL)
            print(f"✅ Posts guardados en: {posts_filename}")
        except Exception as e:
            print(f"❌ Error guardando posts: {e}")

    if not df_comments.empty:
        comments_filename = f"{save_path}/reddit_comments_aranceles_{timestamp}.csv"
        try:
            df_comments.to_csv(comments_filename, index=False, encoding='utf-8-sig', quoting=csv.QUOTE_ALL)
            print(f"✅ Comentarios guardados en: {comments_filename}")
        except Exception as e:
            print(f"❌ Error guardando comentarios: {e}")

    print("\n✅ Proceso completado.")

if __name__ == "__main__":
    main()


📊 EXTRACTOR DE DATOS DE REDDIT SOBRE ARANCELES COMERCIALES 📊

Para usar este script, necesitas las credenciales de una aplicación de Reddit.
Puedes crear una app en: https://www.reddit.com/prefs/apps

