# Automated Data Observability Pipeline for Tableau Cloud

### Project Overview
In large-scale Analytics environments, relying on users to report stale data is inefficient and damages trust. This project shifts the paradigm from **Reactive Support** to **Proactive Observability**.

Using the **Tableau Server Client (TSC) API**, this pipeline:
1.  **Audits** the entire Tableau Cloud environment automatically.
2.  **Validates** the "Data Freshness" of extracts against defined SLAs.
3.  **Identifies** failures before they impact business decision-making.

### Tech Stack
* **Python 3.x**
* **Tableau Server Client (TSC)** for API interaction
* **Pandas** for structured log analysis
* **Data Governance** concepts (SLA monitoring)

In [None]:
# Install the Tableau Server Client library
# This is required as it is not included in the standard Databricks Runtime.

%pip install tableauserverclient

In [None]:
import tableauserverclient as TSC
import pandas as pd
import datetime
from datetime import timezone
from pyspark.sql import SparkSession

# --- SECURITY CONFIGURATION ---
# Best Practice: Retrieve credentials from Databricks Secrets (Key Vault)
# instead of hardcoding sensitive information in the notebook.
try:
    # Replace 'tableau_secrets' with your actual Secret Scope name
    TOKEN_NAME = dbutils.secrets.get(scope="tableau_secrets", key="pat_name")
    TOKEN_VALUE = dbutils.secrets.get(scope="tableau_secrets", key="pat_value")
    
    # Environment Configuration
    SITE_ID = "corporate-analytics" # Replace with your Site ID
    SERVER_URL = "https://prod-useast-b.online.tableau.com" # Replace with your Server URL

except Exception as e:
    print(f"Warning: Could not retrieve secrets from Databricks. Error: {e}")
    print("Ensure the Secret Scope is mounted and keys are correct.")

# Initialize Authentication
tableau_auth = TSC.PersonalAccessTokenAuth(TOKEN_NAME, TOKEN_VALUE, SITE_ID)
server = TSC.Server(SERVER_URL, use_server_version=True)

In [None]:
# --- DATA EXTRACTION PIPELINE ---
# Goals:
# 1. Connect to Tableau Cloud API
# 2. Iterate through all Workbooks and Published Datasources
# 3. Calculate 'Data Freshness' (Lag)
# 4. Generate a unified audit log

audit_log = []

print(f"Starting Audit on: {SERVER_URL}")

with server.auth.sign_in(tableau_auth):
    
    # 1. Audit Workbooks
    print("Scanning Workbooks...")
    all_workbooks, _ = server.workbooks.get(TSC.RequestOptions())
    
    for wb in all_workbooks:
        last_refresh = wb.updated_at
        # Calculate hours since last refresh. If None (Live connection), set to -1
        hours_since = (datetime.datetime.now(timezone.utc) - last_refresh).total_seconds() / 3600 if last_refresh else -1
        
        audit_log.append({
            'asset_type': 'Workbook',
            'asset_id': wb.id,
            'asset_name': wb.name,
            'owner_id': wb.owner_id,
            'project_name': wb.project_name,
            'last_refresh_utc': last_refresh,
            'hours_since_refresh': float(round(hours_since, 2)),
            'status': 'Critical' if hours_since > 24 else 'Healthy', # SLA Rule: 24h
            'audit_timestamp': datetime.datetime.now(timezone.utc)
        })

    # 2. Audit Published Datasources
    print("Scanning Datasources...")
    all_datasources, _ = server.datasources.get(TSC.RequestOptions())
    
    for ds in all_datasources:
        last_refresh = ds.updated_at
        hours_since = (datetime.datetime.now(timezone.utc) - last_refresh).total_seconds() / 3600 if last_refresh else -1
        
        audit_log.append({
            'asset_type': 'Datasource',
            'asset_id': ds.id,
            'asset_name': ds.name,
            'owner_id': ds.owner_id,
            'project_name': ds.project_name,
            'last_refresh_utc': last_refresh,
            'hours_since_refresh': float(round(hours_since, 2)),
            'status': 'Critical' if hours_since > 24 else 'Healthy', # SLA Rule: 24h
            'audit_timestamp': datetime.datetime.now(timezone.utc)
        })

print(f"Audit Complete. Extracted {len(audit_log)} metadata records.")

In [None]:
# --- STORAGE LAYER (Load) ---
# Convert the audit log to a Spark DataFrame and save it as a Delta Table.
# This enables historical analysis of environment stability over time.

# 1. Convert List of Dicts -> Pandas -> Spark
# (Pandas is used as an intermediate step for easier handling of list of dicts)
df_pandas = pd.DataFrame(audit_log)
df_spark = spark.createDataFrame(df_pandas)

# 2. Define Table Location
table_name = "bronze_tableau_observability_logs"

# 3. Write to Delta Lake
(df_spark.write
    .format("delta")
    .mode("append") # Append ensures we keep history
    .option("mergeSchema", "true") # Handles schema evolution if we add fields later
    .saveAsTable(table_name)
)

print(f"Success! Data written to Delta Table: '{table_name}'")
display(df_spark.limit(5))