In [0]:
import dataiku
import pandas as pd
import time
import json

def analyze_agent_costs_final(audit_path="/data/dataiku/dss_data/run/audit"):
    client = dataiku.api_client()
    project = client.get_default_project()
    
    # Unique names for temp objects
    ts = int(time.time())
    conn_name = f"tmp_audit_conn_{ts}"
    ds_name = f"tmp_audit_logs_{ts}"
    
    dataset = None
    conn = None

    print(f"1. Creating temporary connection to: {audit_path}")
    try:
        # Create a Filesystem connection rooted at the audit log path
        # The DSS Backend (running as 'dataiku') will access this path
        conn = client.create_connection(conn_name, "Filesystem", {
            "root": audit_path
        })
    except Exception as e:
        print(f"Error creating connection. Ensure you have Admin rights. {e}")
        return None

    try:
        print(f"2. Creating temporary dataset: {ds_name} (Raw Line Mode)")
        # We use 'csv' because it is the only format that supports raw line reading via separator hacks
        dataset = project.create_dataset(ds_name, "Filesystem", params={
            "connection": conn_name,
            "path": "/" # Root of the connection
        }, formatType="csv")
        
        # Configure CSV to act like a "Line Reader"
        # We use a separator that doesn't exist in the file (\x1F) so the whole line becomes Column 0
        settings = dataset.get_settings()
        settings.get_raw()["formatParams"] = {
            "style": "excel",
            "separator": "\x1F",  # Unit Separator (ASCII 31) - highly unlikely in JSON
            "quoteChar": "",      # Disable quoting to prevent 'unexpected EOF' on JSON quotes
            "escapeChar": "",     # Disable escaping to read literal chars
            "parseHeaderRow": False,
            "skipRowsBeforeHeader": 0,
            "charset": "utf8"
        }
        
        # Force a single-column schema
        settings.get_raw()["schema"] = {
            "columns": [{"name": "line", "type": "string"}]
        }
        settings.save()

        print("3. Streaming and parsing log lines...")
        
        stats = {}
        # We use the internal dataiku.Dataset to read efficiently
        dku_ds = dataiku.Dataset(ds_name)
        
        # Iterate through the file line by line (in chunks)
        print("OUTER LOOP starting...")
       
        for df in dku_ds.iter_dataframes(chunksize=5000):
            # Dynamic column finding (in case DSS names it 'col0' instead of 'line')
            try:
                # print("OUTER LOOP TOP")
                col_name = df.columns[0]

                # Iterate over rows in the dataframe
                for raw_line in df[col_name]:
                    try:
                        # print("INNER LOOP TOP")                        
                        if not isinstance(raw_line, str):
                            continue

                        # 3a. Parse JSON manually
                        try:
                            event = json.loads(raw_line)
                        # except json.JSONDecodeError:
                        except Exception as e:
                            # This catches the "dirty" lines (stack traces, etc.) and skips them
                            print(f"Exception - inner loop - MID, continuing {e}")

                            continue

                        # 3b. Filter for LLM events
                        # 'topic' usually identifies the event type
                        topic = event.get('topic', '')
                        if 'llm' not in topic and 'external-model' not in topic:
                            continue

                        print("INNER LOOP 20")
                        # 3c. Extract Data
                        data = event.get('data', {})

                        # Optional: Skip failed calls (remove if you want to count errors)
                        if data.get('outcome') != 'SUCCESS':
                            continue

                        details = data.get('details', {})
                        usage = data.get('usage', {})
                        context = data.get('context', {})
                        target = data.get('target', {})

                        # 3d. Identify Agent & Model
                        # Heuristic: Check Context -> Details -> Target
                        agent = context.get('agentName') or \
                                details.get('agentName') or \
                                context.get('agentId') or \
                                "Direct/Unknown"

                        model = details.get('llmId') or \
                                target.get('llmId') or \
                                "N/A"

                        # 3e. Extract Metrics
                        cost = usage.get('estimatedCost', 0.0)
                        tokens = usage.get('totalTokens', 0)

                        # 3f. Aggregate
                        key = (agent, model)
                        if key not in stats:
                            stats[key] = {'cost': 0.0, 'tokens': 0, 'calls': 0}

                        stats[key]['cost'] += float(cost or 0)
                        stats[key]['tokens'] += int(tokens or 0)
                        stats[key]['calls'] += 1
                        print("INNER LOOP BOTTOM - SAFE")
                    except Exception as e:
                        print(f"Exception - inner loop, continuing {e}")
                        continue
            except Exception as e:
                print(f"Exception - outer loop, continuing {e}")
                continue
            print("ZZZZZZ")
        print("Outer loop successfully finished.")

        # 4. Format Output
        print("FORMATTING OUTPUT...")
        results = []
        for (agent, model), metrics in stats.items():
            results.append({
                "Agent Name": agent,
                "LLM Model": model,
                "Total Cost ($)": round(metrics['cost'], 4),
                "Total Tokens": int(metrics['tokens']),
                "Call Count": int(metrics['calls'])
            })
            
        if not results:
            return pd.DataFrame(columns=["Status"], data=["No LLM usage events found in logs"]) 
            
        return pd.DataFrame(results).sort_values("Total Cost ($)", ascending=False)

    except Exception as e:
        print(f"\nCRITICAL ERROR: {e}")
        import traceback
        traceback.print_exc()
        #return None

    finally:
        print("4. Cleaning up temporary artifacts...")
        if dataset:
            #return pd.DataFrame(results)
            try: dataset.delete()
            except: pass
        if conn:
            try: conn.delete()
            except: pass
        

# --- Execution ---
LOG_PATH = "/data/dataiku/dss_data/run/audit" 
df_report = analyze_agent_costs_final(LOG_PATH)

if df_report is not None:
    print("\n--- Agent Cost & Utilization Report ---")
    print(df_report.to_string(index=False))

In [0]:
df_report

In [0]:
import dataiku
import pandas as pd
import time
import json
import os

def analyze_agent_costs_parent_conn(audit_path="/data/dataiku/dss_data/run/audit"):
    client = dataiku.api_client()
    project = client.get_default_project()
    
    # Calculate Parent Directory and Target Folder Name
    parent_dir = os.path.dirname(audit_path.rstrip('/'))
    target_subfolder = "/" + os.path.basename(audit_path.rstrip('/'))
    
    ts = int(time.time())
    conn_name = f"tmp_audit_root_{ts}"
    folder_name = f"tmp_audit_access_{ts}"
    
    conn = None
    folder = None
    
    print(f"1. Creating temporary connection to PARENT: {parent_dir}")
    try:
        conn = client.create_connection(conn_name, "Filesystem", {
            "root": parent_dir
        })
    except Exception as e:
        print(f"Error creating connection: {e}")
        return None

    try:
        print(f"2. Creating temporary Folder pointing to: {target_subfolder}")
        folder = project.create_managed_folder(folder_name)
        settings = folder.get_settings()
        settings.get_raw()["type"] = "Filesystem"
        settings.get_raw()["params"] = {
            "connection": conn_name, 
            "path": target_subfolder 
        }
        settings.save()
        
        print("3. Listing and streaming log files...")
        dku_folder = dataiku.Folder(folder_name)
        
        try:
            all_paths = dku_folder.list_paths_in_partition()
        except Exception as e:
            print(f"   Error listing files: {e}")
            return None

        print(f"   Found {len(all_paths)} files.")
        
        stats = {}
        
        for file_path in all_paths:
            # Only process audit logs
            if "audit.log" not in file_path:
                continue
                
            print(f"   Processing: {file_path} ...")
            
            try:
                # Get raw stream (File-like object)
                with dku_folder.get_download_stream(file_path) as stream:
                    
                    # --- FIX: Iterate directly over the stream object ---
                    for line_bytes in stream:
                        if not line_bytes: continue
                            
                        try:
                            # Robust decoding
                            line_str = line_bytes.decode('utf-8', errors='replace').strip()
                            if not line_str: continue
                            
                            event = json.loads(line_str)
                            
                            # --- Filter & Extraction ---
                            topic = event.get('topic', '')
                            if 'llm' not in topic and 'external-model' not in topic:
                                continue
                            
                            data = event.get('data', {})
                            if data.get('outcome') != 'SUCCESS':
                                continue

                            details = data.get('details', {})
                            usage = data.get('usage', {})
                            context = data.get('context', {})
                            target = data.get('target', {})

                            # Identity Heuristics
                            agent = context.get('agentName') or details.get('agentName') or context.get('agentId') or "Direct/Unknown"
                            model = details.get('llmId') or target.get('llmId') or "N/A"
                            
                            # Metrics
                            cost = usage.get('estimatedCost', 0.0)
                            tokens = usage.get('totalTokens', 0)
                            
                            # Aggregate
                            key = (agent, model)
                            if key not in stats:
                                stats[key] = {'cost': 0.0, 'tokens': 0, 'calls': 0}
                            
                            stats[key]['cost'] += float(cost or 0)
                            stats[key]['tokens'] += int(tokens or 0)
                            stats[key]['calls'] += 1
                            
                        except (json.JSONDecodeError, ValueError):
                            continue 
                            
            except Exception as e:
                print(f"   WARNING: Skipped file {file_path}: {e}")
                continue

        # 4. Format Output
        results = []
        for (agent, model), metrics in stats.items():
            results.append({
                "Agent Name": agent,
                "LLM Model": model,
                "Total Cost ($)": round(metrics['cost'], 4),
                "Total Tokens": int(metrics['tokens']),
                "Call Count": int(metrics['calls'])
            })
            
        if not results:
            return pd.DataFrame(columns=["Status"], data=["No LLM usage events found in logs"]) 
            
        return pd.DataFrame(results).sort_values("Total Cost ($)", ascending=False)

    except Exception as e:
        print(f"\nCRITICAL ERROR: {e}")
        return None

    finally:
        print("4. Cleaning up temporary artifacts...")
        if folder:
            try: folder.delete()
            except: pass
        if conn:
            try: conn.delete()
            except: pass

# --- Execution ---
LOG_PATH = "/data/dataiku/dss_data/run/audit" 

df_report = analyze_agent_costs_parent_conn(LOG_PATH)

if df_report is not None:
    print("\n--- Agent Cost & Utilization Report ---")
    print(df_report.to_string(index=False))