In [None]:
import os
import uuid

import pandas as pd
import sqlalchemy as sa
from delta import configure_spark_with_delta_pip
from dotenv import load_dotenv
from pyspark.sql import SparkSession, types
from tqdm.notebook import tqdm

In [None]:
load_dotenv()

## Configure spark

In [None]:
builder = (
    SparkSession.builder.appName("DeltaTableMigration")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.executor.cores", "2")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:10001")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.hs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    
)
spark = configure_spark_with_delta_pip(builder).getOrCreate()

## Create Database Connection

In [None]:
db_user = os.environ["DB_USER"]
db_password = os.environ["DB_PASSWORD"]
db_host = os.environ["DB_HOST"]
db_name = os.environ["DB_NAME"]

In [None]:
engine = sa.create_engine(f"postgresql://{db_user}:{db_password}@{db_host}/{db_name}")

## Read a single table into a dataframe then then write to delta table

In [None]:
schema = "public"
table = "state_data"
first_chunk = True
for chunk in pd.read_sql(f"select * from {schema}.{table}", engine, chunksize=50000):
    if not chunk.empty:
        spark_df = spark.createDataFrame(chunk.astype(str))
        mode = "overwrite" if first_chunk else "append"
        spark_df.write.format("delta").mode(mode).save(f"s3a://data/data/delta/{schema}/{table}")
        first_chunk = False