### Documentation

- Author:      CKS
- Create date: 20240625
- Description:
    - Takes [AccountSchedule], [Account] and [AccountScheduleSetup] as input.
    - Creates Many:Many relationship between Accounts and SubTotals in [AccountMapping].
    - Creates Account-Grouping and SubTotals in [AccountHierarchy].
    - Highlights errors and gaps in user input through [AccountMappingError] and grouping of missing Accounts as "Unmapped" in [AccountHierarchy].
- Assumptions:
    - Any given AccountNo is used in the same way across Companies.
        - For example, a single account number can't be turnover in one company and COGS in another.
- Input:
    1. [AccountSchedule]
        - Any input can work, but make sure to transform your input to the stated ColumnNames and Datatypes.
        - No Primary Key or Uniqueness is assumed in the code.
    2. [Account]
        - Dimension-table holding all Accounts (AccountNo and Income/Balance flag needed)
        - Can include same AccountNo from multiple Companies
    3. [AccountScheduleSetup]
        - Specifies which ScheduleNames should be processed along with which Account Type(s) they each should include (Income and/or Balance).
- Changelog:
    - 20240702-CKS - Added error handling with [AccountMappingError] and grouping of missing Accounts as "Unmapped".
    - 20241014-CKS - Converted to run in Fabric Notebook reading from and writing to Fabric Lakehouse

### Setup

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *
from delta.tables import *

spark.conf.set('spark.sql.caseSensitive', True)

In [None]:
Account = spark.table("Account")
AccountScheduleSetup = spark.table("AccountScheduleSetup")

### AccountSchedule

#### Pre-Execution Cleanup
Defining AccountSchedule

In [None]:
AccountSchedule = spark.sql("""
    SELECT Schedule.AccountScheduleId AS AccountScheduleId
          ,Schedule.ScheduleName AS ScheduleName
          ,Schedule.SortOrder AS SortOrder
          ,Schedule.RowNo AS RowNo
          ,Schedule.TotalingType AS RowType
          ,Schedule.Description AS Description
          ,REPLACE(REPLACE(REPLACE(REPLACE(Schedule.Totaling, '...', '.'), '..', '.'), '+', '|'), ' ', '') AS Totaling -- To avoid common typos and simplify subsequent code.
          ,CASE
                WHEN COUNT(*) OVER (PARTITION BY Schedule.ScheduleName,Schedule.RowNo) > 1 THEN 1 -- Marking duplicated row numbers for later error handling.
                ELSE 0
                END AS DuplicateRowNo
          ,CASE
                WHEN Schedule.TotalingType NOT IN ('Account', 'Group', 'SubTotal') THEN 1   -- Marking unknown row types for later error handling.
                ELSE 0
                END AS UnknownRowType
          ,CASE
                WHEN ROW_NUMBER() OVER (PARTITION BY Schedule.ScheduleName ORDER BY Schedule.SortOrder DESC) = 1 AND Schedule.TotalingType = 'SubTotal' THEN 1  -- Marking the last subtotal for later exclusion from error handling.
                ELSE 0
                END AS LastSubTotal
    FROM AccountSchedule Schedule
    INNER JOIN AccountScheduleSetup Setup -- Only Schedules specified in Setup will be processed.
        ON Setup.ScheduleName = Schedule.ScheduleName
    """)

#### Unique Account and RowNo
For use with interval specifications

In [None]:
### AccountNo
UniqueAccount = Account.select(
    col("AccountNo"),
    lpad("AccountNo", 20, "0").alias("SortableAccountNo"),   # Added to be able to sort Int AccountNo correctly and still allow non-numeric AccountNo.
    col("IncomeBalance"),
    row_number().over(Window.partitionBy("AccountNo").orderBy("IncomeBalance")).alias("RowNumber")
)

AccountNo = UniqueAccount.where(
        col("RowNumber") == 1   # Defensive check in case AccountNo is not uniquely Income/Balance globally.
    ).select(
        "AccountNo",
        "SortableAccountNo",
        "IncomeBalance"
)

### RowNo
RowNo = AccountSchedule.select("ScheduleName","RowNo").distinct()

#### Finding all Accounts and Rows included in the Totaling

In [None]:
### Account
# Interval and AND -Logic on Totaling Column
AccountScheduleAccounts = AccountSchedule.where(
                AccountSchedule.RowType == "Account"
            ).withColumn(
                "SplitOnPipe", explode(split("Totaling", "\|"))
            ).withColumns({
                "FromAccountNo": split("SplitOnPipe", "\.").getItem(0),
                "ToAccountNo": split("SplitOnPipe", "\.").getItem(1),
                "TooManyIntervals": when(size(split("SplitOnPipe", "\.")) > 2, 1).otherwise(0)
            }).withColumn(
                "ToAccountNo", coalesce("ToAccountNo","FromAccountNo")
            ).withColumns({
                "FromAccountNo": lpad("FromAccountNo", 20, "0"),
                "ToAccountNo": lpad("ToAccountNo", 20, "0")
            })

# Final Account DF
Accounts = AccountScheduleAccounts.alias("Schedule").join(
                AccountScheduleSetup.alias("Setup"),
                [col("Setup.ScheduleName") == col("Schedule.ScheduleName")],
                "inner"
            ).join(
                AccountNo.alias("ANo"),
                (
                      (col("ANo.SortableAccountNo") >= col("Schedule.FromAccountNo"))
                    & (col("ANo.SortableAccountNo") <= col("Schedule.ToAccountNo"))
                )
                & (
                    (
                        (col("ANo.IncomeBalance") == 0) & (col("Setup.IncludeIncomeAccounts") == 1)
                    )
                    | (
                        (col("ANo.IncomeBalance") == 1) & (col("Setup.IncludeBalanceAccounts") == 1)
                    )
                ),
                "inner"
            ).select(
                col("Schedule.AccountScheduleId"),
                col("Schedule.ScheduleName"),
                col("Schedule.SortOrder"),
                col("Schedule.RowNo"),
                col("Schedule.RowType"),
                col("Schedule.Description"),
                col("ANo.AccountNo")
            )

### Group / SubTotal
# Interval and AND -Logic on Totaling Column
AccountScheduleGroupSubTotals = AccountSchedule.where(
                AccountSchedule.RowType.isin(["Group", "SubTotal"])
            ).withColumn(
                "SplitOnPipe", explode(split("Totaling", "\|"))
            ).withColumns({
                "FromRowNo": split("SplitOnPipe", "\.").getItem(0),
                "ToRowNo": split("SplitOnPipe", "\.").getItem(1),
                "TooManyIntervals": when(size(split("SplitOnPipe", "\.")) > 2, 1).otherwise(0)
            }).withColumn(
                "ToRowNo", coalesce("ToRowNo","FromRowNo")
            )

# Final GroupSubTotal DF
GroupSubTotals = AccountScheduleGroupSubTotals.alias("Schedule").join(
                RowNo.alias("RNo"),
                [
                col("RNo.ScheduleName") == col("Schedule.ScheduleName"),
                col("RNo.RowNo") >= col("Schedule.FromRowNo"),
                col("RNo.RowNo") <= col("Schedule.ToRowNo")
                ],
                "inner"
            ).select(
                col("Schedule.AccountScheduleId"),
                col("Schedule.ScheduleName"),
                col("Schedule.SortOrder"),
                col("Schedule.RowNo"),
                col("Schedule.RowType"),
                col("Schedule.Description"),
                col("RNo.RowNo").alias("ChildRowNo")
            )

### AccountHierarchy

#### SubTotal 

In [None]:
SubTotal = AccountSchedule.where(
        col("RowType") == "SubTotal"
    ).select(
        col("ScheduleName"),
        lit(0).alias("CompanyId"),  # Shared Chart of Accounts
        col("RowType"),
        lit("SubTotal").alias("AccountNo"),
        col("Description").alias("Level1"),
        col("Description").alias("Level2"),
        col("Description").alias("Level3"),
        col("SortOrder").alias("Level1Sort"),
        col("SortOrder").alias("Level2Sort"),
        col("SortOrder").alias("Level3Sort"),
        col("RowNo").alias("Level1RowNo"),
        col("RowNo").alias("Level2RowNo"),
        col("RowNo").alias("Level3RowNo")
    )

#### Account incl. (up to) 3 levels of Grouping

In [None]:
AccountWithGrouping = Accounts.alias("A").join(
        GroupSubTotals.alias("G1"),
        [
        col("G1.ScheduleName") == col("A.ScheduleName"),
        col("G1.RowType") == "Group",
        col("G1.ChildRowNo") == col("A.RowNo")
        ],
        "left"
    ).join(
        GroupSubTotals.alias("G2"),
        [
        col("G2.ScheduleName") == col("G1.ScheduleName"),
        col("G2.RowType") == "Group",
        col("G2.ChildRowNo") == col("G1.RowNo")
        ],
        "left"
    ).select(
        col("A.ScheduleName"),
        lit(0).alias("CompanyId"),  # Shared Chart of Accounts
        col("A.RowType"),
        col("A.AccountNo"),
        coalesce(col("G2.Description"), col("G1.Description"), col("A.Description")).alias("Level1"),
        coalesce(col("G1.Description"), col("A.Description")).alias("Level2"),
        col("A.Description").alias("Level3"),
        coalesce(col("G2.SortOrder"), col("G1.SortOrder"), col("A.SortOrder")).alias("Level1Sort"),
        coalesce(col("G1.SortOrder"), col("A.SortOrder")).alias("Level2Sort"),
        col("A.SortOrder").alias("Level3Sort"),
        coalesce(col("G2.RowNo"), col("G1.RowNo"), col("A.RowNo")).alias("Level1RowNo"),
        coalesce(col("G1.RowNo"), col("A.RowNo")).alias("Level2RowNo"),
        col("A.RowNo").alias("Level3RowNo")
    )

#### Any missing Accounts are incl. as "Unmapped"

In [None]:
Unmapped = AccountScheduleSetup.alias("Setup").join(
        AccountNo.alias("ANo"),
        (
            (col("ANo.IncomeBalance") == 0) & (col("Setup.IncludeIncomeAccounts") == 1)
        )
        | (
            (col("ANo.IncomeBalance") == 1) & (col("Setup.IncludeBalanceAccounts") == 1)
        ),
        "inner"
    ).select(
        col("Setup.ScheduleName"),
        lit(0).alias("CompanyId"),  # Shared Chart of Accounts
        lit("Account").alias("RowType"),
        col("ANo.AccountNo"),
        lit("Unmapped").alias("Level1"),
        lit("Unmapped").alias("Level2"),
        lit("Unmapped").alias("Level3"),
        lit(9999999).alias("Level1Sort"),
        lit(9999999).alias("Level2Sort"),
        lit(9999999).alias("Level3Sort"),
        lit("R9999").alias("Level1RowNo"),
        lit("R9999").alias("Level2RowNo"),
        lit("R9999").alias("Level3RowNo")
    )

# Filter out accounts that are already mapped
Unmapped = Unmapped.join(
    Accounts,
    [
    Unmapped.ScheduleName == Accounts.ScheduleName,
    Unmapped.AccountNo == Accounts.AccountNo
    ],
    "left_anti"
)

#### Final AccountHierarchy

In [None]:
AccountHierarchy = SubTotal.unionByName(
        AccountWithGrouping
    ).unionByName(
        Unmapped
    )

AccountHierarchy = AccountHierarchy.withColumn("AccountHierarchyId", monotonically_increasing_id())
AccountHierarchy = AccountHierarchy.select("AccountHierarchyId", *AccountHierarchy.columns[:-1])

AccountHierarchy.write.format("delta").mode("overwrite").saveAsTable("AccountHierarchy")

AccountHierarchy = spark.table("AccountHierarchy")

### AccountMapping
- Many-to-Many Mapping-table between AccountHierarchy and Account Dimension
    - All Accounts are mapped to themselves
    - All relevant Accounts are also mapped to a given SubTotal

In [None]:
# Initiating with "Ground Level"
MappingSource = GroupSubTotals.where(
        col("RowType") == "SubTotal"
    ).select(
        col("ScheduleName"),
        lit(0).alias("LevelNo"),
        col("RowNo"),
        col("RowNo").alias("ParentRowNo"),
        col("ChildRowNo")
    )

# Looping until all Parent-Child references have been resolved
LevelNo = 1
while True:
    NewMapping = MappingSource.alias("G").where(
            col("LevelNo") == LevelNo - 1
        ).join(
            GroupSubTotals.alias("G1"), 
            [
            col("G1.ScheduleName") == col("G.ScheduleName"), 
            col("G1.RowNo") == col("G.ChildRowNo")
            ],
            "inner"
        ).select(
            col("G.ScheduleName"),
            lit(LevelNo).alias("LevelNo"),
            col("G.RowNo"),
            col("G1.RowNo").alias("ParentRowNo"),
            col("G1.ChildRowNo")
        )
    
    if NewMapping.count() == 0:
        break
    
    MappingSource = MappingSource.union(NewMapping)
    LevelNo += 1

In [None]:
# Account-Account Mapping
AccountAccountMapping = AccountHierarchy.alias("AH").where(
        col("AH.RowType") == "Account"
    ).join(
        Account.alias("A"),
        col("A.AccountNo") == col("AH.AccountNo"),
        "inner"
    ).select(
        col("AH.AccountHierarchyId"),
        col("A.AccountId")
    )

# SubTotal-Account Mapping
SubTotalAccountMapping = MappingSource.alias("M").join(
        AccountHierarchy.alias("AH"), 
        [
        col("AH.ScheduleName") == col("M.ScheduleName"),
        col("AH.Level3RowNo") == col("M.RowNo")
        ],
        "inner"
    ).join(
        AccountHierarchy.alias("AM").where(col("AM.RowType") == "Account"), 
        [
        col("AM.ScheduleName") == col("M.ScheduleName"),
        col("AM.Level3RowNo") == col("M.ChildRowNo")
        ],
        "inner"
    ).join(
        Account.alias("A"),
        col("A.AccountNo") == col("AM.AccountNo"),
        "inner"
    ).select(
        col("AH.AccountHierarchyId"),
        col("A.AccountId")
    )

#### Final AccountMapping

In [None]:
AccountMapping = AccountAccountMapping.unionByName(
        SubTotalAccountMapping
    )

AccountMapping.write.format("delta").mode("overwrite").saveAsTable("AccountMapping")

### AccountMappingError
The following six types of errors are addressed:
1. "Unmapped": The row is not referenced in the totaling column of other rows.
2. "Multi Mapping": Accounts or rows referenced in totaling are also referenced by other rows.
3. "Duplicate Row No": The same row number is used twice within the given schedule.
4. "Unknown Row Type": The row type isn't one of the three recognized types (Account, Group, and SubTotal).
5. "Unknown Row No Reference": The totaling doesn't reference any rows defined within the given schedule.
6. "Unknown Account Reference": The totaling doesn't reference any known accounts.


In [None]:
### 1. Unmapped
UnmappedErrors = AccountSchedule.alias("AccSch").where(
        col("LastSubTotal") != 1
    ).join(
        GroupSubTotals.alias("G"),
        [
        col("G.ScheduleName") == col("AccSch.ScheduleName"),
        col("G.ChildRowNo") == col("AccSch.RowNo")
        ],
        "left_anti"
    ).select(
        col("AccSch.AccountScheduleId"),
        lit("1. Unmapped").alias("ErrorDesc")
    )

### 2. Multi Mapping
# For Accounts
MultiMappingErrors_Account = Accounts.withColumn(
    "MultiMapping",
    when(count("*").over(Window.partitionBy("ScheduleName", "AccountNo")) > 1, 1).otherwise(0)
)
# For GroupSubTotals
MultiMappingErrors_GroupSubTotal = GroupSubTotals.withColumn(
    "MultiMapping",
    when(count("*").over(Window.partitionBy("ScheduleName", "ChildRowNo")) > 1, 1).otherwise(0)
)

# Union MultiMapping Errors from both
MultiMappingErrors = MultiMappingErrors_Account.where(col("MultiMapping") == 1).select("AccountScheduleId").union(
        MultiMappingErrors_GroupSubTotal.where(col("MultiMapping") == 1).select("AccountScheduleId")
    ).select(
        col("AccountScheduleId"),
        lit("2. Multi Mapping").alias("ErrorDesc")
    )

### 3. Duplicate RowNo
DuplicateRowNoErrors = AccountSchedule.where(col("DuplicateRowNo") == 1).select(
    col("AccountScheduleId"),
    lit("3. Duplicate RowNo").alias("ErrorDesc")
)

### 4. Unknown RowType
UnknownRowTypeErrors = AccountSchedule.where(col("UnknownRowType") == 1).select(
    col("AccountScheduleId"),
    lit("4. Unknown RowType").alias("ErrorDesc")
)

### 5. Unknown RowNo reference
UnknownRowNoRefErrors = AccountSchedule.alias("AccSch").where(
        col("RowType").isin(["Group", "SubTotal"])
    ).join(
        GroupSubTotals.alias("G"),
        col("G.AccountScheduleId") == col("AccSch.AccountScheduleId"),
        "left_anti"
    ).select(
        col("AccountScheduleId"),
        lit("5. Unknown RowNo reference").alias("ErrorDesc")
    )

### 6. Unknown Account reference
UnknownAccountRefErrors = AccountSchedule.alias("AccSch").where(
        col("RowType") == "Account"
    ).join(
        Accounts.alias("A"),
        col("A.AccountScheduleId") == col("AccSch.AccountScheduleId"),
        "left_anti"
    ).select(
        col("AccountScheduleId"),
        lit("6. Unknown Account reference").alias("ErrorDesc")
    )

#### Final AccountMappingError

In [None]:
AccountMappingErrors = UnmappedErrors.unionByName(
        MultiMappingErrors
    ).unionByName(
        DuplicateRowNoErrors
    ).unionByName(
        UnknownRowTypeErrors
    ).unionByName(
        UnknownRowNoRefErrors
    ).unionByName(
        UnknownAccountRefErrors
    )

AccountMappingErrors.write.format("delta").mode("overwrite").saveAsTable("AccountMappingError")