In [18]:
from functions.credentials import get_credentials
from functions.clients import initialize_clients
from datetime import datetime
import pandas as pd 

In [19]:
secret_client, writer, logger = get_credentials()
tasks = initialize_clients()

In [20]:

def extract_access_right(user: dict) -> str:
    """Extract the access right from user dictionary"""
    access_right_key = next(key for key in user if 'UserAccessRight' in key)
    return user.get(access_right_key)

def fill_user_table(user_access: list, object_type: str, object_id: str, list_of_users: list) -> None:
    """Fill user access table with user permissions data"""
    for user in list_of_users:
        access_right = extract_access_right(user)
        user_access.append([user['graphId'], object_id, access_right, object_type])

def get_element_id(object_data: dict) -> str:
    """Get element ID from object data"""
    return object_data.get('id') or object_data.get('objectId')

def extract_string_values(object_data: dict) -> dict:
    """Extract string values from object data"""
    return {key: value for key, value in object_data.items() if isinstance(value, str)}

def fill_dimension_table(workspace_content: list, users: list, all_data: dict, 
                        workspace_id: str, object_type: str, object_data: dict) -> None:
    """Fill dimension tables with workspace content data"""
    element_id = get_element_id(object_data)
    fill_user_table(users, object_type, element_id, object_data.get('users', []))

    dimension_data = extract_string_values(object_data)
    
    if object_type not in all_data:
        all_data[object_type] = []
    all_data[object_type].append(dimension_data)

    workspace_content.append([workspace_id, element_id, object_type])

def process_workspace_users(user_access: list, users: list, workspace: dict) -> None:
    """Process workspace users data"""
    workspace_users = workspace.get('users', [])
    fill_user_table(user_access, 'workspace', workspace['id'], workspace_users)
    users.extend(workspace_users)

def process_workspace_objects(workspace_content: list, user_access: list, 
                            all_data: dict, workspace: dict) -> None:
    """Process workspace objects data"""
    for key, value in workspace.items():
        if isinstance(value, list) and key != 'users':
            for object_ in value:
                fill_dimension_table(workspace_content, user_access, all_data,
                                  workspace['id'], key, object_)

def transform_powerbi_data(client: str) -> None:
    """Transform Power BI data from bronze to silver layer"""
    today = datetime.now().strftime('%d%m%Y')
    silver_path = f'{today}/{client}'

    # Read input data
    input_data = {
        'workspaces': pd.DataFrame(writer.read_json_data('test-app', f'{client}_workspaces_{today}')),
        'activities': pd.DataFrame(writer.read_json_data('test-app', f'{client}_activities_{today}')),
        'workspace_content': writer.read_json_data('bronze', f'{client}_workspace_content_{today}')
    }

    # Process workspace content
    user_access = []
    users = []
    workspace_content = []
    all_data = {}

    # Process each workspace
    for workspace in input_data['workspace_content']:
        for key, value in workspace.items():
            if not isinstance(value, list):
                continue
            if key == "users":
                process_workspace_users(user_access, users, workspace)
            else:
                process_workspace_objects(workspace_content, user_access, all_data, workspace)

    # Create and deduplicate dataframes
    output_data = {
        'users': pd.DataFrame(users).drop_duplicates(),
        'user_access': pd.DataFrame(user_access).drop_duplicates(),
        'workspaces': input_data['workspaces'],
        'workspace_content': pd.DataFrame(workspace_content),
        'activities': input_data['activities'],
        **{key: pd.DataFrame(value).drop_duplicates() for key, value in all_data.items()}
    }

    # Write all dataframes to silver layer
    for name, df in output_data.items():
        writer.write_parquet_data(df, 'silver', f'{silver_path}/{name}')


#transform_powerbi_data('vbi')


In [21]:
clients = initialize_clients()

In [68]:
def add_metadata_columns(df: pd.DataFrame, client: str, today: str) -> pd.DataFrame:
    """
    Adds metadata columns to a dataframe
    
    Args:
        df (pd.DataFrame): Input dataframe
        client (str): Client name
        today (str): Date string
        
    Returns:
        pd.DataFrame: Dataframe with added metadata columns
    """
    df['client'] = client
    df['dwh_date'] = today
    return df

def merge_new_rows(current_df: pd.DataFrame, silver_df: pd.DataFrame, id_column: str) -> pd.DataFrame:
    """
    Merges rows from silver dataframe that don't exist in current dataframe based on ID column
    
    Args:
        current_df (pd.DataFrame): Existing dataframe from gold layer
        silver_df (pd.DataFrame): New dataframe from silver layer
        id_column (str): Name of ID column to check for new rows
        
    Returns:
        pd.DataFrame: Merged dataframe with new rows added
    """
    if current_df.empty:
        print(f"Adding {len(silver_df)} new rows (initial load)")
        return silver_df
        
    # Find rows in silver that don't exist in current based on ID
    new_rows = silver_df[~silver_df[id_column].isin(current_df[id_column])]
    
    # Print number of new rows
    print(f"Adding {len(new_rows)} new rows")
    
    # Concatenate current data with new rows
    
    return pd.concat([current_df, new_rows], ignore_index=True)

def load_power_bi_data(client):
    today = datetime.now().strftime('%d%m%Y')
    silver_path = f'{today}/{client}'
    id_columns = {
        'users': 'graphId',
        'activities': 'Id', 
        'user_access': 0, 
        'workspace_content': 0 

    }

    all_tables = {
        'dimensions': ['users','workspaces', 'reports', 'datasets'
        ], 
        'facts': ['user_access', 'workspace_content', 'activities', 'azure_spend']
    }

    for kind, tables in all_tables.items():
        for name in tables:
            try:
                dataframe = writer.read_parquet_data('silver', f'{silver_path}/{name}')
                id_column = id_columns.get(name, 'id')
                path_abbreviation = 'dim' if kind == 'dimensions' else 'fact'
                try:
                    current_dataframe = writer.read_parquet_data('gold', f"{kind}/{path_abbreviation}_{name}")
                    df = merge_new_rows(current_dataframe, dataframe, id_column)
                except Exception as e:
                    print("This table doesnt exist now: ", name, e)
                    df = dataframe

                writer.write_parquet_data(df, 'gold', f"{kind}/{path_abbreviation}_{name}")

            except:
                pass





In [69]:
load_power_bi_data('vbi')

dict_items([('client', 'datalake_writer'), ('date', '11-11-2024'), ('time', '17:02:46'), ('operation', 'ERROR'), ('kind', 'Read data'), ('text', 'Error reading parquet data from 27082024/vbi_users: The specified blob does not exist.\nRequestId:a7271c61-f01e-0034-7c53-344e02000000\nTime:2024-11-11T16:02:44.5682015Z\nErrorCode:BlobNotFound\nContent: <?xml version="1.0" encoding="utf-8"?><Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.\nRequestId:a7271c61-f01e-0034-7c53-344e02000000\nTime:2024-11-11T16:02:44.5682015Z</Message></Error>')])
dict_items([('client', 'datalake_writer'), ('date', '11-11-2024'), ('time', '17:02:46'), ('operation', 'ERROR'), ('kind', 'Read data'), ('text', 'Error reading parquet data from 27082024/vbi_workspaces: The specified blob does not exist.\nRequestId:a7271c74-f01e-0034-0d53-344e02000000\nTime:2024-11-11T16:02:44.5832537Z\nErrorCode:BlobNotFound\nContent: <?xml version="1.0" encoding="utf-8"?><Error><Code>BlobNotFound</Code><Messa