# Tratamento dos dados

In [1]:
import csv
import ast
from pathlib import Path

# ajuste para o nome do arquivo original
SOURCE = Path("./data/sample.csv")
BATTLES_OUT = Path("./data/battles.csv")
DECKS_OUT = Path("./data/decks.csv")
CLANS_OUT = Path("./data/clans.csv")

WIN_LIST_COL = "winner.cards.list"
LOS_LIST_COL = "loser.cards.list"

In [31]:
def parse_cards_list(raw: str):
    if raw is None:
        return []
    s = raw.strip()
    if not s or s == "[]":
        return []
    try:
        return [int(x) for x in ast.literal_eval(s)]
    except Exception:
        s = s.strip("[]")
        return [int(x) for x in s.split(",") if x.strip()]


def canonical_deck(card_ids):
    # Remove duplicados (caso) e ordena para forma canônica
    return tuple(sorted(set(card_ids)))


def load_rows(path: Path):
    with path.open(encoding="utf-8") as f:
        rdr = csv.DictReader(f)
        for row in rdr:
            yield row

In [32]:
def main():
    deck_index = {}        
    decks = []               
    next_deck_id = 1

    battles_rows = []

    clans_rows = []

    for row in load_rows(SOURCE):
        win_cards = parse_cards_list(row.get(WIN_LIST_COL, ""))
        los_cards = parse_cards_list(row.get(LOS_LIST_COL, ""))

        win_key = canonical_deck(win_cards)
        los_key = canonical_deck(los_cards)

        if win_key not in deck_index:
            deck_index[win_key] = next_deck_id
            decks.append((next_deck_id, win_key))
            next_deck_id += 1
        if los_key not in deck_index:
            deck_index[los_key] = next_deck_id
            decks.append((next_deck_id, los_key))
            next_deck_id += 1

        battle_record = {
            "id": row.get("id"),
            "battleTime": row.get("battleTime"),
            "arena_id": row.get("arena.id"),
            "gameMode_id": row.get("gameMode.id"),
            "winner_tag": row.get("winner.tag"),
            "loser_tag": row.get("loser.tag"),
            "winner_crowns": row.get("winner.crowns"),
            "loser_crowns": row.get("loser.crowns"),
            "winner_startingTrophies": row.get("winner.startingTrophies"),
            "loser_startingTrophies": row.get("loser.startingTrophies"),
            "winner_trophyChange": row.get("winner.trophyChange"),
            "loser_trophyChange": row.get("loser.trophyChange"),
            "winner_deck_id": deck_index[win_key],
            "loser_deck_id": deck_index[los_key],
            "tournamentTag": row.get("tournamentTag"),
            "winner_clan": row.get("winner.clan.tag"),
            "loser_clan": row.get("loser.clan.tag")
        }
        battles_rows.append(battle_record)

        # Clans
        winner_clan_tag = row.get("winner.clan.tag")
        winner_clan_badge = row.get("winner.clan.badgeId")
        loser_clan_tag = row.get("loser.clan.tag")
        loser_clan_badge = row.get("loser.clan.badgeId")

        clans_rows.append({
            "clan_tag": winner_clan_tag,
            "clan_badgeId": winner_clan_badge,
        })

        clans_rows.append({
            "clan_tag": loser_clan_tag,
            "clan_badgeId": loser_clan_badge,
        })

    # Escrever decks.csv
    with DECKS_OUT.open("w", newline="", encoding="utf-8") as f:
        fieldnames = ["deck_id"] + [f"card{i+1}_id" for i in range(8)]
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for deck_id, card_tuple in decks:
            # Pad caso algum deck tenha <8 após remover duplicados (raro)
            cards = list(card_tuple)
            if len(cards) < 8:
                cards += [None] * (8 - len(cards))
            row_out = {"deck_id": deck_id}
            for i, cid in enumerate(cards[:8]):
                row_out[f"card{i+1}_id"] = cid
            w.writerow(row_out)

    # Escrever battles.csv
    with BATTLES_OUT.open("w", newline="", encoding="utf-8") as f:
        fieldnames = list(battles_rows[0].keys()) if battles_rows else []
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for r in battles_rows:
            w.writerow(r)

    # Escrever clans.csv
    with CLANS_OUT.open("w", newline="", encoding="utf-8") as f:
        fieldnames = ["clan_tag", "clan_badgeId"]
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for r in clans_rows:
            w.writerow(r)


if __name__ == "__main__":
    main()


# Populando o banco de dados

In [1]:
%pip install pymysql

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import csv
import pymysql

# CONFIGURAÇÕES DO BANCO
conn =  pymysql.connect(
    host="localhost",
    user="gabriel",
    password="123",
    database="clash_royale",
    charset="utf8mb4"
)

cursor = conn.cursor()

## Cartas

In [4]:
def inserir_carta(cursor, id_carta, nome, custo_elixir, raridade, url_image):
    query = "INSERT INTO carta (id_carta, nome, custo_elixir, raridade, url_image) VALUES (%s, %s, %s, %s, %s)"
    cursor.execute(query, (id_carta, nome, custo_elixir, raridade, url_image))




with open("./data/card_dictionary.csv", encoding="utf-8") as file:
    reader = csv.DictReader(file)

    # Corrigir BOM na primeira coluna
    reader.fieldnames = [
        name.replace("\ufeff", "").strip()
        for name in reader.fieldnames
    ]

    for row in reader:
        inserir_carta(cursor, int(row["id"]), row["card_name"], row["elixir_cost"], row["rarity"], row["image_url"])

conn.commit()

print("Finalizado!")

Finalizado!


## Decks

In [6]:
import csv

def inserir_deck(cursor, id_deck):
    query = "INSERT IGNORE INTO deck (id_deck) VALUES (%s)"
    cursor.execute(query, (id_deck,))


def inserir_deck_carta(cursor, id_deck, id_carta):
    query = "INSERT IGNORE INTO deck_carta (id_deck, id_carta) VALUES (%s, %s)"
    cursor.execute(query, (id_deck, id_carta))


with open("./data/decks.csv", encoding="utf-8") as file:
    reader = csv.DictReader(file)

    # Remove BOM e espaços
    reader.fieldnames = [
        name.replace("\ufeff", "").strip()
        for name in reader.fieldnames
    ]

    for row in reader:
        id_deck = int(row["deck_id"])

        # 1️⃣ Cria o deck
        inserir_deck(cursor, id_deck)

        # 2️⃣ Cria as cartas do deck
        for i in range(1, 9):  # card1_id ... card8_id
            card_key = f"card{i}_id"
            id_carta = int(row[card_key])

            inserir_deck_carta(cursor, id_deck, id_carta)

conn.commit()


## Clãs

In [33]:
def inserir_clan(cursor, tag, badge):
    query = "INSERT IGNORE INTO cla (tag_cla, badge_id) VALUES (%s, %s)"
    cursor.execute(query, (tag, badge))



with open("./data/clans.csv", encoding="utf-8") as file:
    reader = csv.DictReader(file)

    reader.fieldnames = [
        name.replace("\ufeff", "").strip()
        for name in reader.fieldnames
    ]

    for row in reader:
        if(row["clan_tag"] == ''): continue;
        inserir_clan(cursor, row["clan_tag"], int(float(row["clan_badgeId"])))

conn.commit()

## Jogadores

In [38]:
def inserir_jogador(cursor, vencedor):
    query = """INSERT INTO jogador (tag_jogador, trofeus, id_cla, last_update)
            VALUES (%s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                trofeus = IF(VALUES(last_update) > last_update, VALUES(trofeus), trofeus),
                id_cla   = IF(VALUES(last_update) > last_update, VALUES(id_cla), id_cla),
                last_update = IF(VALUES(last_update) > last_update, VALUES(last_update), last_update);
            """
    cursor.execute(query, (
        vencedor["tag_jogador"], 
        vencedor["trophies"],
        vencedor["cla"],
        vencedor["last_update"]
    ))



with open("./data/battles.csv", encoding="utf-8") as file:
    reader = csv.DictReader(file)

    reader.fieldnames = [
        name.replace("\ufeff", "").strip()
        for name in reader.fieldnames
    ]

    for row in reader:
        vencedor = {
            "tag_jogador": row["winner_tag"],
            "trophies": int(float(row["winner_startingTrophies"]) + float(row["winner_trophyChange"])),
            "last_update": row["battleTime"].split("+")[0],
            "cla": row["winner_clan"]
        }
        perdedor = {
            "tag_jogador": row["loser_tag"],
            "trophies": int(float(row["loser_startingTrophies"]) + float(row["loser_trophyChange"])),
            "last_update": row["battleTime"].split("+")[0],
            "cla": row["loser_clan"]
        }
        if not any(not v for v in vencedor.values()):
            inserir_jogador(cursor, vencedor)
        if not any(not v for v in perdedor.values()):
            inserir_jogador(cursor, perdedor)

conn.commit()

## Batalhas

In [None]:
import csv
from datetime import datetime

def inserir_batalha(
    cursor,
    id_batalha,
    data_hora,
    media_trofeus,
    id_vencedor,
    id_perdedor,
    id_deck_vencedor,
    id_deck_perdedor
):
    query = """
        INSERT INTO IGNORE batalha (
            id_batalha,
            data_hora,
            media_trofeus,
            id_vencedor,
            id_perdedor,
            id_deck_vencedor,
            id_deck_perdedor
        )
        VALUES (%s,%s,%s,%s,%s,%s,%s)
    """

    cursor.execute(query, (
        id_batalha,
        data_hora,
        media_trofeus,
        id_vencedor,
        id_perdedor,
        id_deck_vencedor,
        id_deck_perdedor
    ))


with open("./data/battles.csv", encoding="utf-8") as file:
    reader = csv.DictReader(file)

    # Corrigir BOM
    reader.fieldnames = [
        name.replace("\ufeff", "").strip()
        for name in reader.fieldnames
    ]

    for row in reader:

        data = row["battleTime"].split("+")[0]  # remove o timezone
        data_hora = datetime.fromisoformat(data)

        id_batalha = int(row["id"])

        id_vencedor = row["winner_tag"]
        id_perdedor = row["loser_tag"]

        media_trofeus = (
            int(float(row["winner_startingTrophies"])) +
            int(float(row["loser_startingTrophies"]))
        ) // 2

        id_deck_vencedor = int(float(row["winner_deck_id"]))
        id_deck_perdedor = int(float(row["loser_deck_id"]))
        inserir_batalha(
                cursor,
                id_batalha,
                data_hora,
                media_trofeus,
                id_vencedor,
                id_perdedor,
                id_deck_vencedor,
                id_deck_perdedor
            )
        try: 
            inserir_batalha(
                cursor,
                id_batalha,
                data_hora,
                media_trofeus,
                id_vencedor,
                id_perdedor,
                id_deck_vencedor,
                id_deck_perdedor
            )
        except: 
            continue

conn.commit()


In [48]:
conn.close()