In [1]:
import pyspark 
from pyspark.sql  import SparkSession 
from pyspark.sql.functions import collect_list, col, lit 
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType, DateType, ArrayType
import os 
import json 
from pyspark.sql.functions import to_date, when
from urllib.parse import urlparse
import boto3


In [10]:
pip install prettytable

Collecting prettytable
  Downloading prettytable-3.12.0-py3-none-any.whl.metadata (30 kB)
Downloading prettytable-3.12.0-py3-none-any.whl (31 kB)
Installing collected packages: prettytable
Successfully installed prettytable-3.12.0
Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 23.3.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
with open("credentials.json") as f: 
    credentials = json.load(f)
    
aws_access_key = credentials["accessKey"] 
aws_secret_key = credentials["secretKey"] 
aws_s3_endpoint = "http://18.138.236.177:9000" 
aws_s3_region = "us-east-1" 
nessi_warehouse = "s3a://standardize"
nessi_uri = "http://18.138.236.177:19120/api/v1"
raw_csv_path = "s3a://raw/CRMUSER_ACCOUNTS.csv"

In [3]:
conf = (
    pyspark.SparkConf()
        .setAppName('raw_to_standard_etl')

        .set('spark.jars.packages', 
             'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,'
             'org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,'
             'software.amazon.awssdk:bundle:2.24.8,'
             'org.apache.iceberg:iceberg-nessie:1.4.0,'
             'org.apache.hadoop:hadoop-aws:3.3.4,'  
             'org.apache.hadoop:hadoop-common:3.3.4,'  
             'io.minio:minio:8.5.0,'
             'org.apache.spark:spark-hadoop-cloud_2.13:3.2.1,'
             'software.amazon.awssdk:url-connection-client:2.24.8,'
             'com.amazonaws:aws-java-sdk-bundle:1.12.207,'
             '')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', nessi_uri)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.s3.endpoint', aws_s3_endpoint)
        .set('spark.sql.catalog.nessie.warehouse', nessi_warehouse)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .set('spark.hadoop.fs.s3a.access.key', aws_access_key)
        .set('spark.hadoop.fs.s3a.secret.key', aws_secret_key)
        .set('spark.hadoop.fs.s3a.endpoint', aws_s3_endpoint)
        .set('spark.hadoop.fs.s3a.region', aws_s3_region)
        .set('spark.hadoop.fs.s3a.path.style.access', 'true')
        .set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
        .set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
        .set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
        .set('spark.driver.memory', '512m')
        .set('spark.executor.memory', '512m')
        .set('spark.executor.cores', '1')
)

In [4]:
# Create new SparkSession
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("spark session")

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
org.apache.iceberg#iceberg-nessie added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.hadoop#hadoop-common added as a dependency
io.minio#minio added as a dependency
org.apache.spark#spark-hadoop-cloud_2.13 added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-07ca06c4-a72c-4feb-b978-06cc82ad03bf;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.77.1 in central
	found

spark session


In [5]:
raw_df = spark.read.csv(raw_csv_path, header=True) 
raw_df.show(20)

24/11/21 04:11:00 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

+---------+-------+------------+---------+-------+--------------+------+---------+------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|          NAME|GENDER|CUST_TYPE|STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+--------------+------+---------+------+--------------+-----------------------+
|  1564505|      1|   113391407|113391407|Account|TRAN T *** HAI|     M|    10001| ACTVE|          1404|             23-02-2018|
|  1564506|      1|   113391457|113391457|Account|TRAN T *** YEN|     F|    10001| ACTVE|          1003|              3/12/2018|
|  1564507|      1|   113391483|113391483|Account|TRAN T *** BAC|     F|    10001| ACTVE|          2201|             15-06-2018|
|   859964|      1|   105260800|105260800|Account|NGUYEN *** UNG|     M|    10001| ACTVE|          1401|             16-11-2018|
|   958930|      1|   106573010|106573010|Account|TRAN N *** ANH|     M|    10001| ACTVE|        

In [7]:
data_df = spark.read.csv("CRMUSER_ACCOUNTS.csv", header = True)
data_df.show(20)

+---------+-------+------------+---------+-------+--------------+------+---------+------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|          NAME|GENDER|CUST_TYPE|STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+--------------+------+---------+------+--------------+-----------------------+
|  1564505|      1|   113391407|113391407|Account|TRAN T *** HAI|     M|    10001| ACTVE|          1404|             23-02-2018|
|  1564506|      1|   113391457|113391457|Account|TRAN T *** YEN|     F|    10001| ACTVE|          1003|              3/12/2018|
|  1564507|      1|   113391483|113391483|Account|TRAN T *** BAC|     F|    10001| ACTVE|          2201|             15-06-2018|
|   859964|      1|   105260800|105260800|Account|NGUYEN *** UNG|     M|    10001| ACTVE|          1401|             16-11-2018|
|   958930|      1|   106573010|106573010|Account|TRAN N *** ANH|     M|    10001| ACTVE|        

### Create table

In [6]:
spark.sql("CREATE NAMESPACE nessie.lamtran;").show()

++
||
++
++



In [7]:
spark.sql("""CREATE TABLE nessie.lamtran.customer(
            ACCOUNTID STRING,
            BANK_ID STRING,
            CORE_CUST_ID STRING,
            ORGKEY STRING,
            ORGTYPE STRING,
            NAME STRING,
            GENDER STRING,
            CUST_TYPE STRING,
            STATUS STRING,
            PRIMARY_SOL_ID STRING,
            RELATIONSHIPOPENINGDATE DATE)
        USING ICEBERG """)

DataFrame[]

In [65]:
# Convert date formats
schema = StructType([
    StructField("ACCOUNTID", StringType(), True),
    StructField("BANK_ID", StringType(), True),
    StructField("CORE_CUST_ID", StringType(), True),
    StructField("ORGKEY", StringType(), True),
    StructField("ORGTYPE", StringType(), True),
    StructField("NAME", StringType(), True),
    StructField("GENDER", StringType(), True),
    StructField("CUST_TYPE", StringType(), True),
    StructField("STATUS", StringType(), True),
    StructField("PRIMARY_SOL_ID", StringType(), True),
    StructField("RELATIONSHIPOPENINGDATE", DateType(), True)
])

# Read CSV with schema
df = spark.read.csv(raw_csv_path, schema=schema, header=True)

df = raw_df.withColumn(
    "RELATIONSHIPOPENINGDATE",
    when(raw_df["RELATIONSHIPOPENINGDATE"].rlike(r"^\d{2}-\d{2}-\d{4}$"), 
         to_date("RELATIONSHIPOPENINGDATE", "dd-MM-yyyy"))
    .when(raw_df["RELATIONSHIPOPENINGDATE"].rlike(r"^\d{1,2}/\d{1,2}/\d{4}$"), 
          to_date("RELATIONSHIPOPENINGDATE", "d/M/yyyy"))
)


In [11]:
# Set procesure case delta table
spark.sql("""ALTER TABLE nessie.lamtran.customer SET TBLPROPERTIES (
    'write.delete.mode'='merge-on-read',
    'write.update.mode'='merge-on-read',
    'write.merge.mode'='merge-on-read'
);
""")

DataFrame[]

In [56]:
df.write.format("iceberg").mode("append").save("nessie.lamtran.customer")

In [67]:
spark.sql("""SELECT COUNT(*) FROM  nessie.lamtran.customer""").show()
spark.sql("""SELECT * FROM  nessie.lamtran.customer""").show(10)

+--------+
|count(1)|
+--------+
|      28|
+--------+

+---------+-------+------------+---------+-------+--------------+------+---------+------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|          NAME|GENDER|CUST_TYPE|STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+--------------+------+---------+------+--------------+-----------------------+
|  1564505|      1|   113391407|113391407|Account|TRAN T *** HAI|     M|    10001| ACTVE|          1404|             2018-02-23|
|  1564506|      1|   113391457|113391457|Account|TRAN T *** YEN|     F|    10001| ACTVE|          1003|             2018-12-03|
|  1564507|      1|   113391483|113391483|Account|TRAN T *** BAC|     F|    10001| ACTVE|          2201|             2018-06-15|
|   859964|      1|   105260800|105260800|Account|NGUYEN *** UNG|     M|    10001| ACTVE|          1401|             2018-11-16|
|   958930|      1|   106573010|106573010

### Update/ Delete / insert table

In [57]:
# Thêm giá trị (1)
spark.sql("""
  INSERT INTO nessie.lamtran.customer (ACCOUNTID, BANK_ID, CORE_CUST_ID, ORGKEY, ORGTYPE, NAME, GENDER, CUST_TYPE, STATUS, PRIMARY_SOL_ID, RELATIONSHIPOPENINGDATE)
  VALUES (80, 2, 123456789, 123456789, 'Account', 'HAHAHIHI', 'M', 10001, 'ACTVE', 1500, to_date('2024-11-07','yyyy-mm-dd'))
""")


DataFrame[]

In [58]:
#(2)
spark.sql("""UPDATE nessie.lamtran.customer  SET STATUS = 'INACTIVE', NAME = 'TRAN THI ***** HANG'
  WHERE ACCOUNTID = 860010
""")

DataFrame[]

In [60]:
#(4)
spark.sql("""UPDATE nessie.lamtran.customer  SET STATUS = 'INACTIVE', NAME = 'TRAN THI ***** HA'
  WHERE ACCOUNTID = 2008140
""")

DataFrame[]

In [24]:
spark.sql("""SELECT * FROM  nessie.lamtran.customer where ACCOUNTID= 80 ;""").show()

+---------+-------+------------+---------+-------+--------+------+---------+------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|    NAME|GENDER|CUST_TYPE|STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+--------+------+---------+------+--------------+-----------------------+
|       80|      2|   123456789|123456789|Account|HAHAHIHI|     M|    10001| ACTVE|          1500|             2024-01-07|
+---------+-------+------------+---------+-------+--------+------+---------+------+--------------+-----------------------+



In [59]:
(3)
spark.sql("""DELETE FROM nessie.lamtran.customer  WHERE ACCOUNTID = 958933
""")

DataFrame[]

In [61]:
# Thêm giá trị (5)
spark.sql("""
  INSERT INTO nessie.lamtran.customer (ACCOUNTID, BANK_ID, CORE_CUST_ID, ORGKEY, ORGTYPE, NAME, GENDER, CUST_TYPE, STATUS, PRIMARY_SOL_ID, RELATIONSHIPOPENINGDATE)
  VALUES (100, 2, 123456789, 123456789, 'Account', 'HOANG THI HUONG', 'M', 10001, 'ACTVE', 1500, to_date('2024-11-14','yyyy-mm-dd'))
""")

DataFrame[]

In [58]:
spark.sql("""SELECT * FROM nessie.lamtran.customer where ACCOUNTID = 860010""").show()

+---------+-------+------------+---------+-------+-------------------+------+---------+--------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|               NAME|GENDER|CUST_TYPE|  STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+-------------------+------+---------+--------+--------------+-----------------------+
|   860010|      1|   105268371|105268371|Account|TRAN THI ***** HANG|     F|    10001|INACTIVE|          1015|             2017-08-25|
+---------+-------+------------+---------+-------+-------------------+------+---------+--------+--------------+-----------------------+



In [63]:
spark.sql("""DROP TABLE nessie.lamtran.customer""").show()

++
||
++
++



In [28]:
spark.sql("""SELECT COUNT(*) FROM  nessie.lamtran.customer""").show()

+--------+
|count(1)|
+--------+
|      29|
+--------+



### Move file data and update metadata

In [74]:
spark.sql("""SELECT * FROM nessie.lamtran.customer.snapshots""").show(truncate = False)

+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                                                        |summary                                                                                                                                                                               

In [75]:
spark.sql("""SELECT * FROM nessie.lamtran.customer.snapshots""").show(truncate = True)

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-11-20 04:27:...|3363876608955524645|               NULL|   append|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 04:28:...|1288987108754526709|3363876608955524645|   append|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 04:29:...|8245947571210554019|1288987108754526709|overwrite|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 04:30:...|7114774514001334108|8245947571210554019|overwrite|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 04:31:...|6913384981637447512|7114774514001334108|overwrite|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 04:32:...|2674580199128993375|6913384981637447512|   append|s3a://st

In [118]:
spark.sql("""SELECT * FROM nessie.lamtran.customer.all_manifests""").show(truncate = True)

+-------+--------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+---------------------+
|content|                path|length|partition_spec_id|  added_snapshot_id|added_data_files_count|existing_data_files_count|deleted_data_files_count|added_delete_files_count|existing_delete_files_count|deleted_delete_files_count|partition_summaries|reference_snapshot_id|
+-------+--------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+---------------------+
|      0|s3a://standardize...|  7432|                0|3363876608955524645|                     1|                        0|                       0|                       0|          

In [117]:
spark.sql("""SELECT * FROM nessie.lamtran.customer.all_entries""").show(truncate = False)

+------+-------------------+---------------+--------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [88]:
spark.sql("""SELECT * FROM nessie.lamtran.customer.all_files""").show(truncate = False)

+-------+-------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------+------------+------------------+-----------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------+----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Get snapshot information on timestamp 

In [119]:
import pandas as pd
def get_snapshots_by_date(
    spark: SparkSession,
    base_table: str,
    fromDate: str,  # Expected format: "YYYY-MM-DD HH:MM:SS"
    toDate: str  # Expected format: "YYYY-MM-DD HH:MM:SS"
) -> pd.DataFrame:


    # config date time 
    # Đặt chính sách xử lý thời gian để tương thích với định dạng cũ
    spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

    # Construct table paths for snapshots, manifests, and entries
    snapshot_table = f"{base_table}.snapshots"
    manifests_table = f"{base_table}.all_manifests"
    entries_table = f"{base_table}.all_entries"

    # Filter snapshots within the specified date range with timestamp details
    snapshots = spark.sql(f"""
        SELECT * FROM {snapshot_table}
        WHERE committed_at >= to_timestamp('{fromDate}', 'yyyy-MM-dd HH:mm:ss.SSS') 
          AND committed_at <= to_timestamp('{toDate}', 'yyyy-MM-dd HH:mm:ss.SSS')
    """).collect()

    # Prepare a list to store detailed snapshot information
    snapshot_data = []

    # For each snapshot within the day, gather related metadata
    for snap in snapshots:
        snapshot_id = snap.snapshot_id
        manifest_list_location = snap.manifest_list

        # Get manifest files for this snapshot
        manifests = spark.sql(f"""
            SELECT * FROM {manifests_table}
            WHERE added_snapshot_id = {snapshot_id}
        """).collect()

        # Get data files for each manifest
        manifest_file_path = []
        data_file_paths = []  # Separate list to store file paths

        for manifest in manifests:
            manifest_entry = {
                "manifest_path": manifest.path,  # manifest path for the 'manifest' column
            }
            manifest_file_path.append(manifest_entry)
            
        # Get related data file entries
        entries = spark.sql(f"""
            SELECT * FROM {entries_table}
            WHERE snapshot_id = {snapshot_id}
        """).collect()

        # Extract data file paths and store in the separate list
        for entry in entries:
            data_file_paths.append(entry.data_file[1])  # Append each data_file_path

        # Convert Row object to dictionary and handle datetime conversion
        snap_dict = snap.asDict()
        
        # Append the full data for this snapshot
        snapshot_data.append({
            "snapshot_id": snapshot_id,
            "committed_at": snap_dict.get("committed_at"),
            "operation": snap_dict.get("operation"),
            "manifest_list": manifest_list_location,
            "manifest_file_path": [manifest['manifest_path'] for manifest in manifest_file_path],  # This column will contain manifest_path
            "data_path": data_file_paths  # This column will contain the data_file_path
        })

    # Convert snapshot data into a DataFrame for table-like display
    df = pd.DataFrame(snapshot_data)
    
    # Return the updated DataFrame
    return df

In [85]:
snapshots = spark.sql(f"""
    SELECT * FROM nessie.lamtran.customer.snapshots
    WHERE committed_at > to_timestamp('2024-11-20 04:30:01.468', 'yyyy-MM-dd HH:mm:ss.SSS') 
      AND committed_at < to_timestamp('2024-11-20 04:32:58.397', 'yyyy-MM-dd HH:mm:ss.SSS')
""")
snapshots.show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-11-20 04:30:...|7114774514001334108|8245947571210554019|overwrite|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 04:31:...|6913384981637447512|7114774514001334108|overwrite|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 04:32:...|2674580199128993375|6913384981637447512|   append|s3a://standardize...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [120]:
df = get_snapshots_by_date(spark, "nessie.lamtran.customer", "2024-11-20 04:30:21.468","2024-11-20 04:32:58.397")

with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.max_colwidth', None):
    print(df)

# df.to_csv('out_test.csv', index=False) 

           snapshot_id            committed_at  operation  \
0  7114774514001334108 2024-11-20 04:30:31.468  overwrite   
1  6913384981637447512 2024-11-20 04:31:11.113  overwrite   
2  2674580199128993375 2024-11-20 04:32:07.397     append   

                                                                                                                                           manifest_list  \
0  s3a://standardize/lamtran/customer_3a443a51-fc4b-4de4-b145-2ae6c721bf3f/metadata/snap-7114774514001334108-1-511e7e3c-16b5-468b-9cb7-9ec142251634.avro   
1  s3a://standardize/lamtran/customer_3a443a51-fc4b-4de4-b145-2ae6c721bf3f/metadata/snap-6913384981637447512-1-86a4a8c5-7e63-4764-9bdc-c6ef628e81ec.avro   
2  s3a://standardize/lamtran/customer_3a443a51-fc4b-4de4-b145-2ae6c721bf3f/metadata/snap-2674580199128993375-1-cdeab601-452a-4711-8b55-62f1413b60c9.avro   

                                                                                                                                  

### Move file and test full snapshot

In [32]:
from pyspark.sql.functions import to_date, when
from urllib.parse import urlparse
import boto3
from botocore.exceptions import ClientError
from py4j.protocol import Py4JJavaError
from datetime import datetime, timedelta
import pytz
import time
import shutil
import re
from typing import List, Dict
import pandas as pd
import fastavro
import logging
from pathlib import Path
import io
from typing import Tuple
import pyarrow.parquet as pq

In [33]:
###################################################### GENERATE CLEANER PATH ########################################################################
# generate Clean path
def parse_s3_path(s3_path: str) -> tuple:
    """Parse an S3 path into bucket and key components."""
    parsed = urlparse(s3_path)
    if parsed.scheme not in ["s3", "s3a"]:
        raise ValueError(f"Expected s3:// or s3a:// path, got {parsed.scheme}://")
    bucket = parsed.netloc
    # Remove leading slash
    key = parsed.path.lstrip('/')
    return bucket, key

def clean_s3_path(path: str) -> str:
    """
    Cleans and normalizes S3 paths to ensure consistent format.
    Removes any '..' or '.' path components that cause MinIO errors.
    """
    # Convert s3a:// to s3:// for consistency
    path = path.replace('s3a://', 's3a://')
    
    # Split into components
    parsed = urlparse(path)
    if parsed.scheme != 's3a':
        raise ValueError(f"Expected s3:// path, got {parsed.scheme}://")
    
    # Clean the path components
    path_parts = [part for part in parsed.path.split('/') if part and part not in ('.', '..')]
    clean_path = '/'.join(path_parts)
    
    # Reconstruct the full path
    return f"s3a://{parsed.netloc}/{clean_path}"

def generate_target_path(source_path: str, source_base: str, target_base: str, is_data: bool = True) -> str:
    """
    Generates a new target path maintaining the relative structure and placing in correct subdirectory.
    
    Args:
        source_path: Original file path
        source_base: Base directory of source files
        target_base: Target base directory
        is_data: If True, file goes in /data, if False, goes in /metadata
    """
    # Clean and normalize all paths
    source_path = clean_s3_path(source_path)
    source_base = clean_s3_path(source_base)
    target_base = clean_s3_path(target_base)
    
    # Get the filename
    filename = os.path.basename(source_path)
    
    # Determine the appropriate subdirectory
    subdir = 'data' if is_data else 'metadata'
    
    # Construct the new path
    return os.path.join(target_base, subdir, filename)

###################################################### CODE TO HANDLE CHANGE IN METADATA AND MOVE TABLE ##############################################

# Update
def is_delete_file(file_path: str) -> bool:
    """Check if the file is a delete file."""
    return file_path.endswith('-deletes.parquet')

def update_manifest_metadata_s3(
    s3_client,
    manifest_file_path: str,
    old_paths: List[str],
    new_paths: List[str],
    snapshot_id: str
):
    """
    Update the file locations in manifest Avro files stored in S3 with improved delete handling
    
    Args:
        s3_client: Boto3 S3 client
        manifest_file_path: Path to the manifest file (s3:// format)
        old_paths: List of original data file paths
        new_paths: List of new data file paths
        snapshot_id: Snapshot ID to identify relevant entries
    """
    path_mapping = dict(zip(old_paths, new_paths))
    bucket, key = parse_s3_path(manifest_file_path)
    
    # Download the Avro file to memory
    response = s3_client.get_object(Bucket=bucket, Key=key)
    avro_bytes = response['Body'].read()
    
    # Read the Avro content
    avro_reader = fastavro.reader(io.BytesIO(avro_bytes))
    records = list(avro_reader)
    schema = avro_reader.writer_schema
    
    # Update file_location in records
    updated_records = []
    for record in records:
        # Check if this is a delete record
        is_delete = False
        if 'data_file' in record and 'content' in record['data_file']:
            is_delete = record['data_file']['content'] == "1"
        
        # Process records that belong to this snapshot
        if 'snapshot_id' in record and str(record['snapshot_id']) == str(snapshot_id):
            if 'data_file' in record:
                old_file_path = record['data_file'].get('file_path', '')
                if old_file_path in path_mapping:
                    record['data_file']['file_path'] = path_mapping[old_file_path]
                    # Preserve the delete status
                    if is_delete:
                        record['data_file']['content'] = "1"
        
        updated_records.append(record)
    
    # Write updated records to a new bytes buffer
    output_buffer = io.BytesIO()
    fastavro.writer(output_buffer, schema, updated_records)
    output_buffer.seek(0)
    
    # Upload the updated Avro file back to S3
    s3_client.put_object(
        Bucket=bucket,
        Key=key,
        Body=output_buffer.getvalue()
    )

def move_iceberg_data_and_update_metadata_s3(
    data_paths: List[str],
    manifest_file_paths: List[str],
    target_base_dir: str,
    aws_access_key_id: str = None,
    aws_secret_access_key: str = None,
    endpoint_url: str = None,
    snapshot_id: int = None
) -> bool:
    """
    Move Iceberg data files and update metadata files.
    Now properly handles both regular data files and delete files.
    """
    print("\n=== Starting new data movement operation ===")
    print(f"Snapshot ID: {snapshot_id}")
    print(f"Target base dir: {target_base_dir}")
    
    print("\nData paths to process:")
    for path in data_paths:
        print(f"- {path}")
        
    print("\nManifest files to update:")
    for path in manifest_file_paths:
        print(f"- {path}")
    
    # Initialize S3 client
    s3_client = boto3.client(
        's3',
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        endpoint_url=endpoint_url
    )
    
    # Separate data files and delete files for processing
    data_files = [path for path in data_paths if not is_delete_file(path)]
    delete_files = [path for path in data_paths if is_delete_file(path)]
    
    print(f"\nFound {len(data_files)} data files and {len(delete_files)} delete files")
    
    # Generate path mappings for both regular and delete files
    path_mappings = {}
    
    print("\nGenerating path mappings:")
    
    # Process regular data files
    for source_path in data_files:
        target_path = generate_target_path(
            source_path,
            source_base=os.path.commonprefix([clean_s3_path(p) for p in data_paths]),
            target_base=target_base_dir,
            is_data=True
        )
        path_mappings[source_path] = target_path
        print("Regular file mapping:")
        print(f"From: {source_path}")
        print(f"To: {target_path}")
        print("---")
    
    # Process delete files
    for source_path in delete_files:
        target_path = generate_target_path(
            source_path,
            source_base=os.path.commonprefix([clean_s3_path(p) for p in data_paths]),
            target_base=target_base_dir,
            is_data=True  # Keep delete files in the data directory
        )
        path_mappings[source_path] = target_path
        print("Delete file mapping:")
        print(f"From: {source_path}")
        print(f"To: {target_path}")
        print("---")
    
    # Move all files (both regular and delete files)
    print("\nMoving files:")
    for source_path, target_path in path_mappings.items():
        try:
            print("\nMoving file:")
            print(f"From: {source_path}")
            print(f"To: {target_path}")
            
            # Parse paths
            source_bucket, source_key = parse_s3_path(source_path)
            target_bucket, target_key = parse_s3_path(target_path)
            
            # Copy file to new location
            s3_client.copy_object(
                Bucket=target_bucket,
                Key=target_key,
                CopySource={
                    'Bucket': source_bucket,
                    'Key': source_key
                }
            )
            
            # Delete original file
            s3_client.delete_object(
                Bucket=source_bucket,
                Key=source_key
            )
            
            print("Move successful")
            
        except Exception as e:
            print(f"Error moving file {source_path}: {str(e)}")
            return False
    
    # Update manifest files
    print("\nUpdating manifest files:")
    
    for manifest_path in manifest_file_paths:
        try:
            print(f"\nProcessing manifest: {manifest_path}")
            update_manifest_metadata_s3(
                s3_client,
                manifest_path,
                list(path_mappings.keys()),
                list(path_mappings.values()),
                snapshot_id
            )
            
        except Exception as e:
            print(f"Error updating manifest {manifest_path}: {str(e)}")
            return False
    
    print("\nOperation completed successfully")
    return True

################################################### HELPER FUNCTION TO CHECK LOG #####################################################################

def verify_record_counts(
    s3_client,
    data_paths: List[str],
    operation: str
) -> Dict[str, int]:
    """
    Verify record counts in source files before moving.
    
    Args:
        s3_client: Boto3 S3 client
        data_paths: List of data file paths
        operation: Operation type (append, overwrite)
    
    Returns:
        Dict containing file paths and their record counts
    """
    counts = {}
    
    for path in data_paths:
        try:
            # Parse S3 path
            bucket, key = parse_s3_path(path)
            
            # Download parquet file to memory
            buffer = io.BytesIO()
            s3_client.download_fileobj(bucket, key, buffer)
            buffer.seek(0)
            
            # Read parquet metadata
            parquet_file = pq.ParquetFile(buffer)
            row_count = parquet_file.metadata.num_rows
            
            counts[path] = {
                'count': row_count,
                'is_delete': is_delete_file(path)
            }
            
            print(f"File: {path}")
            print(f"Record count: {row_count}")
            print(f"Is delete file: {is_delete_file(path)}")
            print("---")
            
        except Exception as e:
            print(f"Error reading {path}: {str(e)}")
            counts[path] = {'count': 0, 'is_delete': is_delete_file(path)}
    
    return counts

def calculate_expected_records(
    record_counts: Dict[str, Dict],
    operation: str
) -> int:
    """
    Calculate expected record count after operation.
    
    Args:
        record_counts: Dict of file paths and their record counts
        operation: Operation type (append, overwrite)
    
    Returns:
        Expected record count
    """
    total_records = 0
    total_deletes = 0
    
    for file_info in record_counts.values():
        if file_info['is_delete']:
            total_deletes += file_info['count']
        else:
            total_records += file_info['count']
    
    # Fix: Handle delete-only operations correctly
    if operation == 'append':
        return total_records
    elif operation == 'overwrite':
        result = total_records - total_deletes
        # Ensure we don't return negative counts
        return max(0, result)
    else:
        raise ValueError(f"Unknown operation: {operation}")

def move_iceberg_data_with_verification(
    data_paths: List[str],
    manifest_file_paths: List[str],
    target_base_dir: str,
    operation: str,
    aws_access_key_id: str = None,
    aws_secret_access_key: str = None,
    endpoint_url: str = None,
    snapshot_id: int = None
) -> Tuple[bool, Dict]:
    print("\n=== Starting enhanced data movement operation ===")
    print(f"Operation: {operation}")
    print(f"Snapshot ID: {snapshot_id}")
    
    s3_client = boto3.client(
        's3',
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        endpoint_url=endpoint_url
    )
    
    # Verify source record counts
    print("\nVerifying source record counts:")
    source_counts = verify_record_counts(s3_client, data_paths, operation)
    expected_records = calculate_expected_records(source_counts, operation)
    print(f"\nExpected final record count: {expected_records}")
    
    # Perform the move operation for both regular and delete files
    move_success = move_iceberg_data_and_update_metadata_s3(
        data_paths=data_paths,
        manifest_file_paths=manifest_file_paths,
        target_base_dir=target_base_dir,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key,
        endpoint_url=endpoint_url,
        snapshot_id=snapshot_id
    )
    
    if not move_success:
        return False, {
            'source_counts': source_counts,
            'expected_records': expected_records,
            'actual_records': None,
            'success': False
        }
    
    # Verify target record counts (including delete files)
    print("\nVerifying target record counts:")
    target_paths = [
        generate_target_path(
            path,
            source_base=os.path.commonprefix([clean_s3_path(p) for p in data_paths]),
            target_base=target_base_dir,
            is_data=True
        )
        for path in data_paths
    ]
    
    target_counts = verify_record_counts(s3_client, target_paths, operation)
    actual_records = calculate_expected_records(target_counts, operation)
    
    verification_results = {
        'source_counts': source_counts,
        'expected_records': expected_records,
        'actual_records': actual_records,
        'success': expected_records == actual_records
    }
    
    if verification_results['success']:
        print("\nRecord count verification successful!")
        print(f"Expected records: {expected_records}")
        print(f"Actual records: {actual_records}")
    else:
        print("\nRecord count verification failed!")
        print(f"Expected records: {expected_records}")
        print(f"Actual records: {actual_records}")
        print("Consider rolling back the operation.")
    
    return move_success, verification_results

######################################################## FUNCTION TO RUN JOB #########################################################################

def process_snapshots_with_verification(df: pd.DataFrame, **kwargs):
    """
    Process snapshots with verification of record counts.
    """
    all_results = []
    
    for _, row in df.iterrows():
        print(f"\nProcessing snapshot ID: {row['snapshot_id']}")
        print(f"Operation: {row['operation']}")
        print(f"Committed at: {row['committed_at']}")
        
        success, results = move_iceberg_data_with_verification(
            data_paths=row['data_path'],
            manifest_file_paths=row['manifest_file_path'],
            operation=row['operation'],
            snapshot_id=row['snapshot_id'],
            **kwargs
        )
        
        results['snapshot_id'] = row['snapshot_id']
        results['operation'] = row['operation']
        all_results.append(results)
        
        if not success or not results['success']:
            print(f"Failed to process snapshot {row['snapshot_id']}")
            break
    
    return pd.DataFrame(all_results)

In [89]:
################################################# RUN SCRIPT (You can change to suitable for your table) ######################################

results_df = process_snapshots_with_verification(
    df,
    target_base_dir= "s3a://standardize/lamtran_newlocation", # CHANGE HERE
    aws_access_key_id= aws_access_key,
    aws_secret_access_key= aws_secret_key,
    endpoint_url= aws_s3_endpoint 
)

# Display verification results
print("\nVerification Results:")
print(results_df)


Processing snapshot ID: 7114774514001334108
Operation: overwrite
Committed at: 2024-11-20 04:30:31.468000

=== Starting enhanced data movement operation ===
Operation: overwrite
Snapshot ID: 7114774514001334108

Verifying source record counts:

Expected final record count: 0

=== Starting new data movement operation ===
Snapshot ID: 7114774514001334108
Target base dir: s3a://standardize/lamtran_newlocation

Data paths to process:

Manifest files to update:

Found 0 data files and 0 delete files

Generating path mappings:

Moving files:

Updating manifest files:

Operation completed successfully

Verifying target record counts:

Record count verification successful!
Expected records: 0
Actual records: 0

Processing snapshot ID: 6913384981637447512
Operation: overwrite
Committed at: 2024-11-20 04:31:11.113000

=== Starting enhanced data movement operation ===
Operation: overwrite
Snapshot ID: 6913384981637447512

Verifying source record counts:
File: s3a://standardize/lamtran/customer_3

### test full snapshot

In [113]:
spark.sql(f"""            
SELECT *
FROM nessie.lamtran.customer.all_files m 
""").show(truncate = True)

+-------+--------------------+-----------+-------+------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
|content|           file_path|file_format|spec_id|record_count|file_size_in_bytes|        column_sizes|        value_counts|   null_value_counts|nan_value_counts|        lower_bounds|        upper_bounds|key_metadata|split_offsets|equality_ids|sort_order_id|    readable_metrics|
+-------+--------------------+-----------+-------+------------+------------------+--------------------+--------------------+--------------------+----------------+--------------------+--------------------+------------+-------------+------------+-------------+--------------------+
|      0|s3a://standardize...|    PARQUET|      0|           1|              3003|{1 -> 39, 2 -> 37...|{1 -> 1, 2 -> 1, ...|{1 -> 0, 2 -> 0, ...|              {

In [36]:
spark.sql(f"""            
SELECT *
FROM nessie.lamtran.customer.snapshots s 
  
""").show(truncate = True)

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2024-11-20 03:23:...|1246593139439904600|               NULL|   append|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 03:24:...|5432061827693556251|1246593139439904600|   append|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 03:25:...|4129731365437905558|5432061827693556251|overwrite|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 03:27:...|6202517335213052460|4129731365437905558|overwrite|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 03:28:...|6361270771376132640|6202517335213052460|overwrite|s3a://standardize...|{spark.app.id -> ...|
|2024-11-20 03:29:...|2231548173782421862|6361270771376132640|   append|s3a://st

In [114]:
spark.sql(f"""            
SELECT *
FROM nessie.lamtran.customer.all_entries f 
  
""").show(truncate = True)

+------+-------------------+---------------+--------------------+--------------------+--------------------+
|status|        snapshot_id|sequence_number|file_sequence_number|           data_file|    readable_metrics|
+------+-------------------+---------------+--------------------+--------------------+--------------------+
|     2|8245947571210554019|              1|                   1|{0, s3a://standar...|{{147, 28, 0, NUL...|
|     2|6913384981637447512|              4|                   4|{0, s3a://standar...|{{145, 27, 0, NUL...|
|     1|2674580199128993375|              6|                   6|{0, s3a://standar...|{{39, 1, 0, NULL,...|
|     1|8245947571210554019|              3|                   3|{0, s3a://standar...|{{147, 28, 0, NUL...|
|     2|7114774514001334108|              3|                   3|{0, s3a://standar...|{{147, 28, 0, NUL...|
|     1|1288987108754526709|              2|                   2|{0, s3a://standar...|{{38, 1, 0, NULL,...|
|     1|6913384981637447512|

In [93]:
spark.sql(f"""            
SELECT s.committed_at, s.snapshot_id, m.path, f.data_file
FROM nessie.lamtran.customer.entries f 
JOIN nessie.lamtran.customer.manifests m 
  ON f.snapshot_id = m.added_snapshot_id
JOIN nessie.lamtran.customer.snapshots s 
  ON m.added_snapshot_id = s.snapshot_id 
""").show(20, truncate = True)

+--------------------+-------------------+--------------------+--------------------+
|        committed_at|        snapshot_id|                path|           data_file|
+--------------------+-------------------+--------------------+--------------------+
|2024-11-19 08:31:...|1197200229001663007|s3a://standardize...|{0, s3a://standar...|
|2024-11-19 08:31:...|1197200229001663007|s3a://standardize...|{0, s3a://standar...|
|2024-11-19 08:30:...|4893775950529386665|s3a://standardize...|{0, s3a://standar...|
|2024-11-19 08:31:...|1197200229001663007|s3a://standardize...|{0, s3a://standar...|
|2024-11-19 08:31:...|1197200229001663007|s3a://standardize...|{0, s3a://standar...|
|2024-11-19 08:29:...|7476707695693006581|s3a://standardize...|{0, s3a://standar...|
+--------------------+-------------------+--------------------+--------------------+



## TEST Time Travel 

In [116]:
spark.sql("""SELECT * FROM nessie.lamtran.customer.snapshots""").show(truncate = False)

+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                                                        |summary                                                                                                                                                                               

In [93]:
spark.sql("""SELECT COUNT(*) FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:28:30.584'""").show(10)
spark.sql("""SELECT * FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:28:30.584' where ACCOUNTID = 80""").show(10)

+--------+
|count(1)|
+--------+
|      29|
+--------+

+---------+-------+------------+---------+-------+--------+------+---------+------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|    NAME|GENDER|CUST_TYPE|STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+--------+------+---------+------+--------------+-----------------------+
|       80|      2|   123456789|123456789|Account|HAHAHIHI|     M|    10001| ACTVE|          1500|             2024-01-07|
+---------+-------+------------+---------+-------+--------+------+---------+------+--------------+-----------------------+



In [96]:
spark.sql("""SELECT COUNT(*) FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:29:14.987'""").show(10)
spark.sql("""SELECT * FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:29:14.987' where ACCOUNTID = 860010""").show(10)

+--------+
|count(1)|
+--------+
|      29|
+--------+

+---------+-------+------------+---------+-------+-------------------+------+---------+--------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|               NAME|GENDER|CUST_TYPE|  STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+-------------------+------+---------+--------+--------------+-----------------------+
|   860010|      1|   105268371|105268371|Account|TRAN THI ***** HANG|     F|    10001|INACTIVE|          1015|             2017-08-25|
+---------+-------+------------+---------+-------+-------------------+------+---------+--------+--------------+-----------------------+



In [97]:
spark.sql("""SELECT COUNT(*) FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:30:31.468'""").show(10)
spark.sql("""SELECT * FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:30:31.468' where ACCOUNTID = 860010""").show(10)

+--------+
|count(1)|
+--------+
|      28|
+--------+

+---------+-------+------------+---------+-------+-------------------+------+---------+--------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|               NAME|GENDER|CUST_TYPE|  STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+-------------------+------+---------+--------+--------------+-----------------------+
|   860010|      1|   105268371|105268371|Account|TRAN THI ***** HANG|     F|    10001|INACTIVE|          1015|             2017-08-25|
+---------+-------+------------+---------+-------+-------------------+------+---------+--------+--------------+-----------------------+



In [99]:
spark.sql("""SELECT COUNT(*) FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:31:11.113'""").show(10)
spark.sql("""SELECT * FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:31:11.113' where ACCOUNTID = 2008140""").show(10)

+--------+
|count(1)|
+--------+
|      28|
+--------+

+---------+-------+------------+---------+-------+-----------------+------+---------+--------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|             NAME|GENDER|CUST_TYPE|  STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+-----------------+------+---------+--------+--------------+-----------------------+
|  2008140|      1|   119821325|119821325|Account|TRAN THI ***** HA|     F|    10001|INACTIVE|          1602|             2018-03-16|
+---------+-------+------------+---------+-------+-----------------+------+---------+--------+--------------+-----------------------+



In [100]:
spark.sql("""SELECT COUNT(*) FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:32:07.397'""").show(10)
spark.sql("""SELECT * FROM nessie.lamtran.customer FOR TIMESTAMP AS OF TIMESTAMP '2024-11-20 04:32:07.397' where ACCOUNTID = 100""").show(10)

+--------+
|count(1)|
+--------+
|      29|
+--------+

+---------+-------+------------+---------+-------+---------------+------+---------+------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|   ORGKEY|ORGTYPE|           NAME|GENDER|CUST_TYPE|STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+---------------+------+---------+------+--------------+-----------------------+
|      100|      2|   123456789|123456789|Account|HOANG THI HUONG|     M|    10001| ACTVE|          1500|             2024-01-14|
+---------+-------+------------+---------+-------+---------------+------+---------+------+--------------+-----------------------+



## TEST Roll Back

In [114]:
spark.sql("""CALL nessie.system.rollback_to_snapshot('nessie.lamtran.customer', 6913384981637447512)""").show()

+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 1197200229001663007|4893775950529386665|
+--------------------+-------------------+



In [None]:
spark.sql("""CALL nessie.system.rollback_to_timestamp('nessie.lamtran.customer', TIMESTAMP '2024-11-20 04:31:11.113')""").show()

In [116]:
spark.sql("""SELECT COUNT(*) FROM nessie.lamtran.customer""").show()

+--------+
|count(1)|
+--------+
|      29|
+--------+



### create new table to test snapshot management

In [8]:
spark.sql("""CREATE TABLE nessie.lamtran.customer_test(
            ACCOUNTID STRING,
            BANK_ID STRING,
            CORE_CUST_ID STRING,
            ORGKEY STRING,
            ORGTYPE STRING,
            NAME STRING,
            GENDER STRING,
            CUST_TYPE STRING,
            STATUS STRING,
            PRIMARY_SOL_ID STRING,
            RELATIONSHIPOPENINGDATE DATE)
        USING ICEBERG """)

DataFrame[]

In [9]:
# Convert date formats
schema = StructType([
    StructField("ACCOUNTID", StringType(), True),
    StructField("BANK_ID", StringType(), True),
    StructField("CORE_CUST_ID", StringType(), True),
    StructField("ORGKEY", StringType(), True),
    StructField("ORGTYPE", StringType(), True),
    StructField("NAME", StringType(), True),
    StructField("GENDER", StringType(), True),
    StructField("CUST_TYPE", StringType(), True),
    StructField("STATUS", StringType(), True),
    StructField("PRIMARY_SOL_ID", StringType(), True),
    StructField("RELATIONSHIPOPENINGDATE", DateType(), True)
])

# Read CSV with schema
df = spark.read.csv(raw_csv_path, schema=schema, header=True)

df = raw_df.withColumn(
    "RELATIONSHIPOPENINGDATE",
    when(raw_df["RELATIONSHIPOPENINGDATE"].rlike(r"^\d{2}-\d{2}-\d{4}$"), 
         to_date("RELATIONSHIPOPENINGDATE", "dd-MM-yyyy"))
    .when(raw_df["RELATIONSHIPOPENINGDATE"].rlike(r"^\d{1,2}/\d{1,2}/\d{4}$"), 
          to_date("RELATIONSHIPOPENINGDATE", "d/M/yyyy"))
)


In [10]:
df.write.format("iceberg").mode("append").save("nessie.lamtran.customer_test")

                                                                                

### Insert/ update/ Delete 

In [11]:
# Thêm giá trị (1)
spark.sql("""
  INSERT INTO nessie.lamtran.customer_test (ACCOUNTID, BANK_ID, CORE_CUST_ID, ORGKEY, ORGTYPE, NAME, GENDER, CUST_TYPE, STATUS, PRIMARY_SOL_ID, RELATIONSHIPOPENINGDATE)
  VALUES (80, 2, 123456789, 123456789, 'Account', 'HAHAHIHI', 'M', 10001, 'ACTVE', 1500, to_date('2024-11-07','yyyy-mm-dd'))
""")


DataFrame[]

In [12]:
#(2)
spark.sql("""UPDATE nessie.lamtran.customer_test  SET STATUS = 'INACTIVE', NAME = 'TRAN THI ***** HANG'
  WHERE ACCOUNTID = 860010
""")

                                                                                

DataFrame[]

In [13]:
#(3)
spark.sql("""DELETE FROM nessie.lamtran.customer_test  WHERE ACCOUNTID = 958933
""")

DataFrame[]

In [14]:
#(4)
spark.sql("""UPDATE nessie.lamtran.customer_test  SET STATUS = 'INACTIVE', NAME = 'TRAN THI ***** HA'
  WHERE ACCOUNTID = 2008140
""")

DataFrame[]

In [15]:
# Thêm giá trị (5)
spark.sql("""
  INSERT INTO nessie.lamtran.customer_test (ACCOUNTID, BANK_ID, CORE_CUST_ID, ORGKEY, ORGTYPE, NAME, GENDER, CUST_TYPE, STATUS, PRIMARY_SOL_ID, RELATIONSHIPOPENINGDATE)
  VALUES (100, 2, 123456789, 123456789, 'Account', 'HOANG THI HUONG', 'M', 10001, 'ACTVE', 1500, to_date('2024-11-14','yyyy-mm-dd'))
""")

DataFrame[]

## Check snapshot

In [43]:
spark.sql("""SELECT * FROM nessie.lamtran.customer_test.snapshots""").show(truncate = False)

+-----------------------+-------------------+-------------------+---------+----------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                                                             |summary                                                                                                                                                                     

In [18]:
spark.sql("""SELECT * FROM nessie.lamtran.customer_test.history""").show(truncate = False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2024-11-21 04:12:09.349|2598649496185647733|NULL               |true               |
|2024-11-21 04:18:17.929|5460143888575482310|2598649496185647733|true               |
|2024-11-21 04:19:30.612|5056947826345476597|5460143888575482310|true               |
|2024-11-21 04:21:24.607|6386723924476925074|5056947826345476597|true               |
|2024-11-21 04:26:05.773|5949622908842454111|6386723924476925074|true               |
|2024-11-21 04:27:18.065|2571846111905831955|5949622908842454111|true               |
+-----------------------+-------------------+-------------------+-------------------+



In [20]:
##1. rollback_to_snapshot
spark.sql("""CALL nessie.system.rollback_to_snapshot('nessie.lamtran.customer_test', 5949622908842454111)""").show()

+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 2571846111905831955|5949622908842454111|
+--------------------+-------------------+



In [21]:
spark.sql("""SELECT COUNT(*) FROM nessie.lamtran.customer_test""").show(truncate = False)

+--------+
|count(1)|
+--------+
|28      |
+--------+



In [22]:
##2.  rollback_to_timestamp
spark.sql("""CALL nessie.system.rollback_to_timestamp('nessie.lamtran.customer_test', TIMESTAMP '2024-11-21 04:21:24.607')""").show()

+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 5949622908842454111|5056947826345476597|
+--------------------+-------------------+



In [23]:
spark.sql("""SELECT COUNT(*) FROM nessie.lamtran.customer_test""").show(truncate = False)

+--------+
|count(1)|
+--------+
|29      |
+--------+



In [24]:
spark.sql("""SELECT * FROM nessie.lamtran.customer_test WHERE ACCOUNTID = 2008140""").show(truncate = False)

+---------+-------+------------+---------+-------+--------------+------+---------+------+--------------+-----------------------+
|ACCOUNTID|BANK_ID|CORE_CUST_ID|ORGKEY   |ORGTYPE|NAME          |GENDER|CUST_TYPE|STATUS|PRIMARY_SOL_ID|RELATIONSHIPOPENINGDATE|
+---------+-------+------------+---------+-------+--------------+------+---------+------+--------------+-----------------------+
|2008140  |1      |119821325   |119821325|Account|NGUYEN *** ANG|F     |10001    |ACTVE |1602          |2018-03-16             |
+---------+-------+------------+---------+-------+--------------+------+---------+------+--------------+-----------------------+



In [25]:
## 3.	Set_current_snapshot
spark.sql("""CALL nessie.system.set_current_snapshot('nessie.lamtran.customer_test', 5056947826345476597)""").show()

+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 5056947826345476597|5056947826345476597|
+--------------------+-------------------+



In [26]:
spark.sql("""SELECT COUNT(*) FROM nessie.lamtran.customer_test""").show(truncate = False)

+--------+
|count(1)|
+--------+
|29      |
+--------+



In [None]:
spark.sql("""CALL nessie.system.set_current_snapshot( table => 'nessie.lamtran.customer', ref => 's1')""").show()

In [27]:
spark.sql("""SELECT * FROM nessie.lamtran.customer.history""").show(truncate = False)

+---------------+-----------+---------+-------------------+
|made_current_at|snapshot_id|parent_id|is_current_ancestor|
+---------------+-----------+---------+-------------------+
+---------------+-----------+---------+-------------------+



In [33]:
#4. cherrypick_snapshot
spark.sql("""CALL nessie.system.cherrypick_snapshot('nessie.lamtran.customer_test',6386723924476925074)""").show()

+-------------------+-------------------+
| source_snapshot_id|current_snapshot_id|
+-------------------+-------------------+
|6386723924476925074|6386723924476925074|
+-------------------+-------------------+



In [40]:
spark.sql("""SELECT * FROM nessie.lamtran.customer_test.history""").show(truncate = False)

24/11/21 07:15:39 WARN NessieUtil: The Iceberg property 'gc.enabled' and/or 'write.metadata.delete-after-commit.enabled' is enabled on table 'lamtran.customer_test' in NessieCatalog. This will likely make data in other Nessie branches and tags and in earlier, historical Nessie commits inaccessible. The recommended setting for those properties is 'false'. Use the 'nessie-gc' tool for Nessie reference-aware garbage collection.


+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2024-11-21 04:12:09.349|2598649496185647733|NULL               |true               |
|2024-11-21 04:18:17.929|5460143888575482310|2598649496185647733|true               |
|2024-11-21 04:19:30.612|5056947826345476597|5460143888575482310|true               |
|2024-11-21 04:21:24.607|6386723924476925074|5056947826345476597|true               |
|2024-11-21 04:26:05.773|5949622908842454111|6386723924476925074|false              |
|2024-11-21 04:27:18.065|2571846111905831955|5949622908842454111|false              |
|2024-11-21 04:29:10.866|5949622908842454111|6386723924476925074|false              |
|2024-11-21 04:29:49.619|5056947826345476597|5460143888575482310|true               |
|2024-11-21 04:33:06.362|6386723924476925074|505694782

In [None]:
## 5.	Publish_changes
spark.sql("""CALL nessie.system.publish_changes('nessie.lamtran.customer','wap_id_1')""").show()

In [None]:
# 6. fast_forward
spark.sql("""CALL nessie.system.fast_forward('nessie.lamtran.customer', 'main', 'audit-branch')""").show()


In [41]:
# 7.	Expire_snapshot
## Xóa 5 các snapshot sau ngày giờ .... nhưng vẫn giữ lại n bản chụp nhanh gần nhất ý 
spark.sql("""CALL nessie.system.expire_snapshots('nessie.lamtran.customer_test', TIMESTAMP '2024-11-21 04:19:24.607', 2)""").show()
#spark.sql("""ALTER TABLE nessie.lamtran.customer_test SET TBLPROPERTIES ('gc.enabled' = 'true')""")

24/11/21 07:22:07 WARN NessieUtil: The Iceberg property 'gc.enabled' and/or 'write.metadata.delete-after-commit.enabled' is enabled on table 'lamtran.customer_test' in NessieCatalog. This will likely make data in other Nessie branches and tags and in earlier, historical Nessie commits inaccessible. The recommended setting for those properties is 'false'. Use the 'nessie-gc' tool for Nessie reference-aware garbage collection.

+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+
|deleted_data_files_count|deleted_position_delete_files_count|deleted_equality_delete_files_count|deleted_manifest_files_count|deleted_manifest_lists_count|deleted_statistics_files_count|
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+
|                       1|                                  0|                                  0|                           1|                           2|                             0|
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+



                                                                                

In [42]:
spark.sql("""SELECT * FROM nessie.lamtran.customer_test.history""").show(truncate = False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2024-11-21 04:19:30.612|5056947826345476597|5460143888575482310|true               |
|2024-11-21 04:21:24.607|6386723924476925074|5056947826345476597|true               |
|2024-11-21 04:26:05.773|5949622908842454111|6386723924476925074|false              |
|2024-11-21 04:27:18.065|2571846111905831955|5949622908842454111|false              |
|2024-11-21 04:29:10.866|5949622908842454111|6386723924476925074|false              |
|2024-11-21 04:29:49.619|5056947826345476597|5460143888575482310|true               |
|2024-11-21 04:33:06.362|6386723924476925074|5056947826345476597|true               |
+-----------------------+-------------------+-------------------+-------------------+



In [44]:
# Thực hiện xóa snapshot tại snapshot_id = 5056947826345476597
spark.sql("""CALL nessie.system.expire_snapshots(table => 'nessie.lamtran.customer_test', snapshot_ids => ARRAY(5056947826345476597))""").show()




+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+
|deleted_data_files_count|deleted_position_delete_files_count|deleted_equality_delete_files_count|deleted_manifest_files_count|deleted_manifest_lists_count|deleted_statistics_files_count|
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+
|                       1|                                  0|                                  0|                           2|                           1|                             0|
+------------------------+-----------------------------------+-----------------------------------+----------------------------+----------------------------+------------------------------+



                                                                                

In [45]:
spark.sql("""SELECT * FROM nessie.lamtran.customer_test.history""").show(truncate = False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2024-11-21 04:33:06.362|6386723924476925074|5056947826345476597|true               |
+-----------------------+-------------------+-------------------+-------------------+



In [67]:
spark.sql("""SELECT count (DISTINCT path) FROM nessie.lamtran.customer_test.all_manifests""").show(truncate = True)

+--------------------+
|count(DISTINCT path)|
+--------------------+
|                   6|
+--------------------+



In [55]:
spark.sql("""SELECT count(DISTINCT manifest_list) AS unique_manifest_count from  nessie.lamtran.customer_test.snapshots""").show(truncate = True)

+---------------------+
|unique_manifest_count|
+---------------------+
|                    3|
+---------------------+



In [66]:
spark.sql("""SELECT count (DISTINCT path)  FROM nessie.lamtran.customer.all_manifests""").show(truncate = False)

+--------------------+
|count(DISTINCT path)|
+--------------------+
|9                   |
+--------------------+

