In [1]:
import pandas as pd
import numpy as np
import json
import subprocess
import re
import os
import shutil
from itertools import chain
import datetime


In [2]:
class ArqManipulation:
    """
    A utility class for file operations and data manipulation.
    """

    @staticmethod 
    def read_parquet_file(parquet_file_name: str) -> pd.DataFrame:
        """
        Reads a Parquet file and returns a DataFrame.

        :param parquet_file_name: Path to the Parquet file.
        :return: DataFrame with file contents.
        """
        try:
            if not os.path.exists(parquet_file_name):
                print(f"File '{parquet_file_name}' does not exist.")
                return pd.DataFrame()
            
            return pd.read_parquet(parquet_file_name)
        except Exception as e:
            raise RuntimeError(f"Error reading Parquet file '{parquet_file_name}': {e}")

    @staticmethod
    def save_df_to_parquet(df: pd.DataFrame, parquet_file_name: str):
        """
        Saves a DataFrame to a Parquet file.

        :param df: Dataframe to save.
        :param parquet_file_name: Parqueet saving path.
        """
        try:
            os.makedirs(os.path.dirname(parquet_file_name), exist_ok=True)
            df.to_parquet(parquet_file_name)
            print(f"DataFrame successfully saved to {parquet_file_name}")
        except Exception as e:
            raise RuntimeError(f"Error saving DataFrame to Parquet file '{parquet_file_name}': {e}")

    @staticmethod
    def clean_ansi_escape(base_str: str) -> str:
        """
        Removes ANSI escape values from a string.

        :param base_str: Unformmated string.
        :return: Cleaned string.
        """
        return re.sub(r'\x1B\[[0-9;]*[A-Za-z]', '', base_str)

    @staticmethod
    def parse_stdout_json(base_str: str) -> dict:
        """
        Parses JSON output from GitHub CLI after cleaning ANSI escape sequences.

        :param base_str: The raw output string from the GitHub CLI.
        :return: Parsed JSON dictionary.
        """
        try:
            cleaned = ArqManipulation.clean_ansi_escape(base_str)
            str_output = ''.join(cleaned.splitlines())
            return json.loads(str_output)
        except json.JSONDecodeError as e:
            raise e

    @staticmethod
    def json_to_df(parsed_json: dict) -> pd.DataFrame:
        """
        Converts a JSON dictionary to a sorted DataFrame with specific columns.

        :param parsed_json: Parsed JSON data.
        :return: Pandas DataFrame sorted by the 'createdAt' column.
        """
        try:
            df_json = pd.DataFrame(parsed_json)
            required_columns = ['name', 'createdAt', 'conclusion', 'status', 'databaseId', 'workflowDatabaseId']
            
            if not all(col in df_json.columns for col in required_columns):
                raise KeyError(f"Missing required columns in JSON data: {set(required_columns) - set(df_json.columns)}")

            df_json['createdAt'] = pd.to_datetime(df_json['createdAt'])
            return df_json[required_columns].sort_values(by="createdAt")
        except KeyError as e:
            raise ValueError(f"Error processing JSON to DataFrame: {e}")
        except Exception as e:
            raise RuntimeError(f"Unexpected error in json_to_df: {e}")

In [3]:
class ActionsWorkflow:
    """
    A class to extract GitHub Actions workflows using the GitHub CLI, generating a dataframe with returned data
    """

    def __init__(self, repository, query_size):
        """
        Initializes the ActionsWorkflow class.

        :param repository: GitHub repository in the format "owner/repo".
        :param query_size: Number of workflows to retrieve.
        """
        self.repository = repository
        self.json_attributes = '--json name,status,conclusion,createdAt,databaseId,workflowDatabaseId'
        self.query_size = query_size
        self.df = self.__gh_list_query__()

    def __gh_list_query__(self):
        """
        Calls the GitHub API via the GitHub CLI (`gh run list`) and retrieves
        a specified number of workflows.

        :return: A DataFrame containing the parsed workflow data.
        """
        try:
            list_command = f'gh run --repo {self.repository} list {self.json_attributes} -L {self.query_size}'
            
            output_json = subprocess.run(
                list_command, shell=True, text=True, check=True, capture_output=True
            ).stdout

            parsed_json = ArqManipulation.parse_stdout_json(output_json)
            df = ArqManipulation.json_to_df(parsed_json)

            ArqManipulation.save_df_to_parquet(df = df, parquet_file_name="./bin/actionsWorflow.parquet")

            return df.set_index('name')

        except subprocess.CalledProcessError as e:
            print(f"Error executing GitHub CLI command: {e}")
            return pd.DataFrame()  # Return an empty DataFrame on error



In [4]:
class ActionsJobs:
    """
    A class to interact with GitHub Actions jobs using the GitHub CLI.
    """

    def __init__(self, repository, workflow):
        """
        Initializes the ActionsJobs class.

        :param repository: GitHub repository in the format "owner/repo".
        :param workflow: Workflow associated with the jobs.
        """
        self.repository = repository
        self.workflow = workflow  

    def __retrieve_jobs__(self, database_id: int):
        command = f'gh run --repo {self.repository} view {database_id}'
        jobs_data = subprocess.run(command, shell=True, text=True, check=True, capture_output=True).stdout

        return jobs_data

    def get_jobs(self, database_id: int) -> pd.DataFrame:
            """
            Retrieves job data from the GitHub CLI and processes it.

            :param database_id: The ID of the workflow run.
            :return: A Pandas DataFrame containing job details.
            """
            try:
                jobs_df = ArqManipulation.read_parquet_file(parquet_file_name="./bin/actionsJobs.parquet")

                if jobs_df.empty:
                    data = self.__retrieve_jobs__(database_id=database_id)
                    jobs_df = self.__clean_job_text__(data)

                    jobs_df["databaseId"] = int(database_id)

                    ArqManipulation.save_df_to_parquet(jobs_df, parquet_file_name="./bin/actionsJobs.parquet")

                elif not database_id in jobs_df['databaseId'].values:
                    data = self.__retrieve_jobs__(database_id=database_id)
                    data_df = self.__clean_job_text__(data)
                    data_df["databaseId"] = int(database_id)

                    jobs_df = pd.concat([jobs_df, data_df], ignore_index=True)

                    ArqManipulation.save_df_to_parquet(jobs_df, parquet_file_name="./bin/actionsJobs.parquet")

                return jobs_df

            except subprocess.CalledProcessError as e:
                print(f"Error executing GitHub CLI command: {e}")
                return pd.DataFrame()

            except Exception as e:
                print(f"Unexpected error: {e}")
                return pd.DataFrame()
        
    def __split_string__(self, job_list):
        """
        Splits a job string into structured components.

        :param job: The job string to split.
        :return: A list of cleaned job attributes.
        """
        jobs = []

        for job in job_list:
            delimiters = r" \| | / build in | \(ID |\| in| / cleanup in | /| in " 
            splitted_job = re.split(delimiters, job)
            splitted_job = [s.strip() for s in splitted_job if s.strip()]
            jobs.append(splitted_job)
        
        jobs.pop(0)

        return jobs

    def __build_cleaned_df__(self, data):
        # Define columns
        columns = ["conclusion", "test", "buildTime (sec)", "jobId"]
        jobs_df = pd.DataFrame(columns=columns)
        jobs_df["failedAt"] = None

        for job in data:
            if any("ID" in item and ("PASSED" in item or "FAILED" in item) for item in job):
                temp_df = pd.DataFrame(self.__split_string__(job), columns=columns)

                temp_df['buildTime (sec)'] = temp_df['buildTime (sec)'].apply(str_time_to_int)
                jobs_df = pd.concat([jobs_df, temp_df], ignore_index=True)
            
            elif any("FAILED" in item for item in job):
                failed = next(item for item in job if "FAILED" in item).split("FAILED | ")
                if not jobs_df.empty:
                    jobs_df.at[jobs_df.index[-1], "failedAt"] = failed[1]  

        jobs_df["jobId"] = jobs_df["jobId"].str.rstrip(")").astype('int')
        return jobs_df


    def __find_jobs__(self, base_str: str) -> list[str]:
        lines = base_str.splitlines()
        arr = []  # Stores grouped sections
        current_group = []  # Temporary storage for the current section

        for line in lines:
            if line.isupper() or not line.strip():  # New section (uppercase or empty line)
                if current_group:  # Avoid adding empty groups
                    arr.append(current_group)
                current_group = [line]  # Start a new group
            else:
                current_group.append(line)

        if current_group:  # Append the last group
            arr.append(current_group)

        # Filter out groups that do not start with an uppercase title
        filtered_arr = [group for group in arr if group and group[0].isupper()]
        return filtered_arr

    def __clean_job_text__(self, base_str: str) -> pd.DataFrame:
        """
        Cleans and structures GitHub job data from CLI output.

        :param base_str: Raw job text output from the GitHub CLI.
        :return: A Pandas DataFrame with structured job data.
        """
        try:
            # Remove ANSI escape sequences and unwanted characters
            ansi_cleaned = ArqManipulation.clean_ansi_escape(base_str)
            cleaned = ansi_cleaned.replace("✓", "PASSED |").replace("X", "FAILED |")

            stripped_list = self.__find_jobs__(cleaned)

            if not (x.find('JOBS') or x.find("ANNOTATIONS") for x in stripped_list):
                return pd.DataFrame()

            jobs_df = self.__build_cleaned_df__(stripped_list)

            return jobs_df

        except Exception as e:
            print(f"Error processing job text: {e}")
            return pd.DataFrame()

    

def str_time_to_int(time_str: str) -> int:
    """
    Converts a time string to seconds.
    returns: int
    """
    names = ['d', 'h', 'm', 's']
    seconds = [86400, 3600, 60, 1]

    total_time = 0

    for m, t in zip(names,seconds):
        if m in time_str:
            time_list = time_str.split(m)
            total_time +=  int(time_list[0]) * t
            time_str = time_list[1]

    return total_time


In [5]:
class ActionsArtifacts:
    """
    A class to handle downloading, retrieving, and deleting GitHub Actions artifacts.
    """

    def __init__(self, repository: str):
        """
        Initializes the ActionsArtifacts object.

        :param repository: The GitHub repository in the format "owner/repo".
        """
        self.repository = repository
        self.folder = './artifacts/'  # Default storage dir
        self.paths = self.retrieve_downloaded_artifacts() 

    def download_artifact(self, database_id: str):
        """
        Downloads an artifact from GitHub Actions using the GitHub CLI.

        :param database_id: The database ID of the artifact to download.
        """
        try:
            # Ensure the folder exists before downloading
            os.makedirs(self.folder, exist_ok=True)

            # Construct the command to download the artifact
            command = f'gh run --repo {self.repository} download {database_id} --dir {os.path.join(self.folder, str(database_id))}'

            # Execute the command
            subprocess.run(command, shell=True, text=True, check=True)
            print("Download Successful")
        except subprocess.CalledProcessError as e:
            print(f"Error during artifact download: {e}")
        except Exception as e:
            print(f"Unexpected error: {e}")

    def retrieve_downloaded_artifacts(self) -> list[str]:
        """
        Retrieves all downloaded artifacts file paths.

        :return: returns Paths of the downloaded artifacts
        """
        paths = []

        # Walk through the artifacts folder and collect all file paths
        for path, _, files in os.walk(self.folder):
            for file in files:
                paths.append(os.path.join(path, file))

        return paths

    def delete_downloaded_artifacts(self):
        """
        Deletes all downloaded artifacts recursively
        """
        try:
            shutil.rmtree(self.folder)
            if os.path.exists(self.folder):
                print("Error: Failed to delete artifacts directory.")
            else:
                print("Artifacts directory deleted successfully.")
        except FileNotFoundError:
            print("Artifacts directory not found, nothing to delete.")
        except Exception as e:
            print(f"Error while deleting artifacts: {e}")


In [6]:
class PytestArtifactLogExtractor:
    """
    A class to extract and process test status and timing information from a pytest artifact log.
    """

    def __init__(self, path: str):
        """
        Initializes the PytestArtifactLogExtractor object.

        :param path: Path to the pytest artifact log file.
        """
        self.path = path
        self.data = self.__read_file__()

    def __read_file__(self):
        """
        Reads the contents of the log file and returns it as a string.

        :return: String containing the file content.
        """
        with open(self.path, "r") as file: 
            data = file.read()

        return ArqManipulation.clean_ansi_escape(data)

    def log_to_df(self):
        """
        Parses the log file to extract test results and performance metrics.

        :return: A DataFrame combining test statuses with time metrics.
        """

        df_parquet = ArqManipulation.read_parquet_file(parquet_file_name='pytest.log.parquet')   

        databaseId = self.__extract_self_path_info__().get('databaseId').get(0)
        databaseId = int(databaseId) if databaseId else None 
    
        if not df_parquet.empty and (databaseId in df_parquet['databaseId']):
            return df_parquet

        tests, categories, failures = self.__extract_all_categories__()
        
        # Creating dataframes test status and categories
        status_df = pd.DataFrame(tests, columns=["status", "name", "category", "arguments"]).set_index('name')
        categories_df = self.__create_df__(categories)
        failures_df = pd.DataFrame(failures,columns=['name', 'category', 'arguments', 'error', 'error_details']).set_index('name')

        # Labeling the dfs
        status_df.index.name = 'pytest_tests_status'
        categories_df.index.name = 'pytest_run_times'
        failures_df.index.name = 'pytest_failures_errors'

        # Applying individual id for each table
        status_df['databaseId'] = databaseId
        categories_df['databaseId'] = databaseId
        failures_df['databaseId'] = databaseId


        return status_df, categories_df, failures_df

    def __extract_all_categories__(self):
        """
        Converts extracted timing data into DataFrames.

        :param values: A list of lists with extracted time metrics.
        :type values: list[list]
        :return: A list of DataFrames with execution time statistics.
        :rtype: list[pandas.DataFrame]
        """
        header = []
        # Filtering out irrelevant categories
        keywords = ('deselected', 'passed in', 'grand total', 'live log')

        values = self.data.splitlines()
        for value in values:
            if any(k in value for k in keywords):
                continue   
            elif re.match(r'=+|-+', value): # Divide by headers demarked by '=' or '-' (logging)
                value = value.replace("=", "")  
                value = value.replace("-", "")  
                header.append([value]) 
            else:
                # Populate each category and break in the case of the pytest-durations tables while ignoring empty values
                value = re.split(r"\s+", value) 
                if list(filter(None, value)):
                    header[-1].append(list(filter(None, value)))

        headers = [['live_log','live_log','live_log']]
        if not 'live log' in self.data:
            headers = self.__extract_test_status_names__(self.__get_list_by_name__(header, 'session')[0])
            
        categories = self.__get_list_by_name__(header, 'duration top')
        failures = self.__extract_failures_errors__(self.__get_list_by_name__(header, 'summary')[0])

        return headers, categories, failures

    def __get_list_by_name__(self, data: list, name: str):
        """
        Find the sublist containing the specified name in the first element.

        :param data: A list of sublists to search through.
        :type data: list[list]
        :param name: The name to search for in the first element of each sublist.
        :type name: str
        :return: A list of sublists where the first element matches the name.
        :rtype: list[list]
        """
        matching_sublists = []
        
        for sublist in data:
            if re.search(name, sublist[0]):  # Converte os itens para string
                matching_sublists.append(sublist)
        
        return matching_sublists

    def __extract_test_status_names__(self, data):
        """
        Extracts the status and the tests names out of the pytest log, breaking them down to a list of lists.

        :param data: A list of lines containing test results.
        :type data: list[str]
        :return: list[list[str]]: A list of lists with test names, statuses (PASSED, FAILED, ERROR), and additional details.
        """

        tests = []
        keywords = ('PASSED', 'FAILED', 'ERROR')

        for line in data:
            line = ''.join(line).strip()
            
            if any(k in line for k in keywords):
                line = re.sub(r'\[.*?\d%\]', '', line)
                parts = line.split('::', maxsplit=1)

                match = re.search(r'(PASSED|FAILED|ERROR)', parts[0])
                if match:
                    test_name = parts[0][:match.start()].strip()
                    status = match.group(0)
                else:
                    test_name, status = parts[0], None

                tmp = [test_name, status]

                if len(parts) > 1:
                    values = list(filter(None, re.split(r'\[(.*?)\]', parts[1])))
                    tmp += values
                
                while len(tmp) < 4:
                    tmp.append(None)

                tests.append(tmp)
        
        return tests

    def __extract_failures_errors__(self, data):

        """
        Extracts from the pytest log the details of tests with failures or errors cleaning the data to make it ready to a dataframe.

        :param data: A list of strings containing test results.
        :type data: list[str]
        :return: list[list]: A list of lists containing details of tests with failures and/or errors.
        """

        # Regex asks for a string, cleaning it and concatening the list
        data_str = ''.join(list(''.join(d) for d in data[1:]))
        data_str = list(filter(None, re.split(r'(FAILED|ERROR)', data_str)))

        splitted_data = []

        # Splitting test from error
        for d in data_str:
            if d and ('FAILED' or 'ERROR') not in d:
                splitted_data.append(list(filter(None, re.split(r'\[(.*?)\]-|::|(\w+):([\w=*]+)', d))))   

        return splitted_data

    def __create_df__(self, values):
        """
        Converts extracted timing information into DataFrames.

        :param values: A list of lists containing extracted time metrics.
        :return: A list of DataFrames with execution time statistics.
        """
        dfs = pd.DataFrame()
        
        for h in values:
            time_df = pd.DataFrame(h[2:], columns=h[1])

            # Converting time-related columns to datetime.time format
            time_columns = ['avg', 'min', 'total']
            for col in time_columns:
                if col in time_df.columns:
                    time_df[col] = pd.to_timedelta(time_df[col], errors='coerce').dt.total_seconds().round(3)
                    
            # Assigning a 'durationType' column for metric categorization
            time_df['durationType'] = h[0].replace('top', '').replace('test', '')

            dfs = pd.concat([time_df, dfs], ignore_index=True)

        if 'name' in dfs.columns:
            dfs = dfs.set_index('name') 

        return dfs

    def __extract_self_path_info__(self):
        """
        Extracts test and database ID information from the log file path.

        :return: A DataFrame containing 'test' and 'databaseId' information.
        """
        # Extract filename without extension
        stripped = self.path.split('/')[-1].split('.')
        stripped.pop()  # Remove the file extension

        # Ensure there are exactly three elements (fill missing ones with None)
        while len(stripped) < 3:
            stripped.append(None)  # Fill missing values with NaN

        # Create DataFrame
        df = pd.DataFrame([stripped], columns=['test', 'region', 'databaseId'])

        return df

    def __merge_artifact_dfs__(self, times_df, status_df):
        """
        Merges test execution time data with test status information.

        :param times_df: A list of DataFrames containing time-related data.
        :param status_df: A DataFrame containing test statuses.
        :return: A combined DataFrame containing execution metrics and test results.
        """
        databaseId_df = self.__extract_self_path_info__()  
        order = ['category', 'durationType', 'databaseId', 'status', 'num', 'avg', 'min', 'total']
        dfs = []

        for h in times_df:
            joined_df = h.join(status_df)  # Merging time metrics with test statuses

            # Adding database ID to each row
            for col in databaseId_df.columns.values:
                joined_df[col] = databaseId_df[col].values[0]  

            # Reordering columns
            joined_df = joined_df[order]  
            dfs.append(joined_df)

        return pd.concat(dfs)  

### "Main"

Applying and creating each dataframe and their respective classes

In [7]:
repo_path = 'MagaluCloud/s3-specs'
query_size = 10

workflow = ActionsWorkflow(repository=repo_path, query_size=query_size)
workflow.df

DataFrame successfully saved to ./bin/actionsWorflow.parquet


Unnamed: 0_level_0,createdAt,conclusion,status,databaseId,workflowDatabaseId
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
Pull Request Essential Tests,2025-02-11 14:42:01+00:00,success,completed,13265481705,132962917
Pull Request Extra Tests,2025-02-11 14:42:01+00:00,success,completed,13265481700,142271933
Pull Request Extra Tests,2025-02-11 17:37:25+00:00,success,completed,13269014124,142271933
Pull Request Essential Tests,2025-02-11 17:37:25+00:00,success,completed,13269014120,132962917
Pull Request Essential Tests,2025-02-11 17:39:58+00:00,success,completed,13269057739,132962917
Pull Request Extra Tests,2025-02-11 17:39:58+00:00,success,completed,13269057738,142271933
Pull Request Extra Tests,2025-02-11 17:45:26+00:00,success,completed,13269149128,142271933
Pull Request Essential Tests,2025-02-11 17:45:26+00:00,success,completed,13269149127,132962917
Pull Request Essential Tests,2025-02-11 17:52:13+00:00,success,completed,13269265728,132962917
Pull Request Extra Tests,2025-02-11 17:52:13+00:00,success,completed,13269265723,142271933


In [8]:
jobs = ActionsJobs(repo_path, workflow)
jobs.get_jobs(13269014124)


Unnamed: 0,conclusion,test,buildTime (sec),jobId,failedAt,databaseId
0,PASSED,"extra_tests_debug (locking, ../params/br-se1.y...",2168,37043704535,,13269014124
1,PASSED,"extra_tests_debug (locking, ../params/br-ne1.y...",246,37043705230,,13269014124
2,PASSED,"extra_tests_dist (acl, ../params/br-ne1.yaml, ...",398,37043705967,,13269014124
3,PASSED,"extra_tests_dist (bucket_versioning, ../params...",118,37043706472,,13269014124
4,PASSED,"extra_tests_dist (policy, ../params/br-ne1.yam...",165,37043706867,,13269014124
5,PASSED,cleanup_tests,113,37045803570,,13269014124
6,PASSED,"run_tests (basic, basic, ../params.example.yam...",87,37031740475,,13265481705
7,PASSED,"run_tests (presign, presign, ../params.example...",73,37031741335,,13265481705
8,PASSED,"run_tests (cold_storage, cold_storage, ../para...",83,37031742054,,13265481705
9,PASSED,tests-success,0,37031853981,,13265481705


In [64]:
from itertools import chain


line = '[gw0] [ 80%] PASSED cold_storage_test.py::test_boto_change_object_class_to_cold_storage[dasdasdasdsad]'


match = re.search(r'(PASSED|FAILED|ERROR).*', line).group()
match = re.split(r'::', match, 1)
tmp = re.split('\s', match[0])
tmp += re.split(r'\[', match[1], maxsplit=1)

print(tmp)


['PASSED', 'cold_storage_test.py', 'test_boto_change_object_class_to_cold_storage', 'dasdasdasdsad]']


In [66]:
artifacts = ActionsArtifacts(repository=repo_path)
#a = artifacts.download_artifact(13269014124)
a = artifacts.retrieve_downloaded_artifacts()
a

['./artifacts/13269014120/output_artifact_cold_storage_example.13269014120.13269014120/pytest_output_cold_storage_example.13269014120.log',
 './artifacts/13269014120/output_artifact_presign_example.13269014120.13269014120/pytest_output_presign_example.13269014120.log',
 './artifacts/13269014120/output_artifact_basic_example.13269014120.13269014120/pytest_output_basic_example.13269014120.log',
 './artifacts/13269265728/output_artifact_basic.example.13269265728/pytest_output_basic.example.13269265728.log',
 './artifacts/13269265728/output_artifact_presign.example.13269265728/pytest_output_presign.example.13269265728.log',
 './artifacts/13269265728/output_artifact_cold_storage.example.13269265728/pytest_output_cold_storage.example.13269265728.log',
 './artifacts/13269057739/output_artifact_basic_example.13269057739/pytest_output_basic_example.13269057739.log',
 './artifacts/13269057739/output_artifact_cold_storage_example.13269057739/pytest_output_cold_storage_example.13269057739.log',
 '

In [67]:
artifact = PytestArtifactLogExtractor(path = a[0])
pytest_tests_status	, pytest_run_times, pytest_failures_errors = artifact.log_to_df()

# Trocar databaseId por jobId
display(pytest_tests_status)
display(pytest_run_times)
display(pytest_failures_errors)

File 'pytest.log.parquet' does not exist.


IndexError: list index out of range

In [None]:
total_passed = pytest_tests_status[pytest_tests_status.index.values != 'PASSED'].set_index('category')
total_passed.index.value_counts()

In [None]:
total_times = pd.Series(dict(map(lambda t: (t, sum(pytest_run_times[pytest_run_times.index == t].get('total'))), pytest_tests_status.category.unique())))
avg_time_test  = pd.Series(dict(map(lambda t: (t, sum(pytest_run_times[pytest_run_times.index == t].get('avg'))), pytest_tests_status.category.unique())))
min_test_time  = pd.Series(dict(map(lambda t: (t, sum(pytest_run_times[pytest_run_times.index == t].get('min'))), pytest_tests_status.category.unique())))
total_nums = pytest_tests_status['category'].value_counts()
total_passed = pytest_tests_status[pytest_tests_status.index.values != 'PASSED'].set_index('category').index.value_counts()
time_count_df = pd.concat([total_passed, total_nums - total_passed, total_nums, min_test_time, avg_time_test, total_times], axis=1)
time_count_df.columns = ['num_passed', 'num_failed', 'total_runs', 'min_test_time', 'avg_test_time', 'total_duration']

report_df = pd.DataFrame()
report_df['name'] = pytest_tests_status['category'].unique()
report_df = report_df.set_index('name')

report_df = pd.concat([report_df, time_count_df], axis=1)
report_df

In [None]:
failed_details_df = pytest_failures_errors.set_index('category').drop(columns=['arguments', 'databaseId'])
df = failed_details_df.reset_index()
df = report_df.reset_index().round(3)
df

Blocos que vão existir

Topo: Contendo Informações básicas do relatório e se possível alguns campos em branco e também a data de quando o código foi executado

Texto: Informações gerais do número de acertos e erros

Tabela: Contem o dataframe report_df, mas estilizado

Gráficos: fica pra dps

In [None]:
from reportlab.lib.pagesizes import A4
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Table, TableStyle, Spacer, ListFlowable, ListItem
from datetime import datetime

# Function to create PDF
def create_pdf(df):
    # A4 size dimensions
    width, height = A4

    # Set 10% margin
    margin = 0.1 * width

    # Create PDF with margins
    doc = SimpleDocTemplate("report_v0.pdf", pagesize=A4,
                            leftMargin=margin, rightMargin=margin, topMargin=0.1*height, bottomMargin=0.1*height)

    # Styles
    styles = getSampleStyleSheet()
    heading_style = styles['Heading1']
    normal_style = styles['Normal']
    normal_style.alignment = 0  # 0 for left alignment

    bold_style = ParagraphStyle(
        name="Bold",
        parent=normal_style,
        fontName="Helvetica-Bold",
        fontSize=12
    )

    # Create the story (content) for the PDF
    story = []

    # Add title with fields
    story.extend(create_title(heading_style,normal_style))

    # Add each section to the story
    story.extend(create_execution_summary(df, normal_style, bold_style))
    story.extend(create_detailed_results(df, normal_style, bold_style, width, margin))
    story.extend(create_errors_summary(normal_style, bold_style, width, margin))

    # Build PDF
    doc.build(story)

# 
def create_title(heading_style, normal_style):
    # Initialize the story list
    story = []

    # Get current date and time
    agora = datetime.now()
    horario_dia = agora.strftime("%d/%m/%Y %H:%M:%S")

    # Create the title
    title_text = "Sumário de Resultados dos Testes"
    title_paragraph = Paragraph(f"<b>{title_text}</b>", heading_style)

    # Create the formatted text for the execution date on the right side
    right_date_style = ParagraphStyle(
        "RightDateStyle", parent=normal_style, alignment=2, fontSize=10
    )
    date_paragraph = Paragraph(horario_dia, right_date_style)

    # Add title and date to the story as separate elements
    story.append(title_paragraph)
    story.append(date_paragraph)

    # Create the formatted text for the execution date, system version, and environment
    execution_paragraph = Paragraph(f"Data da Execução: ", normal_style)
    version_paragraph = Paragraph("Versão do Sistema: ", normal_style)
    environment_paragraph = Paragraph("Ambiente: ", normal_style)

    # Add other paragraphs to the story
    story.append(execution_paragraph)
    story.append(Spacer(1, 12))  # Spacer between execution and version
    story.append(version_paragraph)
    story.append(Spacer(1, 12))  # Spacer between version and environment
    story.append(environment_paragraph)

    story.append(Spacer(1, 24))  # Add space at the end

    # Return the complete story
    return story



# Function to create execution summary table with bullet points
def create_execution_summary(df, normal_style, bold_style):
    story = []
    story.append(Paragraph("Resumo Geral", bold_style))
    story.append(Spacer(1, 6))

    # Criando a lista de resumo corretamente
    summary_data = {
        'Total de Testes:': df['total_runs'].sum(),
        'Testes Bem-Sucedidos:': df['num_passed'].sum(),
        'Testes com Falha:': df['num_failed'].sum(),
        'Tempo Mínimo de Execução:': f"{df['min_test_time'].min():.2f} s",
        'Tempo Médio de Execução:': f"{df['avg_test_time'].mean():.2f} s",
        'Duração Total dos Testes:': f"{df['total_duration'].sum():.2f} s"
    }

    # Criando a lista com bullet points
    bullet_points = [
        ListItem(Paragraph(f"<b>{key}</b> {value}", normal_style), leftIndent=20, spaceAfter=6)
        for key, value in summary_data.items()
    ]

    # Criando o ListFlowable
    list_flowable = ListFlowable(bullet_points, bulletType='bullet', leftIndent=20)

    # Adicionando ao relatório
    story.append(list_flowable)
    story.append(Spacer(1, 24))

    return story

# Function to create detailed results table
def create_detailed_results(df, normal_style, bold_style, width, margin):
    story = []
    story.append(Paragraph("Detalhamento dos Testes", bold_style))
    story.append(Spacer(1, 12))
    df_renamed = df.copy()  # Create a copy of the DataFrame
    df_renamed.columns = [
        'Nome do Teste', 
        'Testes Bem-Sucedidos', 
        'Falhas', 
        'Execuções', 
        'Tempo Mínimo de Execução', 
        'Tempo Médio', 
        'Duração Total'
    ]

    # Dropping the specified columns
    df_renamed = df_renamed.drop(columns=['Testes Bem-Sucedidos', 'Tempo Mínimo de Execução'])

    df_renamed['Tempo Médio'] = df_renamed['Tempo Médio'].astype(str) + ' sec'
    df_renamed['Duração Total'] = df_renamed['Duração Total'].astype(str) + ' sec'

    # Prepare the detailed data for the table
    detailed_tests_data = [df_renamed.columns.tolist()]  # Add header
    detailed_tests_data.extend(
        [[Paragraph(str(value), normal_style) for value in row] for row in df_renamed.values.tolist()]
    )

    # Calculate available width after applying margins
    available_width = width - 2 * margin  # Subtracting left and right margins

    # Define column proportions
    proportions = [0.3, 0.15, 0.15, 0.15, 0.2]  # Example proportions

    total_proportion = sum(proportions)
    if total_proportion > 1:
        proportions = [p / total_proportion for p in proportions]  # Scale proportions to fit within 1

    # Calculate column widths based on the available width
    col_widths = [available_width * p for p in proportions]

    # Create the table
    detailed_table = Table(detailed_tests_data, colWidths=col_widths)
    detailed_table.setStyle(TableStyle([('ALIGN', (0, 0), (-1, -1), 'CENTER'),
                                        ('GRID', (0, 0), (-1, -1), 0.5, colors.black)]))
    story.append(detailed_table)
    story.append(Spacer(1, 24))

    return story

# Function to create errors summary as a numbered list
def create_errors_summary(normal_style, bold_style, width, margin):
    story = []
    story.append(Paragraph("Resumo dos Erros", bold_style))
    story.append(Spacer(1, 12))

    # Define um estilo menor para os números da lista
    small_number_style = ParagraphStyle(
        "small_number_style",
        parent=normal_style,
        fontSize=10  # Tamanho menor para os números
    )

    # Lista de erros
    errors_list = [
        "Timeout na comunicação com a API (Testes: TC_002)",
        "Falha de autenticação com banco de dados (Testes: TC_007)",
        "Erro de processamento de dados em alta carga (Testes: TC_015, TC_016)"
    ]

    # Criando itens da lista com mais espaço entre eles
    numbered_items = [
        ListItem(Paragraph(error, normal_style), leftIndent=margin, spaceAfter=10)
        for error in errors_list
    ]

    # Criando a lista numerada
    numbered_list = ListFlowable(
        numbered_items,
        bulletType='1',  # Define como lista numerada (1., 2., 3.)
        bulletFontSize=10,  # Define tamanho menor para os números
        leftIndent=margin*0.5  # Ajustando a indentação para alinhar com as margens
    )

    story.append(numbered_list)
    story.append(Spacer(1, 24))

    return story

# Gerar o PDF
create_pdf(df)


In [None]:
import pandas as pd
from reportlab.lib.pagesizes import A4
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.lib.units import cm  # Import cm from reportlab
from reportlab.platypus import SimpleDocTemplate, Spacer, Table, TableStyle, Paragraph


# Convert DataFrame to list of lists (including header), but wrap text in Paragraph
styles = getSampleStyleSheet()
normal_style = styles['Normal']

table_data = []
# Add the header (which is a list of strings, converted to Paragraphs)
header = [Paragraph(col, normal_style) for col in df.columns]
table_data.append(header)

# Add rows, converting each entry to a Paragraph
for _, row in df.iterrows():
    row_data = [Paragraph(str(value), normal_style) for value in row]
    table_data.append(row_data)

# Function to calculate column widths based on page size
def calculate_column_widths(page_width, num_columns, margin=1*cm):
    """
    Calculate column widths based on the page width and number of columns.
    Optionally, adjust for a margin between columns.
    """
    available_width = page_width - 4 * margin 
    column_width = available_width / num_columns  
    return column_width  

def get_table_style():
    return TableStyle([
        ('BACKGROUND', (0, 0), (-1, 0), (0.8, 0.8, 0.8)),  # Light grey background for header
        ('GRID', (0, 0), (-1, -1), 0.5, (0.5, 0.5, 0.5)),  # Grey grid lines
        ('ALIGN', (0, 0), (-1, -1), 'CENTER'),  # Center align both horizontally and vertically for all cells
        ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),  # Bold font for headers
        ('FONTSIZE', (0, 0), (-1, -1), 10),  # Font size for all cells
        ('BOTTOMPADDING', (0, 0), (-1, -1), 5),  # Padding at the bottom of each cell
        ('TOPPADDING', (0, 0), (-1, -1), 5),  # Padding at the top of each cell
        ('LEFTPADDING', (0, 0), (-1, -1), 10),  # Padding at the left side of each cell
        ('RIGHTPADDING', (0, 0), (-1, -1), 10),  # Padding at the right side of each cell
        ('ALIGN', (0, 0), (-1, 0), 'CENTER'),  # Center-align header
    ])


# Function for the first page
def myFirstPage(canvas, doc):
    page_width = A4[0] 

    # Calculate column widths based on the page width (in cm)
    num_columns = len(df.columns)
    colWidths = calculate_column_widths(page_width, num_columns)


    canvas.saveState()
    canvas.setFont("Helvetica", 12)
    canvas.drawString(100, 750, f"sizePage{page_width}, num={num_columns} column={colWidths}")
    canvas.restoreState()

# Function for subsequent pages
def myLaterPages(canvas, doc):
    canvas.saveState()
    canvas.setFont("Helvetica", 12)
    canvas.drawString(100, 750, "This is a later page")
    canvas.restoreState()

# Function to create the document
def go():
    doc = SimpleDocTemplate("hello_with_dataframe_as_table_dynamic_widths_cm.pdf", pagesize=A4)
    
    num_columns = len(df.columns)
    colWidths = calculate_column_widths(A4[0], num_columns)

    Story = [Spacer(1, 2 * cm)]  # Use cm for Spacer as well

    # Create the Table object with the data and dynamically calculated column widths
    table = Table(table_data, colWidths)
    table.setStyle(get_table_style())

    # Add the table to the story
    Story.append(table)
    Story.append(Spacer(1, 0.2 * cm))  # Use cm for spacing

    # Generate the PDF
    doc.build(Story, onFirstPage=myFirstPage, onLaterPages=myLaterPages)

# Call the function to generate the PDF
go()


In [None]:
# 1. Pedir o id do workflow que se quer um relatorio ou varios ids
# 2. Colocar todos os jobs do workflow dentro de um dataframe
# 3. Para cada job gerar os 3 dataframes necessarios


# Data de criação do workflow e seu Id
# JobId | Nome | Tempo | Falhas/Acertos | Erros (sem detalhes)

# Plotting

## Workflow Df

In [None]:
Faz o teste -> gera graficos com tempo e taxa de falhas por tipo de teste

Workflow -> Job -> Passos -> Resultados pytest 



In [None]:
import matplotlib.pyplot as plt

# Define color mapping
colors = {
    'failure': 'firebrick',
    'cancelled': 'darkgray',
    'startup_failure': 'darkorange',
    'success':  'darkgreen'
}

# Filter the DataFrame
a = workflow.df[workflow.df['status'] == 'completed']

# Get value counts of the 'conclusion' columnimport matplotlib.pyplot as plt

# Define color mapping
colors = {
    'failure': 'firebrick',
    'cancelled': 'darkgray',
    'startup_failure':'darkorange',
    'success':  'darkgreen'

}

# Filter the DataFrame
a = workflow.df[workflow.df['status'] == 'completed']

# Get value counts of the 'conclusion' column
value_counts = a['conclusion'].value_counts()

# Map colors to the categories in value_counts
bar_colors = [colors[cat] for cat in value_counts.index]

# Create a figure with two subplots side by side
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))  # 1 row, 2 columns

# Plot the bar chart on the first subplot
value_counts.plot.bar(color=bar_colors, ax=ax1)
ax1.set_xlabel('Conclusion')
ax1.set_ylabel('Count')
ax1.set_title('Bar Chart: Conclusion Counts')

# Plot the pie chart on the second subplot
value_counts.plot.pie(colors=bar_colors, autopct='%1.1f%%', ax=ax2)
ax2.set_ylabel('')  # Remove the y-label for the pie chart
ax2.set_title('Pie Chart: Conclusion Distribution')

# Adjust layout for better spacing
plt.tight_layout()

# Show the plots
plt.show()
value_counts = a['conclusion'].value_counts()

# Map colors to the categories in value_counts
bar_colors = [colors[cat] for cat in value_counts.index]

# Create a figure with two subplots side by side
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))  # 1 row, 2 columns

# Plot the bar chart on the first subplot
value_counts.plot.bar(color=bar_colors, ax=ax1)
ax1.set_xlabel('Conclusion')
ax1.set_ylabel('Count')
ax1.set_title('Bar Chart: Conclusion Counts')

# Plot the pie chart on the second subplot
value_counts.plot.pie(colors=bar_colors, autopct='%1.1f%%', ax=ax2)
ax2.set_ylabel('')  # Remove the y-label for the pie chart
ax2.set_title('Pie Chart: Conclusion Distribution')

# Adjust layout for better spacing
plt.tight_layout()

# Show the plots
plt.show()

## Jobs Df

In [None]:
jobs = ActionsJobs(repository=repo_path, workflow=workflow)
ids = workflow.df['databaseId'].unique()
all_job_dfs = [jobs.get_jobs(id)for id in ids]
jobs_df = pd.concat(all_job_dfs)
jobs_df

In [None]:
import matplotlib.pyplot as plt

def plot_failed_passed_jobs_bars(df):
    unique_names = df.groupby(['Test', 'Conclusion']).size().unstack(fill_value=0)
    test_to_number = {test: i + 1 for i, test in enumerate(df['Test'].unique())}

    # Define colors for 'FAILED' and 'PASSED'
    colors = {
        'FAILED': 'firebrick',
        'PASSED': 'darkgreen'
    }

    ax = unique_names.plot.bar(color=[colors['FAILED'], colors['PASSED']], figsize=(8, 4))

    # Add labels and title
    ax.set_xlabel('Test')
    ax.set_ylabel('Count')
    ax.set_title('FAILED vs PASSED by Test')

    # Change the x-tick labels to their respective numbers
    ax.set_xticklabels([test_to_number[test] for test in unique_names.index], rotation=0)

    # Create a legend for the test numbers and names
    test_legend = [f"{num}. {test}" for test, num in test_to_number.items()]
    plt.figtext(1.05, 0.5, "\n".join(test_legend), va='center', fontsize=10, wrap=True)

    # Show the plot
    plt.tight_layout()
    plt.show()

# Example usage
#plot_failed_passed_jobs_bars(jobs_df[jobs_df['Conclusion'] == 'FAILED'])
plot_failed_passed_jobs_bars(jobs_df)