# Tableau Calculation and Lineage Extractor

This notebook extracts calculations from Tableau workbooks and creates an interactive lineage diagram showing field dependencies.

## Library Imports

In [101]:
# Import required libraries for data processing, file handling, and Tableau workbook analysis

import pandas as pd
import os, re, sys
import string
import webbrowser
import importlib

from tableaudocumentapi import Workbook
from os.path import isfile, join

import Excelcreator as exg
importlib.reload(exg)  # Reload module to get latest changes

pd.set_option('display.max_columns', None)

### Define Input and Output Paths

In [102]:
# Set up input and output directory paths
input_path = "inputs"
output_path = "outputs"

mypath = "./{}".format(input_path)   #./ points to "this path" as a relative path

mypath

'./inputs'

### Scan for Tableau Files

In [103]:
# Get list of all Tableau workbook files (.twb or .twbx) from the inputs folder
input_files = [f for f in os.listdir(mypath) if isfile(join(mypath, f)) and (f.lower().endswith('.twb') or f.lower().endswith('.twbx'))] 
input_files

['Test reports.twb']

### Utility Functions

In [104]:
# Utility functions for string cleaning and file validation

def remove_sp_char_leave_undescore_square_brackets(string_to_convert):
    # Clean special characters from strings while keeping underscores and square brackets
    filtered_string = re.sub(r'[^a-zA-Z0-9\s._\[\]]', '', string_to_convert).replace(' ', "_")
    return filtered_string

def find_tableau_file(inputfile):
    """Return the input filename if it is a .twb or .twbx, else return empty string."""
    if inputfile.lower().endswith('.twb') or inputfile.lower().endswith('.twbx'):
        return inputfile
    return ""

### Select Tableau File

In [105]:
# Select the first valid Tableau file from the inputs folder and prepare output naming
selected_file = None
for i in input_files:
    candidate = find_tableau_file(i)
    if candidate:
        selected_file = candidate
        break

if not selected_file:
    raise FileNotFoundError(f"No .twb or .twbx file found in '{input_path}'")

print('Selected Tableau file: ' + selected_file)

# substring to be used when naming the exported data (strip extension)
tableau_name_substring = os.path.splitext(selected_file)[0][:30]
print('\nOutput docs name: ' + tableau_name_substring)

packagedTableauFile_relPath = os.path.join(input_path, selected_file)
packagedTableauFile_relPath

Selected Tableau file: Test reports.twb

Output docs name: Test reports


'inputs\\Test reports.twb'

## Extract Fields Using Tableau Document API

In [106]:
%%capture 

# Extract all fields from Tableau workbook using Document API
# Handles both .twb and .twbx files (extracts .twb from .twbx if needed)
TWBX_Workbook = None
try:
    TWBX_Workbook = Workbook(packagedTableauFile_relPath)
except Exception:
    # If the file is a packaged workbook (.twbx), try to extract contained .twb
    if packagedTableauFile_relPath.lower().endswith('.twbx'):
        import zipfile, tempfile, shutil

        with zipfile.ZipFile(packagedTableauFile_relPath, 'r') as z:
            twb_name = next((n for n in z.namelist() if n.lower().endswith('.twb')), None)
            if twb_name is None:
                raise RuntimeError(f"No .twb found inside packaged workbook: {packagedTableauFile_relPath}")

            tmpdir = tempfile.mkdtemp(prefix='twbx_extract_')
            try:
                z.extract(twb_name, tmpdir)
                twb_path = os.path.join(tmpdir, twb_name)
                TWBX_Workbook = Workbook(twb_path)
            finally:
                # clean up the extracted files
                try:
                    shutil.rmtree(tmpdir)
                except Exception:
                    pass
    else:
        raise

collator = []
calcID = []
calcID2 = []
calcNames = []

c = 0
    
for datasource in TWBX_Workbook.datasources:
    datasource_name = datasource.name
    datasource_caption = datasource.caption if datasource.caption else datasource_name

    for count, field in enumerate(datasource.fields.values()):
        dict_temp = {
            'counter': c,
            'datasource_name': datasource_name,
            'datasource_caption': datasource_caption,
            'alias': field.alias,
            'field_calculation': field.calculation,
            'field_calculation_bk': field.calculation,
            'field_caption': field.caption,
            'field_datatype': field.datatype,
            'field_def_agg': field.default_aggregation,
            'field_desc': field.description,
            'field_hidden': field.hidden,
            'field_id': field.id,
            'field_is_nominal': field.is_nominal,
            'field_is_ordinal': field.is_ordinal,
            'field_is_quantitative': field.is_quantitative,
            'field_name': field.name,
            'field_role': field.role,
            'field_type': field.type,
            'field_worksheets': field.worksheets,
            'field_WHOLE': field
        }

        if field.calculation is not None:
            calcID.append(field.id)
            calcNames.append(field.name)

            f2 = field.id.replace(']', '').replace('[', '')
            calcID2.append(f2)

        c += 1
        collator.append(dict_temp)

### Helper Functions for Field Processing

In [107]:
# Helper functions for field categorization and name replacement in formulas

def default_to_friendly_names2(formulaList,fieldToConvert, dictToUse):
    # Replace field IDs in formulas with friendly names for readability
    for i in formulaList:
        for tableauName, friendlyName in dictToUse.items():
            try:
                i[fieldToConvert] = (i[fieldToConvert]).replace(tableauName, friendlyName)
            except:
                a = 0
       
    return formulaList


def category_field_type(row):
    if row['datasource_name'] == 'Parameters':
        val = 'Parameters'
    elif row['field_calculation'] == None:
        val = 'Default_Field'
    else:
        val = 'Calculated_Field'
    return val

def compare_fields(row):
    if row['field_id'] == row['field_id2']:
        val = 0
    else:
        val = 1
    return val

### Process and Clean Extracted Fields

In [108]:
# Process and organize all extracted fields into a clean dataframe
# Categorizes fields as: Parameters, Calculated Fields, or Default Fields
calcDict = dict(zip(calcID, calcNames))
calcDict2 = dict(zip(calcID2, calcNames)) #raw fields without any []

collator = default_to_friendly_names2(collator,'field_calculation',calcDict2)

df_API_all = pd.DataFrame(collator)
df_API_all['field_type'] = df_API_all.apply(category_field_type, axis=1)

preference_list=['Parameters', 'Calculated_Field', 'Default_Field']
df_API_all["field_type"] = pd.Categorical(df_API_all["field_type"], categories=preference_list, ordered=True)

#get rid of duplicates for parameters, so only parameters from the explicit Parameters datasource are kept (as they are also listed again under the name of any other datasources)
df_API_all = df_API_all.sort_values(["field_id","field_type"]).drop_duplicates(["field_id", 'field_calculation']) 

df_API_all['field_id2'] = df_API_all['field_id'].str.replace(r'[\[\]]', '', regex=True)

df_API_all['comparison'] = df_API_all.apply(compare_fields, axis=1)
df_API_all = df_API_all[df_API_all['comparison'] == 1]

df_API_all = df_API_all.drop(['field_id2', 'comparison'], axis=1)
df_API_all.sort_values(['datasource_name', 'field_type', 'counter', 'field_name'])

df1 = df_API_all[[ 'field_name', 'field_datatype','field_type',  'field_calculation',   'field_id', 'datasource_caption', 'field_worksheets']].copy()

preference_list=[ 'Default_Field', 'Parameters', 'Calculated_Field']
df1["field_type"] = pd.Categorical(df1["field_type"], categories=preference_list, ordered=True)
df1 = df1.sort_values(['field_type'])

df1.columns = ['Field_Name', 'DataType', 'Type', 'Calculation', 'Field_ID', 'Datasource', 'Worksheets']

df1['Field_Name'] = df1['Field_Name'].str.replace(r'[\[\]]', '', regex=True)

# Clean the Worksheets column - convert list to comma-separated string without brackets
def format_worksheets(ws_list):
    if ws_list and len(ws_list) > 0:
        # Join worksheet names with comma and space
        return ', '.join(ws_list)
    return ''

df1['Worksheets'] = df1['Worksheets'].apply(format_worksheets)

# Add column to indicate if field is used in any worksheet
df1['Used_In_Report'] = df1['Worksheets'].apply(lambda x: 'Yes' if x else 'No')

df1

Unnamed: 0,Field_Name,DataType,Type,Calculation,Field_ID,Datasource,Worksheets,Used_In_Report
103,cb_2016_us_state_5m_AKHIShift-fauxWebMercator.shp,table,Default_Field,,[__tableau_internal_object_id__].[cb_2016_us_s...,sample-superstore,,No
49,City,string,Default_Field,,[City],sample-superstore,,No
50,Country,string,Default_Field,,[Country],sample-superstore,,No
51,Customer ID,string,Default_Field,,[Customer ID],sample-superstore,,No
107,Order ID,string,Default_Field,,[Order ID],sample-superstore,"ship, Top5prod, orders, category, Cat Orders, ...",Yes
...,...,...,...,...,...,...,...,...
56,Metric Swap Diff % - $,real,Calculated_Field,IF [Metric Swap Change YoY]<0 THEN [Metric Swa...,[Customers Diff % - $ (copy)_780248698853298227],sample-superstore,"top state, map",Yes
55,Customers Diff % + $,real,Calculated_Field,IF [Customers Change YoY]>=0 THEN [Customers C...,[Customers Diff % - $ (copy)_780248696752820251],sample-superstore,,No
54,Metric Swap Diff % + $,real,Calculated_Field,IF [Metric Swap Change YoY]>=0 THEN [Metric Sw...,[Customers Diff % + $ (copy)_780248698853298225],sample-superstore,"top state, map",Yes
61,Metric Profit,string,Calculated_Field,"""Profit""",[Metric Sales (copy)_1005428624987471893],sample-superstore,Buttons,Yes


## Excel Export Configuration

In [109]:
# Configure Excel export settings (sheet title, column widths, formatting, etc.)
# Column widths: Field_Name, DataType, Type, Calculation, Field_ID, Datasource, Worksheets, Used_In_Report

dfs_to_use = [{'excelSheetTitle': 'All fields extracted from DOC API', 'df_to_use':df1, 'mainColWidth':'' , 
               'normalColWidth': [25, 12, 18, 60, 30, 25, 40, 15], 'sheetName': 'GeneralDetails', 'footer': 'Data_1 (DOC API)', 'papersize':9, 'color': '#fff0b3'}                
             
             ]

#papersize: a3 = 8, a4 = 9
# Column widths optimized for: Field_Name(25), DataType(12), Type(18), Calculation(60), Field_ID(30), Datasource(25), Worksheets(40), Used_In_Report(15)

### Generate Excel File

In [110]:
# Generate Excel file containing all extracted field information
path_excel_file_to_create = exg.create_new_file_paths(tableau_name_substring+'_calculations_table')

exg.create_excel_from_dfs(dfs_to_use, path_excel_file_to_create)

# Start of lineage diagram module

## Lineage Diagram Setup

In [111]:
# Create abbreviated node IDs for the lineage diagram (AA, AB, AC, etc.)

def first_char_checker(cell_value):
    # Normalize field IDs by wrapping them with double underscores
    if cell_value[0] != '[':
        cell_value = '__' + cell_value + '__'
    else:
        cell_value = cell_value.replace('[', '__')
        cell_value = cell_value.replace(']', '__')
    return cell_value

# Define abc list to use during lineage diagram creation
abc=list(string.ascii_uppercase)
collated_abc = []

for i in abc:
    for j in abc:
        collated_abc.append(i+j)

### Map Default Fields to Abbreviations

In [112]:
# Map default fields to short abbreviations (AA, AB, etc.) for the diagram
# Filter to only include fields that are used in the report
def_fields = df1[(df1['Type'] == 'Default_Field') & (df1['Used_In_Report'] == 'Yes')]['Field_ID'].copy().apply(remove_sp_char_leave_undescore_square_brackets)

abc_touse = collated_abc[0:len(def_fields)]

def_fields_final = pd.DataFrame(list(zip(def_fields.tolist(), abc_touse)))
def_fields_final['aa'] = def_fields_final.apply(lambda row: first_char_checker(row[0]), axis=1)

mapping_dict_friendly_names = dict(zip(def_fields_final[0].tolist(), abc_touse))
mapping_dict = dict(zip(def_fields_final['aa'].tolist(), abc_touse))

print(f"Default fields used in report: {len(def_fields)}")
def_fields_final

Default fields used in report: 15


Unnamed: 0,0,1,aa
0,[Order_ID],AA,__Order_ID__
1,[Profit],AB,__Profit__
2,[Product_Name],AC,__Product_Name__
3,[Geometry],AD,__Geometry__
4,[Person],AE,__Person__
5,[Order_Date],AF,__Order_Date__
6,[Category],AG,__Category__
7,[Region],AH,__Region__
8,[Customer_Name],AI,__Customer_Name__
9,[STATEFP],AJ,__STATEFP__


### Extract and Map Calculated Fields

In [113]:
# Extract calculated fields and parameters, map them to abbreviated IDs (x___AA, x___AB, etc.)
# Filter to only include fields that are used in the report
created_calc = df_API_all[(df_API_all['field_type'] != 'Default_Field') & (df_API_all['field_worksheets'].apply(lambda x: x and len(x) > 0))]\
                [['field_name', 'field_id', 'field_calculation', 'field_calculation_bk']].copy()

nlsi = ['x___' + i for i in collated_abc]
nlsi_to_use = nlsi[0:len(created_calc)]

created_calc['field_name'] = created_calc['field_name'].apply(remove_sp_char_leave_undescore_square_brackets)
created_calc['aa'] = created_calc.apply(lambda row: first_char_checker(row['field_id']), axis=1)
created_calc['field_calculation_bk'] = created_calc['field_calculation_bk'].str.replace(r'[\[\]]', '__', regex=True)

print(f"Calculated fields used in report: {len(created_calc)}")
created_calc

Calculated fields used in report: 50


Unnamed: 0,field_name,field_id,field_calculation,field_calculation_bk,aa
29,2024,[2022 (copy) (copy)_780248699807363141],2024,2024,__2022 (copy) (copy)_780248699807363141__
30,2023,[2022 (copy)_780248699807354948],2023,2023,__2022 (copy)_780248699807354948__
31,Metric_Swap_Calc,[Calculation_1005428624948703232],"CASE [Parameters].[SALES]\r\nWHEN ""Sales"" THEN...","CASE __Parameters__.__Parameter 2__\r\nWHEN ""S...",__Calculation_1005428624948703232__
32,Metric_Sales,[Calculation_1005428624984944657],"""Sales""","""Sales""",__Calculation_1005428624984944657__
33,Sales_Selected_Year,[Calculation_135107990938935296],if YEAR([Order Date])=[Parameters].[2022]\r\nT...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107990938935296__
34,Sales_Previous_Year,[Calculation_135107990942146561],if YEAR([Order Date])=[Parameters].[2022]-1 \r...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107990942146561__
35,Sales_Change_YoY,[Calculation_135107990942695426],(SUM([Sales Selected Year])-SUM([Sales Previou...,(SUM(__Calculation_135107990938935296__)-SUM(_...,__Calculation_135107990942695426__
36,Selected_Year,[Calculation_135107991117291534],[Parameters].[2022],__Parameters__.__Parameter 1__,__Calculation_135107991117291534__
37,Profit_Selected_Year,[Calculation_135107991461892112],if YEAR([Order Date])=[Parameters].[2022]\r\nT...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107991461892112__
38,Profit_Previous_Year,[Calculation_135107991463411729],if YEAR([Order Date])=[Parameters].[2022]-1\r\...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107991463411729__


### Create Calculated Field Mapping Dictionary

In [114]:
# Create mapping dictionary for calculated field abbreviations
calc_map_dict = dict(zip(created_calc['aa'].to_list(), nlsi_to_use))
calc_map_dict

{'__2022 (copy) (copy)_780248699807363141__': 'x___AA',
 '__2022 (copy)_780248699807354948__': 'x___AB',
 '__Calculation_1005428624948703232__': 'x___AC',
 '__Calculation_1005428624984944657__': 'x___AD',
 '__Calculation_135107990938935296__': 'x___AE',
 '__Calculation_135107990942146561__': 'x___AF',
 '__Calculation_135107990942695426__': 'x___AG',
 '__Calculation_135107991117291534__': 'x___AH',
 '__Calculation_135107991461892112__': 'x___AI',
 '__Calculation_135107991463411729__': 'x___AJ',
 '__Calculation_135107991467085842__': 'x___AK',
 '__Calculation_135107991467745299__': 'x___AL',
 '__Calculation_348184552535425024__': 'x___AM',
 '__Calculation_348184552536739841__': 'x___AN',
 '__Calculation_348184552538714114__': 'x___AO',
 '__Calculation_780248696371109901__': 'x___AP',
 '__Customers Change YoY (copy)_348184552558637063__': 'x___AQ',
 '__Customers Diff % + $ (copy)_780248698853298225__': 'x___AR',
 '__Customers Diff % - $ (copy)_780248698853298227__': 'x___AS',
 '__Customer

### Add Abbreviations and Sort

In [115]:
# Add abbreviated IDs to calculated fields dataframe and sort
created_calc['shorthand_abc'] = created_calc['aa'].map(calc_map_dict)
created_calc.sort_values(by='shorthand_abc', inplace = True)
created_calc

Unnamed: 0,field_name,field_id,field_calculation,field_calculation_bk,aa,shorthand_abc
29,2024,[2022 (copy) (copy)_780248699807363141],2024,2024,__2022 (copy) (copy)_780248699807363141__,x___AA
30,2023,[2022 (copy)_780248699807354948],2023,2023,__2022 (copy)_780248699807354948__,x___AB
31,Metric_Swap_Calc,[Calculation_1005428624948703232],"CASE [Parameters].[SALES]\r\nWHEN ""Sales"" THEN...","CASE __Parameters__.__Parameter 2__\r\nWHEN ""S...",__Calculation_1005428624948703232__,x___AC
32,Metric_Sales,[Calculation_1005428624984944657],"""Sales""","""Sales""",__Calculation_1005428624984944657__,x___AD
33,Sales_Selected_Year,[Calculation_135107990938935296],if YEAR([Order Date])=[Parameters].[2022]\r\nT...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107990938935296__,x___AE
34,Sales_Previous_Year,[Calculation_135107990942146561],if YEAR([Order Date])=[Parameters].[2022]-1 \r...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107990942146561__,x___AF
35,Sales_Change_YoY,[Calculation_135107990942695426],(SUM([Sales Selected Year])-SUM([Sales Previou...,(SUM(__Calculation_135107990938935296__)-SUM(_...,__Calculation_135107990942695426__,x___AG
36,Selected_Year,[Calculation_135107991117291534],[Parameters].[2022],__Parameters__.__Parameter 1__,__Calculation_135107991117291534__,x___AH
37,Profit_Selected_Year,[Calculation_135107991461892112],if YEAR([Order Date])=[Parameters].[2022]\r\nT...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107991461892112__,x___AI
38,Profit_Previous_Year,[Calculation_135107991463411729],if YEAR([Order Date])=[Parameters].[2022]-1\r\...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107991463411729__,x___AJ


### Handle Duplicate Field Names

In [116]:
# Handle duplicate field names by adding numeric suffixes (e.g., Index, Index1, Index2)
def differentiate_duplicates(series):
    counts = series.groupby(series).cumcount() 
    return series + counts.astype(str).replace('0', '')

# differentiate field names that have duplicate values (eg. calc field Index appears twice in workbook, now it will be Index, Index1)
created_calc['field_name'] = differentiate_duplicates(created_calc['field_name'])

created_calc

Unnamed: 0,field_name,field_id,field_calculation,field_calculation_bk,aa,shorthand_abc
29,2024,[2022 (copy) (copy)_780248699807363141],2024,2024,__2022 (copy) (copy)_780248699807363141__,x___AA
30,2023,[2022 (copy)_780248699807354948],2023,2023,__2022 (copy)_780248699807354948__,x___AB
31,Metric_Swap_Calc,[Calculation_1005428624948703232],"CASE [Parameters].[SALES]\r\nWHEN ""Sales"" THEN...","CASE __Parameters__.__Parameter 2__\r\nWHEN ""S...",__Calculation_1005428624948703232__,x___AC
32,Metric_Sales,[Calculation_1005428624984944657],"""Sales""","""Sales""",__Calculation_1005428624984944657__,x___AD
33,Sales_Selected_Year,[Calculation_135107990938935296],if YEAR([Order Date])=[Parameters].[2022]\r\nT...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107990938935296__,x___AE
34,Sales_Previous_Year,[Calculation_135107990942146561],if YEAR([Order Date])=[Parameters].[2022]-1 \r...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107990942146561__,x___AF
35,Sales_Change_YoY,[Calculation_135107990942695426],(SUM([Sales Selected Year])-SUM([Sales Previou...,(SUM(__Calculation_135107990938935296__)-SUM(_...,__Calculation_135107990942695426__,x___AG
36,Selected_Year,[Calculation_135107991117291534],[Parameters].[2022],__Parameters__.__Parameter 1__,__Calculation_135107991117291534__,x___AH
37,Profit_Selected_Year,[Calculation_135107991461892112],if YEAR([Order Date])=[Parameters].[2022]\r\nT...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107991461892112__,x___AI
38,Profit_Previous_Year,[Calculation_135107991463411729],if YEAR([Order Date])=[Parameters].[2022]-1\r\...,if YEAR(__Order Date__)=__Parameters__.__Param...,__Calculation_135107991463411729__,x___AJ


### Create Friendly Name Mapping

In [117]:
# Create final mapping of friendly field names to abbreviated IDs
calc_map_dict_friendly_names = dict(zip(created_calc['field_name'], created_calc['shorthand_abc'] ))
calc_map_dict_friendly_names

{'2024': 'x___AA',
 '2023': 'x___AB',
 'Metric_Swap_Calc': 'x___AC',
 'Metric_Sales': 'x___AD',
 'Sales_Selected_Year': 'x___AE',
 'Sales_Previous_Year': 'x___AF',
 'Sales_Change_YoY': 'x___AG',
 'Selected_Year': 'x___AH',
 'Profit_Selected_Year': 'x___AI',
 'Profit_Previous_Year': 'x___AJ',
 'Profit_Change_YoY': 'x___AK',
 'Profit_Change_Sign': 'x___AL',
 'Orders_Selected_Year': 'x___AM',
 'Orders_Previous_Year': 'x___AN',
 'Orders_Change_YoY': 'x___AO',
 'Sales_Diff___arrow': 'x___AP',
 'Customers_Change_Sign': 'x___AQ',
 'Metric_Swap_Diff___': 'x___AR',
 'Metric_Swap_Diff___1': 'x___AS',
 'Metric_Swap_Diff___arrow': 'x___AT',
 'Metric_Swap_Diff__arrow': 'x___AU',
 'Customers_Previous_Year': 'x___AV',
 'Metric_Profit': 'x___AW',
 'Metric_Orders': 'x___AX',
 '2022': 'x___AY',
 'Metric_Swap_Calc_PY_Label': 'x___AZ',
 'Metric_Swap_Calc_Runn_Label': 'x___BA',
 'Metric_Swap_Calc_Label': 'x___BB',
 'Metric_Swap_Calc_PY': 'x___BC',
 'Metric_Swap_Change_YoY': 'x___BD',
 'Metric_Swap_Calc_Run

## Build Lineage Paths

### Define Function to Create Lineage Paths

In [118]:
# Identify dependencies between fields by analyzing which fields are used in calculation formulas
def create_lineage_paths(df, field_type):
    c = 0
    t_collator = []

    for i in df['aa']:
        try:
            tlist = created_calc[created_calc['field_calculation_bk'].str.contains(i, regex=False) == True]['aa'].to_list()
        except:
            tlist = []

        if len(tlist) != 0:
            for x in tlist:
                newdict = {
                    'count': c,
                    'starting': i,
                    'ending': x,
                    'path_mermaid': i + " --> " + x
                }
                t_collator.append(newdict)
                c = c + 1
    
    return t_collator

### Find Dependencies for Default Fields

In [119]:
# Find all dependencies for default fields
t_collator_def_fields = create_lineage_paths(def_fields_final, 'default_field')
t_collator_def_fields

[{'count': 0,
  'starting': '__Profit__',
  'ending': '__Calculation_135107991461892112__',
  'path_mermaid': '__Profit__ --> __Calculation_135107991461892112__'},
 {'count': 1,
  'starting': '__Profit__',
  'ending': '__Calculation_135107991463411729__',
  'path_mermaid': '__Profit__ --> __Calculation_135107991463411729__'},
 {'count': 2,
  'starting': '__Sales__',
  'ending': '__Calculation_135107990938935296__',
  'path_mermaid': '__Sales__ --> __Calculation_135107990938935296__'},
 {'count': 3,
  'starting': '__Sales__',
  'ending': '__Calculation_135107990942146561__',
  'path_mermaid': '__Sales__ --> __Calculation_135107990942146561__'}]

### Find Dependencies for Calculated Fields

In [120]:
# Find all dependencies for calculated fields
t_collator_calcs = create_lineage_paths(created_calc, 'calculation')
t_collator_calcs

[{'count': 0,
  'starting': '__Calculation_135107990938935296__',
  'ending': '__Calculation_1005428624948703232__',
  'path_mermaid': '__Calculation_135107990938935296__ --> __Calculation_1005428624948703232__'},
 {'count': 1,
  'starting': '__Calculation_135107990938935296__',
  'ending': '__Calculation_135107990942695426__',
  'path_mermaid': '__Calculation_135107990938935296__ --> __Calculation_135107990942695426__'},
 {'count': 2,
  'starting': '__Calculation_135107990938935296__',
  'ending': '__Metric Swap Calc (Label) (copy)_780248699614724162__',
  'path_mermaid': '__Calculation_135107990938935296__ --> __Metric Swap Calc (Label) (copy)_780248699614724162__'},
 {'count': 3,
  'starting': '__Calculation_135107990938935296__',
  'ending': '__Metric Swap Calc (copy)_1005428624952950785__',
  'path_mermaid': '__Calculation_135107990938935296__ --> __Metric Swap Calc (copy)_1005428624952950785__'},
 {'count': 4,
  'starting': '__Calculation_135107990938935296__',
  'ending': '__Met

### Replace Field Names with Abbreviations in Paths

In [121]:
# Replace full field names in paths with abbreviated IDs to simplify the diagram
for default_field, mapping_letter in mapping_dict.items():
    for i in t_collator_def_fields:
        i['path_mermaid'] = i['path_mermaid'].replace(default_field, mapping_letter)

for default_field, mapping_letter in calc_map_dict.items():
    for i in t_collator_def_fields:
        i['path_mermaid'] = i['path_mermaid'].replace(default_field, mapping_letter)

for default_field, mapping_letter in mapping_dict.items():
    for i in t_collator_calcs:
        i['path_mermaid'] = i['path_mermaid'].replace(default_field, mapping_letter)

for default_field, mapping_letter in calc_map_dict.items():
    for i in t_collator_calcs:
        i['path_mermaid'] = i['path_mermaid'].replace(default_field, mapping_letter)

print(f"Processed {len(t_collator_def_fields)} default field paths")
print(f"Processed {len(t_collator_calcs)} calculated field paths")

Processed 4 default field paths
Processed 80 calculated field paths


## Create Interactive Lineage Diagram

### Build Nodes and Edges for Network Graph

In [122]:
# Build nodes (fields) and edges (dependencies) for the interactive lineage diagram
import json

nodes = []
edges = []
node_ids = set()

# Add default fields as nodes
for i, d in mapping_dict_friendly_names.items():
    if d not in node_ids:
        nodes.append({
            'id': d,
            'label': i,
            'group': 'default',
            'title': f'Default Field: {i}'
        })
        node_ids.add(d)

# Add calculated fields as nodes
for i, d in calc_map_dict_friendly_names.items():
    if d not in node_ids:
        calc_row = created_calc[created_calc['field_name'] == i]
        calc_formula = ''
        if not calc_row.empty and calc_row['field_calculation'].values[0]:
            calc_formula = str(calc_row['field_calculation'].values[0])
        
        nodes.append({
            'id': d,
            'label': i,
            'group': 'calculated',
            'title': f'{i}\n\nFormula:\n{calc_formula}'
        })
        node_ids.add(d)

# Build edges from the collators
for item in t_collator_def_fields:
    parts = item['path_mermaid'].split(' --> ')
    if len(parts) == 2:
        edges.append({
            'from': parts[0],
            'to': parts[1],
            'arrows': 'to'
        })

for item in t_collator_calcs:
    parts = item['path_mermaid'].split(' --> ')
    if len(parts) == 2:
        edges.append({
            'from': parts[0],
            'to': parts[1],
            'arrows': 'to'
        })

# Convert to JSON for JavaScript
nodes_json = json.dumps(nodes)
edges_json = json.dumps(edges)

print(f"Total nodes: {len(nodes)}")
print(f"Total edges: {len(edges)}")
print("Nodes and edges prepared for lineage diagram")

Total nodes: 65
Total edges: 84
Nodes and edges prepared for lineage diagram


### Generate HTML File and Open in Browser

In [123]:
# Generate interactive HTML lineage diagram using Vis.js library
# Creates a hierarchical network visualization with tooltips showing formulas

html_base = """
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>""" + tableau_name_substring + """ Calculation Lineage</title>
    <script type="text/javascript" src="https://unpkg.com/vis-network/standalone/umd/vis-network.min.js"></script>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 20px;
            background-color: #f5f5f5;
        }
        h1 {
            color: #333;
            text-align: center;
        }
        #mynetwork {
            width: 100%;
            height: 800px;
            border: 1px solid #ddd;
            background-color: white;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .controls {
            text-align: center;
            margin: 20px 0;
            padding: 15px;
            background-color: white;
            border-radius: 5px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .controls button {
            margin: 0 5px;
            padding: 10px 20px;
            background-color: #4CAF50;
            color: white;
            border: none;
            border-radius: 4px;
            cursor: pointer;
            font-size: 14px;
        }
        .controls button:hover {
            background-color: #45a049;
        }
        .legend {
            margin-top: 20px;
            padding: 15px;
            background-color: white;
            border-radius: 5px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }
        .legend-item {
            display: inline-block;
            margin-right: 20px;
        }
        .legend-color {
            display: inline-block;
            width: 20px;
            height: 20px;
            border-radius: 50%;
            margin-right: 5px;
            vertical-align: middle;
        }
    </style>
</head>
<body>
    <h1>""" + tableau_name_substring + """ Calculation Lineage</h1>
    
    <div class="controls">
        <button onclick="network.fit()">Fit to Screen</button>
        <button onclick="network.moveTo({scale: 1.0})">Reset Zoom</button>
    </div>
    
    <div id="mynetwork"></div>
    
    <div class="legend">
        <strong>Legend:</strong>
        <div class="legend-item">
            <span class="legend-color" style="background-color: #97C2FC;"></span>
            <span>Default Fields</span>
        </div>
        <div class="legend-item">
            <span class="legend-color" style="background-color: #FB7E81;"></span>
            <span>Calculated Fields</span>
        </div>
    </div>

    <script type="text/javascript">
        // Create nodes and edges data
        var nodes = new vis.DataSet(""" + nodes_json + """);
        var edges = new vis.DataSet(""" + edges_json + """);

        // Create network
        var container = document.getElementById('mynetwork');
        var data = {
            nodes: nodes,
            edges: edges
        };
        
        var options = {
            nodes: {
                shape: 'box',
                margin: 10,
                widthConstraint: {
                    maximum: 200
                },
                font: {
                    size: 14
                }
            },
            edges: {
                arrows: {
                    to: {
                        enabled: true,
                        scaleFactor: 0.5
                    }
                },
                smooth: {
                    type: 'cubicBezier',
                    forceDirection: 'horizontal'
                },
                color: {
                    color: '#848484',
                    highlight: '#2B7CE9'
                }
            },
            groups: {
                default: {
                    color: {
                        background: '#97C2FC',
                        border: '#2B7CE9',
                        highlight: {
                            background: '#D2E5FF',
                            border: '#2B7CE9'
                        }
                    }
                },
                calculated: {
                    color: {
                        background: '#FB7E81',
                        border: '#E92B36',
                        highlight: {
                            background: '#FFB5B8',
                            border: '#E92B36'
                        }
                    }
                }
            },
            layout: {
                hierarchical: {
                    enabled: true,
                    direction: 'LR',
                    sortMethod: 'directed',
                    levelSeparation: 200,
                    nodeSpacing: 150
                }
            },
            physics: {
                enabled: false
            },
            interaction: {
                hover: true,
                tooltipDelay: 100,
                navigationButtons: true,
                keyboard: true
            }
        };
        
        var network = new vis.Network(container, data, options);
        
        // Event listener for node clicks
        network.on("click", function(params) {
            if (params.nodes.length > 0) {
                var nodeId = params.nodes[0];
                var node = nodes.get(nodeId);
                console.log("Clicked node:", node);
            }
        });
        
        // Fit network when loaded
        network.once("stabilizationIterationsDone", function() {
            network.fit();
        });
    </script>
</body>
</html>
"""

print('\n ______________________________ START_OF_HTML ______________________________')
print(html_base[:500] + '...')
print('\n ______________________________ END_OF_HTML ______________________________')


### Output html string to a local file, then open it on the web browser

# Specify the file path
file_path = 'outputs\\{}_lineage_diagram.html'.format(tableau_name_substring)

# Write the string to an HTML file with UTF-8 encoding
with open(file_path, 'w', encoding='utf-8') as file:
    file.write(html_base)

print("HTML content successfully written to {}".format(file_path))

# Open the HTML file in the default web browser
webbrowser.open('file://' + os.path.realpath(file_path))

print("\n✓ Interactive lineage diagram created and opened in browser")


 ______________________________ START_OF_HTML ______________________________

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Test reports Calculation Lineage</title>
    <script type="text/javascript" src="https://unpkg.com/vis-network/standalone/umd/vis-network.min.js"></script>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 0;
            padding: 20px;
            background-color: #f5f5f5;
        }
        h1 {
       ...

 ______________________________ END_OF_HTML ______________________________
HTML content successfully written to outputs\Test reports_lineage_diagram.html

✓ Interactive lineage diagram created and opened in browser

✓ Interactive lineage diagram created and opened in browser
