In [0]:
%pip install delta-sharing

In [0]:
import delta_sharing
from concurrent.futures import ThreadPoolExecutor
from collections import namedtuple
import multiprocessing
import threading 
import uuid
import re
import sys
import os
import requests

class DeltaShareRecipient:
  """
  This class allows you to add a share files, and perform various operations that make it easy to work with delta share files.
  """
  def __init__(self, share_profile_file_loc:str="", provider_sharing_identifier:str="", catalog:str="hive_metastore", table_prefix:str=""):
    """
    Constructor method to initialize the class with the given parameters.

    Args:
        share_profile_file_loc (str): The path to the share file on dbfs or any other cloud storage location.
        catalog (str, optional): The catalog to use for the tables. Defaults to "hive_metastore".
        table_prefix (str, optional): The prefix to use for the tables. Defaults to "".
    """
    self.__log("", f"DeltaShareRecipient started with share_profile_file_loc={share_profile_file_loc} and provider_sharing_identifier={provider_sharing_identifier}")
    if (share_profile_file_loc=="" and provider_sharing_identifier==""):
      raise Exception("You cannot use both share_profile_file_loc and provider_sharing_identifier, you can use either one of them.")
    elif share_profile_file_loc!="" and provider_sharing_identifier!="":
      raise Exception(f"You must provide value for either share_profile_file_loc={share_profile_file_loc} or provider_sharing_identifier={provider_sharing_identifier}, you cannot add values for both.")
    
    self.share_profile_file_loc = share_profile_file_loc
    self.provider_sharing_identifier = provider_sharing_identifier
    self.table_prefix = table_prefix
    self.catalog = catalog

    if provider_sharing_identifier != "":
      self.__log("", "Sharing type is Databricks to Databricks (D2D)")
      self.is_D2D_sharing = True # databricks-to-databricks
      self.tables = list()
      self.temp_catalog = f"{self.catalog}_temp_"
    else:
      self.__log("", "Sharing type is Databricks to Open (D2O)")
      self.is_D2D_sharing = False # databricks-to-open
      self.temp_catalog = self.catalog
      self.share_profile_file_loc = self.__load_share_profile(self.share_profile_file_loc)
      self.deltasharing_client = delta_sharing.SharingClient(self.share_profile_file_loc.replace("dbfs:", "/dbfs/"))
      self.tables = self.deltasharing_client.list_all_tables()
    
    self.sync_runs_db = 'hive_metastore.default'
    self.sync_runs_table = f"{self.sync_runs_db}.delta_sharing_sync_runs"
    #get current user
    self.current_user= self.__spark_sql("select current_user() as user;").collect()[0][0]
    self.__log("", f"delta sharing recipient initiated, shared tabled will be stored in catalog {catalog}")
    self.lock = threading.Lock()

  def summerise(self, sync_ids:list, full_summary=False):
    """
    This method will display the summary of the sync operations performed.

    Args:
        sync_ids (list): A list of sync ids.
    """
    self.__log("", "starting summurisation of sync operation")
    syncs = (', '.join('"' + sync_id + '"' for sync_id in sync_ids))
    if full_summary:
      display(self.__spark_sql(f"select * from {self.sync_runs_table} where sync_id in ({syncs}) order by completion_time desc;"))
    else:
      display(self.__spark_sql(f"select status, source_table, target_table, num_affected_rows,\
      (unix_timestamp(completion_time)-unix_timestamp(started_time)) as duration_seconds,\
      message from {self.sync_runs_table} where sync_id in ({syncs}) order by duration_seconds desc;"))

  def discover(self):
    """
    Returns a dataframe with all the information about the share file, including share, schema, and table.

    Returns:
        pyspark.sql.DataFrame: A dataframe containing the share, schema, and table.
    """
    if self.is_D2D_sharing:
      display(self.__spark_sql(f'SHOW SHARES IN PROVIDER `{self.provider_sharing_identifier}`;'))
    else: 
      tables = self.deltasharing_client.list_all_tables()
      if len(tables) > 0:
        display(spark.createDataFrame(data=tables,\
           schema = ["table","schema","share"]).select("share","schema","table"))
      else:
        self.__log("", "sharing file is empty. ask provider to add tables to this share to continue.")
  
  def create_remotely_linked_tables(self, share:str)->list:
    return self.__share_sync(share, cache_locally=False, refresh_incrementally=False,\
                 clear_previous_cache=True, clear_sync_history=False)

  def create_fully_cached_tables(self, share:str)->list:
    return self.__share_sync(share, cache_locally=True, refresh_incrementally=False,\
                 clear_previous_cache=True, clear_sync_history=False)
  
  def create_incrementally_cached_tables(self, share:str, primary_keys:dict)->list:
    return self.__share_sync(share, cache_locally=True, refresh_incrementally=True,\
                 clear_previous_cache=False, clear_sync_history=False, primary_keys=primary_keys)
  
  def __share_sync(self, share:str="", cache_locally:bool=False, refresh_incrementally:bool=False,\
                 clear_previous_cache:bool=False, clear_sync_history:bool=False, primary_keys:dict=dict(), num_threads=-1)->list:
    """
    This method will sync all tables inside a aspecific share, if no share provided, it will sync all tables, from all shares.

    Args:
        cache_locally (bool, optional): Whether to cache the table locally. Defaults to False.
        refresh_incrementally (bool, optional): Whether to refresh the cache incrementally, note CDF must be enabled on the source table. Defaults to False.
        clear_previous_cache (bool, optional): Whether to clear the previous cache (warning: this will drop the tables and clear all content). Defaults to False.
        clear_sync_history (bool, optional): Whether to clear the sync history. Defaults to False.
        primary_keys (dict, optional): The primary keys for the the tables inside the share, this is needed for incremental updates to work. Defaults is empty {},\
        however you can pass it in this format {'table_x':'id1, id2, id3', 'table_y':'idx, idy'}.
        num_threads (int, optional): how many concurrent threads to be used for sync

    Returns:
        list: A list of sync ids.
    """
    if num_threads == -1:
      num_threads = multiprocessing.cpu_count()
    try:
      self.__log("", f"share sync started and will utilise {num_threads} threads of the assigned cluster driver node")
      sync_ids = list()
      self.__clear_cache_and_sync_runs(cache_locally, refresh_incrementally, clear_previous_cache, clear_sync_history)

      if self.is_D2D_sharing:
        self.__create_d2d_tables(share, cache_locally)

      with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = []
        for table in self.tables:
          if share=="" or (share!="" and table.share == share):
            target_schema = re.sub(r"[^a-zA-Z0-9_]+", "_", table.schema)
            target_name = re.sub(r"[^a-zA-Z0-9_]+", "_", table.name)
            future = executor.submit(self.table_sync, table.share, f"`{table.schema}`.`{table.name}`",\
                                    f"{self.catalog}.{target_schema}.{self.table_prefix}{target_name}",\
                                    primary_keys.get(f"{table.schema}.{table.name}"), cache_locally, refresh_incrementally, clear_previous_cache)
            futures.append(future)
        for future in futures:
            sync_ids.append(future.result())
      if len(sync_ids) > 0:
        self.summerise(sync_ids)
      else:
        self.__log("", "sharing file is empty. ask provider to add tables to this share to continue.")
    finally:
      if self.is_D2D_sharing:
        self.__spark_sql(f'drop catalog if exists {self.temp_catalog} cascade;')
        #pass
    return sync_ids
  
  def table_sync(self, share:str, source:str, target:str, primary_keys:str="", cache_locally:bool=False,\
                 refresh_incrementally:bool=False, clear_previous_cache:bool=False)->str:
    """
    This method will sync a single table.

    Args:
        share (str): The share name.
        source (str): The source schema and table name in this format 'schema.table'.
        target (str): The target schema and table name to which you want to save the source to, in this format 'schema.table'.
        primary_keys (dict, optional): The primary keys for the the tables inside the share, this is needed for incremental updates to work. Defaults is empty {},\
        however you can pass it in this format {'table_x':'id1, id2, id3', 'table_y':'idx, idy'}.
        cache_locally (bool, optional): Whether to cache the table locally. Defaults to False.
        refresh_incrementally (bool, optional): Whether to refresh the cache incrementally, note CDF must be enabled on the source table. Defaults to False.
        clear_previous_cache (bool, optional): Whether to clear the previous cache (warning: this will drop the tables and clear all content). Defaults to False.
    Returns:
        str: The sync id.
      """
    try:  
      self.__log(source, f"sync operation started for share={share} and source={source}")
      #, target={target}, cache_locally={cache_locally}, refresh_incrementally={refresh_incrementally}, clear_previous_cache={clear_previous_cache}")
      sync_id=None
      if cache_locally == False:
        sync_type = "remotely-stored"
      elif refresh_incrementally == True:
        sync_type = "locally-cashed-incrementally-refreshed"
      else:
        sync_type = "locally-cashed-fully-refreshed"
      if cache_locally==False and refresh_incrementally:
        raise Exception("Can not have refresh_incrementally=True while table is cache_locally=False, set refresh_incrementally=False and try again.")

      status = 'STARTED'
      message = "table_sync started"
      starting_from_version = -1
      for table in self.tables:
        if table.share == share and f"`{table.schema}`.`{table.name}`" == source:
          self.__log(source, f"database {self.catalog}.{table.schema} will be created if not exists.")
          self.__spark_sql(f"CREATE DATABASE IF NOT EXISTS {self.catalog}.{table.schema};")
          break
              
      if clear_previous_cache:
        if refresh_incrementally:
          self.__log("", "warning: refresh_incrementally=True and clear_previous_cache=True, this will practically make incremental updates not effective, set clear_previous_cache=False to clear this warning.")
        #delete if it is D2O or D2D and cached locally

        if (self.is_D2D_sharing == False) or (self.is_D2D_sharing and cache_locally):
          self.__spark_sql(f"DROP TABLE IF EXISTS {target};")

      #insert initial entry recrod to log this run where
      sync_id=self.__log_table_sync_event(sync_id, share, source, starting_from_version, target, sync_type,\
                                          status, message,'null', 'null', 'null', 'null')
      
      temp_target = target.replace(self.catalog, self.temp_catalog)
      self.__create_table(sync_id, share, source, target, temp_target, primary_keys, sync_type)
    except Exception as e:
      status = "FAILED"
      message = "table_sync failed, exception message: " + str(e).replace('\'','"')
      self.__log(source, message)
      sync_id=self.__log_table_sync_event(sync_id, share, source, starting_from_version, target, sync_type,\
                                          status, message,'null', 'null', 'null', 'null')
      #raise e;
    return sync_id

  def __clear_cache_and_sync_runs(self, cache_locally:bool=False, refresh_incrementally:bool=False,\
                                  clear_previous_cache:bool=False, clear_sync_history:bool=False):
    self.__log("", f"cache managment started cache_locally={cache_locally}, refresh_incrementally={refresh_incrementally}, clear_sync_history={clear_sync_history}")
    
    self.__spark_sql(f"CREATE DATABASE IF NOT EXISTS {self.sync_runs_db};")
    
    if clear_sync_history:
      if refresh_incrementally:
        self.__log("all", "warning: refresh_incrementally=True and clear_sync_history=True, this will practically make incremental updates not effective, set clear_sync_history=False to clear this warning.")
      self.__spark_sql(f"DROP TABLE IF EXISTS {self.sync_runs_table};")
    
    if self.is_D2D_sharing:
      catalog_exists = False
      self.__spark_sql(f"drop catalog if exists {self.temp_catalog}")
      try:
        catalog_type = self.__spark_sql(f'desc catalog {self.catalog}').where('info_name="Catalog Type"').collect()[0][1]
        if clear_previous_cache and catalog_type == "Delta Sharing":
          self.__spark_sql(f"drop catalog if exists {self.catalog} cascade;")
        else:
          catalog_exists = True
          self.__log("", f"catalog {self.catalog} already exists.")
      except Exception as e:
        self.__log("", f"catalog {self.catalog} accepted to host shared data.")
      if catalog_exists and cache_locally == False:
        raise Exception(f"unable to create catalog {self.catalog} because it already exists. You must provide catalog name (other than hive_metastor) that does not exists to proceed. Alternatively, dorp this catalog and try again, you can use this command to drop the catalog: DROP CATALOG {self.catalog} cascade;")


  def __create_d2d_tables(self, share:str, cache_locally:bool):
    if share == "":
      raise Exception("You must specify a share if you are using a provider_sharing_identifier. call discover() to list all available shares and pick one that you would like to sync")

    catalog = self.temp_catalog
    if cache_locally == False: #no need for temp catalog if we are going to create remotly-stored tables
      catalog = self.catalog
      if catalog == "hive_metastore":
          raise Exception("Cannot use hive_metastore to host delta sharing table, provide another catalog name.")
      self.__log("", f"D2D sharing started, share {share} will be stored in catalog {catalog}")
    else:
      self.__log("", f"D2D sharing started, share {share} will be stored in catalog {catalog} temporarily")
      self.__spark_sql(f'create catalog if not exists {self.catalog};')
    self.__spark_sql(f'create catalog {catalog} using share `{self.provider_sharing_identifier}`.{share};')
    databases = self.__get_database_objects("databases", catalog, 'databaseName')
    
    SharedTable = namedtuple('Table', 'name schema share')
    for database in databases:
      if database == "information_schema":
        continue
      if cache_locally == True:
        self.__spark_sql(f'create database if not exists {self.catalog}.{database};')
      tables = self.__get_database_objects("tables", f"{catalog}.{database}", 'tableName')
      for table in tables:
        self.tables.append(SharedTable(table, database, share))
  
  def __create_table(self, sync_id:str, share:str, source:str, target:str, temp_target:str, primary_keys:str, sync_type:str):
    file_loc = self.share_profile_file_loc.replace("/dbfs", "dbfs:")
    table_url = f"{file_loc}#{share}.{source}".replace('`','') #must be in driver node fs
    (last_sync_type, starting_from_version, last_completion_timestamp) = self.__get_last_sync_version(source, target)
    max_table_version = self.__get_max_table_version(table_url, source)
    self.__log(source, f"table starting_From_version={starting_from_version} and max_table_version={max_table_version}")
    if sync_type == "remotely-stored":
      self.__log(source, f"table will be created remotely-stored")
      if self.is_D2D_sharing == False:#d2d tables are already created in the catalog, so no need to do anything here
        if spark.catalog.tableExists(target) == False: 
          try:
            self.__spark_sql(f"""CREATE TABLE {target}
                        USING deltaSharing
                        LOCATION '{table_url}';""")
          except Exception as e:
            if "Unsupported cloud file system scheme `dbfs`" in str(e):
              raise Exception(f"Unable to create table {target} on catalog {self.catalog}. possible reasons: \n1- you are using a Unity Catalog enabled cluster. --> You are not allowed to create remote table for dbfs location on Unity Catalog cluster, change the provided catalog to hive_metastor to continue. or set cache_locally=True")

          status = "SUCCESS"
          message = "table created as remote table"
          num_affected_rows = "null"
          num_updated_rows="null"
          num_deleted_rows="null"
          num_inserted_rows="null"
          starting_from_version = 'null'
        else:
          status = "FAILED"
          message = f'Target table {target} already exists. drop it manually before continuing or set clear_previous_cache=True.\
          Note setting this option will drop this table "{target}" and clear all content'
          num_affected_rows = "null"
          num_updated_rows="null"
          num_deleted_rows="null"
          num_inserted_rows="null"
          starting_from_version = 'null'
      else:
        status = "SUCCESS"
        message = "table created as remote table"
        num_affected_rows = "null"
        num_updated_rows="null"
        num_deleted_rows="null"
        num_inserted_rows="null"
        starting_from_version = 'null'
    #create local copy if 
    # 1- sync_type = locally-cashed-fully-refreshed
    # 2- sync_type = locally-cashed-incrementally-refreshedor but tables does not exists
    # 3- sync_type = locally-cashed-incrementally-refreshedor but no previous sync version
    # 4- sync_type = locally-cashed-incrementally-refreshedor and last sync version is greater than max table version (seems table is deleted and recreated at source)
    # 5- sync_type = locally-cashed-incrementally-refreshedor and no primary keys provided
    elif ((sync_type == "locally-cashed-fully-refreshed") or 
    (sync_type == "locally-cashed-incrementally-refreshed" and (self.is_D2D_sharing == False and spark.catalog.tableExists(f"{target}") == False)) or
    (sync_type == "locally-cashed-incrementally-refreshed" and starting_from_version is None) or
    (sync_type == "locally-cashed-incrementally-refreshed" and isinstance(max_table_version, str)) or
    (sync_type == "locally-cashed-incrementally-refreshed" and starting_from_version > max_table_version) or
    (sync_type == "locally-cashed-incrementally-refreshed" and (primary_keys is None or primary_keys.strip()==""))):
      self.__log(source, "starting locally-cashed-fully-refreshed")
      if self.is_D2D_sharing:
        spark.table(temp_target).write.saveAsTable(f"{target}", format="delta", mode="overwrite")
      else:
        delta_sharing.load_as_spark(table_url).write.saveAsTable(target, format="delta", mode="overwrite")
      status = "SUCCESS"
      message = "table created as fully refreshed local table"
      if sync_type == "locally-cashed-incrementally-refreshed":
        starting_from_version = max_table_version
        message = f"table was supposed to be refreshed incrementally, however, it failed and now it is created as full local cached copy. possible reasons for this 1- This is the first time to run the sync, 2- No primary keys specified to perform incremental updates. 3- table was dropped at source and recreated. 4- No previous recorded sync history for this table. 5- the {self.sync_runs_table} was deleted or cleared out."
        self.__log(source, message)
      else:
        starting_from_version = 'null'
      num_affected_rows = self.__get_table_count(target)
      num_updated_rows="null"
      num_deleted_rows="null"
      num_inserted_rows=num_affected_rows
    elif sync_type == "locally-cashed-incrementally-refreshed":
      self.__log(source, "starting locally-cashed-incrementally-refreshed")
      if starting_from_version == max_table_version:
        self.__log(source, f"table is up to date")
        status = "SUCCESS"
        message = f"update skipped, table {target} is up to date and last sync version is {starting_from_version}"
        self.__log("", message)
        num_affected_rows = 0
        num_updated_rows=0
        num_deleted_rows=0
        num_inserted_rows=0
      else:
        if starting_from_version<=0 or starting_from_version > max_table_version:
          self.__log(source, f"table starting_from_version is not applicable, resetting to use max_table_version instead")
          starting_from_version = max_table_version
        (status, message, starting_from_version, num_affected_rows, num_updated_rows, num_deleted_rows, num_inserted_rows) = \
        self.__sync_incrementally(sync_id, share, table_url, source, starting_from_version, max_table_version, target, temp_target,\
          primary_keys, last_sync_type, last_completion_timestamp)
    else:
      message = f"Unknow sync type specified for table {source}"
      Exception(message)
    self.__log_table_sync_event(sync_id, share, source, starting_from_version, target, sync_type, status,\
                             message, num_affected_rows, num_updated_rows, num_deleted_rows, num_inserted_rows)

  def __sync_incrementally(self, sync_id:str, share:str, table_url, source:str, starting_from_version, max_table_version,\
                           target:str, temp_target:str, primary_keys:str, last_sync_type, last_completion_timestamp):  
    
    if primary_keys is None or primary_keys.strip()=="":
      message = "can not perform incremental refresh without specifying table primary keys. you can pass primary keys as dictionary,\
      for example {'table_x':'pk1, pk2', 'table_y':'pk3, pk4'}"
      self.__log(source, message)
      raise Exception(message)
    
    conditions = [f"__source.{c.strip()}=__target.{c.strip()}" for c in primary_keys.split(',')]
    on_conditions = " and ".join([str(con) for con in conditions])
    
    view_name = re.sub(r"[^a-zA-Z0-9_]+", "_", source)+"_temp_view"
    try:
      if self.is_D2D_sharing:
        self.__spark_sql(f'create or replace temp view {view_name} as select * from table_changes("{temp_target}", {starting_from_version});')
      else:
        #create temp view with table changes
        delta_sharing.load_table_changes_as_spark(url=table_url, starting_version=starting_from_version).createOrReplaceTempView(view_name)
      
      self.__spark_sql(f'create or replace temp view partitioned_{view_name} as \
      (select distinct * from (SELECT *, rank() over (partition by {primary_keys} order by _commit_version desc) as __rank FROM {view_name}) \
      where _change_type != "update_preimage" and __rank=1);')
      
      self.__log(source, "starting merge operation from source to target")
      result = self.__spark_sql(f"merge into {target} as __target using partitioned_{view_name} as __source on {on_conditions} \
      when matched and __source._change_type in ('insert', 'update_postimage') then update set *\
      when matched and __source._change_type in('delete','update_preimage') then delete\
      when not matched and __source._change_type in ('insert', 'update_postimage') then insert *;")\
      .collect();
      
      num_affected_rows = result[0][0]
      num_updated_rows = result[0][1]
      num_deleted_rows = result[0][2]
      num_inserted_rows = result[0][3]
      latest_version = self.__spark_sql(f'select max(_commit_version) from {view_name};').collect()[0][0]
      self.__log(source, f"merge operation completed and resulted in {num_affected_rows} num_affected_rows")
      return ("SUCCESS", "table incrementally updated successfully", latest_version,\
              num_affected_rows, num_updated_rows, num_deleted_rows, num_inserted_rows)
    finally:
      self.__spark_sql(f'drop view if exists {view_name};')
      self.__spark_sql(f'drop view if exists partitioned_{view_name};')
  
  def __get_last_sync_version(self, source:str, target:str):
    last_version_df = self.__spark_sql(f"select sync_type, source_last_sync_version, completion_time from {self.sync_runs_table}\
    where source_table='{source}' and status='SUCCESS' and source_last_sync_version = (select max(source_last_sync_version)\
    from {self.sync_runs_table} where source_table='{source}' and status='SUCCESS' and source_last_sync_version is not null)\
    order by completion_time desc limit 1;")
    self.__log(source, f"table exists={spark.catalog.tableExists(target)}")
    if last_version_df.isEmpty() == False and spark.catalog.tableExists(target): #source must exists for the history to make sense
      self.__log(source, f"previous sync found")
      return (last_version_df.collect()[0][0], last_version_df.collect()[0][1], last_version_df.collect()[0][2])
    else:
      self.__log(source, f"table has no usable sync history to use for incremental sync")
      return (None, None, None)
  
  def __get_max_table_version(self, table_url:str, source:str)->int:
    try:
      self.__log(source, f"trying to get max_table_history")
      if self.is_D2D_sharing:
        self.__spark_sql(f"select * from table_changes('{self.temp_catalog}.{source}', {sys.maxsize}) limit 1;").limit(1).collect()
      else:
        delta_sharing.load_table_changes_as_spark(url=table_url, starting_version=sys.maxsize).limit(1).collect()
      version = sys.maxsize
      self.__log(source, f"table version is {version}")
    except Exception as e: #as expected the previous statement failed, lets extract the max table version from the error message
      #self.__log(source, str(e))
      pattern = r"latest version of the table\((\d+)\)"
      match = re.search(pattern, str(e))
      if match:
        version = int(match.group(1))
      else:
        version = 'null'
    return version
  
  def __log_table_sync_event(self, sync_id:str, share:str, source:str, starting_from_version:int, target:str, sync_type:str,\
                             status:str, message:str, num_affected_rows, num_updated_rows,num_deleted_rows, num_inserted_rows):
    with self.lock:
      if spark.catalog.tableExists(self.sync_runs_table) == False:
        #create sync_log table if not exists
        self.__spark_sql(f"create table {self.sync_runs_table} (sync_id string, share string, source_table string, source_last_sync_version int,\
        target_table string, started_by string, started_time timestamp, completion_time timestamp, status string, sync_type string,\
        num_affected_rows int, num_updated_rows int, num_deleted_rows int, num_inserted_rows int, message string);")
      if sync_id is None:
        sync_id = str(uuid.uuid4())
        self.__spark_sql(f"""insert into {self.sync_runs_table} values('{sync_id}', '{share}', '{source}', null, '{target}', '{self.current_user}',\
        current_timestamp(), null, '{status}', '{sync_type}', null, null, null, null, '{message}');""")
      else:
        self.__spark_sql(f"update {self.sync_runs_table} set source_last_sync_version={starting_from_version},\
        completion_time = current_timestamp(), status='{status}', sync_type='{sync_type}', num_affected_rows={num_affected_rows},\
        num_updated_rows = {num_updated_rows}, num_deleted_rows = {num_deleted_rows}, num_inserted_rows={num_inserted_rows},\
        message='{message}' where sync_id='{sync_id}';") 
    return sync_id

  def __get_table_count(self, target):
      return self.__spark_sql(f"select count(*) from {target};").collect()[0][0]

  def __spark_sql(self, sql):
    #print(sql)
    #print()
    return spark.sql(sql)

  def __log(self, id, msg):
    if id == "":
      print(f"[info] {msg}")
    else:
      print(f"[info - {id}] {msg}")
    pass
  
  def __get_database_objects(self, object_type, source, selector):
    return list(self.__spark_sql(f"show {object_type} in {source};").toPandas()[selector])

  def __load_share_profile(self, share_profile_loc:str):
    # Check if the input starts with http
    file_name = share_profile_loc
    if share_profile_loc.startswith("http"):
      # Generate the file name
      directory_path = "/dbfs/deltasharing/profiles/"
      file_name = directory_path + re.sub(r'[^a-zA-Z0-9_]', '', share_profile_loc.replace(".share", "")) + ".share"
      try:
          url = share_profile_loc
          self.__log("", "loading share profile form online location")
          if 'databricks' in share_profile_loc and "retrieve_config.html" in share_profile_loc:
            url = share_profile_loc.replace("/delta_sharing/retrieve_config.html?", "/api/2.0/unity-catalog/public/data_sharing_activation/")
            self.__log("", f"url is now pointing at {url}")
          
          # Read the file from the URL and store its content as a string
          content = requests.get(url).text
          # Check if the content contains 'shareCredentialsVersion'
          if 'shareCredentialsVersion' in content:
              # Create directories if they don't exist
              if not os.path.exists(directory_path):
                  os.makedirs(directory_path)
              with open(file_name, 'w') as file:
                  file.write(content)
                  self.__log("", f"file saved successfully to {file_name}")
          else:
              print(f"file can be read form online location, however it is not a valid delta share profile, will try loading form local cache.")
      except Exception as e:
        pass
      
    # Check if the file exists
    if not os.path.exists(file_name):
        raise Exception(f"Share profile {share_profile_loc} cannot be loaded from this location. if this an online location, try downloading the file manually, save it to dbfs and try again after setting share_profile_file_loc to the dbfs location of the saved file.")
    else:
      self.__log("", f"file is available at {file_name}")
      return file_name
