In [1]:
import logging
logging.basicConfig(filename='./information_quality.log', 
                    filemode='w',
                    format='%(asctime)s;%(name)s;%(levelname)s;%(message)s',
                    level=logging.DEBUG)

logger = logging.getLogger('MI_defects')

logger.info('Configuration started')

In [6]:
try:
    import requests
    import os.path
    import math
    import time
    import datetime
    import pandas as pd
    import matplotlib.pyplot as plt
    import numpy as np
    import yaml
    from datetime import datetime, timedelta

except Exception as e:
    logger.error(f'Error importing modules: {e.msg}')

logger.info('All modules imported successfully')


In [5]:
try:
    with open("config.yaml") as file:
        config = yaml.safe_load(file)
except Exception as e:
    logger.error(f'Error loading configuration file: {e.msg}')

try:
    sources = config.get("defect_data_sources")
    selection = config.get("pick_defect_data_from")
    source = sources[selection]
except Exception as e:
    logger.error(f'Error accessing configuration values: {e.msg}')
logger.info('Configuration loaded successfully')

In [11]:

# get stuff from config
url = source["url"]
product = source["product"]
#file_name = config.get("raw_defect_data")
days = config.get("get_defect_data_going_back_days")

# Function to download and cache data files to csv based on yaml config
def get_raw_data(file_name, url, product, days):
    # update url with search thing since no REST API available
    data_url=url + "/buglist.cgi"

    # Date window as an absolute date (safer than "-Xd" for some instances)
    since = (datetime.utcnow() - timedelta(days=days)).strftime("%Y-%m-%d")
    
    # Changed implementation to repeated tuples since I was only getting 1000 status NEW issues
    # This works (thanks ChatGPT)
    data_params = [
        ("product", product),
        ("query_format", "advanced"),
        ("bug_status", "NEW"),
        ("bug_status", "ASSIGNED"),
        ("bug_status", "REOPENED"),
        ("bug_status", "RESOLVED"),
        ("bug_status", "CLOSED"),
        ("chfield", "[Bug creation]"),
        ("chfieldfrom", since),
        ("chfieldto", "Now"),
        ("limit", "0"),
        ("ctype", "csv"),
        ("columnlist", "bug_id,product,component,assigned_to,bug_status,resolution,severity,bug_severity,opendate,changeddate"),
        ("order", "bug_id DESC"),
    ]
    print("Downloading fresh data to {}".format(file_name))
    try:
        r = requests.get(data_url, params=data_params)
        try:
            with open(file_name, "w") as f:
                f.write(r.text)
        except Exception as e:
            logger.error(f'Error writing raw data to file: {e.msg}')
        logger.info('Raw data saved to file successfully')
    except Exception as e:
        logger.error(f'Error downloading raw data: {e.msg}')
    logger.info('Raw data download complete')

# Make raw data into pandas dataframe
def get_data(bugzilla_url, product, days):
    filename = "data_{}_{}.csv".format(product,days)
    get_raw_data(filename, bugzilla_url, product, days)
    print('Data download complete')
    df = pd.read_csv(filename, header=0, parse_dates=["opendate", "changeddate"])
    return df

RAW_DATA = get_data(url, product, days)

Downloading fresh data to data_gcc_180.csv
Data download complete


In [12]:
# BASE MEASURES

# Look in df to see the total number of resolved issues/entries
base_measure_total_resolved_issues = len(RAW_DATA[(RAW_DATA["bug_status"] == "RESOLVED") | (RAW_DATA["bug_status"] == "CLOSED")])
print("Total number of resolved issues:", base_measure_total_resolved_issues)
if base_measure_total_resolved_issues == 0:
    logger.warning("No resolved issues found in the data!")
elif base_measure_total_resolved_issues < 0:
    logger.error("Strange things going on. Count < 0: {}".format(base_measure_total_resolved_issues))
else:
    logger.info("Total resolved issues OK.")

# Look in the df to see how many issues there are in total
base_measure_total_issues = len(RAW_DATA)
print("Total number of issues:", base_measure_total_issues)
if base_measure_total_issues == 0:
    logger.warning("No issues found in the data!")
else:
    logger.info("Total issues OK.")

# Look in the df to see how many of these issues were not bugs (i.e., enhancements)
base_measure_total_resolved_non_bugs = len(RAW_DATA[((RAW_DATA["bug_status"] == "RESOLVED") | (RAW_DATA["bug_status"] == "CLOSED")) & (RAW_DATA["bug_severity"]=="enhancement")])
print("Total number of resolved non-bugs:", base_measure_total_resolved_non_bugs)
if base_measure_total_resolved_non_bugs == 0:
    logger.warning("No resolved non-bugs found in the data!")
else:
    logger.info("Total resolved non-bugs OK.")

# Save curated data for further analysis
CURATED_DATA_FILE = "base_measure_bugs_{}_{}.csv".format(product,days)

# Create df for base measures
CURATED_DATA = RAW_DATA[((RAW_DATA["bug_status"] == "RESOLVED") | (RAW_DATA["bug_status"] == "CLOSED")) & (RAW_DATA["bug_severity"]!="enhancement")]
CURATED_DATA.to_csv(CURATED_DATA_FILE, index=False)
print("Curated data saved to {}".format(CURATED_DATA_FILE))

# Save curated data for further analysis
CURATED_DATA_FILE = "base_measure_non_bugs_{}_{}.csv".format(product,days)

# Create df for base measures
CURATED_DATA = RAW_DATA[((RAW_DATA["bug_status"] == "RESOLVED") | (RAW_DATA["bug_status"] == "CLOSED")) & (RAW_DATA["bug_severity"]=="enhancement")]
CURATED_DATA.to_csv(CURATED_DATA_FILE, index=False)
print("Curated data saved to {}".format(CURATED_DATA_FILE))

Total number of resolved issues: 1064
Total number of issues: 1735
Total number of resolved non-bugs: 50
Curated data saved to base_measure_bugs_gcc_180.csv
Curated data saved to base_measure_non_bugs_gcc_180.csv


In [13]:
# function that returns a df with the resultion time per issue
def get_resolution_time_df(df):
    # Filter for resolved or closed issues
    resolved_df = df[(df['bug_status'] == 'RESOLVED') | (df['bug_status'] == 'CLOSED')].copy()
    if resolved_df.empty:
        logger.warning("No resolved or closed issues found for resolution time calculation.")
    else:
        logger.info("Resolved issues found for resolution time calculation.")

    # Calculate resolution time in days
    resolved_df['resolution_time_days'] = (resolved_df['changeddate'] - resolved_df['opendate']).dt.total_seconds() / (24 * 3600)
    if resolved_df['resolution_time_days'].isnull().all():
        logger.warning("Resolution time calculation resulted in all NaN values.")
    elif (resolved_df['resolution_time_days'] < 0).any():
        logger.error("Resolution time calculation resulted in negative values.")
    elif resolved_df['resolution_time_days'].isnull().sum() > 0:
        logger.warning("Some resolution time values are NaN.")
    else:
        logger.info("Resolution time calculation completed successfully.")
    
    # Select relevant columns
    try:
        resolution_time_df = resolved_df[['bug_id', 'opendate', 'changeddate', 'resolution_time_days']]
    except Exception as e:
        logger.error("Error selecting relevant columns: {}".format(e))
        return None

    return resolution_time_df

RESOLUTION_TIME_DF = get_resolution_time_df(RAW_DATA)
if RESOLUTION_TIME_DF is not None:
    RESOLUTION_TIME_DF.to_csv("resolution_time_{}_{}.csv".format(product,days), index=False)
    print("Resolution time data saved to resolution_time_{}_{}.csv".format(product,days))
else:
    logger.error("Resolution time DataFrame is None; skipping save operation.")

Resolution time data saved to resolution_time_gcc_180.csv


In [14]:
# DERIVED MEASURES

# Function to calculate average resolution time
def calculate_average_resolution_time(df):
    try:
        average_time = df['resolution_time_days'].mean()
    except Exception as e:
        logger.error("Error calculating average resolution time: {}".format(e))
        return None
    return average_time
average_resolution_time = calculate_average_resolution_time(RESOLUTION_TIME_DF)
print("Average resolution time (days):", average_resolution_time)

Average resolution time (days): 13.85676186995266


In [15]:
# Function to calculate fraction of enhancements / other issues among unresolved issues per day
def calculate_fraction_unresolved_non_bugs(df):
    unresolved_df = df[~df['bug_status'].isin(['RESOLVED', 'CLOSED'])]
    total_unresolved = unresolved_df.shape[0]
    if total_unresolved == 0:
        return 0.0
    unresolved_non_bugs = unresolved_df[unresolved_df['bug_severity'] == 'enhancement'].shape[0]
    try:
        fraction = unresolved_non_bugs / total_unresolved
    except ZeroDivisionError:
        fraction = 0.0
        logger.error("Division by zero when calculating fraction of unresolved non-bugs.")
    return fraction

fraction_unresolved_non_bugs = calculate_fraction_unresolved_non_bugs(RAW_DATA)
print("Fraction of unresolved non-bugs among unresolved issues:", fraction_unresolved_non_bugs)

# Function to calculate historical fraction of non-bugs over time
def calculate_historical_non_bug_fraction(df):
    # Group by date and calculate daily fraction
    df_copy = df.copy()
    df_copy['date'] = df_copy['opendate'].dt.date
    
    daily_fractions = []
    for date in df_copy['date'].unique():
        daily_issues = df_copy[df_copy['date'] == date]
        total_daily = len(daily_issues)
        non_bugs_daily = len(daily_issues[daily_issues['bug_severity'] == 'enhancement'])

        try:
            fraction = non_bugs_daily / total_daily
        except ZeroDivisionError:
            fraction = 0.0
            logger.error("Division by zero when calculating historical non-bug fraction.")

        daily_fractions.append({'date': date, 'fraction_non_bugs': fraction})
    
    return pd.DataFrame(daily_fractions).sort_values('date')

historical_fractions = calculate_historical_non_bug_fraction(RAW_DATA)
print(f"Historical non-bug fraction - Mean: {historical_fractions['fraction_non_bugs'].mean():.3f}, "
      f"Std: {historical_fractions['fraction_non_bugs'].std():.3f}")



Fraction of unresolved non-bugs among unresolved issues: 0.15648286140089418
Historical non-bug fraction - Mean: 0.091, Std: 0.117


In [16]:
# INDICATORS

# Traffic light indicator based on YAML file settings for fraction of non-bug / bug work
def traffic_light_indicator(value, thresholds):
    """
    Returns traffic light color based on non-bug fraction thresholds
    GREEN: Low non-bug fraction (≤ 10% - mostly bug fixes, good!)
    YELLOW: Moderate non-bug fraction (10-33% - balanced workload)
    RED: High non-bug fraction (> 33% - too much enhancement work, focus on bugs!)
    """
    if value <= thresholds['green']:
        return 'GREEN'
    elif value <= thresholds['yellow']:
        return 'YELLOW'
    else:
        return 'RED'

# Get thresholds from config
thresholds = config['indicators']['non_bug_fraction']

# Calculate indicators
current_indicator = traffic_light_indicator(fraction_unresolved_non_bugs, thresholds)


# Display results
print("=== TRAFFIC LIGHT INDICATORS ===")
print(f"Current non-bug fraction: {fraction_unresolved_non_bugs:.3f} ({fraction_unresolved_non_bugs*100:.1f}%) → {current_indicator}")

print(f"\nThresholds (for non-bug work):")
print(f"  GREEN: ≤ {thresholds['green']*100:.0f}% non-bug work (focus on bug fixes)")
print(f"  YELLOW: {thresholds['green']*100:.0f}% - {thresholds['yellow']*100:.0f}% non-bug work (balanced workload)")
print(f"  RED: > {thresholds['yellow']*100:.0f}% non-bug work (too much enhancement work)")


=== TRAFFIC LIGHT INDICATORS ===
Current non-bug fraction: 0.156 (15.6%) → YELLOW

Thresholds (for non-bug work):
  GREEN: ≤ 10% non-bug work (focus on bug fixes)
  YELLOW: 10% - 33% non-bug work (balanced workload)
  RED: > 33% non-bug work (too much enhancement work)
