### Syanpse Lakehouse Sync Functions

* UnifyFolderPaths - Creates a dictionary for the abfss and https paths given either abfss or https as the path given.
* Tracker - Creates a _SynapseLakehouseTracker delta table for the given pool if it does not exist.

#### UnifyFolderPaths Function
**Creates a dictionary for the abfss and https paths given either abfss or https as the path given.**

In [0]:
import re

"""
Creates a dictionary for the abfss and https paths given either abfss or https as the path given.

Parameters:
    - Key: Prefix for the JSON item. 
    - FolderPathFull: Either abfss or https ADLS folder paths

Output:
    - Dictionary of the abfss and https paths

Example:
    Input - UnifyFolderPaths('TestLocation', 'abfss://mycontainer@mystorageaccount.dfs.core.net.windows')
    Output - {
                'TestLocation_abfss', 'abfss://mycontainer@mystorageaccount.dfs.core.net.windows/myFolder'
                ,'TestLocation_https', 'https://mystorageaccount.dfs.core.net.windows/mycontainer/myFolder'
            }
"""
def UnifyFolderPaths(key, FolderPathFull) -> dict:
  folderPathDict = dict()
  
  if FolderPathFull.startswith('abfss://'):
    ## abfss => https:
    # FolderPathFull = 'abfss://goldzone@adlsbrmyers.dfs.core.windows.net/AdventureWorksDW2019_base_delta/dbo/FactInternetSales/'
    patternFileSystem = '(?<=abfss:\/\/).+(?=\@)'
    patternStorageAccount = '@([^\/]*)'
    patternRemaining = '((?:[^\/]*\/){3})(.+)'

    fileSystemName = re.findall(patternFileSystem, FolderPathFull)[0]
    storageAccountName = re.findall(patternStorageAccount, FolderPathFull)[0]
    
    if len(re.findall(patternRemaining, FolderPathFull)) > 0:
      remaining = re.findall(patternRemaining, FolderPathFull)[0][1]
    else:
      remaining = ''
    FolderPathFullHttps = f'https://{storageAccountName}/{fileSystemName}/{remaining}'

    # print(FolderPathFullHttps)
    folderPathDict = {f'{key}_abfss':FolderPathFull.rstrip("/"), f'{key}_https':FolderPathFullHttps.rstrip("/")}

  elif FolderPathFull.startswith('https://'):
    ## https => abfss:
    # FolderPathFull = 'https://adlsbrmyers.dfs.core.windows.net/goldzone/AdventureWorksDW2019_base_delta/dbo/FactInternetSales/'
    patternFileSystem = '((?:.*?\/){3})(.+)((?:.*?\/){4})'
    patternStorageAccount = '(?<=https:\/\/).+(?=\.dfs)'
    patternRemaining = '((?:[^\/]*\/){4})(.+)'

    fileSystemName = re.findall(patternFileSystem, FolderPathFull)[0][1]
    storageAccountName = re.findall(patternStorageAccount, FolderPathFull)[0]
    
    if len(re.findall(patternRemaining, FolderPathFull)) > 0:
      remaining = re.findall(patternRemaining, FolderPathFull)[0][1]
    else:
      remaining = ''
      
    FolderPathFullAbfss = f'abfss://{fileSystemName}@{storageAccountName}.dfs.core.windows.net/{remaining}'

    # print(FolderPathFullAbfss)
    folderPathDict = {f'{key}_abfss':FolderPathFullAbfss.rstrip("/"), f'{key}_https':FolderPathFull.rstrip("/")}
    
  return folderPathDict



In [0]:
import re

def GetAzureStorageAccountName(FolderPathFull) -> dict:
  folderPathDict = dict()
  
  if FolderPathFull.startswith('abfss://'):
    ## abfss => https:
    # FolderPathFull = 'abfss://goldzone@adlsbrmyers.dfs.core.windows.net/AdventureWorksDW2019_base_delta/dbo/FactInternetSales/'
    patternStorageAccount = '@([^\/]*)(?=\.dfs)'

    storageAccountName = re.findall(patternStorageAccount, FolderPathFull)[0]

  elif FolderPathFull.startswith('https://'):
    ## https => abfss:
    # FolderPathFull = 'https://adlsbrmyers.dfs.core.windows.net/goldzone/AdventureWorksDW2019_base_delta/dbo/FactInternetSales/'
    patternStorageAccount = '(?<=https:\/\/).+(?=\.dfs)'
    
  return storageAccountName

#### Tracker Class
**Creates a _SynapseLakehouseTracker delta table for the given pool if it does not exist.**

In [0]:
import datetime
import hashlib
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, LongType, ArrayType, IntegerType, BooleanType
from delta import *

"""
Creates a _SynapseLakehouseTracker delta table for the given pool if it does not exist.

def __int__:
    Parameters:
        - PoolName: The dedicated sql pool name.
        - TrackerFolderPath: The folder path where the delta table will be created.

  Output:

def AddEntry:
    Parameters:
      - PoolName: The dedicated sql pool name.
      - TrackerFolderPath: The folder path where the delta table will be created.

  Output:
  
def CloseEntry:
    Parameters:
      - PoolName: The dedicated sql pool name.
      - TrackerFolderPath: The folder path where the delta table will be created.

  Output:
  
def GetLastSuccessfulSynapseLoad:
    Parameters:
        - PoolName: The dedicated sql pool name.
        - TrackerFolderPath: The folder path where the delta table will be created.

  Output:
  
"""
class Tracker():
    def __init__(self, SynapseWorkspaceName:str, PoolName:str, TrackerFolderPath:str):
        self.syncSchema = StructType([       
          StructField('SynapseLakehouseSyncKey', StringType(), False),
          StructField('PoolName', StringType(), False),
          StructField('SchemaName', StringType(), False),
          StructField('TableName', StringType(), False),
          StructField('VersionNumberStart', LongType(), False),
          StructField('VersionNumberEnd', LongType(), False),
          StructField('DateTimeStart', TimestampType(), False),
          StructField('DateTimeEnd', TimestampType(), False),
          StructField('InsertDateTime', TimestampType(), False),
          StructField('LoadType', StringType(), False),
          StructField('ChangeTypes', ArrayType(StringType()), False),
          StructField('TableRowCountADLS', LongType(), True),
          StructField('TableRowCountSynapse', LongType(), True),
          StructField('ADLSStagedFlag', BooleanType(), False),
          StructField('SynapseLoadedFlag', BooleanType(), False),
          StructField('SynapseLoadedDateTime', TimestampType(), True)
      ])
        self.trackerFolderPath = UnifyFolderPaths(key='TrackerFolderPath', FolderPathFull=TrackerFolderPath)
        self.SynapseWorkspaceName = SynapseWorkspaceName
        self.poolName = PoolName
        # DeltaTable.createIfNotExists(spark).location(f'{self.trackerFolderPath["TrackerFolderPath_abfss"]}/{SynapseWorkspaceName}_{PoolName}_SynapseLakehouseSync/_SynapseLakehouseSyncTracker').addColumns(self.syncSchema).execute()
        spark.createDataFrame([], schema=self.syncSchema).write.format("delta").mode('append').save(f'{self.trackerFolderPath["TrackerFolderPath_abfss"]}/{SynapseWorkspaceName}_{PoolName}_SynapseLakehouseSync/_SynapseLakehouseSyncTracker')

    def AddEntry(self, SchemaName:str, TableName:str, VersionStart:int, VersionEnd:int, DateTimeStart:str, DateTimeEnd:str, LoadType:str, ChangeTypes:str, TableRowCountADLS:int):
      
        InsertDateTime = datetime.datetime.strptime(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')
        SynapseLakehouseSyncKey = hashlib.md5('||'.join([self.poolName, SchemaName, TableName, InsertDateTime.strftime('%Y-%m-%d %H:%M:%S')]).encode()).hexdigest()
        
        df = spark.createDataFrame(data=[(SynapseLakehouseSyncKey, self.poolName, SchemaName, TableName, VersionStart, VersionEnd, DateTimeStart, DateTimeEnd, InsertDateTime, LoadType, ChangeTypes, TableRowCountADLS, None, True, False, None) ], schema=self.syncSchema)

        df.write.format('delta').mode('append').save(f'{self.trackerFolderPath["TrackerFolderPath_abfss"]}/{self.SynapseWorkspaceName}_{self.poolName}_SynapseLakehouseSync/_SynapseLakehouseSyncTracker')

        return SynapseLakehouseSyncKey
    
    def CloseEntry(self, SynapseLakehouseSyncKey:str, TableRowCountSynapse:int):
        
        spark.sql(f"""UPDATE delta.`{self.trackerFolderPath["TrackerFolderPath_abfss"]}/{self.SynapseWorkspaceName}_{self.poolName}_SynapseLakehouseSync/_SynapseLakehouseSyncTracker`
            SET SynapseLoadedFlag = True
                ,SynapseLoadedDateTime = '{datetime.datetime.now()}'
                ,TableRowCountSynapse = {TableRowCountSynapse}
            WHERE PoolName = '{self.poolName}'
            AND SynapseLakehouseSyncKey = '{SynapseLakehouseSyncKey}'
        """)

    def GetLastSuccessfulSynapseLoad(self, SchemaName:str, TableName:str):
        
        spark.read.format('delta').load(f'{self.trackerFolderPath["TrackerFolderPath_abfss"]}/{self.SynapseWorkspaceName}_{self.poolName}_SynapseLakehouseSync/_SynapseLakehouseSyncTracker').createOrReplaceTempView('vwTracker')

        dfTracker = spark.sql(f"""
        SELECT PoolName, SchemaName, TableName, VersionNumberEnd, DateTimeEnd
        FROM
        (
          SELECT PoolName, SchemaName, TableName, VersionNumberEnd, DateTimeEnd, ROW_NUMBER() OVER (PARTITION BY PoolName, SchemaName, TableName ORDER BY InsertDateTime DESC) AS _RN
          FROM vwTracker
          WHERE SynapseLoadedFlag = True
          AND TableRowCountADLS == TableRowCountSynapse
          AND PoolName = '{self.poolName}'
          AND SchemaName = '{SchemaName}'
          AND TableName = '{TableName}'
        )
        WHERE _RN = 1
        """)
        
        return dfTracker


# if __name__ == '__main__':
#     track = Tracker(PoolName='TestPool', TrackerFolderPath='abfss://bronzezone@adlsstorageaccount.dfs.core.windows.net/')

#     key = track.AddEntry(SchemaName='TestDB', TableName='TestTable', VersionStart=0, VersionEnd=0, DateTimeStart=datetime.datetime.strptime('2022-08-20 21:25:24', '%Y-%m-%d %H:%M:%S'), DateTimeEnd=datetime.datetime.strptime('2022-08-20 21:25:24', '%Y-%m-%d %H:%M:%S'), LoadType='', ChangeTypes=[''])
#     print(key)

#     track.CloseEntry('e22957d685f716b6e0b5e26c4e8be860')

#     display(track.GetLastSuccessfulSynapseLoad('TestDB', 'TestTable'))

    # SchemaName='TestDB', TableName='TestTable', VersionNumberStart=0, VersionNumberEnd=0, DateTimeStart=datetime.datetime.strptime('2022-08-20 21:25:24', '%Y-%m-%d %H:%M:%S'), DateTimeEnd=datetime.datetime.strptime('2022-08-20 21:25:24', '%Y-%m-%d %H:%M:%S'), LoadType='', ChangeTypes=[''], StagedFlag=True, LoadedFlag=False)

    # keyValue = Tracker(PoolName='TestPool', SchemaName='TestDB', TableName='TestTable', VersionNumberStart=0, VersionNumberEnd=0, DateTimeStart=datetime.datetime.strptime('2022-08-20 21:25:24', '%Y-%m-%d %H:%M:%S'), DateTimeEnd=datetime.datetime.strptime('2022-08-20 21:25:24', '%Y-%m-%d %H:%M:%S'), LoadType='', ChangeTypes=[''], StagedFlag=True, LoadedFlag=False)
