# Santander Dev Week ETL (CSV no lugar da API de usuarios)

Contexto do desafio (adaptado):
1. Ler os IDs de usuarios do arquivo `data/SDW2023.csv`.
2. Extrair os dados de cada usuario.
3. Gerar uma mensagem personalizada de marketing.
4. Atualizar a lista `news` do usuario.

Adaptacao aplicada:
- No projeto original, `Extract` e `Load` usam endpoint HTTP (`GET/PUT /users/{id}`).
- Aqui, essas etapas usam `data/users_api_mock.csv` como base local.
- A etapa `Transform` continua no mesmo estilo do desafio, com OpenAI opcional.


In [None]:
from pathlib import Path
from copy import deepcopy
import csv
import json
import os
import random


def find_project_root(start: Path) -> Path:
    for candidate in [start.resolve(), *start.resolve().parents]:
        if (candidate / "pyproject.toml").exists():
            return candidate
    return start.resolve()


PROJECT_ROOT = find_project_root(Path.cwd())
DATA_DIR = PROJECT_ROOT / "data"
IDS_CSV = DATA_DIR / "SDW2023.csv"
USERS_MOCK_CSV = DATA_DIR / "users_api_mock.csv"
OUTPUT_CSV = DATA_DIR / "users_api_mock_atualizado.csv"

print(f"PROJECT_ROOT: {PROJECT_ROOT}")
print(f"IDS_CSV: {IDS_CSV}")
print(f"USERS_MOCK_CSV: {USERS_MOCK_CSV}")
print(f"OUTPUT_CSV: {OUTPUT_CSV}")


## Extract

No desafio original: para cada ID, seria feito `GET /users/{id}`.
Aqui: simulamos esse `GET` lendo os dados do CSV local (`users_api_mock.csv`).


In [None]:
def read_user_ids(ids_csv_path: Path) -> list[int]:
    user_ids: list[int] = []
    with ids_csv_path.open("r", encoding="utf-8", newline="") as csv_file:
        reader = csv.DictReader(csv_file)
        for row in reader:
            user_ids.append(int(row["UserID"]))
    return user_ids


def load_users_mock_db(users_csv_path: Path) -> dict[int, dict]:
    users_by_id: dict[int, dict] = {}
    with users_csv_path.open("r", encoding="utf-8", newline="") as csv_file:
        reader = csv.DictReader(csv_file)
        for row in reader:
            user_id = int(row["UserID"])
            users_by_id[user_id] = {
                "id": user_id,
                "name": row["name"],
                "account": {"limit": float(row["account_limit"])},
                "card": {"limit": float(row["card_limit"])},
                "features": json.loads(row.get("features") or "[]"),
                "news": json.loads(row.get("news") or "[]"),
            }
    return users_by_id


def get_user(user_id: int, users_by_id: dict[int, dict]) -> dict | None:
    user = users_by_id.get(user_id)
    return deepcopy(user) if user is not None else None


user_ids = read_user_ids(IDS_CSV)
users_db = load_users_mock_db(USERS_MOCK_CSV)
users = [get_user(user_id, users_db) for user_id in user_ids]
users = [user for user in users if user is not None]

print(f"IDs lidos do CSV: {user_ids}")
print(f"Usuarios extraidos: {len(users)}")


## Transform

Mesma ideia do desafio: gerar mensagem personalizada para cada usuario.

Como usar OpenAI:
- Defina `OPENAI_API_KEY` no ambiente.
- (Opcional) defina `OPENAI_MODEL`.
- Se a chave nao existir (ou der erro), o notebook usa fallback local com templates.

Se quiser instalar o SDK da OpenAI no projeto:
- `uv add --dev openai`


In [None]:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
OPENAI_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")

random.seed(7)

FALLBACK_TEMPLATES = [
    "{name}, invista de forma recorrente para transformar seu limite de R$ {account_limit:,.0f} em patrimonio no longo prazo.",
    "{name}, voce pode usar metas mensais para investir com consistencia e evoluir sua saude financeira.",
    "{name}, diversificar seus investimentos e manter disciplina pode acelerar seus objetivos financeiros.",
]


def generate_marketing_message(user: dict) -> str:
    if OPENAI_API_KEY:
        try:
            from openai import OpenAI

            client = OpenAI(api_key=OPENAI_API_KEY)
            prompt = (
                f"Crie uma mensagem curta e personalizada para {user['name']} sobre a importancia de investir. "
                f"Considere que o limite em conta e R$ {user['account']['limit']:.2f}. "
                "Tom amigavel, objetivo e em portugues brasileiro."
            )

            completion = client.chat.completions.create(
                model=OPENAI_MODEL,
                messages=[
                    {
                        "role": "system",
                        "content": "Voce e um especialista em marketing bancario.",
                    },
                    {"role": "user", "content": prompt},
                ],
                temperature=0.6,
                max_tokens=90,
            )
            content = (completion.choices[0].message.content or "").strip()
            if content:
                return content
        except Exception as exc:
            print(f"Falha no OpenAI para user={user['id']}: {exc}")

    template = random.choice(FALLBACK_TEMPLATES)
    return template.format(name=user["name"], account_limit=user["account"]["limit"])


def append_news(user: dict) -> None:
    message = generate_marketing_message(user)
    user["news"].append(
        {
            "icon": "https://digitalinnovationone.github.io/santander-dev-week-2023-api/icons/credit.svg",
            "description": message,
        }
    )


for user in users:
    append_news(user)

print("Transform concluido.")
print(f"Primeira mensagem gerada: {users[0]['news'][-1]['description']}")


## Load

No desafio original: seria `PUT /users/{id}` para atualizar a `news`.
Aqui: simulamos esse `PUT`, atualizamos em memoria e salvamos no CSV de saida.


In [None]:
def update_user(user: dict, users_by_id: dict[int, dict]) -> bool:
    user_id = user["id"]
    if user_id not in users_by_id:
        return False
    users_by_id[user_id] = deepcopy(user)
    return True


def save_users_to_csv(output_csv_path: Path, users_by_id: dict[int, dict]) -> None:
    output_csv_path.parent.mkdir(parents=True, exist_ok=True)
    fieldnames = ["UserID", "name", "account_limit", "card_limit", "features", "news"]

    with output_csv_path.open("w", encoding="utf-8", newline="") as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        writer.writeheader()

        for user_id in sorted(users_by_id):
            user = users_by_id[user_id]
            writer.writerow(
                {
                    "UserID": user["id"],
                    "name": user["name"],
                    "account_limit": f"{user['account']['limit']:.2f}",
                    "card_limit": f"{user['card']['limit']:.2f}",
                    "features": json.dumps(user["features"], ensure_ascii=False),
                    "news": json.dumps(user["news"], ensure_ascii=False),
                }
            )


success_count = 0
for user in users:
    if update_user(user, users_db):
        success_count += 1

save_users_to_csv(OUTPUT_CSV, users_db)

print(f"Usuarios atualizados: {success_count}/{len(users)}")
print(f"Arquivo de saida: {OUTPUT_CSV}")


In [None]:
with OUTPUT_CSV.open("r", encoding="utf-8", newline="") as csv_file:
    rows = list(csv.DictReader(csv_file))

print("Preview de saida (UserID + news):")
for row in rows:
    print(f"- UserID={row['UserID']} | name={row['name']}")
    print(f"  news={row['news']}")
