## Preparação dos Conjuntos de Dados
Para conduzir os testes da atualização incremental e em lote, foram desenvolvidas abordagens distintas na preparação dos conjuntos de dados.  O objetivo foi avaliar o desempenho dessas abordagens na evolução do esquema JSON em diferentes contextos, nos quais os experimentos podem variar quanto ao conjunto de dados utilizado, ao tamanho dos incrementos (quantidade de novos documentos) e à quantidade de atributos por documento (e.g. baixa, média ou alta quantidade de atributos por documento). A seguir, são detalhadas as etapas dessas preparações.

## Preparação dos Dados para Abordagem JSD Evolution

Para os experimentos de evolução incremental, utilizou-se uma progressão geométrica (PG) para dividir as coleções em incrementos de diferentes tamanhos, representando os novos documentos usados para evoluir o esquema JSON. O primeiro incremento foi utilizado para criar o esquema inicial, enquanto os incrementos seguintes foram utilizados para evoluir o esquema atual. Esses incrementos refletem diferentes estágios de crescimento da quantidade de documentos. Por exemplo, é possível observar como o esquema inicial evolui conforme mais documentos são adicionados e como essa evolução afeta o desempenho em termos de tempo de processamento.

Para determinar o primeiro termo (`Dn₁`) e a razão (`r`) da progressão geométrica (PG), foram utilizados apenas números inteiros. Dessa forma, a distribuição dos documentos em incrementos resultou em quantidades inteiras. O conjunto de dados do **Twitter** foi particionado usando uma PG, com primeiro termo de **450** (`Dn₁ = 450`) e razão igual a **2**, simulando incrementos progressivos na quantidade de documentos. Esse conjunto de dados foi dividido considerando a quantidade de documentos da coleção. A soma dos **12** primeiros termos (incrementos `Dnᵢ`, com `i` variando de 1 a `m`, onde `m = 12`) dessa PG resulta em **1.872.500 documentos**.

Para o conjunto de dados **VK**, foram adotados os mesmos valores da PG usados para particionar a coleção de documentos do Twitter (primeiro termo de **450** e razão **2**). A variação do tamanho dos incrementos, proporcional à PG, permite analisar como as abordagens de evolução incremental e em lote se comportam com mesma quantidade de documentos, mas com alta e média quantidade de atributos por documento (Twitter e VK, respectivamente).

No caso do conjunto de dados **Livros**, foi adotada uma PG com primeiro termo igual a **10.000** (`Dn₁ = 10.000`) e razão **2**, refletindo o crescimento progressivo da quantidade de documentos. Os valores da PG foram escolhidos conforme o tamanho de cada conjunto de dados, visando assegurar uma representação adequada do crescimento progressivo do número de documentos em cada coleção.

A Figura abaixo ilustra como a PG foi usada para dividir as coleções em incrementos de diferentes tamanhos.

![Particionamento das coleções em incrementos de diferentes tamanhos](./imagem/JSD_E.png)
*Figura 1 — Particionamento das coleções em incrementos de diferentes tamanhos.*


---

### Coleção usada no esquema inicial
- `twitter_450`
- `vk_450`
- `livros_10000`

### Conjuntos de coleções usadas para atualização

#### Conjunto de dados Twitter
`twitter_900`, `twitter_1800`, `twitter_3600`, `twitter_7200`, `twitter_14400`,  
`twitter_28800`, `twitter_57600`, `twitter_115200`, `twitter_230400`,  
`twitter_460800`, `twitter_921600`

#### Conjunto de dados VK
`vk_900`, `vk_1800`, `vk_3600`, `vk_7200`, `vk_14400`,  
`vk_28800`, `vk_57600`, `vk_115200`, `vk_230400`,  
`vk_460800`, `vk_921600`

#### Conjunto de dados de metadados de livros
`books_10000`, `books_20000`, `books_40000`, `books_80000`,  
`books_160000`, `books_320000`, `books_640000`, `books_1280000`,  
`books_2560000`, `books_5120000`, `books_10240000`, `books_20480000`

In [12]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Particionamento disjunto por PG (sem skip), para grandes volumes.
Gera coleções no formato <out_prefix><tamanho>, ex.: vk_450, vk_900, ...

Uso:
    python partition_pg_disjoint.py [config.json] [dataset]
Ex.:
    python partition_pg_disjoint.py config.json vk
"""

from pymongo import MongoClient
import json
import sys


def termos_pg(a1: int, r: int, m: int):
    vals, v = [], a1
    for _ in range(m):
        vals.append(v)
        v *= r
    return vals


def load_dataset_cfg(path: str, dataset: str):
    with open(path, "r", encoding="utf-8") as f:
        cfg = json.load(f)

    if "mongo" not in cfg or "uri" not in cfg["mongo"]:
        raise ValueError("Config inválida: faltou mongo.uri")
    if "datasets" not in cfg or dataset not in cfg["datasets"]:
        raise ValueError(f"Config inválida: faltou datasets.{dataset}")

    ds = cfg["datasets"][dataset]
    if "partitioning" not in ds:
        raise ValueError(f"Config inválida: faltou datasets.{dataset}.partitioning")

    p = ds["partitioning"]
    for key in ["db_name", "src_collection", "out_prefix", "pg", "performance"]:
        if key not in p:
            raise ValueError(f"Config inválida: faltou partitioning.{key}")
    for key in ["dn1", "r", "m"]:
        if key not in p["pg"]:
            raise ValueError(f"Config inválida: faltou partitioning.pg.{key}")
    for key in ["read_batch_size", "write_chunk_size"]:
        if key not in p["performance"]:
            raise ValueError(f"Config inválida: faltou partitioning.performance.{key}")

    return {
        "MONGO_URI": cfg["mongo"]["uri"],
        "DB_NAME": p["db_name"],
        "SRC_COLL": p["src_collection"],
        "OUT_PREFIX": p["out_prefix"],
        "Dn1": p["pg"]["dn1"],
        "r": p["pg"]["r"],
        "m": p["pg"]["m"],
        "READ_BATCH_SIZE": p["performance"]["read_batch_size"],
        "WRITE_CHUNK_SIZE": p["performance"]["write_chunk_size"],
    }


def main():
    config_path =  "config.json"
    dataset =  "vk"

    cfg = load_dataset_cfg(config_path, dataset)
    MONGO_URI = cfg["MONGO_URI"]
    DB_NAME = cfg["DB_NAME"]
    SRC_COLL = cfg["SRC_COLL"]
    OUT_PREFIX = cfg["OUT_PREFIX"]
    Dn1, r, m = cfg["Dn1"], cfg["r"], cfg["m"]
    READ_BATCH_SIZE = cfg["READ_BATCH_SIZE"]
    WRITE_CHUNK_SIZE = cfg["WRITE_CHUNK_SIZE"]
    #dataset = cfg["default_dataset"]

    client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
    client.admin.command("ping")
    db = client[DB_NAME]

    if SRC_COLL not in db.list_collection_names():
        raise RuntimeError(
            f"Coleção fonte '{SRC_COLL}' não existe no DB '{DB_NAME}'. "
            f"Existentes: {db.list_collection_names()}"
        )

    source = db[SRC_COLL]
    total_docs = source.estimated_document_count()
    if total_docs == 0:
        raise RuntimeError(f"A coleção '{SRC_COLL}' está vazia.")

    part_sizes = termos_pg(Dn1, r, m)
    soma_pg = sum(part_sizes)
    if soma_pg > total_docs:
        print(f"[aviso] Soma dos tamanhos ({soma_pg}) > total de docs ({total_docs}). "
              "Partições finais poderão ficar menores/vazias.")

    # prepara coleções de saída
    outs, out_names = [], []
    for size in part_sizes:
        name = f"{OUT_PREFIX}{size}"  # ex.: vk_450, vk_900, ...
        db[name].drop()
        outs.append(db[name])
        out_names.append(name)

    # uma passada só, ordenado por _id
    cursor = source.find({}, projection=None).sort([("_id", 1)]).batch_size(READ_BATCH_SIZE)

    idx_part = 0
    remaining_in_part = part_sizes[0]
    inserted_counts = [0] * len(outs)
    total_seen = 0
    write_buffer = []

    def flush_buffer():
        nonlocal write_buffer, inserted_counts, idx_part
        if write_buffer:
            outs[idx_part].insert_many(
                write_buffer, ordered=False, bypass_document_validation=True
            )
            inserted_counts[idx_part] += len(write_buffer)
            write_buffer.clear()

    for doc in cursor:
        total_seen += 1
        write_buffer.append(doc)
        remaining_in_part -= 1

        if len(write_buffer) >= WRITE_CHUNK_SIZE and remaining_in_part > 0:
            flush_buffer()

        if remaining_in_part == 0:
            flush_buffer()
            idx_part += 1
            if idx_part >= len(outs):
                break
            remaining_in_part = part_sizes[idx_part]

    if idx_part < len(outs):
        flush_buffer()

    # logs humanos (faixas 1-based esperadas)
    start_h = 1
    for size, name, count in zip(part_sizes, out_names, inserted_counts):
        end_h = start_h + size - 1
        print(f"{name}: esperado {start_h}-{end_h} (tamanho {size}), inseridos {count}")
        start_h = end_h + 1

    print(f"Total percorrido no cursor: {total_seen} documentos (coleção ~{total_docs}).")
    print("Particionamento concluído.")


if __name__ == "__main__":
    main()


vk_450: esperado 1-450 (tamanho 450), inseridos 450
vk_900: esperado 451-1350 (tamanho 900), inseridos 900
vk_1800: esperado 1351-3150 (tamanho 1800), inseridos 1800
vk_3600: esperado 3151-6750 (tamanho 3600), inseridos 3600
vk_7200: esperado 6751-13950 (tamanho 7200), inseridos 7200
vk_14400: esperado 13951-28350 (tamanho 14400), inseridos 14400
vk_28800: esperado 28351-57150 (tamanho 28800), inseridos 28800
vk_57600: esperado 57151-114750 (tamanho 57600), inseridos 57600
vk_115200: esperado 114751-229950 (tamanho 115200), inseridos 115200
vk_230400: esperado 229951-460350 (tamanho 230400), inseridos 230400
vk_460800: esperado 460351-921150 (tamanho 460800), inseridos 460800
vk_921600: esperado 921151-1842750 (tamanho 921600), inseridos 921600
Total percorrido no cursor: 1842750 documentos (coleção ~3036654).
Particionamento concluído.


## Preparação dos Dados para Abordagem em Lote

Adotou-se a estratégia de união dos incrementos particionados durante a preparação dos experimentos na abordagem de evolução incremental, a fim de refletir a natureza do processamento em lote, no qual as atualizações consideram a totalidade dos documentos acumulados até o momento (documentos novos e antigos).

![Preparação dos conjuntos de dados da abordagem em lote](./imagem/JSD.png)  
*Figura 2 — Preparação dos conjuntos de dados da abordagem em lote.*

Seja `Pᵢ` o conjunto de documentos da partição `i`, tal que `i` varia de `1` a `m`, sendo `m` o número total de partições geradas pela PG. Então, para cada partição `Pᵢ`, tem-se:

- **P₁**: Partição inicial contendo `Dn₁`, onde `Dn₁` é o primeiro termo da PG, ou seja, representa o primeiro conjunto de dados particionado para os testes da evolução incremental.  
- **P₂**: `P₁ ∪ Dn₂`, onde `Dn₂` é o segundo termo da PG e representa o segundo conjunto de dados particionado para os testes da evolução incremental.  
- **P₃**: `P₂ ∪ Dn₃`, onde `Dn₃` é o terceiro termo da PG e representa o terceiro conjunto de dados particionado para os testes da evolução incremental.  
- **Pᵢ₊₁**: `Pᵢ ∪ Dnᵢ₊₁`, onde `Dnᵢ₊₁` é o próximo termo da PG.

A Figura 2 ilustra como cada partição representa a união dos documentos das partições anteriores, simulando o crescimento gradual e cumulativo do conjunto de dados em cenários de evolução em lote. Essa abordagem precisa ser aplicada a todos os conjuntos de dados, tanto novos quanto antigos.

---

### Conjuntos de coleções usadas na abordagem em Lote

#### Conjunto de dados Twitter
`twitter_450`, `twitter_1350`, `twitter_3150`, `twitter_6750`,  
`twitter_13950`, `twitter_28350`, `twitter_57150`, `twitter_114750`,  
`twitter_229950`, `twitter_460350`, `twitter_921150`, `twitter_1842750`

#### Conjunto de dados VK
`vk_450`, `vk_1350`, `vk_3150`, `vk_6750`,  
`vk_13950`, `vk_28350`, `vk_57150`, `vk_114750`,  
`vk_229950`, `vk_460350`, `vk_921150`, `vk_1842750`

#### Conjunto de dados de metadados de livros
`books_10000`, `books_30000`, `books_70000`, `books_150000`,  
`books_310000`, `books_630000`, `books_1270000`, `books_2550000`,  
`books_5110000`, `books_10230000`, `books_20470000`, `books_40950000`

In [4]:
#!/usr/bin/env python3 aqui
# -*- coding: utf-8 -*-

from pymongo import MongoClient
import json
import sys
from typing import Tuple, List

def termos_pg(a1: int, r: int, m: int) -> List[int]:
    vals, v = [], a1
    for _ in range(m):
        vals.append(v)
        v *= r
    return vals

def load_dataset_cfg(path: str, dataset: str):
    with open(path, "r", encoding="utf-8") as f:
        cfg = json.load(f)

    if "mongo" not in cfg or "uri" not in cfg["mongo"]:
        raise ValueError("Config inválida: faltou mongo.uri")

    if "datasets" not in cfg or dataset not in cfg["datasets"]:
        raise ValueError(f"Config inválida: faltou datasets.{dataset}")

    ds = cfg["datasets"][dataset]
    if "batch_union" not in ds:
        raise ValueError(f"Config inválida: faltou datasets.{dataset}.batch_union")

    b = ds["batch_union"]
    for key in ["source_db", "part_prefix", "dest_db", "batch_prefix", "pg", "performance"]:
        if key not in b:
            raise ValueError(f"Config inválida: faltou batch_union.{key}")
    for key in ["dn1", "r", "m"]:
        if key not in b["pg"]:
            raise ValueError(f"Config inválida: faltou batch_union.pg.{key}")
    for key in ["read_limit", "read_batch_size", "write_chunk_size"]:
        if key not in b["performance"]:
            raise ValueError(f"Config inválida: faltou batch_union.performance.{key}")

    return {
        "MONGO_URI": cfg["mongo"]["uri"],
        "SOURCE_DB": b["source_db"],
        "PART_PREFIX": b["part_prefix"],
        "DEST_DB": b["dest_db"],
        "BATCH_PREFIX": b["batch_prefix"],
        "Dn1": b["pg"]["dn1"],
        "r": b["pg"]["r"],
        "m": b["pg"]["m"],
        "READ_LIMIT": b["performance"]["read_limit"],
        "READ_BATCH_SIZE": b["performance"]["read_batch_size"],
        "WRITE_CHUNK_SIZE": b["performance"]["write_chunk_size"],
    }

def copy_collection_by_pagination(db_src, coll_src: str, db_dst, coll_dst: str,
                                  read_limit: int, read_batch: int, write_chunk: int) -> int:
    db_dst[coll_dst].drop()
    inserted = 0
    last_id = None
    while True:
        q = {"_id": {"$gt": last_id}} if last_id is not None else {}
        docs = list(
            db_src[coll_src]
            .find(q, no_cursor_timeout=False)
            .sort([("_id", 1)])
            .limit(read_limit)
            .batch_size(read_batch)
        )
        if not docs:
            break
        start = 0
        n = len(docs)
        while start < n:
            end = min(start + write_chunk, n)
            chunk = docs[start:end]
            db_dst[coll_dst].insert_many(
                chunk, ordered=False, bypass_document_validation=True
            )
            inserted += len(chunk)
            start = end
        last_id = docs[-1]["_id"]
    return inserted

def append_collection_by_pagination(db_src, coll_src: str, db_dst, coll_dst: str,
                                    read_limit: int, read_batch: int, write_chunk: int) -> int:
    appended = 0
    last_id = None
    while True:
        q = {"_id": {"$gt": last_id}} if last_id is not None else {}
        docs = list(
            db_src[coll_src]
            .find(q, no_cursor_timeout=False)
            .sort([("_id", 1)])
            .limit(read_limit)
            .batch_size(read_batch)
        )
        if not docs:
            break
        start = 0
        n = len(docs)
        while start < n:
            end = min(start + write_chunk, n)
            chunk = docs[start:end]
            db_dst[coll_dst].insert_many(
                chunk, ordered=False, bypass_document_validation=True
            )
            appended += len(chunk)
            start = end
        last_id = docs[-1]["_id"]
    return appended

def clone_same_db_with_out(db, src_coll: str, dst_coll: str) -> bool:
    try:
        db[dst_coll].drop()
        db[src_coll].aggregate(
            [{"$match": {}}, {"$out": dst_coll}],
            allowDiskUse=True,
        )
        return True
    except Exception as e:
        print(f"[aviso] clone_same_db_with_out falhou ({src_coll} -> {dst_coll}): {e}")
        return False

def resolve_args(default_config="config.json", default_dataset="vk"):
    pos = [a for a in sys.argv[1:] if not a.startswith("-")]
    config_path = pos[0] if len(pos) >= 1 else default_config
    dataset = pos[1] if len(pos) >= 2 else default_dataset
    return config_path, dataset

def main(config_path: str = "config.json", dataset: str = "vk"):
    cfg = load_dataset_cfg(config_path, dataset)
    MONGO_URI = cfg["MONGO_URI"]
    SOURCE_DB = cfg["SOURCE_DB"]
    PART_PREFIX = cfg["PART_PREFIX"]
    DEST_DB = cfg["DEST_DB"]
    BATCH_PREFIX = cfg["BATCH_PREFIX"]
    Dn1, r, m = cfg["Dn1"], cfg["r"], cfg["m"]
    READ_LIMIT = cfg["READ_LIMIT"]
    READ_BATCH_SIZE = cfg["READ_BATCH_SIZE"]
    WRITE_CHUNK_SIZE = cfg["WRITE_CHUNK_SIZE"]

    client = MongoClient(MONGO_URI, serverSelectionTimeoutMS=5000)
    client.admin.command("ping")
    src_db = client[SOURCE_DB]
    dst_db = client[DEST_DB]

    sizes = termos_pg(Dn1, r, m)
    part_names = [f"{PART_PREFIX}{s}" for s in sizes]            # vk_450, vk_900, ...
    batch_names = [f"{BATCH_PREFIX}{i}" for i in range(1, m+1)]  # vk_batch_1, ...

    existing_src = set(src_db.list_collection_names())
    missing = [c for c in part_names if c not in existing_src]
    if missing:
        raise RuntimeError(
            f"As coleções disjuntas não existem em '{SOURCE_DB}': {missing}. "
            "Crie-as primeiro com o script de particionamento."
        )

    # P1 = Dn1
    p1_src = part_names[0]
    p1_dst = batch_names[0]
    c1 = copy_collection_by_pagination(src_db, p1_src, dst_db, p1_dst,
                                       READ_LIMIT, READ_BATCH_SIZE, WRITE_CHUNK_SIZE)
    print(f"P1: {SOURCE_DB}.{p1_src} -> {DEST_DB}.{p1_dst}  (copiados {c1} docs)")

    # P_i = P_{i-1} ∪ Dn_i
    for i in range(2, m + 1):
        prev_dst = batch_names[i-2]
        out_dst = batch_names[i-1]
        part_src = part_names[i-1]

        cloned = clone_same_db_with_out(dst_db, prev_dst, out_dst)
        if not cloned:
            _ = copy_collection_by_pagination(dst_db, prev_dst, dst_db, out_dst,
                                              READ_LIMIT, READ_BATCH_SIZE, WRITE_CHUNK_SIZE)

        appended = append_collection_by_pagination(src_db, part_src, dst_db, out_dst,
                                                   READ_LIMIT, READ_BATCH_SIZE, WRITE_CHUNK_SIZE)
        total_i = dst_db[out_dst].estimated_document_count()
        print(f"P{i}: ({DEST_DB}.{prev_dst} ∪ {SOURCE_DB}.{part_src}) -> {DEST_DB}.{out_dst}  "
              f"(anexados {appended}, total {total_i})")

#     for name in batch_names:
#         cnt = dst_db[name].estimated_document_count()
#         print(f"{DEST_DB}.{name}: {cnt} documentos")

if __name__ == "__main__":
    cfg_path, ds = resolve_args()
    # Em Jupyter, você também pode chamar: main("config.json", "vk")
    cfg_path="config.json"
    ds="vk"
    main(cfg_path, ds)


P1: vk.vk_450 -> vk_batch.vk_batch_1  (copiados 450 docs)
P2: (vk_batch.vk_batch_1 ∪ vk.vk_900) -> vk_batch.vk_batch_2  (anexados 900, total 1350)
P3: (vk_batch.vk_batch_2 ∪ vk.vk_1800) -> vk_batch.vk_batch_3  (anexados 1800, total 3150)
P4: (vk_batch.vk_batch_3 ∪ vk.vk_3600) -> vk_batch.vk_batch_4  (anexados 3600, total 6750)
P5: (vk_batch.vk_batch_4 ∪ vk.vk_7200) -> vk_batch.vk_batch_5  (anexados 7200, total 13950)
P6: (vk_batch.vk_batch_5 ∪ vk.vk_14400) -> vk_batch.vk_batch_6  (anexados 14400, total 28350)
P7: (vk_batch.vk_batch_6 ∪ vk.vk_28800) -> vk_batch.vk_batch_7  (anexados 28800, total 57150)
P8: (vk_batch.vk_batch_7 ∪ vk.vk_57600) -> vk_batch.vk_batch_8  (anexados 57600, total 114750)
P9: (vk_batch.vk_batch_8 ∪ vk.vk_115200) -> vk_batch.vk_batch_9  (anexados 115200, total 229950)
P10: (vk_batch.vk_batch_9 ∪ vk.vk_230400) -> vk_batch.vk_batch_10  (anexados 230400, total 460350)
P11: (vk_batch.vk_batch_10 ∪ vk.vk_460800) -> vk_batch.vk_batch_11  (anexados 460800, total 921150)
