In [0]:
from delta.tables import *
from pyspark.sql.functions import *
import json
#%run /Shared/MetaDatarepliaction_Backend_Code/Post_Replication_Test

In [0]:
dbutils.widgets.text('user_input', "[{'source_type':'dbfs_delta_Table','table_name':'Customer'},{'source_type':'sql_server','table_name':'dbo.employee'}]")

input = dbutils.widgets.get("user_input")
# dbutils.widgets.removeAll()


In [0]:
input

Out[13]: "[{'source_type':'dbfs_delta_Table','table_name':'Customer'},{'source_type':'sql_server','table_name':'dbo.Employee'}]"

In [0]:
delta_files_list_dict={}
for i in dbutils.fs.ls('dbfs:/databricks-datasets/tpch/delta-001/'):
    if i.size ==0:
        delta_files_list_dict[i.name[:-1].capitalize()]=i.path

In [0]:
delta_files_list_dict

Out[15]: {'Customer': 'dbfs:/databricks-datasets/tpch/delta-001/customer/',
 'Lineitem': 'dbfs:/databricks-datasets/tpch/delta-001/lineitem/',
 'Nation': 'dbfs:/databricks-datasets/tpch/delta-001/nation/',
 'Orders': 'dbfs:/databricks-datasets/tpch/delta-001/orders/',
 'Part': 'dbfs:/databricks-datasets/tpch/delta-001/part/',
 'Partsupp': 'dbfs:/databricks-datasets/tpch/delta-001/partsupp/',
 'Region': 'dbfs:/databricks-datasets/tpch/delta-001/region/',
 'Supplier': 'dbfs:/databricks-datasets/tpch/delta-001/supplier/'}

In [0]:
# Azure Blob Storage details
storage_account_name = "<storage-account-name>"
storage_account_key = "<storage-account-access-key>"  
container_name = "input"

In [0]:
# Setting up the Spark configuration for Azure Blob Storage authentication
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", storage_account_key)

In [0]:
csv_tables_list_dict = {}
for i in dbutils.fs.ls(f"wasbs://input@{storage_account_name}.blob.core.windows.net/Input_DB"):
    if i.size != 0:
        csv_tables_list_dict[i.name[:-4].capitalize()] = i.path

In [0]:
csv_tables_list_dict

Out[53]: {'Employees': 'wasbs://input@metadatamanagement02.blob.core.windows.net/Input_DB/employees.csv'}

In [0]:
output_container_name = "output"


In [0]:
def delta_file_replication(tab_name, delta_files_list_dict):
    delta_tables_list = {}
    # Direct path to your replication folder on Azure Blob Storage (without mounting)
    replication_path = "wasbs://output@metadatamanagement02.blob.core.windows.net/replication_folder_delta_tables/" + tab_name
    
    try:
        # Check if the delta table already exists at the specified path
        dbutils.fs.ls(replication_path)
        full_load = False
    except:
        full_load = True

    if tab_name in delta_files_list_dict and not full_load:
        # Read the existing Delta table
        deltaTable = DeltaTable.forPath(spark, replication_path)
        
        # Load the new data from the input directory (Azure Blob Storage)
        df1 = spark.read.load(delta_files_list_dict[tab_name])
        
        # Load the source definition file
        with open('/Workspace/Users/user-email-id/MetaDataManagement_ProjectPro/SourceDefinitionFiles/Delta_Lake/' + tab_name + '.json', 'r') as f:
            data = json.load(f)
        
        # Construct the merge condition
        cond = 'target.' + data['Primary_key'] + '=' + 'updates.' + data['Primary_key']
        
        # Perform the merge operation
        deltaTable.alias('target').merge(df1.alias('updates'), cond).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

    elif tab_name in delta_files_list_dict and full_load:
        # If full load is required, read the data and save it to the replication folder
        df1 = spark.read.load(delta_files_list_dict[tab_name])
        df1.write.save(replication_path)

    else:
        print(f"No table {tab_name} found with the path mentioned, Please recheck the list mentioned.")


In [0]:
def sqlserver_replication(tab_name):
    server_name = "jdbc:sqlserver://metadatamanagementreplication.database.windows.net"
    database_name = "metadatamanagementreplication"
    replication_folder=""
    url = 'jdbc-url'
    username="metadatamanagementserver@metadatamanagementreplication"
    password="<Azure-sql-server-password>"
    connectionProperties={"user":username,"password":password,"driver":"com.microsoft.sqlserver.jdbc.SQLServerDriver"}
    #query="SELECT table_name FROM INFORMATION_SCHEMA. TABLES WHERE table_type = 'BASE TABLE' and table_name ="+tab_name+  ";"
    
    try:
        # Load the table from SQL Server
        df1 = spark.read.jdbc(url=url, table=tab_name, properties=connectionProperties)
        
        # Check if the replication folder exists in Azure Blob Storage
        replication_path = f"wasbs://output@metadatamanagement02.blob.core.windows.net/sql_server/{tab_name}"
        
        try:
            dbutils.fs.ls(replication_path)
            replication_folder = True
        except:
            replication_folder = False
        
    except Exception as e:
        print(f"Error loading table from SQL Server: {e}")

    if replication_folder:
        # If replication folder exists, perform Delta table merge operation
        deltaTable = DeltaTable.forPath(spark, replication_path)
        
        # Load the source definition file for the SQL Server table
        with open(f'/Workspace/Users/user-email-id/MetaDataManagement_ProjectPro/SourceDefinitionFiles/SQL_server/' + tab_name[4:] + '.json', 'r') as f:
            data = json.load(f)
        
        # Construct merge condition
        cond = f'target.{data["Primary_key"]} = updates.{data["Primary_key"]}'
        
        # Perform the merge operation
        deltaTable.alias('target').merge(df1.alias('updates'), cond).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

    else:
        # If replication folder doesn't exist, save the data as a new Delta table
        df1.write.save(replication_path)







    
        
    

In [None]:
csv_replication_path = "wasbs://output@metadatamanagement02.blob.core.windows.net/replication_folder_csv_tables/" + file_name


In [0]:
def  csv_replication(file_name,csv_tables_list_dict):
    try:
        (dbutils.fs.ls(csv_replication_path))
        full_load=False
    except:
        full_load=True
    if file_name in csv_tables_list_dict and not full_load:
            deltaTable = DeltaTable.forPath(spark, csv_replication_path)
            df1=spark.read.option("header",True).option("inferschema",True).load(csv_tables_list_dict[file_name])
            with open('/Workspace/Users/user-email-id/MetaDataManagement_ProjectPro/SourceDefinitionFiles/CSV/'+file_name+'.json', 'r') as f:
                data = json.load(f)
            cond='target.'+data['Primary_key']+ '='+ 'updates.'+data['Primary_key']
            deltaTable.alias('target').merge(df1.alias('updates'),cond).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
           
    elif file_name in csv_tables_list_dict and  full_load:
        #print("Full load completed")
        df1=spark.read.option("inferschema",True).option("header",True).csv(csv_tables_list_dict[file_name])
        df1.write.save(csv_replication_path)
               

    else:
        print("No table ",file_name," found with the path mentioned, Please recheck the list mentioned")


In [0]:
for i in eval(input):
    print(i)
    if i['source_type']=="dbfs_delta_Table":
        tab_name=i["table_name"]
        delta_file_replication(tab_name,delta_files_list_dict)
    elif i['source_type']=="sql_server":
        sqlserver_replication(i["table_name"])
    elif i['source_type']=="csv":
        csv_replication(i["table_name"],csv_tables_list_dict)

{'source_type': 'dbfs_delta_Table', 'table_name': 'Customer'}
{'source_type': 'sql_server', 'table_name': 'dbo.Employee'}
