<a href="https://colab.research.google.com/github/AnalystChidinma/anything_data/blob/main/Solution.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

"""
Scenario:

You're building a monitoring system for a data pipeline in a data lakehouse architecture.
Your job is to analyze the metadata logs from multiple data sources and determine the
health status of each pipeline run.

Each pipeline run is logged in a JSON-like format that will be provided below.

You are given a list of such logs for different pipelines. Your task is to analyze each log and apply the
following complex conditional rules to determine the pipeline health status:

✅ Evaluation Rules:

Assign a health_status field with one of the following values:

    - "HEALTHY"
    - "WARNING"
    - "CRITICAL"

Based on the following logic:

1. HEALTHY if:

    - status_code is 200 AND
    - errors is empty AND
    - warnings is empty or only includes "late data arrival" AND
    - duration_seconds is less than 600 AND
    - max_latency_seconds is less than 10

2. WARNING if any of the following:

    - status_code is 200 AND
        - duration_seconds is between 600 and 1200 OR
        - max_latency_seconds is between 10 and 30 OR
        - warnings contains non-late data warning messages
    - OR there are fewer than 100 records ingested (record_count < 100) but no errors

3. CRITICAL if:

    - status_code is not 200
    - OR there are one or more errors
    - OR duration_seconds > 1200
    - OR max_latency_seconds > 30
    - OR record_count == 0

🎯 Your Task:

1. Write a function evaluate_pipeline_health(log) that takes a single log dictionary and returns the same dictionary with a new key health_status assigned based on the above rules.

2. Write a function evaluate_all_pipelines(logs: List[Dict]) -> List[Dict] to apply this to a list of logs.

3. Print a summary:

    - Total pipelines evaluated

    - Count of each health status category

🧪 Bonus Challenge (optional):

    - Add a rule: if the ingestion time is between midnight and 4 AM UTC, and the pipeline is "CRITICAL",
    mark it for "High Priority Alert".
"""

In [6]:
# Example logs for testing
pipeline_log = {
    "pipeline_name": "user_event_ingestion",
    "status_code": 200,
    "errors": [],
    "warnings": ["late data arrival"],
    "duration_seconds": 452,
    "max_latency_seconds": 5.6,
    "record_count": 124500,
    "ingestion_time_utc": "2025-10-22T05:28:00Z",
    "source" : "kafka"
}


# list of logs like above Example
logs = [
    {
        "pipeline_name" : "user_event_ingestion",
    "status_code" : 200,
    "errors" : [],
    "warnings" : ["late data arrival"],
    "duration_seconds" : 452,
    "max_latency_seconds" : 5.6,
    "record_count" : 124500,
    "ingestion_time_utc" : "2025-10-22T05:28:00Z",
    "source" : "kafka"
    },
    {
        "pipeline_name" : "transaction_data_load",
    "status_code" : 500,
    "errors" : ["Database connection timeout"],
    "warnings" : [],
    "duration_seconds" : 1300,
    "max_latency_seconds" : 45.2,
    "record_count" : 0,
    "ingestion_time_utc" : "2025-10-22T14:15:00Z",
    "source" : "s3"
    },
    {

        "pipeline_name": "product_catalog_sync",
        "status_code": 200,
        "duration_seconds": 800,
        "record_count": 80,
        "max_latency_seconds": 15.0,
        "errors": [],
        "warnings": ["schema mismatch"],
        "ingestion_time": "2025-10-08T09:00:00Z",
        "source": "api"
    },
    {
        "pipeline_name": "inventory_update",
        "status_code": 200,
        "duration_seconds": 300,
        "record_count": 1500,
        "max_latency_seconds": 8.0,
        "errors": [],
        "warnings": [],
        "ingestion_time": "2025-10-08T03:45:00Z",
        "source": "ftp"
    }

]

***QUESTION 1***

Write a function evaluate_pipeline_health(log) that takes a single log dictionary and returns the same dictionary with a new key health_status assigned based on the above rules.

In [19]:
# healthy_satus condition
def evaluate_pipeline_health(log):
    if (
        log["status_code"] == 200
        and log["errors"] == []
        and (log["warnings"] == [] or log["warnings"] == ["late data arrival"])
        and log["duration_seconds"] < 600
        and log["max_latency_seconds"] < 10

      )  :
        log.update({"health_status": "HEALTHY"})

#warning

    elif (
         log["status_code"] == 200
        and (log["duration_seconds"] >= 600 and log["duration_seconds"] <= 1200)
        or (log["max_latency_seconds"] >= 10 and log["max_latency_seconds"] <= 30)
        or log["warnings"] != ["late data arrival"]
        or log["record_count"] < 100 and log["errors"] == []

        ):
        log.update({"health_status": "WARNING"})

#Critical

    elif (
        log["status_code"] != 200 or ["errors"] != []
        or log["duration_seconds"] > 1200
        or log["max_latency_seconds"] > 30
        or log["record_count"] == 0
    ):
        log.update({"health_status": "CRITICAL"})


#unknown

    else:
        log.update({"health_status": "UNKNOWN"})
    return log

evaluate_pipeline_health(pipeline_log)

{'pipeline_name': 'user_events_ingestion',
 'status_code': 200,
 'duration_seconds': 452,
 'record_count': 124500,
 'max_latency_seconds': 5.6,
 'errors': [],
 'ingestion_time': '2025-10-08T02:30:00Z',
 'source': 'kafka',
 'health_status': 'HEALTHY'}

***QUESTION 2***

Write a function evaluate_all_pipelines(logs: List[Dict]) -> List[Dict] to apply this to a list of logs.



In [9]:
# evaluate all pipelines logs

health_evaluation_logs = []

def evaluate_all_pipelines(logs):

  for log in logs:
    # Apply HEALTHY condition
    if (
        log["status_code"] == 200
        and log["errors"] == []
        and (log["warnings"] == [] or log["warnings"] == ["late data arrival"])
        and log["duration_seconds"] < 600
        and log["max_latency_seconds"] < 10
      ):
        log["health_status"] = "HEALTHY"

    # Apply WARNING condition
    elif (
         log["status_code"] == 200
        and (
            (log["duration_seconds"] >= 600 and log["duration_seconds"] <= 1200)
            or (log["max_latency_seconds"] >= 10 and log["max_latency_seconds"] <= 30)
            or (log["warnings"] != [] and log["warnings"] != ["late data arrival"])
        )
    ) or (log["record_count"] < 100 and log["errors"] == []):
        log["health_status"] = "WARNING"

    # Apply CRITICAL condition
    else:
        log["health_status"] = "CRITICAL"


    health_evaluation_logs.append(log)

  return health_evaluation_logs

evaluated_logs = evaluate_all_pipelines(logs)
print(evaluated_logs)



In [10]:
# summary of total pipeline executed and count of each health satuts category

healthy_count = 0
warning_count = 0
critical_count = 0

for log in evaluated_logs:
    if log["health_status"] == "HEALTHY":
        healthy_count += 1
    elif log["health_status"] == "WARNING":
        warning_count += 1
    elif log["health_status"] == "CRITICAL":
        critical_count += 1

print(f"Total pipelines evaluated: {len(evaluated_logs)}")
print(f"HEALTHY: {healthy_count}")
print(f"WARNING: {warning_count}")
print(f"CRITICAL: {critical_count}")

Total pipelines evaluated: 4
HEALTHY: 2
CRITICAL: 1


In [None]:
#Bonus Challenge -

