In [35]:
import pandas as pd
import ast
import json
import re

In [36]:
# Define general functions.

def find_paths(variable, df, path):
    # Add the current path of the variable
    path.append(variable)
    
    # Find all variables that depend on the current variable
    next_steps = df[df['Dependency'] == variable]
    
    # If no further dependencies are found, return the current path
    if next_steps.empty:
        return [path]
    
    # List to store all paths
    all_paths = []
    
    # For each next variable, recursively continue following the path
    for _, row in next_steps.iterrows():
        new_paths = find_paths(row['Variable'], df, path.copy())  # Copy the path to allow branching
        all_paths.extend(new_paths)
    
    return all_paths

def extract_code_from_notebook(file_path):
    with open(file_path, 'r') as f:
        notebook = json.load(f)
    
    # Combine all code lines from each cell
    code_cells = ["".join(cell['source']) for cell in notebook['cells'] if cell['cell_type'] == 'code']
    return "\n".join(code_cells)

In [37]:
# Define Test Functions
def check_row_count(df):
    # Überprüft, ob der DataFrame mehr als 0 Zeilen hat
    return 1 if df.shape[0] > 0 else 0



In [38]:
#Load Data 1
df1_read = pd.read_csv(r"C:\Users\Robin\OneDrive\Blog\Detect Workflow Errows\order_table_v2.csv")

# Load Data 2
df2_read = pd.read_csv(r"C:\Users\Robin\OneDrive\Blog\Detect Workflow Errows\customer_table.csv")

# 1.2 Adjust column
df1_change_datatype_orderdate = df1_read.assign(order_date=pd.to_datetime(df1_read['order_date']))

#1.3 Add new column
df1_add_column_year = df1_change_datatype_orderdate.assign(year=lambda x: x['order_date'].dt.year)

#1.4 Filter year 2023
df1_filtered_year = df1_add_column_year[df1_add_column_year['year']==2023]

#1.5 Aggregate
df1_aggregated = df1_filtered_year.groupby(['customer_id']).agg(
    total_price=('total_price', 'sum'),
    unique_order=('order_id', 'nunique')
).reset_index()

# 1.6 merge
merged_df1_df2 = pd.merge(df1_aggregated,df2_read,left_on='customer_id',right_on='id')


In [39]:
# Requires a specific structure of the DataFrames. 
# 1. When making transformations on existing DataFrames, they must be saved in new DataFrames.
# 2. All DataFrames must have a unique name and start with 'df' followed by the original number.

file_path = r"C:\Users\Robin\OneDrive\Blog\Detect Workflow Errows\01_simple_testing_framework.ipynb"
json_text = extract_code_from_notebook(file_path)
json_splitted = json_text.split('\n')

# Regular expression pattern to filter 'df' followed by a number
pattern = r'(df\d+)'  # 'df' followed by one or more digits

# Filtering the variables
filtered_variables = [line for line in json_splitted if re.search(pattern, line)]
variables_dataframe = pd.DataFrame(filtered_variables)
variables_dataframe= variables_dataframe[variables_dataframe[0].str.contains('=')]

iteration = 0
variables_dataframe['variable'] = ""
variables_dataframe['function'] = ""
for i, r in variables_dataframe.iterrows():
    variables_dataframe['variable'][iteration] = r[0].split('=', 1)[0]
    variables_dataframe['function'][iteration] = r[0].split('=', 1)[1]
    iteration += 1

variable_without_duplicates = variables_dataframe[['variable']].drop_duplicates()

variable = []
dependency = []

for i, r in variables_dataframe.iterrows():
    for j, s in variable_without_duplicates.iterrows():
        if s['variable'].strip() in r['function'].strip():
            dependency += [s['variable'].strip()]
        else:
            dependency += [None]
        variable += [r['variable'].strip()]
        
relationship = pd.DataFrame({
    'Variable': variable,
    'Dependency': dependency
})

relationship_filtered = relationship[
    relationship['Dependency'].notnull() | 
    relationship['Variable'].str.contains('read')
]
relationship_dropduplicates = relationship_filtered.drop_duplicates()
relationship_dropduplicates = relationship_dropduplicates[relationship_dropduplicates['Variable'] != 'tested_dataframes']

relationship_dropduplicates_readonly = relationship_dropduplicates[relationship_dropduplicates['Dependency'].isnull()]

stage = []
variable_list = []

for i, r in relationship_dropduplicates_readonly.iterrows():
    start = relationship_dropduplicates[relationship_dropduplicates['Variable'] == r['Variable']]
    stage_num = 0
    while True:
        stage_num += 1
        stage += [stage_num] * len(start)  # Add the current stage number for all rows
        variable_list += start['Variable'].tolist()  # Add all variable values as a list
        start = start[['Variable']]
        start = start.rename(columns={'Variable': 'Dependency'})
        start = pd.merge(relationship_dropduplicates, start, on='Dependency')
        if len(start) == 0:
            break

result_stages = pd.DataFrame({'Variable': variable_list, 'Stages': stage})
result_stages = result_stages.groupby('Variable').agg({'Stages': 'max'}).reset_index()

# Create Coordinates

# List to store all paths
result_paths = []

# Recursively traverse the path for each starting point
for start in relationship_dropduplicates_readonly[relationship_dropduplicates_readonly['Dependency'].isnull()]['Variable']:
    paths = find_paths(start, relationship_dropduplicates, [])
    result_paths.extend(paths)
branch = []
Variable = []

# Output results
for i, path in enumerate(result_paths, 1):
    Variable += [path]
    branch += [i]

result_branch = pd.DataFrame({'Variable': Variable, 'Branch': branch})
result_branch = result_branch.explode('Variable').reset_index(drop=True)
result_branch = result_branch.groupby('Variable').agg({'Branch': 'min'}).reset_index()

result_branch_stage = pd.merge(result_branch, result_stages, on='Variable')

# Line Construction
relationship_dropduplicates_excluderead = relationship_dropduplicates[relationship_dropduplicates['Dependency'].notnull()]
relationship_dropduplicates_excluderead = relationship_dropduplicates_excluderead.reset_index(drop=True)
relationship_dropduplicates_excluderead['RowNumber'] = relationship_dropduplicates_excluderead.index + 1

# Transform the DataFrame to bring the other columns into rows
relationship_melted = relationship_dropduplicates_excluderead.melt(id_vars=['RowNumber'], var_name='Variable', value_name='Value')
relationship_melted = relationship_melted[['RowNumber', 'Value']]
relationship_merged = pd.merge(result_branch_stage, relationship_melted, left_on='Variable', right_on='Value')

relationship_merged = relationship_merged[['Variable', 'Branch', 'Stages', 'RowNumber']]



In [40]:
test_variable = []  # List to store the names of the tested variables
test_result = []    # List to store the results of the tests

# List of DataFrames to be tested
tested_dataframes = [
    'df1_read', 
    'df1_change_datatype_orderdate', 
    'df1_add_column_year', 
    'df1_add_column_year', 
    'df1_filtered_year', 
    'df1_aggregated', 
    'merged_df1_df2', 
    'df2_read'
]

# Iterate over each DataFrame name in the tested_dataframes list
for i in tested_dataframes:
    # Append the result of the row count check for the current DataFrame to the test_result list
    test_result += [check_row_count(eval(i))]
    # Append the current DataFrame name to the test_variable list
    test_variable += [i]

# Create a DataFrame to store the test results with variable names and their corresponding results
result_test_dataframe = pd.DataFrame({'Variable': test_variable, 'Test_Result': test_result})

# Summarize the test results by taking the minimum result for each variable
result_test_dataframe_summarized = result_test_dataframe.groupby('Variable').agg({'Test_Result': 'min'}).reset_index()

# Merge the summarized test results with the relationship_merged DataFrame
endresult = pd.merge(relationship_merged, result_test_dataframe_summarized, on='Variable', how='left')

endresult['Test_Result'] = endresult['Test_Result'].fillna(0)


# Save the final result to an Excel file
endresult.to_excel(r"C:\Users\Robin\OneDrive\Blog\Detect Workflow Errows\line_coordninates.xlsx", index=False)
