# Obtener el primera fecha por canal y cluster

## Carga de librerías necesarias

In [1]:
from dotenv import load_dotenv
from ast import literal_eval
from motor.motor_asyncio import AsyncIOMotorClient
from tqdm.asyncio import tqdm_asyncio  # barra de progreso asíncrona

import os
import asyncio
import pandas as pd

## Conexión BD

In [3]:
# Cargar las variables desde el archivo .env.mongo
load_dotenv(dotenv_path="../.env.mongo")

# Leer las variables de entorno
user = os.getenv("MONGO_USER")
password = os.getenv("MONGO_PASSWORD")
host = os.getenv("MONGO_HOST")
port = os.getenv("MONGO_PORT")

# Cliente de conexión a MongoDB
mongo_url = f"mongodb://{user}:{password}@{host}:{port}/?authSource=admin"
client = AsyncIOMotorClient(mongo_url)

# Verificar conexión
try:
    client.admin.command("ping")
    print("Conexión exitosa a MongoDB")

except Exception as e:
    print("Error de conexión:", e)

Conexión exitosa a MongoDB


In [4]:
db = client["tg_channels_cositas"]

## Carga de datos de meta-tópicos

In [5]:
macrot_df = pd.read_csv("../data/meta_topics.csv")
macrot_df.head().iloc[:, 1:]

Unnamed: 0,tid,representative_docs,cluster,cluster_prob
0,0,estracto de la agencia europea del medicamento...,0,0.121201
1,0,""" si me vacunan: 1 . - ¿ puedo dejar de usar l...",0,0.068394
2,0,buenos días la ema avisa de que la segunda do...,0,0.092112
3,1,"los viajes espaciales, la creacion de una pand...",-1,0.522919
4,1,2011...este articulo del periodico sovereign i...,-1,0.141489


In [6]:
print(macrot_df.shape)

(64228, 5)


In [7]:
macrot_df = macrot_df[macrot_df["cluster"] != -1]
print(macrot_df.shape)

(32707, 5)


In [8]:
# dict {(_id, cluster): [tid1, tid2, …]}
col_tids = (
    macrot_df
    .groupby(['_id', 'cluster'])['tid']
    .unique()
    .to_dict()
)

async def procesar_coleccion(collection_name, cluster_id, tids, batch_size=1000):
    coll = db[collection_name]
    out = []
    for i in range(0, len(tids), batch_size):
        batch = [int(t) for t in tids[i:i+batch_size]]
        pipeline = [
            {'$match': {'topic_id': {'$in': batch}}},
            {'$group': {
                '_id': '$topic_id',
                'earliest_date': {'$min': '$date'}
            }}
        ]
        cursor = coll.aggregate(pipeline, allowDiskUse=True)
        async for doc in cursor:
            out.append({
                '_id': collection_name,
                'cluster':     cluster_id,
                'tid':         doc['_id'],
                'earliest_date': pd.to_datetime(doc['earliest_date'])
            })
    return out

async def main():
    tasks = [
        procesar_coleccion(col_name, cluster, tids)
        for (col_name, cluster), tids in col_tids.items()
    ]
    results = []
    for coro in tqdm_asyncio.as_completed(tasks, total=len(tasks),
                                          desc="Procesando colecciones"):
        results.extend(await coro)
    return results

# Ejecuta el loop
results = await main()

Procesando colecciones: 100%|██████████| 5808/5808 [1:01:17<00:00,  1.58it/s]


In [18]:
# Unión con el df de meta-tópicos
earliest_df = pd.DataFrame(results).drop(columns=["tid"])
macrot_df = macrot_df.merge(earliest_df, on=['_id', 'cluster'], how='left')

In [19]:
macrot_df["earliest_date"].describe()

count                                 858743
mean     2021-11-15 11:35:14.466894592+00:00
min                2016-07-07 17:09:19+00:00
25%                2021-04-04 18:52:26+00:00
50%                2021-12-14 22:00:15+00:00
75%                2022-08-21 19:02:13+00:00
max                2024-03-03 13:21:36+00:00
Name: earliest_date, dtype: object

In [20]:
macrot_df.to_csv("../data/metat_tiempos.csv", index=False)

In [23]:
df_netinf = macrot_df[["_id", "cluster", "earliest_date"]]
df_netinf = df_netinf.rename(columns={
    'cluster': 'cascade_id',
    'rep_docs_dates': 'event_time',
    '_id': 'node_name'
})
df_netinf.head().iloc[:, 1:]

Unnamed: 0,cascade_id,earliest_date
439847,0,2016-11-30 11:39:51+00:00
439854,0,2016-11-30 11:39:51+00:00
439861,0,2016-11-30 11:39:51+00:00
439868,0,2016-11-30 11:39:51+00:00
439875,0,2016-11-30 11:39:51+00:00


In [24]:
# Ordenar el DataFrame por cascada y tiempo de evento
df_netinf = df_netinf.sort_values(['cascade_id', 'earliest_date'])
df_netinf.head().iloc[:, 1:]

Unnamed: 0,cascade_id,earliest_date
439847,0,2016-11-30 11:39:51+00:00
439854,0,2016-11-30 11:39:51+00:00
439861,0,2016-11-30 11:39:51+00:00
439868,0,2016-11-30 11:39:51+00:00
439875,0,2016-11-30 11:39:51+00:00


In [26]:
df_netinf = (
    df_netinf
    .groupby(['node_name', 'cascade_id'], as_index=False)
    ['earliest_date']
    .min()
    .sort_values(['cascade_id', 'earliest_date'])
    .reset_index(drop=True)
)
df_netinf.head().iloc[:, 1:]

Unnamed: 0,cascade_id,earliest_date
0,0,2016-11-30 11:39:51+00:00
1,0,2018-02-07 05:29:56+00:00
2,0,2018-05-30 06:46:46+00:00
3,0,2018-06-06 00:25:52+00:00
4,0,2018-07-03 15:13:56+00:00


In [27]:
df_netinf.shape

(5808, 3)

In [28]:
df_netinf.to_csv("../data/netinf_data.csv", index=False)