# Migrating database into MinIO bucket

This notebook loads the root database into MinIO `rootdb` bucket

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month

In [2]:
builder = (
    SparkSession.builder.appName("MinIO-Delta")
    # Memory configurations for large datasets
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.maxResultSize", "2g")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
    # Jars
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.4.1,com.amazonaws:aws-java-sdk-bundle:1.12.262," \
    "io.delta:delta-spark_2.13:4.0.0," \
    "com.mysql:mysql-connector-j:8.0.33")
    # Delta Lake
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    # MinIO (S3A)
    .config("spark.hadoop.fs.s3a.endpoint", "http://localhost:9900")
    .config("spark.hadoop.fs.s3a.access.key", "minioadmin")
    .config("spark.hadoop.fs.s3a.secret.key", "minioadmin123")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    # S3A performance configs
    .config("spark.hadoop.fs.s3a.connection.timeout", "60000")
    .config("spark.hadoop.fs.s3a.connection.request.timeout", "60000")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.attempts.maximum", "3")
    .config("spark.hadoop.fs.s3a.retry.limit", "3")
)

spark = builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/27 17:00:18 WARN Utils: Your hostname, bnguyen-lenovo, resolves to a loopback address: 127.0.1.1; using 192.168.1.9 instead (on interface wlp2s0)
25/09/27 17:00:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/bnguyen/Desktop/finance_analytics/venv/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/bnguyen/.ivy2.5.2/cache
The jars for the packages stored in: /home/bnguyen/.ivy2.5.2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
io.delta#delta-spark_2.13 added as a dependency
com.mysql#mysql-connector-j added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-db97c309-7289-4790-a883-45a9be8712ef;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.4.1 

In [3]:
# MySQL connection settings
mysql_url = "jdbc:mysql://localhost:30306/finance"
mysql_properties = {
        "user": "root",
        "password": "root123",
        "driver": "com.mysql.cj.jdbc.Driver"
    }
    
tables_config = {
        'users': {
            'path': 's3a://rootdb/users/',
            'partitions': None
        },
        'mcc_codes': {
            'path': 's3a://rootdb/mcc_codes/',
            'partitions': None
        },
        'cards': {
            'path': 's3a://rootdb/cards/',
            'partitions': ['card_brand']  # Partition by brand for better queries
        },
        'transactions': {
            'path': 's3a://rootdb/transactions/',
            'partitions': ['year', 'month']  # Partition by date for performance
        },
        'fraud_labels': {
            'path': 's3a://rootdb/fraud_labels/',
            'partitions': None
        }
    }
    
migration_summary = {}

In [4]:
# Migrate users table
try:
    df = spark.read.jdbc(url=mysql_url, table='users', properties=mysql_properties)
    row_count = df.count()
    
    if row_count > 0:
        df.write.format("delta").mode("overwrite").save('s3a://rootdb/users/')
        print(f"users: {row_count} rows migrated successfully")
    else:
        print(f"users: Empty table migrated successfully")
        
except Exception as e:
    print(f"users: Migration failed - {str(e)}")

25/09/27 17:00:29 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
25/09/27 17:00:31 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/09/27 17:00:36 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to users/part-00000-1c09f5c9-1200-4fe0-a37a-57f4a9c59b8e-c000.snappy.parquet. This is Unsupported
                                                                                

users: 2000 rows migrated successfully


In [5]:
# Migrate mcc_codes table
try:
    df = spark.read.jdbc(url=mysql_url, table='mcc_codes', properties=mysql_properties)
    row_count = df.count()
    
    if row_count > 0:
        df.write.format("delta").mode("overwrite").save('s3a://rootdb/mcc_codes/')
        print(f"mcc_codes: {row_count} rows migrated successfully")
    else:
        print(f"mcc_codes: Empty table migrated successfully")

except Exception as e:
    print(f"mcc_codes: Migration failed - {str(e)}")

mcc_codes: 109 rows migrated successfully


In [6]:
# Migrate cards table (partitioned by card_brand)
try:
    df = spark.read.jdbc(url=mysql_url, table='cards', properties=mysql_properties)
    row_count = df.count()
    
    if row_count > 0:
        df.write.format("delta").mode("overwrite").partitionBy('card_brand').save('s3a://rootdb/cards/')
        print(f"✓ cards: {row_count} rows migrated successfully")
    else:
        print(f"cards: Empty table migrated successfully")
        
except Exception as e:
    print(f"cards: Migration failed - {str(e)}")

✓ cards: 6146 rows migrated successfully


In [7]:
# Migrate transactions table
try:
    # Use JDBC partitioning for large table
    df = spark.read.jdbc(
        url=mysql_url, 
        table='transactions', 
        properties=mysql_properties,
        column='transaction_id',
        lowerBound=1,
        upperBound=14000000,
        numPartitions=40
    )
    
    # Add year and month columns for partitioning
    df = df.withColumn('year', year('trans_date')).withColumn('month', month('trans_date'))
    df = df.coalesce(5)  
    
    row_count = df.count()
    
    if row_count > 0:
        df.write.format("delta").mode("overwrite").partitionBy('year', 'month').save('s3a://rootdb/transactions/')
        print(f"transactions: {row_count} rows migrated successfully")
    else:
        print(f"transactions: Empty table migrated successfully")

except Exception as e:
    print(f"transactions: Migration failed - {str(e)}")

25/09/27 17:01:31 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:34 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:36 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:39 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:41 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:44 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:47 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:49 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:51 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:54 WARN TaskMemoryManager: Failed to allocate a page (134217728 bytes), try again.
25/09/27 17:01:56 WA

transactions: 13305915 rows migrated successfully


In [8]:
# Migrate fraud_labels table (large table with JDBC partitioning)
try:
    # Use JDBC partitioning for large table
    df = spark.read.jdbc(
        url=mysql_url, 
        table='fraud_labels', 
        properties=mysql_properties,
        column='transaction_id',
        lowerBound=1,
        upperBound=14000000,
        numPartitions=40
    )
    
    df = df.coalesce(5)  
    row_count = df.count()
    
    if row_count > 0:
        df.write.format("delta").mode("overwrite").save('s3a://rootdb/fraud_labels/')
        print(f"fraud_labels: {row_count} rows migrated successfully")
    else:
        print(f"fraud_labels: Empty table migrated successfully")
        
except Exception as e:
    print(f"fraud_labels: Migration failed - {str(e)}")

                                                                                

fraud_labels: 8914963 rows migrated successfully
