![Logo de AA1](Logo.png) 
# Documentación Python
Hemos creado un proyecto basado en la detección de palabras clave en redes sociales como Twitter o Telegram, tratando de analizar si puede tener un impacto como moneda en páginas `DeFi`, y todo a base del `Machine Learning`. De esta manera, relacionamos el trading de `criptomonedas` con las `redes sociales`, algo innovador y que a día de hoy aún no tiene una solución exacta. 

Para ello lo que hemos usado principalmente es código de Python, 2 APIs (una de Telegram y otra de Twitter), 1 bot de Telegram y Airflow, para automatizar y optimizar el proceso de ejecución mediante un Docker.

## Previo

Antes de nada vamos a hablar de todos los imports de bibliotecas que tenemos que hacer para empezar.

Yendo de arriba abajo, empecemos:

1. `sqlite3` es la base de datos dinámica
2. `tweepy` APi de twitter
3. `pandas` una biblioteca para manipulación y análisis de datos
4. `datetime` maneja fechas y horas
5. `collections` permite contar elementos en una lista o colección de datos.
6. `re` biblioteca para trabajar con expresiones regulares
7. `RandomForestClassifier` clasificador de Machine Learninng elegido
8. `SentimentIntensityAnalyzer` herramienta para análisis de sentimiento basada en texto
9. `bot` para importar el bot que se usará para mandar mensajes
10. `TelegramClient` API de Telegram
11. `smtplib` proporciona herramientas para enviar correos electrónicos mediante el protocolo SMTP
12. `MIMEText` permite crear el contenido del correo electrónico en formato texto
13. `asyncio` se usa para definir funciones asíncronas como las de Telegram

In [None]:
import sqlite3
import tweepy
import pandas as pd
from datetime import datetime, timedelta
from collections import Counter
import re
from sklearn.ensemble import RandomForestClassifier
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from telegram import Bot
from telethon import TelegramClient
import smtplib
from email.mime.text import MIMEText
import asyncio


## 1. Creación base de datos dinámica

Esta primera parte se encarga como dije antes de crear una base de datos SQLite en la que se guardan las palabras clave que vamos filtrando. Con el 3º comando se ejecuta siguiendo las instrucciones en rojo.

In [None]:
# ----------------------------
# 1. CONFIGURACIÓN DE BASE DE DATOS
# ----------------------------

# Conexión SQLite
conn = sqlite3.connect('crypto_trends.db')
cursor = conn.cursor()

# Crear tabla para palabras clave
cursor.execute('''
    CREATE TABLE IF NOT EXISTS keywords (
        id INTEGER PRIMARY KEY,
        keyword TEXT UNIQUE,
        last_checked TIMESTAMP,
        frequency INTEGER,
        sentiment REAL
    )
''')
conn.commit()

## 2. Conectar las APIs con nustro código

Aquí conectamos las API que utilizaremos para la recogida de mensajes y posteriormente, filtrar las `Keyword`. 

Por un lado tenemos `Tweepy` para Twitter: para usarla debemos de crear una cuenta de Twitter creator y crear un proyecto. En ese momento te darán un Key y una clave secret. Tendrás que solicitar un Token y a su vez, te darán otra clave secret para ese Token. Una vez sabiendo todo eso, se pone donde pusimos los mensajes entre <> y listo.

Para la parte de `Telegram`, es parecido, necesitas tu Id y una clave Hash. Estas se consiguen yendo a las herramientas de la API y haciendo un form para pedirlas. Importante recordar que las funciones a implementar en Telegram son asíncronas, como ya mencioné antes.

In [None]:
# ----------------------------
# 2. CONEXIÓN A APIs
# ----------------------------

API_KEY = 'your_api_key'
API_SECRET = 'your_api_secret'
ACCESS_TOKEN = 'your_access_token'
ACCESS_TOKEN_SECRET = 'your_access_token_secret'

# Autenticación de Twitter con Tweepy
auth = tweepy.OAuthHandler(API_KEY, API_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
api = tweepy.API(auth)

def fetch_tweets(keyword, max_tweets=100):
    """Captura tweets desde Twitter API"""
    tweets_data = []
    for tweet in tweepy.Cursor(api.search_tweets, q=keyword, lang='en', tweet_mode='extended').items(max_tweets):
        tweets_data.append(tweet.full_text)
    return tweets_data


# Configuración de la API de Telegram
api_id = 123456
api_hash = "abcdef1234567890abcdef1234567890"

async def fetch_messages(chat, limit=10):
    async with TelegramClient("my_session", api_id, api_hash) as client:
        messages = await client.get_messages(chat, limit=limit)
        return [msg.message for msg in messages if msg.message]

## 3. Análisis de sentimiento (subjetividad)

En esta parte mediante la biblioteca implmementada podremos ver el `sentimiento` promedio de un texto/palabra, sacando así un número para usar como parámetro posteriormente al clasificar las palabras en clave o no.

In [None]:
# ----------------------------
# 3. ANÁLISIS DE SENTIMIENTO
# ----------------------------

analyzer = SentimentIntensityAnalyzer()

def analyze_sentiment(texts):
    """Calcula el sentimiento promedio usando VADER."""
    scores = [analyzer.polarity_scores(text)['compound'] for text in texts]
    return sum(scores) / len(scores) if scores else 0


## 4. Filtro de palabras clave

Aquí simplemente lo que se hace es coger los textos sacados mediante las APIs y ver que palabras se repiten y cuántas veces, llegando a un tope que lo que hace es meterlas a una lista como palabras relevantes. El número se puede modificar en el `most_common` según cómo de estricto quieres ser.

In [None]:
# ----------------------------
# 4. FILTRO DE PALABRAS CLAVE
# ----------------------------

def extract_keywords(texts, existing_keywords):
    """Extrae palabras clave nuevas."""
    all_words = ' '.join(texts).lower()
    words = re.findall(r'\b[a-zA-Z]+\b', all_words)
    common_words = Counter(words).most_common(20)
    new_keywords = [word for word, count in common_words if word not in existing_keywords]
    return new_keywords

## 5. Entrenamiento del modelo

Lo primero de todo es la base de datos que puesto de ejemplo con la frecuencia de palabras, su sentimiento y si es exitosa o no, sin embargo, se podría implementar otra `base de datos real` en la que se guardara información sobre criptos exitosas, o palabras que hayan podido tener un success.

Después de eso, procedemos a entrenar el modelo, en este caso usando random forest, pero se podrían usar otros modelos de clasificación que quizás sean más eficientes. Es importante entrenar el modelo con datos reales y de criptos para que se acostumbre a ese entorno, y que no sufra de Overfitting o Underfitting. En caso de haber problema, siempre está la opción de hacer balance de pesoss entre entreno y test, para que el modelo sea mejor. (X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=1234, stratify=y))



In [None]:
# ----------------------------
# 5. ENTRENAMIENTO DEL MODELO
# ----------------------------

# Datos históricos simulados
training_data = pd.DataFrame({
    'frequency': [100, 200, 1500, 800, 50],
    'sentiment': [0.8, 0.6, 0.7, 0.9, 0.4],
    'success': [1, 1, 1, 1, 0]
})

# Entrenamiento del modelo
X_train = training_data[['frequency', 'sentiment']]
y_train = training_data['success']
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

## 6. Guardado y análisis de palabras clave

En este apartado lo que se hace es guardar y actualizar palabras clave previamente filtradas, usando las funciones ya definidas sobre el sentimiento, frecuencia y predicción mediante el modelo entrenado. Por último, se establece que si la predicción es buena, entoncess la palabra será exitosa.

In [None]:
# ----------------------------
# 6. GUARDADO Y ANÁLISIS DE PALABRAS CLAVE
# ----------------------------

def save_keyword(keyword, frequency, sentiment):
    """Guarda o actualiza palabras clave en la base de datos."""
    now = datetime.now()
    cursor.execute('''
        INSERT INTO keywords (keyword, last_checked, frequency, sentiment)
        VALUES (?, ?, ?, ?)
        ON CONFLICT(keyword) DO UPDATE SET 
            last_checked=excluded.last_checked,
            frequency=excluded.frequency,
            sentiment=excluded.sentiment
    ''', (keyword, now, frequency, sentiment))
    conn.commit()

def analyze_keyword(keyword, max_tweets=100):
    """Analiza una palabra clave con frecuencia, sentimiento y predicción."""
    tweets = fetch_tweets(keyword, max_tweets)
    frequency = len(tweets)
    sentiment = analyze_sentiment(tweets)
    prediction = model.predict([[frequency, sentiment]])[0]
    save_keyword(keyword, frequency, sentiment)
    return {
        'keyword': keyword,
        'frequency': frequency,
        'sentiment': sentiment,
        'prediction': 'Exitoso' if prediction == 1 else 'No exitoso'
    }

## 7. Notificaciones a correo y Telegram

Aquí lo que hay que hacer es crear un bot en Telegram mediante BotFather, pedir su ID de token y meterlo en neustro código, para así configurarse como un bot que se dedica a mandar notificaciones según los requisitos del Pipeline. 

Lo mismo con las notificaciones Mail pero sin un bot, en este caso usamos la función MIMEText, y así solo necesitaremos nuestro correo y contraseña (del que la envía). Si no quieres que se vean se pueden meter esas claves en un archivo .env y traerlas desde allí.

In [None]:
# ----------------------------
# 7. NOTIFICACIONES
# ----------------------------
# Función asincrónica que envía el mensaje
async def send_telegram_update(chat_id, message):
    """Envía un mensaje a través de Telegram utilizando async."""
    token = 'tu_bot_token_aqui'  # Reemplaza con tu token de bot
    bot = Bot(token=token)
    
    try:
        # Usamos 'await' para llamar a la función asincrónica
        await bot.send_message(chat_id=chat_id, text=message)
        print("Mensaje enviado con éxito.")
    except Exception as e:
        print(f"Error al enviar el mensaje: {e}")




def send_email_notification(subject, message, recipient_email):
    """Envía una notificación por correo electrónico."""
    sender_email = "your_email@gmail.com"
    sender_password = "your_email_password"

    msg = MIMEText(message)
    msg['Subject'] = subject
    msg['From'] = sender_email
    msg['To'] = recipient_email

    with smtplib.SMTP_SSL('smtp.gmail.com', 465) as server:
        server.login(sender_email, sender_password)
        server.sendmail(sender_email, recipient_email, msg.as_string())

## 8. Pipeline que conjunta todo

La última parte del código simplemente junta todo lo que hemos estado usando, estableciendo unos parámetros para enviar lsa notificaciones.

In [None]:
# ----------------------------
# 8. PIPELINE COMPLETO
# ----------------------------

def process_pipeline(max_tweets=100):
    """Pipeline completo de detección y análisis."""
    # Obtener palabras clave existentes
    cursor.execute('SELECT keyword FROM keywords')
    existing_keywords = [row[0] for row in cursor.fetchall()]

    # Captar textos
    tweets = fetch_tweets('crypto', max_tweets)
    new_keywords = extract_keywords(tweets, existing_keywords)

    # Analizar palabras clave nuevas
    for keyword in new_keywords:
        analysis = analyze_keyword(keyword)
        print(f"Análisis para {keyword}: {analysis}")

        # Notificar si es relevante
        if analysis['frequency'] > 100 and analysis['sentiment'] > 0.8:
            
            # Función principal asincrónica
            async def main():
                chat_id = '123456789'  # Reemplaza con el chat_id correcto
                message = f"La palabra '{keyword}' muestra alta relevancia.\nSentimiento: {analysis['sentiment']}\nFrecuencia: {analysis['frequency']}"
                await send_telegram_update(chat_id, message)
            asyncio.run(main())

            send_email_notification(
                subject=f"Tendencia detectada: {keyword}",
                message=f"La palabra '{keyword}' muestra alta relevancia.\nSentimiento: {analysis['sentiment']}\nFrecuencia: {analysis['frequency']}",
                recipient_email='recipient@example.com'
            )


## Ejecución

In [None]:
# Ejecutar pipeline
process_pipeline()

## DAG

El otro código que tenemos es el del DAG que importa el archivo de Python para usarlo en el Airflow posteriormente.

In [None]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from pred_crypto_memes import process_pipeline

with DAG('crypto_monitoring', start_date=datetime(2024, 1, 1), schedule_interval='@hourly') as dag:
    monitor_task = PythonOperator(
        task_id='monitor_social_media',
        python_callable=process_pipeline
    )

## Configuración airflow en Docker

Esto es el archivo yaml., que se importa en el Docker para crear los containers y ejecutar el código automáticamente. (Salen como si fuesen errores porque al no ser un archivo .py, el Python no lo detecta.)

Los comandos del 2º código simplemente lo que hacen es instalar el Airflow, crearnos un usuario para usar los logs del webserver, y dentro del Docker, iniciar los container con el últmo de todos.

In [None]:
pip install pyyaml
import yaml
---
version: "3"
x-airflow-common:
  &airflow-common
  
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.3}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
   
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0" 
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
      start_period: 5s
    restart: always

  redis:
    
    image: redis:7.2-bookworm
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8080:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
  

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always


  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      # yamllint disable rule:line-length
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-init:
    <<: *airflow-common
    # yamllint disable rule:line-length
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    
    command:
      - bash
      - -c
      - airflow

  
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - "5555:5555"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

volumes:
  postgres-db-volume:


In [None]:
python -m venv airflow-env #crear entorno env
source airflow-env/bin/activate  # Linux/MacOS
.\airflow-env\Scripts\activate   # Windows

pip install apache-airflow #instalar airflow

airflow db init #iniciar base de datos

airflow users create \ #crear uusario
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --email admin@example.com \
    --password admin

airflow webserver -p 8080  # Inicia la interfaz gráfica
airflow scheduler          # Inicia el scheduler

curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml' #enlace a archivo yaml adjuntado antes

docker-compose up airflow-init # iniciar Airflow

docker-compose up #iniciar los servers

## Anexo: Explicación preprocesado bases de datos

En caso de que las bases de datos tengan diferentes problemas de tipografía o datos vacíos (NaN), pues aquí se podría ver un ejemplo de cómo modificar esos detalles, además de poner cabeceras, separar los datos en 2 partes o preprocesar los datos en números, si es que son str (datos categóricos)

In [None]:
from scikit.preprocessing import LabelEncoder

cabecera = ['atr'+str(x) for x in range(1,25)] # for en línea 

#para crear elementos de una lista
cabecera.append('clase') # la clase está en la última posición

datos = pd.read_csv('datos.data', names=cabecera, sep=',', na_values='?')
filas, columnas = datos.shape
#eliminar datos desconocidos

datos=datos.dropna()


# la clase está en la última columna 
# separamos los atributos y los almacenamos en X
X = datos.drop(['clase'], axis=1)
print('\n Atributos')
print(X)

# Como la clase toma dos valores diferentes, transformamos la clase con un Label Encoder

class_enc = preprocessing.LabelEncoder()
datos['clase'] = class_enc.fit_transform(datos['clase'])

# separamos la clase y la almacenamos en Y
y = datos['clase']

print('\n Clase ')
print(y)

## Mejoras potenciales a futuro y diversos cambios a hacer

- En los filtros de los tweets que se recogen, se podría restringir a cuentas de X seguidores o que sean verificados, para captar grandes influencias. 

- Otra es poner filtros de letras como un $, que es el carácter que se usa al hablar de criptos, y podría ser interesantes.

- Realizar búsquedas en otras redes sociales que también son influyentes.

- Probar otros métodos de entrenamiento ya sea de `bagging` o `boosting`, incluso redes neuronales, aunque es más complejo.

- Mejorar el código yaml. para modificar los tiempos y que sea más eficiente.

- Filtrar las claves de usadas de los Tokens o de las API en archivos .env para que no puedan ser robadas.