In [8]:
# python 3.11
# pip install azure-storage-blob
# pip install snowflake-snowpar0k-python

In [9]:
from snowflake.snowpark import Session
import os
import xml.etree.ElementTree as ET
import sys
import datetime
from azure.storage.blob import BlobServiceClient
import re

connection_parameters = {
   "account": "kd66798.ca-central-1.aws",
   "user": "PRAVEEN11001",
   "password": "XXXXXXXXXXXXX",
   "role": "ACCOUNTADMIN",  # optional
   "warehouse": "COMPUTE_WH",  # optional
   "database": "DEMO",  # optional
   "schema": "PUBLIC"  # optional
}

session = Session.builder.configs(connection_parameters).create()

In [10]:
# Azure Blob Storage configuration
container_name = "gold"
blob_folder_name = "xmls"
chunked_folder_name = "chunked_xmls"
file_type = "xml"
storage_account_name = "myadlsatic"
sas_token = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
storage_account_key = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX=="
connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container_name)

In [11]:
def put_xml(xmlstring):
    
    tree = ET.ElementTree(ET.fromstring(xmlstring))
    root = tree.getroot()

    for _, child in enumerate(root.findall("./*")):
        child_str = ET.tostring(child, encoding='utf-8').decode('utf-8')
        xml_size = sys.getsizeof(child_str)

        if xml_size > 16000000:
            print(f"working for child {child.tag}")
            child_size = len(child)
            number_of_chunks = xml_size // 16000000 + 1
            grand_children_per_child = child_size // number_of_chunks
            remainder = child_size % number_of_chunks
            start_idx = 0

            for i in range(number_of_chunks):
                new_root = ET.Element(child.tag)
                end_idx = start_idx + grand_children_per_child + (1 if i < remainder else 0)

                for selected_children in child[start_idx:end_idx]:
                    new_root.append(selected_children)

                grand_root = ET.Element(root.tag)
                grand_root.append(new_root)
                new_tree = ET.ElementTree(grand_root)
                new_root = new_tree.getroot()
                xml_string = ET.tostring(new_root, encoding='utf-8')
                blob_name = f"{chunked_folder_name}/{child.tag}_{i}.xml"
                print(f"putting {child.tag}_{i} at {blob_name}")
                blob_client = container_client.get_blob_client(blob=blob_name)
                blob_client.upload_blob(xml_string, overwrite=True)

        else:
            print(f"working for child {child.tag}")
            new_root = ET.Element(root.tag)
            new_root.append(child)
            new_tree = ET.ElementTree(new_root)
            new_root = new_tree.getroot()
            xml_string = ET.tostring(new_root, encoding='utf-8')
            blob_name = f"{chunked_folder_name}/{child.tag}.xml"
            print(f"putting {child.tag} at {blob_name}")
            blob_client = container_client.get_blob_client(blob=blob_name)
            blob_client.upload_blob(xml_string, overwrite=True)

        

In [12]:
def copy_into_sf(session, folder_name,  filename):

    copy_date = datetime.datetime.utcnow()

    copy_result = (session.sql(f"""
    copy into xml_table(filename, xml_data, copy_date)
    from (select '{filename}' as filename, $1 as xml_data, '{copy_date}' as copy_date from @azure_stage/{folder_name}/{filename}.xml)
    on_error='skip_file'
    file_format= (format_name=my_xml_ff);
    """).collect())

    print(copy_result)


In [13]:
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
yesterday_date = datetime.datetime.utcnow().date() - datetime.timedelta(days=1)
container_client = blob_service_client.get_container_client(container_name)
blobs = container_client.list_blobs(name_starts_with=blob_folder_name)
file_pattern = re.compile(rf'^{re.escape(blob_folder_name)}\/.*\.{file_type}$')
count_more_than_size = 0
for blob in blobs:
    blob_name = blob.name
    blob_creation_date = blob.creation_time.date()
    print(blob_name)
    if file_pattern.match(blob_name) and blob_creation_date <= yesterday_date:
        print(f'working on {blob_name}')
        blob_client = container_client.get_blob_client(blob_name)
        blob_data = blob_client.download_blob()
        file_data = blob_data.readall().decode('utf-8')
        filename = blob_name.split("/")[-1].split(".")[0]
        print(filename)
        if blob.size > 16000000:
            count_more_than_size = 1
            put_xml(file_data)
        else:
            copy_into_sf(session, blob_folder_name, filename)
else:
    print("No file found in the specified container and folder.")



xmls
xmls/0006XO4MWT.xml
working on xmls/0006XO4MWT.xml
0006XO4MWT
[Row(file='azure://myadlsatic.blob.core.windows.net/gold/xmls/0006XO4MWT.xml', status='LOADED', rows_parsed=1, rows_loaded=1, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]
xmls/0006XOB39Z.xml
working on xmls/0006XOB39Z.xml
0006XOB39Z
working for child pb001BackgroundReportData
putting pb001BackgroundReportData at chunked_xmls/pb001BackgroundReportData.xml
working for child pb001ClaimNonDetailData
putting pb001ClaimNonDetailData_0 at chunked_xmls/pb001ClaimNonDetailData_0.xml
putting pb001ClaimNonDetailData_1 at chunked_xmls/pb001ClaimNonDetailData_1.xml
working for child pb001Comments
putting pb001Comments at chunked_xmls/pb001Comments.xml
working for child pb001CommentsClaimLevel
putting pb001CommentsClaimLevel at chunked_xmls/pb001CommentsClaimLevel.xml
working for child pb001CustomerServiceData
putting pb001CustomerServiceData at chun

In [14]:
# delete folder in azure blob
delete_folder = f'{chunked_folder_name}/'


def delete_blobs_in_folder(container_client, folder):
    blob_list = container_client.list_blobs(name_starts_with=folder)
    
    for blob in blob_list:
        container_client.delete_blob(blob.name)

def del_chunked_xmls():
    blob_service_client = BlobServiceClient.from_connection_string(connection_string)
    container_client = blob_service_client.get_container_client(container_name)

    # Delete blobs within the folder
    delete_blobs_in_folder(container_client, delete_folder)

    # After deleting blobs, attempt to delete the folder
    container_client.delete_blob(chunked_folder_name)

    
    try:
        container_client.delete_blob(delete_folder)
        print(f"Folder '{delete_folder}' deleted successfully.")
    except Exception as e:
        print(f"Error deleting folder '{delete_folder}': {str(e)}")

In [15]:
if count_more_than_size == 1:
    chunked_blobs = container_client.list_blobs(name_starts_with=chunked_folder_name)
    for blob in chunked_blobs:
        blob_name = blob.name
        print(f'working on {blob_name}')
        blob_client = container_client.get_blob_client(blob_name)
        blob_data = blob_client.download_blob()
        file_data = blob_data.readall().decode('utf-8')
        filename = blob_name.split("/")[-1].split(".")[0]
        print(filename)
        copy_into_sf(session, chunked_folder_name, filename)

del_chunked_xmls()
        

working on chunked_xmls
chunked_xmls
[Row(status='Copy executed with 0 files processed.')]
working on chunked_xmls/PrintOnceTable.xml
PrintOnceTable
[Row(file='azure://myadlsatic.blob.core.windows.net/gold/chunked_xmls/PrintOnceTable.xml', status='LOADED', rows_parsed=1, rows_loaded=1, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]
working on chunked_xmls/pb001BackgroundReportData.xml
pb001BackgroundReportData
[Row(file='azure://myadlsatic.blob.core.windows.net/gold/chunked_xmls/pb001BackgroundReportData.xml', status='LOADED', rows_parsed=1, rows_loaded=1, error_limit=1, errors_seen=0, first_error=None, first_error_line=None, first_error_character=None, first_error_column_name=None)]
working on chunked_xmls/pb001ClaimNonDetailData_0.xml
pb001ClaimNonDetailData_0
[Row(file='azure://myadlsatic.blob.core.windows.net/gold/chunked_xmls/pb001ClaimNonDetailData_0.xml', status='LOADED', rows_parsed=1, rows_loade