In [20]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from py2neo import Graph
from datetime import timedelta
import sys, os
import multiprocessing
cpu_cores = multiprocessing.cpu_count()
print(f"CPU cores disponibles: {cpu_cores}")

CPU cores disponibles: 8


# Lectura de dataframes desde PostGreSQL

In [2]:
# === Credenciales ===
PG_URL  = 'jdbc:postgresql://localhost:5432/graphs'
PG_USER = 'spark_ingest'
PG_PASS = 'GYleZAI2pTBKJYl9W1PL'
PG_SCHEMA = 'saml_d'
PG_TABLE1 = 'accounts'
PG_TABLE2 = 'transferences'
PG_TABLE3 = 'statements'

JDBC_JAR = r"C:\spark\spark-4.0.1-bin-hadoop3\jars\postgresql-42.7.4.jar"
JDBC_BATCHSIZE = 10000
JDBC_FETCHSIZE = 10000

NEO4J_JAR  = r"C:\spark\spark-4.0.1-bin-hadoop3\jars\neo4j-connector-apache-spark_2.13-5.3.11-SNAPSHOT_for_spark_3.jar"
NEO4J_URI  = "bolt://localhost:7687"
NEO4J_USER = "neo4j"
NEO4J_PASS = "Banco.69"
NEO4J_DDBB = "saml-d"

PYTHON = sys.executable  # python del kernel Jupyter

spark = (
    SparkSession.builder
    .appName("postgres-to-neo4j-graph")
    .master("local[*]")
    # === JARs locales ===
    .config("spark.jars", f"{JDBC_JAR},{NEO4J_JAR}")
    .config("spark.driver.extraClassPath", f"{JDBC_JAR};{NEO4J_JAR}")
    .config("spark.executor.extraClassPath", f"{JDBC_JAR};{NEO4J_JAR}")
    # === Mismo Python en driver/worker + fixes Windows ===
    .config("spark.pyspark.driver.python", PYTHON)
    .config("spark.pyspark.python", PYTHON)
    .config("spark.executorEnv.PYSPARK_PYTHON", PYTHON)
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.python.use.daemon", "false")
    .config("spark.local.dir", r"C:\spark\tmp")
    .config("spark.sql.shuffle.partitions", "128")
    .config("spark.driver.memory", "8g")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")  # Opcional: mejora performance
    .getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")

In [3]:
jdbc_props = {
    "user": PG_USER,
    "password": PG_PASS,
    "driver": "org.postgresql.Driver",
    "fetchsize": str(JDBC_FETCHSIZE)
}

# Accounts
accounts_df = (spark.read.format("jdbc")
    .option("url", PG_URL)
    .option("dbtable", f"{PG_SCHEMA}.{PG_TABLE1}")
    .option("partitionColumn", "account")
    .option("lowerBound", 1)                  
    .option("upperBound", 2000000)
    .option("numPartitions", 16)               
    .options(**jdbc_props)
    .load())
#Para particionado eficiente JDBC
acc_bounds = accounts_df.select(
    F.min("account").cast("long").alias("lo"),
    F.max("account").cast("long").alias("hi")
).first()
acc_lo, acc_hi = int(acc_bounds["lo"]), int(acc_bounds["hi"])

# Transferences
tx_df = (spark.read.format("jdbc")
    .option("url", PG_URL)
    .option("dbtable", f"{PG_SCHEMA}.{PG_TABLE2}")
    .option("partitionColumn", "id")
    .option("lowerBound", 1)
    .option("upperBound", 9500000)
    .option("numPartitions", 64)
    .options(**jdbc_props)
    .load())

# Statements
stm_df = (spark.read.format("jdbc")
    .option("url", PG_URL)
    .option("dbtable", f"{PG_SCHEMA}.{PG_TABLE3}")
    .option("partitionColumn", "account")       # particionamos por cuenta
    .option("lowerBound", acc_lo)
    .option("upperBound", acc_hi)
    .option("numPartitions", 64)                 # ajústalo a tu máquina/cluster
    .options(**jdbc_props)
    .load()
    .select(
        F.col("account").cast("long").alias("account"),
        F.col("date_time").alias("date_time"),
        F.col("txn_id").cast("long").alias("txn_id"),
        F.col("direction").alias("direction"),
        F.col("delta_amount").cast("double").alias("delta_amount"),
        F.col("running_balance").cast("double").alias("running_balance")
    )
)

In [4]:
accounts_df.printSchema()
tx_df.printSchema()
stm_df.printSchema()

root
 |-- account: long (nullable = true)
 |-- location: string (nullable = true)

root
 |-- id: long (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- sender_account: long (nullable = true)
 |-- receiver_account: long (nullable = true)
 |-- amount: double (nullable = true)
 |-- payment_currency: string (nullable = true)
 |-- received_currency: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- is_laundering: integer (nullable = true)
 |-- laundering_type: string (nullable = true)

root
 |-- account: long (nullable = true)
 |-- date_time: timestamp (nullable = true)
 |-- txn_id: long (nullable = true)
 |-- direction: string (nullable = true)
 |-- delta_amount: double (nullable = true)
 |-- running_balance: double (nullable = true)



# Preparación de Dataframes para Neo4j

In [5]:
# Nodos
nodes_df = accounts_df.select(
    F.col("account").cast("long").alias("account_number"),
    F.col("location").alias("location")
).dropDuplicates(["account_number"])

# Aristas
edges_df = tx_df.select(
    F.col("id").cast("long").alias("id"),
    F.col("date_time").alias("timestamp"),
    F.col("sender_account").cast("long").alias("src"),
    F.col("receiver_account").cast("long").alias("dst"),
    F.col("amount").cast("double").alias("amount"),
    F.col("payment_currency"),
    F.col("received_currency"),
    F.col("payment_type"),
    F.col("is_laundering").cast("int"),
    F.col("laundering_type")
)

# masked ~ Bernoulli(0.2)
edges_df = edges_df.withColumn("masked", (F.rand(seed=42) < F.lit(0.2)).cast("int"))

# Opcional: particiona por destino para paralelismo estable
edges_df = edges_df.repartition(256, "src")
nodes_df = nodes_df.repartition(64, "account_number")

In [6]:
#Marcadores temporales
mov = (stm_df
    .withColumn("is_credit", (F.col("delta_amount") > 0).cast("int"))
    .withColumn("is_debit",  (F.col("delta_amount") < 0).cast("int"))
    .withColumn("abs_amount", F.abs("delta_amount"))
    .withColumn("ts_long", F.col("date_time").cast("long"))  # para RANGE por segundos
    .repartition(128, "account")                              # ajusta a tu HW/cluster
    .persist()
)
_ = mov.count()  # materializa


In [7]:
def add_window_feats(df, sec):
    # Ventana temporal deslizante por cuenta: últimos `sec` segundos, inclusiva
    w = (Window
         .partitionBy("account")
         .orderBy(F.col("ts_long"))
         .rangeBetween(-sec, 0))  # incluye evento actual

    return (df
        .withColumn(f"cnt_{sec}",       F.count("*").over(w))
        .withColumn(f"cred_cnt_{sec}",  F.sum("is_credit").over(w))
        .withColumn(f"debt_cnt_{sec}",  F.sum("is_debit").over(w))
        .withColumn(f"cred_sum_{sec}",  F.sum(F.when(F.col("is_credit")==1, F.col("abs_amount")).otherwise(0.0)).over(w))
        .withColumn(f"debt_sum_{sec}",  F.sum(F.when(F.col("is_debit")==1,  F.col("abs_amount")).otherwise(0.0)).over(w))
        .withColumn(f"net_sum_{sec}",   F.sum("delta_amount").over(w))
    )

DAY = 86400
windows_s = [7*DAY, 15*DAY, 30*DAY]

mov_w = mov
for s in windows_s:
    mov_w = add_window_feats(mov_w, s)
mov_w = mov_w.persist()
_ = mov_w.count()


In [8]:
#Tomamos la última fila por cuenta y nos llevamos las columnas de ventanas (que ya están calculadas “hasta el último evento”).
w_last = Window.partitionBy("account").orderBy(F.col("date_time").desc(), F.col("txn_id").desc())
last_rows = (mov_w
    .withColumn("rn", F.row_number().over(w_last))
    .filter(F.col("rn")==1)
)

# Selecciona y renombra columnas por ventana (más legible con sufijos w7 / w15 / w30)
def pick_node_cols(df, sec, tag):
    return (df.select(
        "account",
        F.col(f"cnt_{sec}").alias(f"{tag}_tx_count"),
        F.col(f"cred_cnt_{sec}").alias(f"{tag}_credit_count"),
        F.col(f"debt_cnt_{sec}").alias(f"{tag}_debit_count"),
        F.col(f"cred_sum_{sec}").alias(f"{tag}_credit_sum"),
        F.col(f"debt_sum_{sec}").alias(f"{tag}_debit_sum"),
        F.col(f"net_sum_{sec}").alias(f"{tag}_net_flow")
    ))

node_w7   = pick_node_cols(last_rows, 7*DAY,  "w7")
node_w15  = pick_node_cols(last_rows, 15*DAY, "w15")
node_w30  = pick_node_cols(last_rows, 30*DAY, "w30")

nodes_win = (node_w7
    .join(node_w15, "account", "left")
    .join(node_w30, "account", "left")
    .join(
        last_rows.select("account",
                         F.col("running_balance").alias("current_balance"),
                         F.col("date_time").alias("last_seen")),
        "account", "left")
)

# Enriquecer nodes_df original
nodes_enriched_df = (nodes_df.alias("n")
    .join(nodes_win.alias("f"), F.col("n.account_number")==F.col("f.account"), "left")
    .drop("account")
    .repartition(64, "account_number")
)

Features por ARISTA (contexto previo 7/15/30 para src y dst)

Para cada transacción, queremos el histórico de la cuenta justo antes de esa transacción.

Como la ventana que calculamos incluye el evento actual, hacemos un pequeño ajuste por dirección:

Para el emisor (DEBIT): prev_cnt = cnt - 1, prev_debt_cnt = debt_cnt - 1, prev_debt_sum = debt_sum - amount, prev_net = net_sum - ( -amount ) → net_sum - delta_amount.

Para el receptor (CREDIT): prev_cnt = cnt - 1, prev_cred_cnt = cred_cnt - 1, prev_cred_sum = cred_sum - amount, prev_net = net_sum - (+amount) → net_sum - delta_amount.

In [9]:
# Vista en el instante de cada transacción PARA EL MOVIMIENTO correspondiente
# (i.e., una fila por txn_id para CREDIT y otra para DEBIT)
# Sender (DEBIT)
send_mov = (mov_w
    .filter(F.col("is_debit")==1)
    .select(
        F.col("txn_id").alias("id"),
        F.col("account").alias("src"),
        F.col("abs_amount").alias("amt"),
        F.col("delta_amount").alias("delta"),
        *[F.col(f"cnt_{s}").alias(f"s_cnt_{s}") for s in windows_s],
        *[F.col(f"cred_cnt_{s}").alias(f"s_cred_cnt_{s}") for s in windows_s],
        *[F.col(f"debt_cnt_{s}").alias(f"s_debt_cnt_{s}") for s in windows_s],
        *[F.col(f"cred_sum_{s}").alias(f"s_cred_sum_{s}") for s in windows_s],
        *[F.col(f"debt_sum_{s}").alias(f"s_debt_sum_{s}") for s in windows_s],
        *[F.col(f"net_sum_{s}").alias(f"s_net_sum_{s}") for s in windows_s],
    )
)

# Receiver (CREDIT)
recv_mov = (mov_w
    .filter(F.col("is_credit")==1)
    .select(
        F.col("txn_id").alias("id"),
        F.col("account").alias("dst"),
        F.col("abs_amount").alias("amt"),
        F.col("delta_amount").alias("delta"),
        *[F.col(f"cnt_{s}").alias(f"r_cnt_{s}") for s in windows_s],
        *[F.col(f"cred_cnt_{s}").alias(f"r_cred_cnt_{s}") for s in windows_s],
        *[F.col(f"debt_cnt_{s}").alias(f"r_debt_cnt_{s}") for s in windows_s],
        *[F.col(f"cred_sum_{s}").alias(f"r_cred_sum_{s}") for s in windows_s],
        *[F.col(f"debt_sum_{s}").alias(f"r_debt_sum_{s}") for s in windows_s],
        *[F.col(f"net_sum_{s}").alias(f"r_net_sum_{s}") for s in windows_s],
    )
)

# Ajuste "previo" (excluir el propio evento actual)
def adjust_sender_prev(df):
    out = df
    for s in windows_s:
        out = (out
            .withColumn(f"s_cnt_prev_{s}",      F.col(f"s_cnt_{s}") - 1)
            .withColumn(f"s_debt_cnt_prev_{s}", F.col(f"s_debt_cnt_{s}") - 1)
            .withColumn(f"s_cred_cnt_prev_{s}", F.col(f"s_cred_cnt_{s}"))  # no cambia; el evento es débito
            .withColumn(f"s_debt_sum_prev_{s}", F.col(f"s_debt_sum_{s}") - F.col("amt"))
            .withColumn(f"s_cred_sum_prev_{s}", F.col(f"s_cred_sum_{s}"))  # no cambia
            .withColumn(f"s_net_prev_{s}",      F.col(f"s_net_sum_{s}") - F.col("delta"))
        )
    return out

def adjust_receiver_prev(df):
    out = df
    for s in windows_s:
        out = (out
            .withColumn(f"r_cnt_prev_{s}",      F.col(f"r_cnt_{s}") - 1)
            .withColumn(f"r_cred_cnt_prev_{s}", F.col(f"r_cred_cnt_{s}") - 1)
            .withColumn(f"r_debt_cnt_prev_{s}", F.col(f"r_debt_cnt_{s}"))  # no cambia; el evento es crédito
            .withColumn(f"r_cred_sum_prev_{s}", F.col(f"r_cred_sum_{s}") - F.col("amt"))
            .withColumn(f"r_debt_sum_prev_{s}", F.col(f"r_debt_sum_{s}"))  # no cambia
            .withColumn(f"r_net_prev_{s}",      F.col(f"r_net_sum_{s}") - F.col("delta"))
        )
    return out

send_prev = adjust_sender_prev(send_mov).select(
    "id","src",
    *[f"s_cnt_prev_{s}" for s in windows_s],
    *[f"s_cred_cnt_prev_{s}" for s in windows_s],
    *[f"s_debt_cnt_prev_{s}" for s in windows_s],
    *[f"s_cred_sum_prev_{s}" for s in windows_s],
    *[f"s_debt_sum_prev_{s}" for s in windows_s],
    *[f"s_net_prev_{s}"      for s in windows_s],
)

recv_prev = adjust_receiver_prev(recv_mov).select(
    "id","dst",
    *[f"r_cnt_prev_{s}" for s in windows_s],
    *[f"r_cred_cnt_prev_{s}" for s in windows_s],
    *[f"r_debt_cnt_prev_{s}" for s in windows_s],
    *[f"r_cred_sum_prev_{s}" for s in windows_s],
    *[f"r_debt_sum_prev_{s}" for s in windows_s],
    *[f"r_net_prev_{s}"      for s in windows_s],
)

# Unimos a edges_df por id y checamos consistencia de src/dst
edges_enriched = (edges_df
    .join(send_prev, on=["id","src"], how="left")
    .join(recv_prev, on=["id","dst"], how="left")
    .repartition(256, "src")  # como ya lo hacías
)


# Preparación para escritura en Neo4j + funciones helpers

In [10]:
graph = Graph(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASS), name=NEO4J_DDBB)
graph.run("""
CREATE CONSTRAINT account_unique IF NOT EXISTS
FOR (a:Account) REQUIRE a.account_number IS UNIQUE
""")
graph.run("""
CREATE CONSTRAINT tx_unique IF NOT EXISTS
FOR ()-[r:TX]-() REQUIRE r.id IS UNIQUE
""")

In [11]:
NEO4J_BATCHSIZE = 1000
RETRIES = 3
SLEEP = 1.0

def _get_graph():
    # Reintentos por si la primera conexión falla
    last = None
    for _ in range(RETRIES):
        try:
            return Graph(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASS), name=NEO4J_DDBB)
        except Exception as e:
            last = e; time.sleep(SLEEP)
    raise last

def write_nodes_partition(rows_iter):
    graph = _get_graph()
    buf = []
    run = graph.run
    def flush():
        if not buf: return
        run("""
        UNWIND $rows AS row
        MERGE (a:Account {account_number: row.account_number})
        SET a.location = row.location
        """, rows=buf)
        buf.clear()
    for row in rows_iter:
        buf.append({"account_number": int(row["account_number"]), "location": row["location"]})
        if len(buf) >= NEO4J_BATCHSIZE: flush()
    flush()

def write_edges_partition(rows_iter):
    graph = _get_graph()
    buf = []
    run = graph.run
    cypher = """
    UNWIND $rows AS row
    MERGE (s:Account {account: row.src})
    MERGE (t:Account {account: row.dst})
    MERGE (s)-[r:TX {id: row.id}]->(t)
    SET  r.timestamp = row.timestamp,
         r.amount = row.amount,
         r.payment_currency = row.payment_currency,
         r.received_currency = row.received_currency,
         r.payment_type = row.payment_type,
         r.is_laundering = row.is_laundering,
         r.laundering_type = row.laundering_type,
         r.masked = row.masked
    """
    def flush():
        if not buf: return
        run(cypher, rows=buf); buf.clear()
    for row in rows_iter:
        buf.append({
            "id": int(row["id"]),
            "src": int(row["src"]),
            "dst": int(row["dst"]),
            "timestamp": row["timestamp"],
            "amount": float(row["amount"]) if row["amount"] is not None else None,
            "payment_currency": row["payment_currency"],
            "received_currency": row["received_currency"],
            "payment_type": row["payment_type"],
            "is_laundering": int(row["is_laundering"]) if row["is_laundering"] is not None else None,
            "laundering_type": row["laundering_type"],
            "masked": int(row["masked"])
        })
        if len(buf) >= NEO4J_BATCHSIZE: flush()
    flush()

In [13]:
# --- StayAwake: evita suspensión en Windows ---
import ctypes, platform, time

class StayAwake:
    """Bloquea suspensión/apagado de pantalla mientras el contexto está activo."""
    ES_CONTINUOUS = 0x80000000
    ES_SYSTEM_REQUIRED = 0x00000001
    ES_AWAYMODE_REQUIRED = 0x00000040  # opcional: evita que entre en sleep por "Away Mode"

    def __enter__(self):
        if platform.system() == "Windows":
            ctypes.windll.kernel32.SetThreadExecutionState(
                self.ES_CONTINUOUS | self.ES_SYSTEM_REQUIRED | self.ES_AWAYMODE_REQUIRED
            )
        return self

    def __exit__(self, exc_type, exc, tb):
        if platform.system() == "Windows":
            # Restablece al estado normal
            ctypes.windll.kernel32.SetThreadExecutionState(self.ES_CONTINUOUS)


In [14]:
import math, json, os, time
from datetime import timedelta

CHK_DIR = r"E:\Felpipe\Trabajo\Ciencias de datos en general\KaggleChallenges\Anti Money Laundering Transaction Data\checkpoints"   # directorio de checkpoints
os.makedirs(CHK_DIR, exist_ok=True)

def write_done(path):
    os.makedirs(os.path.dirname(path), exist_ok=True)
    open(path, "w").close()

def is_done(path):
    return os.path.exists(path)

def count_df(df):
    # cuenta y devuelve int
    return df.count()

def estimate_eta(done, total, start_ts):
    now = time.time()
    elapsed = now - start_ts
    rate = done / elapsed if done > 0 else 0.0
    remaining = total - done
    eta_s = (remaining / rate) if rate > 0 else float("inf")
    pct = (done / total * 100.0) if total else 0.0
    return pct, timedelta(seconds=int(eta_s)), timedelta(seconds=int(elapsed))


In [15]:
def ingest_nodes(
    nodes_df,
    buckets=8,                 # 8–16 va bien para ~0.85M nodos
    writers_per_bucket=2,      # 1–2 writers reales
    batch_size=20000,          # 15k–25k
    chk_prefix="nodes_hashbuck"
):
    # 1) Bucket uniforme por hash -> lotes grandes y balanceados
    nodes_buck = (nodes_df
        .withColumn("bucket", (F.abs(F.hash("account_number")) % F.lit(buckets)))
        .repartition(buckets, "bucket")           # ~1 partición por bucket
        .persist())
    _ = nodes_buck.count()                         # materializa una sola vez

    # 2) Tamaños por bucket (para progreso) SIN contarlos en cada vuelta
    sizes = (nodes_buck.groupBy("bucket").count().collect())
    bucket_sizes = {int(r["bucket"]): int(r["count"]) for r in sizes}
    total = sum(bucket_sizes.values())
    print(f"[NODOS] total={total}  buckets={buckets}")

    done, start = 0, time.time()

    # 3) Escribe bucket por bucket con pocos writers (evita lock contention)
    for b in range(buckets):
        size_b = bucket_sizes.get(b, 0)
        tag = os.path.join(CHK_DIR, f"{chk_prefix}_{b}._DONE")

        if size_b == 0:
            write_done(tag);  # bucket vacío
            continue
        if is_done(tag):
            done += size_b
            pct, eta, elapsed = estimate_eta(done, total, start)
            print(f"[NODOS] Skip bucket {b} ({size_b} filas). "
                  f"done={done}/{total} ({pct:0.2f}%) ETA={eta} elapsed={elapsed}")
            continue

        batch_df = nodes_buck.filter(F.col("bucket")==b).drop("bucket")

        t0 = time.time()
        (batch_df
            .coalesce(writers_per_bucket)         # 1–2 writers reales a Neo4j
            .write
            .format("org.neo4j.spark.DataSource")
            .mode("Append")
            .option("url", "bolt://localhost:7687")
            .option("authentication.type","basic")
            .option("authentication.basic.username", NEO4J_USER)
            .option("authentication.basic.password", NEO4J_PASS)
            .option("database", NEO4J_DDBB)
            .option("labels", ":Account")
            .option("node.keys", "account_number")
            # .option("node.save.strategy","keys")   # si tu jar soporta esta opción
            .option("batch.size", str(batch_size))
            .option("transaction.retries", "20")
            .option("transaction.retry.timeout", "60000")
            .save())
        t1 = time.time()

        done += size_b
        write_done(tag)
        pct, eta, elapsed = estimate_eta(done, total, start)
        print(f"[NODOS] bucket {b} -> {size_b} filas en {timedelta(seconds=int(t1-t0))}. "
              f"done={done}/{total} ({pct:0.2f}%) ETA={eta} elapsed={elapsed}")


In [16]:
def ingest_edges(
    edges_df,
    buckets=16,                  # nº de buckets por hash(src) -> 8–32 suele ir bien
    writers_per_bucket=1,        # 1 (seguro) o 2 si no ves deadlocks
    batch_size=20000,            # 15k–25k
    chk_prefix="rels_srcbuck"    # prefijo de archivos _DONE
):
    # 1) Prepara buckets balanceados por hash de src
    edges_buck = (edges_df
        .withColumn("bucket", (F.abs(F.hash("src")) % F.lit(buckets)))
        .repartition(buckets, "bucket")              # 1 partición aprox por bucket
        .sortWithinPartitions("src", "id")           # orden consistente ayuda a locks
        .persist())

    # 2) Cuenta filas por bucket UNA sola vez (evita count() repetidos)
    counts_by_bucket = (edges_buck
        .groupBy("bucket").count()
        .collect())
    bucket_sizes = {int(r["bucket"]): int(r["count"]) for r in counts_by_bucket}
    total = sum(bucket_sizes.values())
    print(f"[RELS] total={total}  buckets={buckets}")

    done = 0
    start = time.time()

    # 3) Escribe bucket por bucket con pocos writers (evita colisiones)
    for b in range(buckets):
        size_b = bucket_sizes.get(b, 0)
        if size_b == 0:
            tag = os.path.join(CHK_DIR, f"{chk_prefix}_{b}._DONE")
            write_done(tag)
            continue

        tag = os.path.join(CHK_DIR, f"{chk_prefix}_{b}._DONE")
        if is_done(tag):
            done += size_b
            pct, eta, elapsed = estimate_eta(done, total, start)
            print(f"[RELS] Skip bucket {b} ({size_b} filas). done={done}/{total} ({pct:0.2f}%) ETA={eta} elapsed={elapsed}")
            continue

        batch_df = edges_buck.filter(F.col("bucket")==b).drop("bucket")

        t0 = time.time()
        (batch_df
            .coalesce(writers_per_bucket)            # 1–2 writers REALES → no deadlock
            .write
            .format("org.neo4j.spark.DataSource")
            .mode("Append")
            .option("url", "bolt://localhost:7687")
            .option("authentication.type", "basic")
            .option("authentication.basic.username", NEO4J_USER)
            .option("authentication.basic.password", NEO4J_PASS)
            .option("database", NEO4J_DDBB)
            .option("relationship", "TX")
            .option("relationship.save.strategy", "keys")
            .option("relationship.keys", "id")
            .option("relationship.source.labels", ":Account")
            .option("relationship.target.labels", ":Account")
            .option("relationship.source.node.keys", "src:account_number")
            .option("relationship.target.node.keys", "dst:account_number")
            .option("relationship.source.save.mode", "Match")
            .option("relationship.target.save.mode", "Match")
            .option("relationship.properties",
                    "timestamp,amount,payment_currency,received_currency,"
                    "payment_type,is_laundering,laundering_type,masked")
            .option("batch.size", str(batch_size))
            .option("transaction.retries", "20")      # subimos reintentos
            .option("transaction.retry.timeout", "60000")  # y timeout (ms)
            .save())
        t1 = time.time()

        done += size_b
        write_done(tag)
        pct, eta, elapsed = estimate_eta(done, total, start)
        print(f"[RELS] bucket {b} -> {size_b} filas en {timedelta(seconds=int(t1-t0))}. "
              f"done={done}/{total} ({pct:0.2f}%) ETA={eta} elapsed={elapsed})")


# Ingesta

In [17]:
inicio = time.time()
with StayAwake():
    # Nodos (micro-lotes)
    ingest_nodes(nodes_enriched_df, 
                      buckets=8, 
                      writers_per_bucket=2, 
                      batch_size=20000)

    # Relaciones (micro-lotes)
    ingest_edges(edges_enriched, 
                 buckets=16, 
                 writers_per_bucket=4, 
                 batch_size=20000, 
                 chk_prefix="rels_srcbuck" )

fin = time.time()
print(f"Tiempo total: {timedelta(seconds=int(fin - inicio))}")


[NODOS] total=855460  buckets=8
[NODOS] bucket 0 -> 106320 filas en 0:00:16. done=106320/855460 (12.43%) ETA=0:02:01 elapsed=0:00:17
[NODOS] bucket 1 -> 107115 filas en 0:00:18. done=213435/855460 (24.95%) ETA=0:01:48 elapsed=0:00:35
[NODOS] bucket 2 -> 106586 filas en 0:00:10. done=320021/855460 (37.41%) ETA=0:01:18 elapsed=0:00:46
[NODOS] bucket 3 -> 107158 filas en 0:00:08. done=427179/855460 (49.94%) ETA=0:00:55 elapsed=0:00:54
[NODOS] bucket 4 -> 106972 filas en 0:00:09. done=534151/855460 (62.44%) ETA=0:00:39 elapsed=0:01:04
[NODOS] bucket 5 -> 107299 filas en 0:00:09. done=641450/855460 (74.98%) ETA=0:00:24 elapsed=0:01:14
[NODOS] bucket 6 -> 106439 filas en 0:00:20. done=747889/855460 (87.43%) ETA=0:00:13 elapsed=0:01:35
[NODOS] bucket 7 -> 107571 filas en 0:00:13. done=855460/855460 (100.00%) ETA=0:00:00 elapsed=0:01:48
[RELS] total=9504852  buckets=16
[RELS] bucket 0 -> 606777 filas en 0:01:24. done=606777/9504852 (6.38%) ETA=0:20:45 elapsed=0:01:24)
[RELS] bucket 1 -> 589278

In [18]:
sc = spark.sparkContext
scala_version = sc._jvm.scala.util.Properties.versionNumberString()
print(f"Scala version: {scala_version}")

Scala version: 2.13.16


In [19]:
# graph = Graph(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASS), name=NEO4J_DDBB)
# graph.run("""
# CREATE CONSTRAINT account_unique IF NOT EXISTS
# FOR (a:Account) REQUIRE a.account_number IS UNIQUE
# """)
# graph.run("""
# CREATE CONSTRAINT tx_unique IF NOT EXISTS
# FOR ()-[r:TX]-() REQUIRE r.id IS UNIQUE
# """)