# Data Ecosystems and Governance


## Contents
1. **Data Dictionary** – Creating and validating metadata
2. **Data Quality Checks** – Enforcing rule-based validations
3. **Data Lineage** – Logging transformations for audit and compliance
4. **Role-Based Access Control (RBAC)** – Managing permissions via roles
5. **Building a Data Catalog** – Collecting metadata for data discovery
6. **Compliance Checks** – Automating corporate/regulatory policy enforcement
7. **Data Versioning & Change Control** – Tracking dataset versions and logging changes




Generate Sample DataFrames

Below, we define functions to create the DataFrames that we'll use in each exercise:
- `create_employees_df()`
- `create_sales_df()`
- `create_transactions_df()`
- `create_customers_df()`
- `create_employee_info_df()`
- `create_product_info_df()`

In [1]:
import pandas as pd

def create_employees_df():
    data = [
        {"employee_id":1, "first_name":"John",   "last_name":"Smith",  "department":"Sales",   "date_of_hire":"2023-01-02"},
        {"employee_id":2, "first_name":"Sarah",  "last_name":"Johnson","department":"IT",      "date_of_hire":"2022-06-15"},
        {"employee_id":3, "first_name":"Michael","last_name":"Brown", "department":"Finance","date_of_hire":"2021-09-10"},
        {"employee_id":4, "first_name":"Emily",  "last_name":"Davis",  "department":"IT",      "date_of_hire":"2022-10-25"},
        {"employee_id":5, "first_name":"David",  "last_name":"Miller", "department":"Sales",   "date_of_hire":"2020-03-18"}
    ]
    return pd.DataFrame(data)

def create_sales_df():
    data = [
        {"sale_id":101, "product_id":"P001", "region":"North", "sale_amount":300, "sale_date":"2023-01-05"},
        {"sale_id":102, "product_id":"P002", "region":"East",  "sale_amount":150, "sale_date":"2023-01-06"},
        {"sale_id":103, "product_id":"P003", "region":"South", "sale_amount":0,   "sale_date":"2023-01-07"},
        {"sale_id":104, "product_id":"P001", "region":"West",  "sale_amount":450, "sale_date":"2023-01-08"},
        {"sale_id":105, "product_id":"P004", "region":"North", "sale_amount":250, "sale_date":"2023-01-09"}
    ]
    return pd.DataFrame(data)

def create_transactions_df():
    data = [
        {"transaction_id":1, "amount":100, "timestamp":"2023-01-01 10:00:00"},
        {"transaction_id":2, "amount":200, "timestamp":"2023-01-01 10:05:00"},
        {"transaction_id":3, "amount":-50, "timestamp":"2023-01-01 10:10:00"},
        {"transaction_id":4, "amount":300, "timestamp":"2023-01-01 10:15:00"},
        {"transaction_id":5, "amount":150, "timestamp":"2023-01-01 10:20:00"}
    ]
    return pd.DataFrame(data)

def create_customers_df():
    data = [
        {"customer_id":"C001", "name":"John Doe",    "email":"johndoe@example.com",    "phone_number":"1234567890",   "address":"123 Elm Street, Springfield, USA"},
        {"customer_id":"C002", "name":"Jane Smith",  "email":"janesmith@example.com",  "phone_number":"5559998888",   "address":"456 Oak Avenue, Springfield, USA"},
        {"customer_id":"C003", "name":"Peter Parker","email":"peterparker@example.com","phone_number":"7776665555",   "address":"789 Maple Road, Gotham, USA"},
        {"customer_id":"C004", "name":"Mary Johnson","email":"maryj@example.com",      "phone_number":"4443332222",   "address":"101 Pine Lane, Metropolis, USA"}
    ]
    return pd.DataFrame(data)

def create_employee_info_df():
    data = [
        {"employee_id":1, "first_name":"John",    "last_name":"Smith",  "salary":40000, "last_updated":"2023-02-01"},
        {"employee_id":2, "first_name":"Sarah",   "last_name":"Johnson","salary":50000, "last_updated":"2023-02-05"},
        {"employee_id":3, "first_name":"Michael", "last_name":"Brown", "salary":45000, "last_updated":"2023-02-03"}
    ]
    return pd.DataFrame(data)

def create_product_info_df():
    data = [
        {"product_id":"P001", "product_name":"WidgetA", "price":10.99},
        {"product_id":"P002", "product_name":"WidgetB", "price":5.49},
        {"product_id":"P003", "product_name":"WidgetC", "price":14.99}
    ]
    return pd.DataFrame(data)


## 1. Data Dictionary

**Objective**: Automatically derive basic metadata from a DataFrame and validate it.

**Steps**:
1. Create a function to generate column info (type, number of unique values, etc.).
2. Create a validation function that checks data type consistency.
3. Demonstrate with an in-memory DataFrame (employees).

In [2]:
def create_data_dictionary(df):
    """
    Creates a basic data dictionary (column names, data types, unique count, etc.).
    Returns a dict structure.
    """
    data_dict = {}

    for col in df.columns:
        col_data = df[col]
        data_type = str(col_data.dtype)
        unique_vals = col_data.nunique()
        col_info = {
            "data_type": data_type,
            "num_unique_values": unique_vals
        }
        if pd.api.types.is_numeric_dtype(col_data):
            col_info["min"] = float(col_data.min())
            col_info["max"] = float(col_data.max())
        data_dict[col] = col_info
    return data_dict

def validate_data_dictionary(df, data_dict):
    """
    Checks if DataFrame columns match data dictionary definitions (type checks, etc.).
    """
    validation_report = []

    for col, meta in data_dict.items():
        if col not in df.columns:
            validation_report.append(f"Column '{col}' is missing from dataframe.")
        else:
            actual_dtype = str(df[col].dtype)
            expected_dtype = meta["data_type"]
            if actual_dtype != expected_dtype:
                validation_report.append(
                    f"Column '{col}' expected {expected_dtype}, but got {actual_dtype}."
                )

    if not validation_report:
        print("Validation passed! All columns match the data dictionary.")
    else:
        print("Validation issues found:")
        for issue in validation_report:
            print(" -", issue)

# Demonstration
df_employees = create_employees_df()
dictionary = create_data_dictionary(df_employees)
print("Data Dictionary for 'employees':\n", dictionary)
print("\nValidation:")
validate_data_dictionary(df_employees, dictionary)


Data Dictionary for 'employees':
 {'employee_id': {'data_type': 'int64', 'num_unique_values': 5, 'min': 1.0, 'max': 5.0}, 'first_name': {'data_type': 'object', 'num_unique_values': 5}, 'last_name': {'data_type': 'object', 'num_unique_values': 5}, 'department': {'data_type': 'object', 'num_unique_values': 3}, 'date_of_hire': {'data_type': 'object', 'num_unique_values': 5}}

Validation:
Validation passed! All columns match the data dictionary.


## 2. Data Quality Checks

**Objective**: Enforce rule-based validations (e.g., no negative amounts, valid categories) on a DataFrame.

**Steps**:
1. Check multiple rules on `df_sales`.
2. Report violations.
3. Apply a simple remediation (filter out invalid rows).

In [3]:
def check_data_quality(df):
    """
    Rules:
      - sale_amount >= 0
      - region in [North, South, East, West]
      - product_id in [P001, P002, P003, P004]
    """
    known_products = ["P001", "P002", "P003", "P004"]
    valid_regions = ["North", "South", "East", "West"]

    violations = []

    # sale_amount >= 0
    if (df["sale_amount"] < 0).any():
        count_neg = (df["sale_amount"] < 0).sum()
        violations.append(f"Found {count_neg} rows with negative sale_amount.")

    # region in valid list
    invalid_region_mask = ~df["region"].isin(valid_regions)
    if invalid_region_mask.any():
        count_invalid_reg = invalid_region_mask.sum()
        violations.append(f"Found {count_invalid_reg} rows with invalid region.")

    # product_id in known products
    invalid_product_mask = ~df["product_id"].isin(known_products)
    if invalid_product_mask.any():
        count_invalid_prod = invalid_product_mask.sum()
        violations.append(f"Found {count_invalid_prod} rows with unknown product_id.")

    return violations

def remediate_data(df):
    """
    Drop rows that fail the rules.
    """
    known_products = ["P001", "P002", "P003", "P004"]
    valid_regions = ["North", "South", "East", "West"]

    df_clean = df[df["sale_amount"] >= 0]
    df_clean = df_clean[df_clean["region"].isin(valid_regions)]
    df_clean = df_clean[df_clean["product_id"].isin(known_products)]
    return df_clean

# Demonstration
df_sales = create_sales_df()
print("Original Sales Data:\n", df_sales)

dq_violations = check_data_quality(df_sales)
if dq_violations:
    print("\nData Quality Violations Detected:")
    for v in dq_violations:
        print(" -", v)
else:
    print("\nNo data quality violations found.")

df_cleaned = remediate_data(df_sales)
print("\nCleaned DataFrame after remediation:\n", df_cleaned)


Original Sales Data:
    sale_id product_id region  sale_amount   sale_date
0      101       P001  North          300  2023-01-05
1      102       P002   East          150  2023-01-06
2      103       P003  South            0  2023-01-07
3      104       P001   West          450  2023-01-08
4      105       P004  North          250  2023-01-09

No data quality violations found.

Cleaned DataFrame after remediation:
    sale_id product_id region  sale_amount   sale_date
0      101       P001  North          300  2023-01-05
1      102       P002   East          150  2023-01-06
2      103       P003  South            0  2023-01-07
3      104       P001   West          450  2023-01-08
4      105       P004  North          250  2023-01-09


## 3. Data Lineage

**Objective**: Track each transformation step (e.g., load, filter) in an ETL-like flow.

**Steps**:
1. Create a `LineageTracker` class.
2. Log actions (`action`, `details`, `rows_in_dataset`, `timestamp`).
3. Demonstrate by filtering negative `amount` values in `df_transactions`.

In [4]:
import time
import json

class LineageTracker:
    def __init__(self):
        self.steps = []

    def log_step(self, action, details, df):
        timestamp = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        step_info = {
            "step_number": len(self.steps) + 1,
            "action": action,
            "details": details,
            "rows_in_dataset": len(df),
            "timestamp": timestamp
        }
        self.steps.append(step_info)

    def save_lineage(self, filename="lineage_report.json"):
        with open(filename, "w") as f:
            json.dump({"steps": self.steps}, f, indent=4)

# Demonstration
df_trans = create_transactions_df()
tracker = LineageTracker()

# Step 1: 'Load'
tracker.log_step(action="load", details="Loaded transactions DataFrame", df=df_trans)

# Step 2: Filter out negative amounts
df_trans_filtered = df_trans[df_trans["amount"] >= 0]
tracker.log_step(action="filter", details="Removed rows where amount < 0", df=df_trans_filtered)

# Print lineage steps
print("Lineage Steps:")
for step in tracker.steps:
    print(step)


Lineage Steps:
{'step_number': 1, 'action': 'load', 'details': 'Loaded transactions DataFrame', 'rows_in_dataset': 5, 'timestamp': '2025-03-06T15:45:11Z'}
{'step_number': 2, 'action': 'filter', 'details': 'Removed rows where amount < 0', 'rows_in_dataset': 4, 'timestamp': '2025-03-06T15:45:11Z'}


## 4. Role-Based Access Control (RBAC)

**Objective**: Show how to manage user roles (e.g., `DataAnalyst`, `DataEngineer`, `Auditor`) and check permissions.

**Steps**:
1. Create a Python class `AccessControlManager` with hard-coded roles.
2. Check if a user can view/modify data.


In [5]:
# We'll store roles and users in memory (no JSON files)

class AccessControlManager:
    def __init__(self):
        # Hardcoded for demonstration
        self.roles = {
            "DataAnalyst": {"can_view_raw_data": True,  "can_modify_data": False},
            "DataEngineer": {"can_view_raw_data": True,  "can_modify_data": True},
            "Auditor":      {"can_view_raw_data": False, "can_modify_data": False}
        }
        self.users = {
            "alice":   "DataAnalyst",
            "bob":     "DataEngineer",
            "charlie": "Auditor"
        }

    def get_user_role(self, user):
        return self.users.get(user, None)

    def can_view_raw_data(self, user):
        role = self.get_user_role(user)
        if role and self.roles[role].get("can_view_raw_data", False):
            return True
        return False

    def can_modify_data(self, user):
        role = self.get_user_role(user)
        if role and self.roles[role].get("can_modify_data", False):
            return True
        return False

# Demonstration
acm = AccessControlManager()
test_users = ["alice", "bob", "charlie", "unknown"]
for u in test_users:
    print(f"User: {u}, Role: {acm.get_user_role(u)}")
    print("  -> can_view_raw_data?", acm.can_view_raw_data(u))
    print("  -> can_modify_data?", acm.can_modify_data(u))
    print()

User: alice, Role: DataAnalyst
  -> can_view_raw_data? True
  -> can_modify_data? False

User: bob, Role: DataEngineer
  -> can_view_raw_data? True
  -> can_modify_data? True

User: charlie, Role: Auditor
  -> can_view_raw_data? False
  -> can_modify_data? False

User: unknown, Role: None
  -> can_view_raw_data? False
  -> can_modify_data? False



## 5. Building a Data Catalog

**Objective**: Simulate scanning multiple DataFrames for metadata (row/column counts), storing results in a small "catalog."**

**Steps**:
1. Create some DataFrames in memory (`employees`, `sales`, `customers`).
2. Build a `catalog` dictionary with metadata.
3. Implement a simple search by keyword (on DataFrame "names" or column names).

In [6]:
def build_catalog(dataframes_dict):
    """
    dataframes_dict: { name_of_df: DataFrame }
    returns a dict of metadata.
    """
    catalog = {}
    for name, df in dataframes_dict.items():
        catalog[name] = {
            "num_rows": len(df),
            "num_columns": len(df.columns),
            "columns": list(df.columns)
        }
    return catalog

def search_catalog(catalog, keyword):
    results = {}
    for df_name, meta in catalog.items():
        # Check df_name or columns
        if (keyword.lower() in df_name.lower()) or any(keyword.lower() in col.lower() for col in meta["columns"]):
            results[df_name] = meta
    return results

# Demonstration
dfs = {
    "employees": create_employees_df(),
    "sales": create_sales_df(),
    "customers": create_customers_df()
}

catalog = build_catalog(dfs)
print("Catalog Metadata:\n", catalog)

keyword = "employee"
hits = search_catalog(catalog, keyword)
print(f"\nSearching catalog for '{keyword}'...")
for df_name, meta in hits.items():
    print(f" - Found match in '{df_name}' with columns: {meta['columns']}")


Catalog Metadata:
 {'employees': {'num_rows': 5, 'num_columns': 5, 'columns': ['employee_id', 'first_name', 'last_name', 'department', 'date_of_hire']}, 'sales': {'num_rows': 5, 'num_columns': 5, 'columns': ['sale_id', 'product_id', 'region', 'sale_amount', 'sale_date']}, 'customers': {'num_rows': 4, 'num_columns': 5, 'columns': ['customer_id', 'name', 'email', 'phone_number', 'address']}}

Searching catalog for 'employee'...
 - Found match in 'employees' with columns: ['employee_id', 'first_name', 'last_name', 'department', 'date_of_hire']


## 6. Compliance Checks

**Objective**: Automate compliance checks based on a set of policies. We'll define a small set of policies in code.

**Steps**:
1. Hard-code sample policies (e.g., "must have `last_updated` column").
2. Check a DataFrame (e.g., `employee_info`) for compliance.
3. Print pass/fail results.

In [7]:
def check_policies(df, policies):
    results = []
    
    for policy in policies:
        pid = policy["id"]
        desc = policy["description"]

        if pid == 1:
            # Must contain 'last_updated'
            if "last_updated" not in df.columns:
                results.append(f"FAIL: {desc}")
            else:
                results.append(f"PASS: {desc}")
        elif pid == 2:
            # Salary must be 'encrypted' (naive check)
            if "salary" in df.columns:
                if pd.api.types.is_numeric_dtype(df["salary"]):
                    results.append(f"FAIL: {desc}")
                else:
                    results.append(f"PASS: {desc} (not numeric, might be encrypted)")
            else:
                results.append(f"PASS: No salary column, policy not applicable.")
        else:
            results.append(f"Policy {pid} not recognized.")
    
    return results

# Demonstration
# Example policies
policies = [
    {"id": 1, "description": "Must contain 'last_updated' column."},
    {"id": 2, "description": "Salary information must be encrypted."}
]

df_employee_info = create_employee_info_df()
results = check_policies(df_employee_info, policies)
print("Employee Info DataFrame:\n", df_employee_info)
print("\nCompliance Check Results:")
for r in results:
    print(" -", r)


Employee Info DataFrame:
    employee_id first_name last_name  salary last_updated
0            1       John     Smith   40000   2023-02-01
1            2      Sarah   Johnson   50000   2023-02-05
2            3    Michael     Brown   45000   2023-02-03

Compliance Check Results:
 - PASS: Must contain 'last_updated' column.
 - FAIL: Salary information must be encrypted.


## 7. Data Versioning & Change Control

**Objective**: Show how to track changes to a dataset by versioning it and logging each change.

**Steps**:
1. Maintain a list of versions in memory (instead of saving to disk).
2. For each 'new version,' log a record of the change.

In [8]:
import time

version_store = []  # We'll store versions here in memory
change_log = []

def save_new_version_in_memory(df, description, user="unknown"):
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    # We'll store the DataFrame itself plus metadata
    version_info = {
        "timestamp": timestamp,
        "user": user,
        "description": description,
        "data": df.copy()
    }
    version_store.append(version_info)
    # Also log the change
    change_log.append({
        "timestamp": timestamp,
        "user": user,
        "description": description,
        "version_index": len(version_store) - 1
    })

# Demonstration
df_product_info = create_product_info_df()

# Initial version
save_new_version_in_memory(df_product_info, "Initial product info", user="system")

# Let's simulate a price update
df_product_info.loc[df_product_info["product_id"]=="P001", "price"] = 11.99
save_new_version_in_memory(df_product_info, "Updated price for P001", user="admin_user")

print("Version Store:")
for i, v in enumerate(version_store):
    print(f"Version {i}: {v['timestamp']} by {v['user']} -> {v['description']}")
    print(v["data"], "\n")

print("Change Log:")
for log_entry in change_log:
    print(log_entry)


Version Store:
Version 0: 2025-03-06 15:45:11 by system -> Initial product info
  product_id product_name  price
0       P001      WidgetA  10.99
1       P002      WidgetB   5.49
2       P003      WidgetC  14.99 

Version 1: 2025-03-06 15:45:11 by admin_user -> Updated price for P001
  product_id product_name  price
0       P001      WidgetA  11.99
1       P002      WidgetB   5.49
2       P003      WidgetC  14.99 

Change Log:
{'timestamp': '2025-03-06 15:45:11', 'user': 'system', 'description': 'Initial product info', 'version_index': 0}
{'timestamp': '2025-03-06 15:45:11', 'user': 'admin_user', 'description': 'Updated price for P001', 'version_index': 1}


# Conclusion

In real-world scenarios, you can connect to databases, files, or big data platforms, but the same **governance concepts** apply: metadata documentation, data quality enforcement, lineage logging, privacy, compliance, and more.