In [0]:
dbutils.secrets.listScopes()

[SecretScope(name='spotify-scope'), SecretScope(name='spotify-secret-scope')]

In [0]:
import datetime
from delta.tables import DeltaTable
import pyspark.sql.functions as F

In [0]:
# Set Snowflake credentials as environment variables
snowflake_args = {'sfURL': dbutils.secrets.get(scope="spotify-secret-scope", key="sfURL"),
 'sfUser': dbutils.secrets.get(scope="spotify-secret-scope", key="sfUser"),
 'sfPassword': dbutils.secrets.get(scope="spotify-secret-scope", key="sfPassword"),
 'sfDatabase': dbutils.secrets.get(scope="spotify-secret-scope", key="sfDatabase"),
 'sfSchema': dbutils.secrets.get(scope="spotify-secret-scope", key="sfSchema"),
 'sfWarehouse': dbutils.secrets.get(scope="spotify-secret-scope", key="sfWarehouse"),
 'sfRole': dbutils.secrets.get(scope="spotify-secret-scope", key="sfRole"),
 'dbtable': dbutils.secrets.get(scope="spotify-secret-scope", key="dbtable")
 }

snowflake_args['preactions'] = 'USE DATABASE '+ snowflake_args['sfDatabase'] + ";USE SCHEMA " + snowflake_args['sfSchema'] + ";"
 
ADLS_STORAGE_ACCOUNT_NAME = dbutils.secrets.get(scope="spotify-secret-scope", key="adls-account-name")
ADLS_ACCOUNT_KEY = dbutils.secrets.get(scope="spotify-secret-scope", key="adls-account-key")
ADLS_CONTAINER_NAME = dbutils.secrets.get(scope="spotify-secret-scope", key="adls-container-name")
ADLS_FOLDER_PATH = dbutils.secrets.get(scope="spotify-secret-scope", key="adls-folder-path")

DELTA_SOURCE = f"abfss://{ADLS_CONTAINER_NAME}@{ADLS_STORAGE_ACCOUNT_NAME}.dfs.core.windows.net/"+ ADLS_FOLDER_PATH

In [0]:
# Define checkpoint file path
CHECKPOINT_FILE_PATH = "./delta_checkpoint.txt"

In [0]:
# Create Spark session
spark.conf.set(
    f"fs.azure.account.key.{ADLS_STORAGE_ACCOUNT_NAME}.dfs.core.windows.net",
    ADLS_ACCOUNT_KEY,
)
spark.conf.set("spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled", "true")

In [0]:
# Read the checkpoint file if it exists, otherwise set the LAST_READ_TIMESTAMP to 0
delta_table = DeltaTable.forPath(spark, DELTA_SOURCE)
try:
    with open(CHECKPOINT_FILE_PATH, "r") as checkpoint_file:
        LAST_READ_TIMESTAMP = checkpoint_file.read()
    print("Getting Latest timtestamp from checkpoint file: ")
except FileNotFoundError:
    print("Getting Latest timtestamp from delta history: ")
    LAST_READ_TIMESTAMP = (
        delta_table.history().select(F.min("timestamp").alias("timestamp")).collect()
    )
    LAST_READ_TIMESTAMP = str(LAST_READ_TIMESTAMP[0]["timestamp"])

In [0]:
print("LAST_READ_TIMESTAMP: ", LAST_READ_TIMESTAMP)

LAST_READ_TIMESTAMP:  2023-12-13 20:50:44


In [0]:
def save_delta_to_snowflake():
    delta_df = (
        spark.read.format("delta")
        .option("readChangeFeed", "true")
        .option("startingTimestamp", LAST_READ_TIMESTAMP)
        .load(DELTA_SOURCE)
        )
    
    if delta_df.count()!=0:
        excluded_columns = ["_change_type", "_commit_version", "_commit_timestamp"]
        selected_columns = [
            column for column in delta_df.columns if column not in excluded_columns
            ]
        # Write DataFrame to Snowflake table
        delta_df.select(selected_columns).write.format("net.snowflake.spark.snowflake").options(**snowflake_args).mode("append").save()

        updated_timestamp = delta_table.history().select(F.max("timestamp").alias("timestamp")).collect()[0]["timestamp"]
        updated_timestamp += datetime.timedelta(seconds=1)
        updated_timestamp = str(updated_timestamp)
        print("Updated timestamp: ", updated_timestamp)
        with open(CHECKPOINT_FILE_PATH, "w") as file:
            file.write(updated_timestamp)

    else:
        print("Latest delta files already ingested to snowflake")


In [0]:
save_delta_to_snowflake()

La[REDACTED] delta files already ingested to snowflake
