# Notebook 1 – ETL upload

Lê `df_file.csv`, faz limpeza mínima e insere em **MongoDB** e **Elasticsearch**.

In [None]:
import pandas as pd
import re, csv, os, time
from pymongo import MongoClient
from elasticsearch import Elasticsearch, helpers, exceptions

CSV_PATH = "df_file.csv"

MONGO_URI = "mongodb://admin:123456@localhost:27017/lab_ml?authSource=admin"
MONGO_DB = "lab_ml"
MONGO_COLLECTION_RAW = "tweets_raw"

ES_HOST = "http://localhost:9200"
ES_INDEX_RAW = "tweets_raw"


if not os.path.exists(CSV_PATH):
    raise FileNotFoundError(f"O arquivo {CSV_PATH} não foi encontrado na pasta atual.")

df = pd.read_csv(
    CSV_PATH,
    sep=",",
    quotechar='"',
    quoting=csv.QUOTE_ALL,
    encoding="utf-8",
    engine="python",
)


def clean_text(t):
    if pd.isna(t):
        return ""
    t = t.lower()
    t = re.sub(r"\n", " ", t)
    t = re.sub(r"@[A-Za-z0-9_]+", " ", t)
    t = re.sub(r"http\S+|www\.\S+", " ", t)
    t = re.sub(r"[^a-z0-9 ]", " ", t)
    return re.sub(r"\s+", " ", t).strip()

df["clean_text"] = df["Text"].apply(clean_text)


try:
    mongo_client = MongoClient(MONGO_URI)
    mongo_db = mongo_client[MONGO_DB]
    print("✅ Conectou ao MongoDB com sucesso.")
except Exception as e:
    raise ConnectionError(f"Não foi possível conectar ao MongoDB: {e}")

for _ in range(5):
    try:
        es = Elasticsearch(ES_HOST, timeout=30)
        if es.ping():
            print("✅ Conectou ao Elasticsearch com sucesso.")
            break
    except Exception:
        time.sleep(2)
else:
    raise ConnectionError("Não foi possível conectar ao Elasticsearch em localhost:9200.")


mongo_db[MONGO_COLLECTION_RAW].delete_many({})
mongo_db[MONGO_COLLECTION_RAW].insert_many(df.to_dict("records"))
print(f"✅ Inseriu {len(df)} documentos em MongoDB (coleção '{MONGO_COLLECTION_RAW}').")


try:
    if not es.indices.exists(index=ES_INDEX_RAW):
        es.indices.create(index=ES_INDEX_RAW)
        print(f"✅ Índice '{ES_INDEX_RAW}' criado no Elasticsearch.")
except exceptions.RequestError as retr_exc:
    print(f"⚠️ Erro ao criar índice no Elasticsearch: {retr_exc.info}")

actions = (
    {
        "_index": ES_INDEX_RAW,
        "_id": i,
        "_source": rec,
    }
    for i, rec in df.to_dict("index").items()
)

helpers.bulk(es, actions)
print(f"✅ Inseridos {len(df)} documentos em Elasticsearch (índice '{ES_INDEX_RAW}').")
