## Automated Data Quality Monitoring
**Objective**: Use Great Expectations to perform data profiling and write validation rules.

1. Data Profiling with Great Expectations

### Profile a JSON dataset with product sales data to check for null values in the 'ProductID' and 'Price' fields.
- Create an expectation suite and connect it to the data context.
- Use the `expect_column_values_to_not_be_null` expectation to profile these fields.
- Review the summary to identify any unexpected null values.

In [1]:
import os
import pandas as pd
from great_expectations.data_context import get_context
from great_expectations.core.batch import RuntimeBatchRequest

# Setup paths
project_dir = os.getcwd()
ge_root_dir = os.path.join(project_dir, "great_expectations")
json_path = os.path.join(project_dir, "data", "product_sales.json")

# Load JSON file into a DataFrame
df = pd.read_json(json_path)

# Get Great Expectations context
context = get_context(context_root_dir=ge_root_dir)

# Add Pandas datasource (Fluent API)
datasource = context.sources.add_pandas(name="pandas_json_datasource")

# Create or load expectation suite
suite_name = "product_sales_null_check_suite"
context.add_or_update_expectation_suite(expectation_suite_name=suite_name)

# Create RuntimeBatchRequest
batch_request = RuntimeBatchRequest(
    datasource_name="pandas_json_datasource",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="product_sales",
    runtime_parameters={"batch_data": df},
    batch_identifiers={"default_identifier_name": "default"},
)

# Get validator
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=suite_name
)

# Add expectations to check for null values
validator.expect_column_values_to_not_be_null("ProductID")
validator.expect_column_values_to_not_be_null("Price")

# Save expectations
validator.save_expectation_suite()

# Run checkpoint
results = context.run_checkpoint(
    name="product_sales_null_check_checkpoint",
    validations=[{
        "batch_request": batch_request,
        "expectation_suite_name": suite_name
    }]
)

# Display results
if results.success:
    print("✅ Data quality check PASSED: No nulls in 'ProductID' or 'Price'.")
else:
    print("❌ Data quality check FAILED: Nulls found in one or both fields.")

# Optional: Show report path
html_path = os.path.join(ge_root_dir, "uncommitted", "data_docs", "local_site", "index.html")
print(f"\n📊 View report: file://{html_path}")

FileNotFoundError: File /workspaces/AI_DATA_ANALYSIS_/src/Module 10/Automating Data Quality Checks in Data Pipelines/data/product_sales.json does not exist

2. Writing Validation Rules for Data Ingestion

### Define validation rules for an API data source to confirm that 'Status' field contains only predefined statuses ('Active', 'Inactive').

- Apply `expect_column_values_to_be_in_set` to check field values during data ingestion.
- Execute the validation and review any mismatches.

In [2]:
import os
import pandas as pd
from great_expectations.data_context import get_context
from great_expectations.core.batch import RuntimeBatchRequest

# Step 1: Simulate API data response
api_data = [
    {"UserID": 101, "Status": "Active"},
    {"UserID": 102, "Status": "Inactive"},
    {"UserID": 103, "Status": "Pending"},  # Invalid
    {"UserID": 104, "Status": "Active"}
]

# Step 2: Load API data into a DataFrame
df = pd.DataFrame(api_data)

# Step 3: Define Great Expectations context
project_dir = os.getcwd()
ge_root_dir = os.path.join(project_dir, "great_expectations")
context = get_context(context_root_dir=ge_root_dir)

# Step 4: Register Pandas datasource (Fluent API)
context.sources.add_pandas(name="api_datasource")

# Step 5: Create or update expectation suite
suite_name = "api_status_validation_suite"
context.add_or_update_expectation_suite(expectation_suite_name=suite_name)

# Step 6: Create RuntimeBatchRequest
batch_request = RuntimeBatchRequest(
    datasource_name="api_datasource",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="api_status_data",
    runtime_parameters={"batch_data": df},
    batch_identifiers={"default_identifier_name": "default"},
)

# Step 7: Get a validator
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=suite_name
)

# Step 8: Apply validation rule
validator.expect_column_values_to_be_in_set(
    column="Status",
    value_set=["Active", "Inactive"]
)

# Step 9: Save expectation suite
validator.save_expectation_suite()

# Step 10: Run validation
results = context.run_checkpoint(
    name="api_status_validation_checkpoint",
    validations=[{
        "batch_request": batch_request,
        "expectation_suite_name": suite_name
    }]
)

# Step 11: Print result summary
if results.success:
    print("✅ Validation PASSED: All 'Status' values are valid.")
else:
    print("❌ Validation FAILED: Found invalid 'Status' values.")

# Optional: Report path
html_path = os.path.join(ge_root_dir, "uncommitted", "data_docs", "local_site", "index.html")
print(f"\n📊 View validation report: file://{html_path}")

AttributeError: 'FileDataContext' object has no attribute 'sources'