In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("POC-MySQL-Loader-Stable")
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j.jar,/home/jovyan/jars/postgresql.jar")
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.driver.memory", "4g")     # if Docker memory is high, change to 6g
    .config("spark.executor.memory", "2g")   # local mode uses driver mostly; harmless
    .getOrCreate()
)

spark

In [2]:
from pyspark.sql import functions as F

MYSQL_URL = "jdbc:mysql://mysql:3306/pocdb"
MYSQL_USER = "pocuser"
MYSQL_PASSWORD = "pocpass"

PLAZA_IDS = ["000001", "000002", "000003", "000004"]
plaza_arr = F.array(*[F.lit(x) for x in PLAZA_IDS])
plaza_count = len(PLAZA_IDS)

start_id, end_id = 1001, 51_000   # start after your existing 1000 rows

df = spark.range(start_id, end_id + 1).withColumnRenamed("id", "Transaction_ID")

df = (df
  .withColumn("TIPID", (F.col("Transaction_ID") % 10_000_000).cast("bigint"))
  .withColumn("UFMID", (F.col("Transaction_ID") % 5_000_000).cast("bigint"))
  .withColumn("Plaza_ID", F.element_at(plaza_arr, (F.pmod(F.col("Transaction_ID"), plaza_count) + 1).cast("int")))
  .withColumn("Lane_ID", (F.col("Transaction_ID") % 20).cast("int"))
  .withColumn("Full_Fare", F.lit(2.50).cast("decimal(6,2)"))
  .withColumn("ETC_Fare", F.lit(2.00).cast("decimal(6,2)"))
  .withColumn("Indicated_Fare", F.lit(2.25).cast("decimal(6,2)"))
  .withColumn(
      "Transaction_DateTime",
      F.to_timestamp(F.from_unixtime(
          F.unix_timestamp(F.current_timestamp()) - (F.col("Transaction_ID") % 86400).cast("int")
      ))
  )
  .withColumn("Revenue_Type", F.lit("ETC"))
  .withColumn("Exception_Codes", F.lit(None).cast("string"))
  .withColumn("Zero_Fare_Override_Flag", F.lit(0))
  .withColumn("AVC_Class", F.lit(1))
  .withColumn("Vehicle_Speed", F.lit(60))
  .withColumn("Vehicle_Length", F.lit(15))
  .withColumn("Lane_Position", F.lit("N"))
  .withColumn("Exempt_Flag", F.lit(0))
  .withColumn("Transaction_Type", F.lit("COMPLETED_COLLECTABLE"))
  .withColumn("Uncollectable_Reason", F.lit(None).cast("string"))
  .withColumn("Account_ID", (F.col("Transaction_ID") % 2_000_000).cast("int"))
  .withColumn("UFM_Send_DateTime", F.current_timestamp())
  .withColumn("UFM_Pay_Method", F.lit("ETC"))
  .withColumn("UFM_Discount_Fare", F.lit(1.75).cast("decimal(6,2)"))
  .withColumn("UFM_Full_Fare", F.lit(2.50).cast("decimal(6,2)"))
  .withColumn("UFM_Indicated_Fare", F.lit(2.25).cast("decimal(6,2)"))
  .withColumn("Region_Code", F.lit(1))
  .withColumn("Tag_Agency_ID", F.lit(10))
  .withColumn("Tag_ID", (F.col("Transaction_ID") % 9_000_000).cast("int"))
  .withColumn("Tag_Class", F.lit(2))
  .withColumn("Tag_Status", F.lit("ACTIVE"))
  .withColumn("Plate_Jurisdiction", F.lit("FL"))
  .withColumn("Plate_Number", F.concat(F.lit("P"), F.lpad((F.col("Transaction_ID") % 1_000_000).cast("string"), 6, "0")))
  .withColumn("Plate_Type", F.lit("REG"))
  .withColumn("Plate_Type_Prefix", F.lit(None).cast("string"))
  .withColumn("Plate_Type_Suffix", F.lit(None).cast("string"))
  .withColumn("Plate_Country", F.lit("US"))
  .withColumn("Horizontal_Plate_Position", F.lit(1))
  .withColumn("Vertical_Plate_Position", F.lit(1))
  .withColumn("Horizontal_Plate_Size", F.lit(100))
  .withColumn("Vertical_Plate_Size", F.lit(50))
  .withColumn("Plate_Number_Confidence", F.lit(95))
  .withColumn("Jurisdiction_Confidence", F.lit(98))
  .withColumn("Image_Name", F.lit(None).cast("string"))
  .withColumn("Image_Location", F.lit(None).cast("string"))
  .withColumn("Number_of_Front_Images", F.lit(0))
  .withColumn("Number_of_Rear_Images", F.lit(0))
  .withColumn("Image_Disposition", F.lit(0))
  .withColumn("Image_Reason_Code", F.lit(0))
  .withColumn("InsertedOn", F.current_timestamp())
  .withColumn("InsertedBy", F.lit("POC"))
  .withColumn("UpdatedOn", F.current_timestamp())
  .withColumn("UpdatedBy", F.lit("POC"))
  .withColumn("CycleName", F.lit("POC"))
  .withColumn("BatchId", F.lit(1).cast("decimal(38,0)"))
  .withColumn("Soft_Delete", F.lit(0))
).repartition(4)  # KEY: keep this small on laptop

(df.write.format("jdbc")
 .option("url", MYSQL_URL)
 .option("dbtable", "`Transaction`")
 .option("user", MYSQL_USER)
 .option("password", MYSQL_PASSWORD)
 .option("driver", "com.mysql.cj.jdbc.Driver")
 .option("batchsize", "500")                 # KEY: small batch prevents heap blowup
 .option("rewriteBatchedStatements", "false")# KEY: prevents huge multi-values SQL
 .mode("append")
 .save())

In [4]:
!pip -q install jaydebeapi JPype1

In [7]:
import jaydebeapi

MYSQL_USER = "pocuser"
MYSQL_PASSWORD = "pocpass"

MYSQL_JDBC_URL = "jdbc:mysql://mysql:3306/pocdb"
MYSQL_JAR = "/home/jovyan/jars/mysql-connector-j.jar"


def get_last_id():
    conn = jaydebeapi.connect(
        "com.mysql.cj.jdbc.Driver",
        MYSQL_JDBC_URL,
        [MYSQL_USER, MYSQL_PASSWORD],
        MYSQL_JAR
    )

    conn.jconn.setAutoCommit(False)   # disable autocommit

    cur = conn.cursor()
    cur.execute("SELECT last_id FROM load_checkpoint WHERE job_name='txn_load';")

    row = cur.fetchone()

    cur.close()
    conn.close()

    return int(row[0])


def update_last_id(new_last_id: int):
    conn = jaydebeapi.connect(
        "com.mysql.cj.jdbc.Driver",
        MYSQL_JDBC_URL,
        [MYSQL_USER, MYSQL_PASSWORD],
        MYSQL_JAR
    )

    conn.jconn.setAutoCommit(False)   # disable autocommit

    cur = conn.cursor()

    cur.execute(
        "UPDATE load_checkpoint SET last_id=? WHERE job_name='txn_load';",
        (new_last_id,)
    )

    conn.commit()   # now works

    cur.close()
    conn.close()

In [8]:
print("Before:", get_last_id())
update_last_id(51000)
print("After :", get_last_id())

Before: 51000
After : 51000


In [9]:
from pyspark.sql import SparkSession, functions as F
import jaydebeapi

# -----------------------------
# SPARK SESSION (stable)
# -----------------------------
spark = (
    SparkSession.builder
    .appName("POC-Transaction-93M-Loader")
    .config("spark.jars", "/home/jovyan/jars/mysql-connector-j.jar,/home/jovyan/jars/postgresql.jar")
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.driver.memory", "4g")   # if Docker memory is high, set 6g
    .getOrCreate()
)

# -----------------------------
# MYSQL CONFIG
# -----------------------------
MYSQL_USER = "pocuser"
MYSQL_PASSWORD = "pocpass"
MYSQL_JDBC_URL = "jdbc:mysql://mysql:3306/pocdb"
MYSQL_JAR = "/home/jovyan/jars/mysql-connector-j.jar"

MYSQL_SPARK_URL = "jdbc:mysql://mysql:3306/pocdb"

TABLE_NAME = "`Transaction`"
CHECKPOINT_JOB = "txn_load"

TOTAL_ROWS = 93_000_000
BATCH_ROWS = 500_000        # safe. If stable, increase to 1_000_000
WRITE_PARTITIONS = 4        # keep small to avoid JDBC heap OOM
JDBC_BATCHSIZE = "500"      # key: prevents heap OOM in mysql driver

# -----------------------------
# PLAZA IDS (replace with your actual list)
# -----------------------------
PLAZA_IDS = ["000001", "000002", "000003", "000004"]
plaza_arr = F.array(*[F.lit(x) for x in PLAZA_IDS])
plaza_count = len(PLAZA_IDS)

# -----------------------------
# CHECKPOINT helpers (UPSERT)
# -----------------------------
def get_last_id():
    conn = jaydebeapi.connect(
        "com.mysql.cj.jdbc.Driver",
        MYSQL_JDBC_URL,
        [MYSQL_USER, MYSQL_PASSWORD],
        MYSQL_JAR
    )
    cur = conn.cursor()
    cur.execute("SELECT last_id FROM load_checkpoint WHERE job_name=?;", (CHECKPOINT_JOB,))
    row = cur.fetchone()
    cur.close()
    conn.close()
    return int(row[0]) if row else 0

def upsert_last_id(new_last_id: int):
    conn = jaydebeapi.connect(
        "com.mysql.cj.jdbc.Driver",
        MYSQL_JDBC_URL,
        [MYSQL_USER, MYSQL_PASSWORD],
        MYSQL_JAR
    )
    cur = conn.cursor()
    cur.execute("""
        INSERT INTO load_checkpoint (job_name, last_id)
        VALUES (?, ?)
        ON DUPLICATE KEY UPDATE last_id = VALUES(last_id);
    """, (CHECKPOINT_JOB, int(new_last_id)))
    cur.close()
    conn.close()

# -----------------------------
# BATCH DF generator (deterministic)
# -----------------------------
def make_batch(start_id: int, end_id: int):
    base = spark.range(start_id, end_id + 1).withColumnRenamed("id", "Transaction_ID")

    df = (base
      .withColumn("TIPID", (F.col("Transaction_ID") % 10_000_000).cast("bigint"))
      .withColumn("UFMID", (F.col("Transaction_ID") % 5_000_000).cast("bigint"))

      # rotate plaza ids
      .withColumn("Plaza_ID", F.element_at(plaza_arr, (F.pmod(F.col("Transaction_ID"), plaza_count) + 1).cast("int")))

      .withColumn("Lane_ID", (F.col("Transaction_ID") % 20).cast("int"))

      .withColumn("Full_Fare", F.lit(2.50).cast("decimal(6,2)"))
      .withColumn("ETC_Fare", F.lit(2.00).cast("decimal(6,2)"))
      .withColumn("Indicated_Fare", F.lit(2.25).cast("decimal(6,2)"))

      # portable timestamp arithmetic (no INTERVAL)
      .withColumn(
          "Transaction_DateTime",
          F.to_timestamp(F.from_unixtime(
              F.unix_timestamp(F.current_timestamp()) - (F.col("Transaction_ID") % 86400).cast("int")
          ))
      )

      .withColumn("Revenue_Type", F.lit("ETC"))
      .withColumn("Exception_Codes", F.lit(None).cast("string"))

      .withColumn("Zero_Fare_Override_Flag", F.lit(0))
      .withColumn("AVC_Class", F.lit(1))

      .withColumn("Vehicle_Speed", (F.col("Transaction_ID") % 90).cast("int"))
      .withColumn("Vehicle_Length", (F.col("Transaction_ID") % 30).cast("int"))

      .withColumn("Lane_Position", F.lit("N"))
      .withColumn("Exempt_Flag", F.lit(0))

      .withColumn(
          "Transaction_Type",
          F.when((F.col("Transaction_ID") % 3) == 0, "COMPLETED_COLLECTABLE")
           .when((F.col("Transaction_ID") % 3) == 1, "COMPLETED_UNCOLLECTABLE")
           .otherwise("INCOMPLETE")
      )
      .withColumn("Uncollectable_Reason", F.lit(None).cast("string"))

      .withColumn("Account_ID", (F.col("Transaction_ID") % 2_000_000).cast("int"))
      .withColumn("UFM_Send_DateTime", F.current_timestamp())
      .withColumn("UFM_Pay_Method", F.when((F.col("Transaction_ID") % 2) == 0, "ETC").otherwise("VIOLATOR"))

      .withColumn("UFM_Discount_Fare", F.lit(1.75).cast("decimal(6,2)"))
      .withColumn("UFM_Full_Fare", F.lit(2.50).cast("decimal(6,2)"))
      .withColumn("UFM_Indicated_Fare", F.lit(2.25).cast("decimal(6,2)"))

      .withColumn("Region_Code", F.lit(1))
      .withColumn("Tag_Agency_ID", F.lit(10))
      .withColumn("Tag_ID", (F.col("Transaction_ID") % 9_000_000).cast("int"))
      .withColumn("Tag_Class", (F.col("Transaction_ID") % 10).cast("int"))
      .withColumn("Tag_Status", F.lit("ACTIVE"))

      .withColumn("Plate_Jurisdiction", F.lit("FL"))
      .withColumn("Plate_Number", F.concat(F.lit("P"), F.lpad((F.col("Transaction_ID") % 1_000_000).cast("string"), 6, "0")))
      .withColumn("Plate_Type", F.lit("REG"))
      .withColumn("Plate_Type_Prefix", F.lit(None).cast("string"))
      .withColumn("Plate_Type_Suffix", F.lit(None).cast("string"))
      .withColumn("Plate_Country", F.lit("US"))

      .withColumn("Horizontal_Plate_Position", F.lit(1))
      .withColumn("Vertical_Plate_Position", F.lit(1))
      .withColumn("Horizontal_Plate_Size", F.lit(100))
      .withColumn("Vertical_Plate_Size", F.lit(50))

      .withColumn("Plate_Number_Confidence", F.lit(95))
      .withColumn("Jurisdiction_Confidence", F.lit(98))

      .withColumn("Image_Name", F.lit(None).cast("string"))
      .withColumn("Image_Location", F.lit(None).cast("string"))
      .withColumn("Number_of_Front_Images", F.lit(0))
      .withColumn("Number_of_Rear_Images", F.lit(0))
      .withColumn("Image_Disposition", F.lit(0))
      .withColumn("Image_Reason_Code", F.lit(0))

      .withColumn("InsertedOn", F.current_timestamp())
      .withColumn("InsertedBy", F.lit("POC"))
      .withColumn("UpdatedOn", F.current_timestamp())
      .withColumn("UpdatedBy", F.lit("POC"))
      .withColumn("CycleName", F.lit("POC"))
      .withColumn("BatchId", F.lit(1).cast("decimal(38,0)"))
      .withColumn("Soft_Delete", F.lit(0))
    ).repartition(WRITE_PARTITIONS)

    return df

# -----------------------------
# RUN LOOP (resume-safe)
# -----------------------------
last_id = get_last_id()
start = last_id + 1

print(f"Checkpoint last_id={last_id:,}. Next start={start:,}. Target={TOTAL_ROWS:,}")

while start <= TOTAL_ROWS:
    end = min(start + BATCH_ROWS - 1, TOTAL_ROWS)
    print(f"Loading {start:,} → {end:,}")

    batch_df = make_batch(start, end)

    (batch_df.write.format("jdbc")
     .option("url", MYSQL_SPARK_URL)
     .option("dbtable", TABLE_NAME)
     .option("user", MYSQL_USER)
     .option("password", MYSQL_PASSWORD)
     .option("driver", "com.mysql.cj.jdbc.Driver")
     .option("batchsize", JDBC_BATCHSIZE)
     .option("rewriteBatchedStatements", "false")
     .mode("append")
     .save())

    upsert_last_id(end)
    print(f"✅ Updated checkpoint to {end:,}")

    start = end + 1

print("✅ Finished loading to 93,000,000 rows.")

Checkpoint last_id=551,000. Next start=551,001. Target=93,000,000
Loading 551,001 → 1,051,000
✅ Updated checkpoint to 1,051,000
Loading 1,051,001 → 1,551,000
✅ Updated checkpoint to 1,551,000
Loading 1,551,001 → 2,051,000
✅ Updated checkpoint to 2,051,000
Loading 2,051,001 → 2,551,000
✅ Updated checkpoint to 2,551,000
Loading 2,551,001 → 3,051,000
✅ Updated checkpoint to 3,051,000
Loading 3,051,001 → 3,551,000
✅ Updated checkpoint to 3,551,000
Loading 3,551,001 → 4,051,000
✅ Updated checkpoint to 4,051,000
Loading 4,051,001 → 4,551,000
✅ Updated checkpoint to 4,551,000
Loading 4,551,001 → 5,051,000
✅ Updated checkpoint to 5,051,000
Loading 5,051,001 → 5,551,000
✅ Updated checkpoint to 5,551,000
Loading 5,551,001 → 6,051,000
✅ Updated checkpoint to 6,051,000
Loading 6,051,001 → 6,551,000
✅ Updated checkpoint to 6,551,000
Loading 6,551,001 → 7,051,000
✅ Updated checkpoint to 7,051,000
Loading 7,051,001 → 7,551,000
✅ Updated checkpoint to 7,551,000
Loading 7,551,001 → 8,051,000
✅ Updated 