# Linkage manual review dataset — Elasticsearch candidates with controlled rank sampling

Este notebook cria, a partir de um DataFrame Spark com atributos de linkage, uma coluna:

- `es_candidates`: lista (array) com até **N** candidatos retornados do Elasticsearch (via `_msearch`)
- `target_pos`: posição-alvo sorteada por linha segundo uma **distribuição configurada em YAML**
- `es_candidate`: candidato escolhido de `es_candidates[target_pos-1]` com fallback configurável

Pré-requisitos:
- Cluster Spark disponível
- Acesso ao Elasticsearch (rede + credenciais se necessário)
- Pacotes Python: `pyyaml`, `requests`

> Observação: a abordagem usa `_msearch` para reduzir o overhead (lote por partição).


# Importing libs

## Spark

In [18]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pyspark.sql.types as T
from pyspark.storagelevel import StorageLevel
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

## Elasticsearch

In [27]:
import requests
import elasticsearch

## General

In [20]:
import json
import os
import yaml
from typing import Iterator, Dict, Any

# Starting spark

In [5]:
spark = SparkSession.builder \
    .appName("TrainDataSet") \
    .master("spark://barravento:7077") \
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:8.1.3") \
    .config("spark.es.nodes", "barravento,jardimdealah,stellamaris") \
    .config("spark.es.port", "9200") \
    .config("spark.es.nodes.wan.only", "false") \
    .config("spark.es.resource", "db2") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", 16) \
    .config("spark.sql.files.maxPartitionBytes", "256m") \
    .getOrCreate()

sc = spark.sparkContext
# just to ensure that 
sc.setCheckpointDir("hdfs://barravento:9000/spark-checkpoints"){

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.8/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.elasticsearch#elasticsearch-spark-30_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-8ebd8e8f-104b-46ba-ae1d-0c4e3fdcffe3;1.0
	confs: [default]
	found org.elasticsearch#elasticsearch-spark-30_2.12;8.1.3 in central
	found org.scala-lang#scala-reflect;2.12.8 in central
	found org.slf4j#slf4j-api;1.7.6 in central
	found commons-logging#commons-logging;1.1.1 in central
	found javax.xml.bind#jaxb-api;2.3.1 in central
	found com.google.protobuf#protobuf-java;2.5.0 in central
	found org.apache.spark#spark-yarn_2.12;3.2.0 in central
downloading https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch-spark-30_2.12/8.1.3/elasticsearch-spark-30_2.12-8.1.3.jar ...
	[SUCCESSFUL ] org.elasticsearch#el

# 0) Lembrar de indexar

## Criando 'casca' do indice elasticsearch

In [6]:
ES="http://barravento:9200"
DST="basemaior"

In [8]:
settings = {
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 2,
    "refresh_interval": "30s"
  }}

In [9]:
r = requests.put(f"{ES}/{DST}", json=settings)
r.raise_for_status()
print(r.json())

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'basemaior'}


## Lendo arquivo e indexando usando o spark

In [10]:
df_a = spark.read.parquet("hdfs://barravento:9000/data/synthetic-dataset-A.parquet")
df_a.limit(2).toPandas()

                                                                                

Unnamed: 0,id_cidacs_a,nome_a,nome_mae_a,dt_nasc_a,sexo_a
0,1,YASMIM VITORIA MATIAS FONSECA,TACIANY DOS SANTOS,20071122,2
1,2,PEDRO HENRIQUE MARTINS DE CARVALHO,FRANCILEIDE DOS SANTOS ALVES,20061102,1


In [21]:
(df_a.write
    .format("org.elasticsearch.spark.sql")
    .option("es.nodes", "barravento,jardimdealah,stellamaris")
    .option("es.port", "9200")
    .option("es.resource", DST)
    .option("es.batch.size.entries", "500")
    .option("es.batch.size.bytes", "1mb")
    .option("es.batch.write.retry.count", "10")
    .option("es.batch.write.retry.wait", "10s")
    .option("es.mapping.id", "id_cidacs_a")
    .mode("overwrite")
    .save())

                                                                                

## 1) Configuração YAML (exemplo)

In [22]:
# Ajuste o caminho do YAML conforme seu ambiente (Databricks / Jupyter / filesystem)
yaml_path = "01_sampling_cfg.yaml"  # exemplo Databricks
# yaml_path = "linkage_cfg.yaml"               # exemplo local
print("YAML path:", yaml_path)


YAML path: 01_sampling_cfg.yaml


## 2) Carregar YAML e validar distribuição

In [23]:
cfg = yaml.safe_load(open(yaml_path, "r"))

dist = cfg["sampling"]["position_distribution_pct"]
seed = int(cfg["sampling"].get("seed", 2026))

# valida soma = 100
s = sum(int(v) for v in dist.values())
assert s == 100, f"Distribuição precisa somar 100 (atual: {s})"

max_candidates = int(cfg["query"]["max_candidates"])
fallback_mode = cfg["sampling"]["fallback_when_short"]["mode"]

print("OK! max_candidates =", max_candidates)
print("fallback_mode =", fallback_mode)
print("seed =", seed)

OK! max_candidates = 10
fallback_mode = last_available
seed = 2026


## 3) DataFrame de entrada (exemplo)

In [24]:
df_b = spark.read.parquet("hdfs://barravento:9000/data/synthetic-datasets-b-1000.parquet")
df_b.limit(2).toPandas()

Unnamed: 0,id_cidacs_b,nome_b,nome_mae_b,dt_nasc_b,sexo_b
0,788,RUAN CESAR COSTA DE JESUS,JUSSARA CAROLINA R ALBUQUERQUE,20080531,1
1,1261,YASMIN MUNIZ MARCELINO,VERA LUCIA RIBEIRO,20080516,2


## 4) Sortear a posição-alvo `target_pos` conforme o YAML

In [25]:
items = sorted([(int(k), int(v)) for k, v in dist.items()], key=lambda x: x[0])

# acumulados em [0,1]
cum = []
acc = 0
for pos, pct in items:
    acc += pct
    cum.append((pos, acc / 100.0))

u = F.rand(seed)

case_expr = None
for pos, thr in cum:
    cond = (u <= F.lit(thr))
    case_expr = F.when(cond, F.lit(pos)) if case_expr is None else case_expr.when(cond, F.lit(pos))
case_expr = case_expr.otherwise(F.lit(items[-1][0]))

df2 = df_b.withColumn("target_pos", case_expr.cast("int"))
df2.select("target_pos").groupBy("target_pos").count().orderBy("target_pos").show()

+----------+-----+
|target_pos|count|
+----------+-----+
|         1|  561|
|         2|  326|
|         3|  102|
|         4|   11|
+----------+-----+



## 5) Buscar candidatos no Elasticsearch via `_msearch` (por partição)

In [33]:
es_hosts = ["http://barravento:9200/"]
es_index = DST

fields_cfg = cfg["query"]["fields"]
src_fields = cfg["query"].get("source_fields", [])

def build_es_query(row: Dict[str, Any]) -> Dict[str, Any]:
    should = []
    for f in fields_cfg:
        col = f["col"]
        val = row.get(col)

        if val is None or (isinstance(val, str) and val.strip() == ""):
            continue

        qtype = f["type"]
        es_field = f["es_field"]
        boost = float(f.get("boost", 1.0))

        if qtype == "match":
            clause = {
                "match": {
                    es_field: {
                        "query": val,
                        "boost": boost,
                        "operator": f.get("operator", "OR"),
                        "fuzziness": f.get("fuzziness", "AUTO"),
                    }
                }
            }
        elif qtype == "term":
            clause = {"term": {es_field: {"value": val, "boost": boost}}}
        else:
            raise ValueError(f"Tipo não suportado: {qtype}")

        should.append(clause)

    return {
        "size": max_candidates,
        "_source": src_fields,
        "query": {
            "bool": {
                "should": should,
                "minimum_should_match": 1
            }
        }
    }

cand_schema = T.ArrayType(T.StructType([
    T.StructField("es_id", T.StringType(), True),
    T.StructField("es_score", T.DoubleType(), True),
    T.StructField("es_source", T.StringType(), True),  # JSON compactado (opcional)
]))

out_schema = T.StructType(df2.schema.fields + [
    T.StructField("es_candidates", cand_schema, True),
])

def fetch_partition(rows: Iterator[Any]) -> Iterator[Any]:
    rows_list = list(rows)
    if not rows_list:
        return iter([])

    ndjson_lines = []
    for r in rows_list:
        d = r.asDict(recursive=True)
        header = {"index": es_index}
        query  = build_es_query(d)
        ndjson_lines.append(json.dumps(header))
        ndjson_lines.append(json.dumps(query))

    body = "\n".join(ndjson_lines) + "\n"

    base = es_hosts[0].rstrip("/")
    url = f"{base}/_msearch"

    resp = requests.post(
        url,
        data=body,
        headers={"Content-Type": "application/x-ndjson"},
        timeout=60
    )
    resp.raise_for_status()
    payload = resp.json()

    responses = payload.get("responses", [])
    if len(responses) != len(rows_list):
        raise RuntimeError(f"Resposta do ES desalinhada: {len(responses)} != {len(rows_list)}")

    out = []
    for r, pr in zip(rows_list, responses):
        hits = pr.get("hits", {}).get("hits", [])
        cands = []
        for h in hits:
            cands.append({
                "es_id": h.get("_id"),
                "es_score": float(h.get("_score") or 0.0),
                "es_source": json.dumps(h.get("_source", {}), ensure_ascii=False),
            })
        out.append(tuple(list(r) + [cands]))

    return iter(out)

df3 = df2.rdd.mapPartitions(fetch_partition).toDF(schema=out_schema)

df3.select("target_pos", F.size("es_candidates").alias("n_cands")).show(10, truncate=False)


[Stage 16:>                                                         (0 + 1) / 1]

+----------+-------+
|target_pos|n_cands|
+----------+-------+
|1         |10     |
|2         |10     |
|1         |10     |
|2         |10     |
|1         |10     |
|1         |10     |
|2         |10     |
|3         |10     |
|2         |10     |
|1         |10     |
+----------+-------+
only showing top 10 rows



                                                                                

## 6) Criar `es_candidate` escolhendo a posição sorteada (com fallback)

In [34]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

cand_struct = T.StructType([
    T.StructField("es_id", T.StringType(), True),
    T.StructField("es_score", T.DoubleType(), True),
    T.StructField("es_source", T.StringType(), True),
])

def pick_candidate(cands, target_pos: int):
    if cands is None or len(cands) == 0:
        return None
    idx = (target_pos or 1) - 1
    if idx < len(cands):
        return cands[idx]
    if fallback_mode == "last_available":
        return cands[-1]
    if fallback_mode == "first_available":
        return cands[0]
    return None

pick_udf = F.udf(pick_candidate, cand_struct)

df4 = df3.withColumn("es_candidate", pick_udf(F.col("es_candidates"), F.col("target_pos")))

df4.select(
    "target_pos",
    F.col("es_candidate.es_id").alias("es_id"),
    F.col("es_candidate.es_score").alias("es_score")
).show(20, truncate=False)


[Stage 17:>                                                         (0 + 1) / 1]

+----------+------+--------+
|target_pos|es_id |es_score|
+----------+------+--------+
|1         |946044|1.0     |
|2         |946048|1.0     |
|1         |946044|1.0     |
|2         |946048|1.0     |
|1         |946044|1.0     |
|1         |946044|1.0     |
|2         |946048|1.0     |
|3         |946049|1.0     |
|2         |946048|1.0     |
|1         |946044|1.0     |
|2         |946048|1.0     |
|2         |946048|1.0     |
|3         |946049|1.0     |
|1         |946044|1.0     |
|1         |946044|1.0     |
|1         |946044|1.0     |
|1         |946044|1.0     |
|1         |946044|1.0     |
|2         |946048|1.0     |
|1         |946044|1.0     |
+----------+------+--------+
only showing top 20 rows



                                                                                

## 7) Auditoria: distribuição observada e taxa de nulos

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

aud = (
    df4
    .withColumn("has_candidate", F.col("es_candidate").isNotNull())
    .groupBy("target_pos")
    .agg(
        F.count("*").alias("n"),
        F.sum(F.col("has_candidate").cast("int")).alias("n_with_candidate")
    )
    .withColumn("pct_rows", F.col("n") / F.sum("n").over(Window.partitionBy()) * 100)
    .withColumn("pct_with_candidate", F.col("n_with_candidate") / F.col("n") * 100)
    .orderBy("target_pos")
)

aud.show(50, truncate=False)

df4.select(F.mean(F.col("es_candidate").isNull().cast("int")).alias("null_rate")).show()

## 8) Exportar para revisão manual

In [None]:
# Sugestões:
# - manter colunas de linkage + es_candidate (+/- es_candidates)
# - salvar em parquet/delta para revisão interna
# - ou gerar CSV (cuidado com tamanho de es_candidates)

cols_out = [
    "nome", "nome_mae", "dt_nasc", "sexo", "municipio_res",
    "target_pos",
    "es_candidate",
    "es_candidates",
]
cols_out = [c for c in cols_out if c in df4.columns]

df_out = df4.select(*cols_out)
df_out.show(5, truncate=False)

# Exemplo de escrita:
# df_out.write.mode("overwrite").parquet("/mnt/data/linkage_manual_review.parquet")
