In [None]:
from msfabricpysdkcore import FabricClientCore
try:
    import sempy.fabric as fabric
    local = False
except:
    local = True
    from azure.identity import DefaultAzureCredential
import pandas as pd
import snowflake.connector
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient, DataLakeDirectoryClient
import os
import pyodbc
import struct
from azure.identity import ClientSecretCredential
import requests
import pyarrow as pa
import pyarrow.parquet as pq


In [None]:
sql_database_name="metadatamirroring"

In [None]:
if local:
    workspace_id = "14110asdfasfa79577"
    local_credentials = DefaultAzureCredential()
else:
    workspace_id = fabric.get_workspace_id()
fcc = FabricClientCore()
sql_db = fcc.get_sql_database(workspace_id=workspace_id, sql_database_name=sql_database_name)


server = sql_db.properties["serverFqdn"][:-5]
databasename = sql_db.properties["databaseName"]
database = "{" + f"{databasename}" +"}"

In [None]:
id = None

In [None]:
# Get Azure token using DefaultAzureCredential
if local:
    token_bytes = local_credentials.get_token("https://database.windows.net/.default").token.encode("UTF-16-LE")
else:
    token_bytes = notebookutils.credentials.getToken("https://database.windows.net/.default").encode("UTF-16-LE")
token_struct = struct.pack(f'<I{len(token_bytes)}s', len(token_bytes), token_bytes)
SQL_COPT_SS_ACCESS_TOKEN = 1256  # This connection option is defined by microsoft in msodbcsql.h

# Connection parameters
connection_string = f"Driver={{ODBC Driver 18 for SQL Server}};Server={server};Database={database};"

# Connect with Entra ID (Azure AD) token
conn = pyodbc.connect(connection_string, attrs_before={SQL_COPT_SS_ACCESS_TOKEN: token_struct})
cursor = conn.cursor()

if id is None:
    query = f"SELECT * FROM Metadata WHERE STATUS = 'active' ORDER BY ID DESC"
else:
    query = f"SELECT * FROM Metadata WHERE STATUS = 'active' AND ID = {id}"


# Test the connection
cursor.execute(query)
rows = cursor.fetchall()

column_names = [column[0] for column in cursor.description]

# Close the connection
cursor.close()
conn.close()
df = pd.DataFrame.from_records(rows, columns=column_names)

if df["ID"].any():
    pass
else:
    print("Nothing active")
df.iloc[0,:]

In [None]:
def run_query_(query, snowflakeaccount, snowflakeuser, snowflakepassword):
    with snowflake.connector.connect(account=snowflakeaccount,
                                   user=snowflakeuser,
                                   password=snowflakepassword) as conn:
        with conn.cursor() as cur:
            results = cur.execute(query).fetchall()
    return results

def get_service_client_token_credential_(credential) -> DataLakeServiceClient:
    account_url = f"https://onelake.dfs.fabric.microsoft.com/"

    service_client = DataLakeServiceClient(account_url, credential=credential)

    return service_client


def list_directory_contents(file_system_client: FileSystemClient, directory_name: str):
    paths = file_system_client.get_paths(path=directory_name)

    return paths
# %%
def upload_file_to_directory(directory_client: DataLakeDirectoryClient, local_path: str, file_name: str):
    file_client = directory_client.get_file_client(file_name)

    with open(file=os.path.join(local_path, file_name), mode="rb") as data:
        file_client.upload_data(data, overwrite=True)

def get_secret_(secret_name, keyvault_name):
    vaultBaseUrl = f"https://{keyvault_name}.vault.azure.net"
    keyvault_token = local_credentials.get_token(vaultBaseUrl).token
    
    request_headers = {
        "Authorization": f"Bearer {keyvault_token}",
        "Content-Type": "application/json"
    }
    keyvault_url = f"{vaultBaseUrl}/secrets/{secret_name}?api-version=7.4"
    response = requests.get(keyvault_url, headers=request_headers)
    if response.status_code == 200:
        secret_value = response.json()["value"]
        return secret_value
    else:
        raise Exception(f"Failed to retrieve secret: {response.status_code} - {response.text}")

def get_full_data_type_definition_(snowflakeaccount, snowflakeuser, snowflakepassword, database, schema, table):
    query = f"""
    SELECT 
        COLUMN_NAME,
        CASE
            WHEN DATA_TYPE IN ('NUMBER', 'DECIMAL', 'NUMERIC') AND NUMERIC_SCALE IS NOT NULL 
                THEN DATA_TYPE || '(' || NUMERIC_PRECISION || ',' || NUMERIC_SCALE || ')'
            WHEN DATA_TYPE IN ('NUMBER', 'DECIMAL', 'NUMERIC') AND NUMERIC_SCALE IS NULL 
                THEN DATA_TYPE || '(' || NUMERIC_PRECISION || ')'
            WHEN DATA_TYPE IN ('VARCHAR', 'CHAR', 'CHARACTER', 'STRING', 'TEXT') AND CHARACTER_MAXIMUM_LENGTH IS NOT NULL 
                THEN DATA_TYPE || '(' || CHARACTER_MAXIMUM_LENGTH || ')'
            ELSE DATA_TYPE
        END AS FULL_DATA_TYPE
    FROM 
        {database}.INFORMATION_SCHEMA.COLUMNS 
    WHERE 
        TABLE_SCHEMA = '{schema}'
        AND TABLE_NAME = '{table.upper()}'
    ORDER BY 
        ORDINAL_POSITION
    """
    
    columns = run_query_(query, snowflakeaccount, snowflakeuser, snowflakepassword)

    schema_list = []
    new_columns = []
    for col in columns:
        if col[0] == "HASH_":
            continue
        if "NUMBER" in col[1]:

            decimal_1, decimal_2 = col[1].split("(")[1].split(",")
            decimal_2 = int(decimal_2.replace(")", ""))
            decimal_1 = int(decimal_1)
            
            if int(decimal_2) > 0:
                data_type = pa.decimal128(decimal_1, decimal_2)
                col_d_type = "decimal"
            else:
                data_type = pa.int64()
                col_d_type = "int64"

        elif "TIMESTAMP" in col[1] or "DATE" in col[1]:
            data_type = pa.timestamp('ns')
            col_d_type = "datetime64[ns]"
        elif "BINARY" in col[1]:
            data_type = pa.binary()
            col_d_type = "object"
        elif "BOOLEAN" in col[1]:
            data_type = pa.bool_()
            col_d_type = "bool"
        elif "TIME" in col[1]:
            data_type = pa.time64('ns')
            col_d_type = "datetime64[ns]"
        elif "DATE" in col[1]:
            data_type = pa.date32()
            col_d_type = "datetime64[ns]"
        else:
            data_type = pa.string()
            col_d_type = "object"

        new_columns.append((col[0], col_d_type))
        
        schema_list.append((col[0], data_type))

    schema_list.append(("METADATAROW_ID", pa.string()))
    schema_list.append(("__rowMarker__", pa.int64()))

    schema = pa.schema(schema_list)

    return schema, columns



In [None]:
def run_for_one_table(df, local):

    workspace_id = df["workspace_id"]
    mirrored_db_id = df["mirrored_db_id"]

    snowflake_db = df["snowflake_db"]
    snowflake_schema = df["snowflake_schema"]
    view_name = df["view_name"]

    streamable = df["streamable"]

    def get_secret(secret_name):
        return get_secret_(secret_name, df["keyvault"])

    if local:

        # Get the credentials by calling the API of the Key Vault 'https://{df["keyvault"]}.vault.azure.net/'

        mirroringspntenantid = get_secret('mirroringspntenantid')
        mirroringspnclientid = get_secret('mirroringspnclientid')
        mirroringspnclientsecret = get_secret('mirroringspnclientsecret')

        snowflakeaccount = get_secret('snowflakeaccount')
        snowflakeuser = get_secret('snowflakeuser')
        snowflakepassword = get_secret('snowflakepassword')

    else:
        mirroringspntenantid = notebookutils.credentials.getSecret(f'https://{df["keyvault"]}.vault.azure.net/', 'mirroringspntenantid')
        mirroringspnclientid = notebookutils.credentials.getSecret(f'https://{df["keyvault"]}.vault.azure.net/', 'mirroringspnclientid')
        mirroringspnclientsecret = notebookutils.credentials.getSecret(f'https://{df["keyvault"]}.vault.azure.net/', 'mirroringspnclientsecret')

        snowflakeaccount = notebookutils.credentials.getSecret(f'https://{df["keyvault"]}.vault.azure.net/', 'snowflakeaccount')
        snowflakeuser = notebookutils.credentials.getSecret(f'https://{df["keyvault"]}.vault.azure.net/', 'snowflakeuser')
        snowflakepassword = notebookutils.credentials.getSecret(f'https://{df["keyvault"]}.vault.azure.net/', 'snowflakepassword')


    def run_query(query):
        return run_query_(query, snowflakeaccount, snowflakeuser, snowflakepassword)
    
    def get_full_data_type_definition(view_name):
        return get_full_data_type_definition_(snowflakeaccount, snowflakeuser, snowflakepassword, snowflake_db, snowflake_schema, view_name)

    

    credential = ClientSecretCredential(tenant_id=str(mirroringspntenantid),
                        client_id=mirroringspnclientid,
                        client_secret=mirroringspnclientsecret)

    mirrored_db_path = f"{mirrored_db_id}/Files/LandingZone"

    dlsc = get_service_client_token_credential_(credential)
    fsc = dlsc.get_file_system_client(workspace_id)
    table_path = mirrored_db_path + f"/{view_name}"
    contents = list_directory_contents(fsc, directory_name=table_path)
    content_names = [content["name"] for content in contents]
    partitionnames = [int(cont.split("/")[-1].split(".")[0]) for cont in content_names if cont.endswith(".parquet")]
    if len(partitionnames) < 1:
        latest_partition = 0
    else:
        latest_partition = max(partitionnames)
    latest_partition = str(latest_partition + 1)
    while len(latest_partition) < 20:
        latest_partition = "0" + latest_partition
    latest_partition = latest_partition + ".parquet"

    if streamable == "False":

        def df_to_delete_routine(df_to_delete, columns):
            df_to_delete["__rowMarker__"] = 2

            for col in columns:
                if col[0] == "HASH_":
                    continue
                if "NUMBER" in col[1] or "FLOAT" in col[1]:
                    df_to_delete[col[0]] = 123
                elif col[1] == "BOOLEAN":
                    df_to_delete[col[0]] = True
                elif "DATE" in col[1] or "TIMESTAMP" in col[1]:
                    df_to_delete[col[0]] = "2023-10-01 12:00:00.000"
                    df_to_delete[col[0]] = pd.to_datetime(df_to_delete[col[0]])
                else:
                    df_to_delete[col[0]] = "Placeholder"

            df_to_delete["METADATAROW_ID"] = df_to_delete["METADATAROW_ID"].astype(str)

            return df_to_delete
        
        def df_inserts_routine(df_inserts):
            #df_inserts.rename(columns={col[0]: col[0].replace("_","").lower() for col in columns}, inplace=True)
            df_inserts.rename(columns={"HASH_": "METADATAROW_ID"}, inplace=True)
            df_inserts["__rowMarker__"] = 0
            df_inserts["METADATAROW_ID"] = df_inserts["METADATAROW_ID"].astype(str)
            return df_inserts
    
        def query_routine(snowflake_db, snowflake_schema, view_name, batch_size, columns):
            column_names = [col[0] for col in columns]
            
            column_names_str = ",".join(column_names)
            query = f"SELECT {column_names_str} FROM {snowflake_db}.{snowflake_schema}.{view_name}_MIRROR_NEW LIMIT {batch_size};"
            results = run_query(query)

            #df_inserts = create_typed_dataframe(results, columns)
            df_inserts = pd.DataFrame(results, columns=column_names, dtype="object")

            rest = batch_size - df_inserts[column_names[0]].count() 

            if rest > 0:
                query = f"SELECT HASH_ FROM {snowflake_db}.{snowflake_schema}.{view_name}_MIRROR_OLD LIMIT {rest};"
                results = run_query(query)
                df_to_delete = pd.DataFrame(results, columns=["METADATAROW_ID"])
            else:
                df_to_delete = None
            inserts = False
            if int(df_inserts["HASH_"].count()) > 0:
                inserts = True
                df_inserts = df_inserts_routine(df_inserts)

            deletes = False

            if df_to_delete is not None and int(df_to_delete["METADATAROW_ID"].count()) > 0:
                deletes = True
                df_to_delete = df_to_delete_routine(df_to_delete, columns)
            print(f"Inserts: {inserts}, Deletes: {deletes}")
            if inserts:
                print(f"Insert count: {df_inserts['METADATAROW_ID'].count()}")
            if deletes:
                print(f"Delete count: {df_to_delete['METADATAROW_ID'].count()}")        
            
            return inserts, df_inserts, deletes, df_to_delete


        batch_size = 100000
        
        schema, columns = get_full_data_type_definition(f"{view_name.upper()}_MIRROR_NEW")

        inserts, df_inserts, deletes, df_to_delete = query_routine(snowflake_db, snowflake_schema, view_name,
                                                                   batch_size, columns)
        
        while inserts or deletes:

            if inserts and deletes:
                df_concat = pd.concat([df_inserts, df_to_delete], ignore_index=True, sort=False)
            elif inserts and not deletes:
                df_concat = df_inserts
            elif deletes and not inserts:
                df_concat = df_to_delete
            else:
                df_concat = None
            os.makedirs('upload', exist_ok=True)

            table = pa.Table.from_pandas(df_concat, schema=schema)
            pq.write_table(table, f"upload/{latest_partition}")
            
            dldc = fsc.get_directory_client(table_path)
            upload_file_to_directory(dldc, local_path="upload", file_name=latest_partition)
            count = df_concat["METADATAROW_ID"].count()
            print(f"Uploaded {latest_partition} with row count {count}")

            if inserts:
                hash_value_str = ",".join([f"('{hash_}')" for hash_ in list(df_inserts["METADATAROW_ID"])])
                query = f"""INSERT INTO {snowflake_db}.{snowflake_schema}.{view_name.upper()}_MIRROR_HASHES (hash_) VALUES {hash_value_str}"""
                run_query(query)
            if deletes:
                hash_value_str = ",".join([f"('{hash_}')" for hash_ in list(df_to_delete["METADATAROW_ID"])])
                hash_value_str
                query = f"""DELETE FROM {snowflake_db}.{snowflake_schema}.{view_name}_MIRROR_HASHES WHERE HASH_ in ({hash_value_str})"""
                run_query(query)

            latest_partition = str(int(latest_partition.split(".")[0]) + 1)
            while len(latest_partition) < 20:
                latest_partition = "0" + latest_partition
            latest_partition = latest_partition + ".parquet"

            inserts, df_inserts, deletes, df_to_delete = query_routine(snowflake_db, snowflake_schema, view_name, batch_size, columns)


In [None]:
for index, row in df.iterrows():
    print(row["view_name"])
    run_for_one_table(row, local)
    print(f"Finished {row['view_name']}")

