In [None]:
# Parameters (Injected by Papermill)
logs_data = []  # Papermill will inject this


In [None]:
from datetime import datetime, timezone
import IPython.display as ipython

# Extract Pipeline Name from logs
pipeline_name = (
    logs_data[0]["log_metrics"].get("Pipeline Name", "Unknown Pipeline")
    if logs_data else "Unknown Pipeline"
)

# Generate report timestamp
report_timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")

# Display title and metadata
ipython.display(ipython.Markdown(f"# PySpark Pipeline Log Report for {pipeline_name}"))
ipython.display(ipython.Markdown(f"**Report Generated On:** {report_timestamp}"))
ipython.display(ipython.Markdown("""This report analyses PySpark pipeline logs,
                                 focusing on Resource Estimate Score over time."""))
ipython.display(ipython.Markdown("""Lower scores are better and can be used for
                                 comparative analysis within the same Data Pipeline
                                 or different Data Pipelines."""))

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Ensure logs_data is not empty
if not logs_data:
    error_msg = "logs_data is empty. Ensure Papermill is passing correct data."
    raise ValueError(error_msg)


## Data Processing

This section extracts Timestamps and Resource Estimate Score from the PySpark logs.

In [None]:
# Convert logs data to a DataFrame
df = pd.DataFrame(logs_data)

# Extract relevant columns
df["timestamp"] = df["log_metrics"].apply(lambda x: x["Timestamp"])
df["pipeline_score"] = df["cost_metrics"].apply(lambda x: x["costs"]["pipeline_cost"])
df["pipeline_name"] = df["log_metrics"].apply(
    lambda x: x.get("Pipeline Name", "Unknown Pipeline"),
)

# Convert timestamp to datetime
df["timestamp"] = pd.to_datetime(df["timestamp"], unit="ms")

# Keep only required columns
df = df[["timestamp", "pipeline_score"]]

# Display processed DataFrame
df

## Cost Over Time

This section visualises the Resource Estimate Score over time using a simple line plot.

In [None]:
# Get the pipeline name from the first record, defaulting to "Pipeline Report"
pipeline_name = (
    logs_data[0]["log_metrics"].get("Pipeline Name", "Pipeline Report")
    if logs_data else "Pipeline Report"
)

# Sort values for correct ordering
df = df.sort_values(by="timestamp")

# Scatter plot for better visualization of discrete points
ax = df.plot(
    x="timestamp",
    y="pipeline_score",
    kind="scatter",
    figsize=(10,5),
    color="b",
)

# Set title dynamically based on pipeline name
ax.set_title(f"Resource Estimate Score Over Time - {pipeline_name}")

plt.xlabel("Timestamp")
plt.ylabel("Resource Estimate Score")
plt.grid(True)
plt.show()

In [None]:
# Ensure there is data to process
if not df.empty:
    # Create time-based aggregations
    df["week"] = df["timestamp"].dt.to_period("W").astype(str)
    df["month"] = df["timestamp"].dt.to_period("M").astype(str)
    df["quarter"] = df["timestamp"].dt.to_period("Q").astype(str)
    df["year"] = df["timestamp"].dt.to_period("Y").astype(str)

    # Aggregate costs by different time periods
    weekly_cost = df.groupby("week")["pipeline_score"].sum().reset_index()
    monthly_cost = df.groupby("month")["pipeline_score"].sum().reset_index()
    quarterly_cost = df.groupby("quarter")["pipeline_score"].sum().reset_index()
    yearly_cost = df.groupby("year")["pipeline_score"].sum().reset_index()

    # Merge into a single summary table
    summary_table = pd.DataFrame({
        "Time Period": ["Weekly", "Monthly", "Quarterly", "Yearly"],
        "Total Resource Estimate Score": [
            weekly_cost["pipeline_score"].sum() if not weekly_cost.empty else 0,
            monthly_cost["pipeline_score"].sum() if not monthly_cost.empty else 0,
            quarterly_cost["pipeline_score"].sum() if not quarterly_cost.empty else 0,
            yearly_cost["pipeline_score"].sum() if not yearly_cost.empty else 0,
        ],
    })

    # Display summary table
    ipython.display(ipython.Markdown("## Resource Estimate Score Summary Table"))
    ipython.display(ipython.Markdown("""Aggregates total pipeline Resource Estimate
                                     Score over weekly, monthly, quarterly,
                                     and yearly periods.
                                     """))
    ipython.display(summary_table)
else:
    ipython.display(ipython.Markdown("No data available for summary calculations."))