# Data Catalog And Lineage

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import os
import warnings
warnings.filterwarnings('ignore')

%matplotlib inline

## Data Catalog and Lineage

This notebook handles the documentation and tracking of data assets, their origins, and transformations.
It establishes a system for maintaining metadata about datasets and tracking how data flows through the pipeline.

In [None]:
# Import necessary libraries
import pandas as pd
import numpy as np
import json
import datetime
import uuid
import os
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Optional, Any, Union
import hashlib
import matplotlib.pyplot as plt
import seaborn as sns


In [None]:
# Define data structures for metadata and lineage tracking
@dataclass
class ColumnMetadata:
    """Metadata for a single column in a dataset."""
    name: str
    data_type: str
    description: str = ""
    nullable: bool = True
    unique_values: int = 0
    min_value: Optional[Any] = None
    max_value: Optional[Any] = None
    sample_values: List[Any] = field(default_factory=list)
    tags: List[str] = field(default_factory=list)
    statistics: Dict[str, Any] = field(default_factory=dict)

@dataclass
class DatasetMetadata:
    """Metadata for a dataset."""
    id: str
    name: str
    description: str
    created_at: str
    updated_at: str
    version: str
    source: str
    owner: str
    columns: List[ColumnMetadata] = field(default_factory=list)
    row_count: int = 0
    tags: List[str] = field(default_factory=list)
    additional_info: Dict[str, Any] = field(default_factory=dict)

@dataclass
class Transformation:
    """Information about a data transformation."""
    id: str
    name: str
    description: str
    code: str
    created_at: str
    input_datasets: List[str] = field(default_factory=list)
    output_datasets: List[str] = field(default_factory=list)
    parameters: Dict[str, Any] = field(default_factory=dict)

@dataclass
class DataLineage:
    """Data lineage information."""
    dataset_id: str
    transformations: List[Transformation] = field(default_factory=list)
    upstream_datasets: List[str] = field(default_factory=list)
    downstream_datasets: List[str] = field(default_factory=list)

class DataCatalog:
    """A class to manage dataset metadata and lineage."""
    
    def __init__(self, catalog_path="./data_catalog"):
        """Initialize the data catalog with a storage path."""
        self.catalog_path = catalog_path
        self.metadata_path = os.path.join(catalog_path, "metadata")
        self.lineage_path = os.path.join(catalog_path, "lineage")
        

In [None]:
        # Create directories if they don't exist
        os.makedirs(self.metadata_path, exist_ok=True)
        os.makedirs(self.lineage_path, exist_ok=True)
        

In [None]:
        # In-memory cache of metadata and lineage
        self.metadata_cache = {}
        self.lineage_cache = {}
        

In [None]:
        # Load existing metadata and lineage
        self._load_catalog()
    
    def _load_catalog(self):
        """Load existing metadata and lineage from disk."""

In [None]:
        # Load metadata
        if os.path.exists(self.metadata_path):
            for filename in os.listdir(self.metadata_path):
                if filename.endswith(".json"):
                    dataset_id = filename.replace(".json", "")
                    with open(os.path.join(self.metadata_path, filename), 'r') as f:
                        metadata_dict = json.load(f)

In [None]:
                        # Convert column metadata dictionaries to objects
                        columns = []
                        for col_dict in metadata_dict.get("columns", []):
                            columns.append(ColumnMetadata(**col_dict))
                        metadata_dict["columns"] = columns
                        self.metadata_cache[dataset_id] = DatasetMetadata(**metadata_dict)
        

In [None]:
        # Load lineage
        if os.path.exists(self.lineage_path):
            for filename in os.listdir(self.lineage_path):
                if filename.endswith(".json"):
                    dataset_id = filename.replace(".json", "")
                    with open(os.path.join(self.lineage_path, filename), 'r') as f:
                        lineage_dict = json.load(f)

In [None]:
                        # Convert transformation dictionaries to objects
                        transformations = []
                        for trans_dict in lineage_dict.get("transformations", []):
                            transformations.append(Transformation(**trans_dict))
                        lineage_dict["transformations"] = transformations
                        self.lineage_cache[dataset_id] = DataLineage(**lineage_dict)
    
    def generate_dataset_id(self, df, name):
        """Generate a unique ID for a dataset based on its content and name."""

In [None]:
        # Create a hash of the dataframe's first few rows and columns
        sample_data = df.head(5).to_json()
        hash_input = f"{name}_{sample_data}_{df.shape}"
        return hashlib.md5(hash_input.encode()).hexdigest()
    
    def extract_column_metadata(self, df, column_name):
        """Extract metadata for a single column."""
        col_data = df[column_name]
        col_type = str(col_data.dtype)
        

In [None]:
        # Create basic column metadata
        col_metadata = ColumnMetadata(
            name=column_name,
            data_type=col_type,
            nullable=col_data.isna().any(),
            unique_values=col_data.nunique()
        )
        

In [None]:
        # Add statistics based on data type
        if np.issubdtype(col_data.dtype, np.number):
            col_metadata.min_value = col_data.min()
            col_metadata.max_value = col_data.max()
            col_metadata.statistics = {
                "mean": col_data.mean(),
                "median": col_data.median(),
                "std": col_data.std(),
                "25th_percentile": col_data.quantile(0.25),
                "75th_percentile": col_data.quantile(0.75)
            }
        elif col_data.dtype == 'object' or col_data.dtype.name == 'category':

In [None]:
            # For string/categorical columns
            value_counts = col_data.value_counts().head(5).to_dict()
            col_metadata.statistics = {
                "top_values": value_counts,
                "mode": col_data.mode()[0] if not col_data.mode().empty else None
            }
        

In [None]:
        # Add sample values
        col_metadata.sample_values = col_data.dropna().sample(min(5, len(col_data))).tolist()
        
        return col_metadata
    
    def register_dataset(self, df, name, description, source, owner, tags=None, additional_info=None):
        """Register a dataset in the catalog with its metadata."""

In [None]:
        # Generate a unique ID for the dataset
        dataset_id = self.generate_dataset_id(df, name)
        

In [None]:
        # Create timestamp
        timestamp = datetime.datetime.now().isoformat()
        

In [None]:
        # Extract column metadata
        columns = []
        for column in df.columns:
            col_metadata = self.extract_column_metadata(df, column)
            columns.append(col_metadata)
        

In [None]:
        # Create dataset metadata
        metadata = DatasetMetadata(
            id=dataset_id,
            name=name,
            description=description,
            created_at=timestamp,
            updated_at=timestamp,
            version="1.0",
            source=source,
            owner=owner,
            columns=columns,
            row_count=len(df),
            tags=tags or [],
            additional_info=additional_info or {}
        )
        

In [None]:
        # Create initial lineage
        lineage = DataLineage(
            dataset_id=dataset_id,
            transformations=[],
            upstream_datasets=[],
            downstream_datasets=[]
        )
        

In [None]:
        # Save metadata and lineage
        self.metadata_cache[dataset_id] = metadata
        self.lineage_cache[dataset_id] = lineage
        

In [None]:
        # Write to disk
        self._save_metadata(dataset_id)
        self._save_lineage(dataset_id)
        
        return dataset_id
    
    def _save_metadata(self, dataset_id):
        """Save dataset metadata to disk."""
        metadata = self.metadata_cache.get(dataset_id)
        if metadata:

In [None]:
            # Convert to dictionary, handling nested objects
            metadata_dict = asdict(metadata)
            

In [None]:
            # Write to file
            with open(os.path.join(self.metadata_path, f"{dataset_id}.json"), 'w') as f:
                json.dump(metadata_dict, f, indent=2)
    
    def _save_lineage(self, dataset_id):
        """Save dataset lineage to disk."""
        lineage = self.lineage_cache.get(dataset_id)
        if lineage:

In [None]:
            # Convert to dictionary, handling nested objects
            lineage_dict = asdict(lineage)
            

In [None]:
            # Write to file
            with open(os.path.join(self.lineage_path, f"{dataset_id}.json"), 'w') as f:
                json.dump(lineage_dict, f, indent=2)
    
    def get_dataset_metadata(self, dataset_id):
        """Retrieve metadata for a dataset."""
        return self.metadata_cache.get(dataset_id)
    
    def get_dataset_lineage(self, dataset_id):
        """Retrieve lineage for a dataset."""
        return self.lineage_cache.get(dataset_id)
    
    def record_transformation(self, name, description, code, input_datasets, output_dataset_id, parameters=None):
        """Record a transformation in the lineage of datasets."""

In [None]:
        # Create transformation record
        transformation_id = str(uuid.uuid4())
        timestamp = datetime.datetime.now().isoformat()
        
        transformation = Transformation(
            id=transformation_id,
            name=name,
            description=description,
            code=code,
            created_at=timestamp,
            input_datasets=input_datasets,
            output_datasets=[output_dataset_id],
            parameters=parameters or {}
        )
        

In [None]:
        # Update lineage for the output dataset
        if output_dataset_id in self.lineage_cache:
            lineage = self.lineage_cache[output_dataset_id]
            lineage.transformations.append(transformation)
            lineage.upstream_datasets.extend([ds_id for ds_id in input_datasets if ds_id not in lineage.upstream_datasets])
            self._save_lineage(output_dataset_id)
        

In [None]:
        # Update lineage for input datasets
        for input_id in input_datasets:
            if input_id in self.lineage_cache:
                input_lineage = self.lineage_cache[input_id]
                if output_dataset_id not in input_lineage.downstream_datasets:
                    input_lineage.downstream_datasets.append(output_dataset_id)
                    self._save_lineage(input_id)
        
        return transformation_id
    
    def update_dataset_metadata(self, dataset_id, df=None, **kwargs):
        """Update metadata for a dataset."""
        if dataset_id in self.metadata_cache:
            metadata = self.metadata_cache[dataset_id]
            

In [None]:
            # Update provided fields
            for key, value in kwargs.items():
                if hasattr(metadata, key):
                    setattr(metadata, key, value)
            

In [None]:
            # Update timestamp
            metadata.updated_at = datetime.datetime.now().isoformat()
            

In [None]:
            # Update dataframe-related metadata if provided
            if df is not None:
                metadata.row_count = len(df)
                

In [None]:
                # Update column metadata
                new_columns = []
                for column in df.columns:
                    col_metadata = self.extract_column_metadata(df, column)
                    new_columns.append(col_metadata)
                metadata.columns = new_columns
            

In [None]:
            # Save updated metadata
            self._save_metadata(dataset_id)
            return True
        
        return False
    
    def visualize_lineage(self, dataset_id, depth=2):
        """Visualize the lineage of a dataset."""
        try:
            import networkx as nx
            import matplotlib.pyplot as plt
            
            G = nx.DiGraph()
            

In [None]:
            # Helper function to recursively add nodes and edges
            def add_lineage(ds_id, current_depth=0):
                if current_depth > depth:
                    return
                
                if ds_id not in self.lineage_cache:
                    return
                
                lineage = self.lineage_cache[ds_id]
                metadata = self.metadata_cache.get(ds_id, None)
                

In [None]:
                # Add the current dataset node
                node_label = metadata.name if metadata else ds_id
                G.add_node(ds_id, label=node_label)
                

In [None]:
                # Add upstream datasets
                for upstream_id in lineage.upstream_datasets:
                    upstream_metadata = self.metadata_cache.get(upstream_id, None)
                    upstream_label = upstream_metadata.name if upstream_metadata else upstream_id
                    G.add_node(upstream_id, label=upstream_label)
                    G.add_edge(upstream_id, ds_id)
                    

In [None]:
                    # Recursively add upstream lineage
                    add_lineage(upstream_id, current_depth + 1)
                

In [None]:
                # Add downstream datasets
                for downstream_id in lineage.downstream_datasets:
                    downstream_metadata = self.metadata_cache.get(downstream_id, None)
                    downstream_label = downstream_metadata.name if downstream_metadata else downstream_id
                    G.add_node(downstream_id, label=downstream_label)
                    G.add_edge(ds_id, downstream_id)
                    

In [None]:
                    # Recursively add downstream lineage
                    add_lineage(downstream_id, current_depth + 1)
            

In [None]:
            # Start building the graph
            add_lineage(dataset_id)
            

In [None]:
            # Draw the graph
            plt.figure(figsize=(12, 8))
            pos = nx.spring_layout(G)
            nx.draw(G, pos, with_labels=False, node_color='skyblue', node_size=1500, arrows=True)
            

In [None]:
            # Add labels with dataset names instead of IDs
            labels = nx.get_node_attributes(G, 'label')
            nx.draw_networkx_labels(G, pos, labels=labels)
            
            plt.title(f"Data Lineage for {self.metadata_cache.get(dataset_id).name if dataset_id in self.metadata_cache else dataset_id}")
            plt.axis('off')
            plt.tight_layout()
            plt.show()
            
        except ImportError:
            print("Please install networkx to visualize lineage: pip install networkx")
    
    def generate_data_catalog_report(self, output_path="data_catalog_report.html"):
        """Generate an HTML report of the data catalog."""
        html = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>Data Catalog Report</title>
            <style>
                body { font-family: Arial, sans-serif; margin: 20px; }
                h1 { color: #2c3e50; }
                h2 { color: #3498db; margin-top: 30px; }
                table { border-collapse: collapse; width: 100%; margin-top: 10px; }
                th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
                th { background-color: #f2f2f2; }
                tr:nth-child(even) { background-color: #f9f9f9; }
                .metadata-section { margin-bottom: 30px; padding: 15px; border: 1px solid #ddd; border-radius: 5px; }
                .tag { background-color: #e0e0e0; padding: 3px 8px; border-radius: 10px; margin-right: 5px; font-size: 0.8em; }
            </style>
        </head>
        <body>
            <h1>Data Catalog Report</h1>
            <p>Generated on: """ + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + """</p>
            <p>Total Datasets: """ + str(len(self.metadata_cache)) + """</p>
        """
        

In [None]:
        # Add dataset sections
        for dataset_id, metadata in self.metadata_cache.items():
            lineage = self.lineage_cache.get(dataset_id, None)
            
            html += f"""
            <div class="metadata-section">
                <h2>{metadata.name}</h2>
                <p><strong>ID:</strong> {dataset_id}</p>
                <p><strong>Description:</strong> {metadata.description}</p>
                <p><strong>Source:</strong> {metadata.source}</p>
                <p><strong>Owner:</strong> {metadata.owner}</p>
                <p><strong>Created:</strong> {metadata.created_at}</p>
                <p><strong>Updated:</strong> {metadata.updated_at}</p>
                <p><strong>Version:</strong> {metadata.version}</p>
                <p><strong>Row Count:</strong> {metadata.row_count}</p>
                <p><strong>Tags:</strong> """
            

In [None]:
            # Add tags
            for tag in metadata.tags:
                html += f'<span class="tag">{tag}</span>'
            
            html += """</p>
                
                <h3>Columns</h3>
                <table>
                    <tr>
                        <th>Name</th>
                        <th>Type</th>
                        <th>Nullable</th>
                        <th>Unique Values</th>
                        <th>Sample Values</th>
                    </tr>
            """
            

In [None]:
            # Add column rows
            for column in metadata.columns:
                sample_values = ", ".join([str(val) for val in column.sample_values[:3]])
                html += f"""
                    <tr>
                        <td>{column.name}</td>
                        <td>{column.data_type}</td>
                        <td>{"Yes" if column.nullable else "No"}</td>
                        <td>{column.unique_values}</td>
                        <td>{sample_values}...</td>
                    </tr>
                """
            
            html += """
                </table>
            """
            

In [None]:
            # Add lineage information if available
            if lineage:
                html += """
                <h3>Data Lineage</h3>
                """
                
                if lineage.upstream_datasets:
                    html += """
                    <h4>Upstream Datasets</h4>
                    <ul>
                    """
                    for upstream_id in lineage.upstream_datasets:
                        upstream_name = self.metadata_cache.get(upstream_id).name if upstream_id in self.metadata_cache else upstream_id
                        html += f"<li>{upstream_name} ({upstream_id})</li>"
                    html += "</ul>"
                
                if lineage.downstream_datasets:
                    html += """
                    <h4>Downstream Datasets</h4>
                    <ul>
                    """
                    for downstream_id in lineage.downstream_datasets:
                        downstream_name = self.metadata_cache.get(downstream_id).name if downstream_id in self.metadata_cache else downstream_id
                        html += f"<li>{downstream_name} ({downstream_id})</li>"
                    html += "</ul>"
                
                if lineage.transformations:
                    html += """
                    <h4>Transformations</h4>
                    <table>
                        <tr>
                            <th>Name</th>
                            <th>Description</th>
                            <th>Created</th>
                        </tr>
                    """
                    for transformation in lineage.transformations:
                        html += f"""
                        <tr>
                            <td>{transformation.name}</td>
                            <td>{transformation.description}</td>
                            <td>{transformation.created_at}</td>
                        </tr>
                        """
                    html += "</table>"
            
            html += """
            </div>
            """
        
        html += """
        </body>
        </html>
        """
        

In [None]:
        # Write the HTML report to file
        with open(output_path, 'w') as f:
            f.write(html)
        
        return output_path


In [None]:
# Load the dataset
def load_nz_industry_data():
    """
    Load the New Zealand Industry Financial Dataset.
    This is a placeholder - in a real scenario, you would load the actual data.
    """

In [None]:
    # Create a sample dataframe based on the provided information
    data = {
        'Year': [2023, 2023, 2023, 2023, 2023],
        'Industry_aggregation_NZSIOC': ['Level 1', 'Level 1', 'Level 1', 'Level 1', 'Level 1'],
        'Industry_code_NZSIOC': ['99999', '99999', '99999', '99999', '99999'],
        'Industry_name_NZSIOC': ['All industries', 'All industries', 'All industries', 'All industries', 'All industries'],
        'Units': ['Dollars (millions)', 'Dollars (millions)', 'Dollars (millions)', 'Dollars (millions)', 'Dollars (millions)'],
        'Variable_code': ['H01', 'H04', 'H05', 'H07', 'H08'],
        'Variable_name': ['Total income', 'Sales, government funding, grants and subsidies', 'Interest, dividends and donations', 'Non-operating income', 'Total expenditure'],
        'Variable_category': ['Financial performance', 'Financial performance', 'Financial performance', 'Financial performance', 'Financial performance'],
        'Value': ['930995', '821630', '84354', '25010', '832964'],
        'Industry_code_ANZSIC06': ['ANZSIC06 divisions A-S (excluding classes K6330, L6711, O7552, O760, O771, O772, S9540, S9601, S9602, and S9603)'] * 5
    }
    
    df = pd.DataFrame(data)
    

In [None]:
    # Convert Year to int and Value to numeric
    df['Year'] = df['Year'].astype(int)
    df['Value'] = pd.to_numeric(df['Value'], errors='coerce')
    
    return df


In [None]:
# Example usage of the data catalog
def main():

In [None]:
    # Initialize the data catalog
    catalog = DataCatalog()
    

In [None]:
    # Load the raw dataset
    print("Loading raw dataset...")
    raw_df = load_nz_industry_data()
    

In [None]:
    # Register the raw dataset in the catalog
    raw_dataset_id = catalog.register_dataset(
        df=raw_df,
        name="NZ Industry Financial Data - Raw",
        description="Raw financial data for New Zealand industries from 2013 to 2023",
        source="Annual Enterprise Survey",
        owner="Data Science Team",
        tags=["finance", "new zealand", "industry", "raw"],
        additional_info={
            "data_collection_method": "Annual Enterprise Survey",
            "coverage_period": "2013-2023",
            "confidentiality": "Public"
        }
    )
    print(f"Raw dataset registered with ID: {raw_dataset_id}")
    

In [None]:
    # Perform a transformation: Convert Value column to numeric and clean up
    print("Performing data cleaning transformation...")
    cleaned_df = raw_df.copy()
    cleaned_df['Value'] = pd.to_numeric(cleaned_df['Value'], errors='coerce')
    

In [None]:
    # Register the cleaned dataset
    cleaned_dataset_id = catalog.register_dataset(
        df=cleaned_df,
        name="NZ Industry Financial Data - Cleaned",
        description="Cleaned financial data with numeric values",
        source="Derived from raw NZ Industry Financial Data",
        owner="Data Science Team",
        tags=["finance", "new zealand", "industry", "cleaned"],
        additional_info={
            "parent_dataset": raw_dataset_id,
            "cleaning_steps": "Converted Value column to numeric"
        }
    )
    print(f"Cleaned dataset registered with ID: {cleaned_dataset_id}")
    

In [None]:
    # Record the transformation
    catalog.record_transformation(
        name="Data Cleaning",
        description="Convert Value column to numeric and handle missing values",
        code="cleaned_df = raw_df.copy()\ncleaned_df['Value'] = pd.to_numeric(cleaned_df['Value'], errors='coerce')",
        input_datasets=[raw_dataset_id],
        output_dataset_id=cleaned_dataset_id,
        parameters={"errors": "coerce"}
    )
    

In [None]:
    # Perform another transformation: Calculate year-over-year growth
    print("Calculating year-over-year growth...")
    growth_df = cleaned_df.copy()

In [None]:
    # This would be more meaningful with the full dataset, but we'll simulate it
    growth_df['YoY_Growth'] = growth_df['Value'] * 0.05  # Simulated 5% growth
    

In [None]:
    # Register the growth dataset
    growth_dataset_id = catalog.register_dataset(
        df=growth_df,
        name="NZ Industry Financial Data - Growth Analysis",
        description="Financial data with year-over-year growth calculations",
        source="Derived from cleaned NZ Industry Financial Data",
        owner="Data Science Team",
        tags=["finance", "new zealand", "industry", "growth", "analysis"],
        additional_info={
            "parent_dataset": cleaned_dataset_id,
            "analysis_type": "Year-over-year growth"
        }
    )
    print(f"Growth analysis dataset registered with ID: {growth_dataset_id}")
    

In [None]:
    # Record the transformation
    catalog.record_transformation(
        name="Growth Calculation",
        description="Calculate year-over-year growth for financial metrics",
        code="growth_df = cleaned_df.copy()\ngrowth_df['YoY_Growth'] = growth_df['Value'] * 0.05",
        input_datasets=[cleaned_dataset_id],
        output_dataset_id=growth_dataset_id,
        parameters={"growth_metric": "year-over-year"}
    )
    

In [None]:
    # Visualize the lineage
    print("Visualizing data lineage...")
    catalog.visualize_lineage(growth_dataset_id)
    

In [None]:
    # Generate a data catalog report
    report_path = catalog.generate_data_catalog_report()
    print(f"Data catalog report generated at: {report_path}")
    

In [None]:
    # Print summary of the catalog
    print("\nData Catalog Summary:")
    print(f"Total datasets: {len(catalog.metadata_cache)}")
    for dataset_id, metadata in catalog.metadata_cache.items():
        print(f"- {metadata.name} (ID: {dataset_id})")

if __name__ == "__main__":
    main()