This notebook analyses the workflow files we found in the downloaded repositories and extracts information about the workflows, jobs and steps for each distinct workflow file. 

It requires `data/workflow_files.csv.gz` and a local directory containing all the distinct workflow files extracted from the repositories. These files are generated by the `Extract workflows` notebook.

The notebook generates the following files: 

 - workflows.csv: the file contains an entry for each of the workflow files we found. For each workflow, it indicates the name of the workflow, the list of events that trigger it (including reusable workflows) and the number of jobs.
 - jobs.csv: the file contains an entry for each of the jobs in workflow files. For each job, we indicate the workflow file, the job id and name (if any), a hash of the job, whether it corresponds to (i.e., use "uses:") another workflow and which one and the number of steps.
 - steps.csv: the file contains an entry for each steps we found in jobs. For each step, we report on the workflow file, the job id, step name (if any), a hash of this step, step position in the job, and the name of the action (i.e., the "uses:" field) if any, the number of lines in the "run:" field, if any, a hash of these commands (if any), the number of parameters and a hash of these parameters.

In [1]:
import pandas as pd
import ruamel.yaml as yaml
import multiprocessing
import os

from tqdm import tqdm 

from pathlib import Path
from functools import partial
from hashlib import sha256

In [2]:
# Path to workflow files
WORKFLOW_DIR = Path('/data/ghactions/workflows')

# Path to data folder
DATA_DIR = Path('../data/')

In [3]:
df_input = (
    pd.read_csv('../data/workflow_files.csv.gz')
)

Let's define a function that will extract the "interesting parts" of the yaml files.

In [70]:
def extract_workflow(path):
    """
    Given a path to a workflow file, extract parts of its content and return a dictionary mimicking the parts
    of the y(a)ml file that are of interest (see documentation in this notebook for more details). 
    """
    output = dict()
    
    # See https://yaml.readthedocs.io/en/latest/api.html#duplicate-keys
    # some workflows have duplicated keys (e.g. "if" or "env", I don't know why...)
    parser = yaml.YAML()
    parser.allow_duplicate_keys = True
    
    with open(path) as f: 
        workflow = parser.load(f)
    
    if workflow is None:
        return output
    
    # Name of the workflow
    output['name'] = workflow.get('name')
    
    # List of events that trigger the workflow
    on = workflow.get('on', None)
    if isinstance(on, str):
        output['events'] = [on]
    elif isinstance(on, list):
        output['events'] = list(on)
    elif isinstance(on, dict):
        output['events'] = list(on.keys())
    elif on is None:
        output['events'] = list()
    else:
        assert False, f'Unsupported type {type(workflow.get("on"))} for workflow.on field'
        
    # List of jobs
    jobs = workflow.get('jobs')
    if job is None:
        raise ValueError('No job defined', workflow)
    output['jobs'] = extract_jobs(jobs)
        
    return output
    
    
def extract_jobs(jobs):
    output = dict()
    
    for id, job in jobs.items():
        output[id] = dict()
        
        output[id]['name'] = job.get('name')
        output[id]['hash'] = sha256(str.encode(str(job))).hexdigest()
        output[id]['uses'] = job.get('uses')
        output[id]['steps'] = extract_steps(job.get('steps', []))
        
    return output


def extract_steps(steps):
    output = []
    
    for i, step in enumerate(steps):
        item = dict()
        
        item['name'] = step.get('name')
        item['hash'] = sha256(str.encode(str(step))).hexdigest()
        item['position'] = i + 1
        item['uses'] = step.get('uses')
        _run = step.get('run', None)
        if _run is not None: 
            _run = str(_run)
            item['run'] = len(_run.split('\n'))
            item['run_hash'] = sha256(str.encode(_run)).hexdigest()
            
        # Action parameters
        if step.get('with') is not None: 
            item['parameters'] = len(step['with'])
            item['parameters_hash'] = sha256(str.encode(str(step['with']))).hexdigest()
        
        output.append(item)
        
    return output

Let's define a thin wrapper to handle outputs and errors.

In [71]:
def job(filepath):
    path = WORKFLOW_DIR / (filepath + '.yaml')
    if not path.exists():
        return None
    
    try:
        return extract_workflow(path)
    except Exception as e:
        return e

In [72]:
output = []
done = []

In [73]:
inputs = [x for x in df_input.workflow.drop_duplicates().to_list() if x not in done]

with multiprocessing.Pool() as pool:
    jobs = pool.imap(job, inputs)
    for filepath, r in tqdm(zip(inputs, jobs), total=len(inputs)):
        output.append((filepath, r))
        done.append(filepath)

100%|██████████████████████████████████| 229024/229024 [04:35<00:00, 832.21it/s]


Now we can export these results as csv files. 

In [74]:
# Lists to store data (they will be converted to DataFrames afterward)
m_workflows = []
m_jobs = []
m_steps = []

for filepath, workflow in output: 
    if isinstance(workflow, Exception):
        # print('-', filepath, workflow)
        continue

    m_workflows.append((
        filepath,
        workflow.get('name'),
        ', '.join(workflow.get('events', [])),
        len(workflow.get('jobs', [])),
    ))

    for job_id, job in workflow.get('jobs', dict()).items():
        m_jobs.append((
            filepath, 
            job_id,
            job['hash'],
            job.get('name'),
            job.get('uses'),
            len(job.get('steps', [])),
        ))

        for step in job.get('steps', []):
            m_steps.append((
                filepath, 
                job_id,
                step['hash'],
                step.get('name'),
                step['position'],
                step.get('uses'),
                step.get('run', 0),
                step.get('run_hash'),
                step.get('parameters', 0),
                step.get('parameters_hash'),
            ))

In [75]:
len(m_workflows), len(m_jobs), len(m_steps)

(228519, 432659, 2579227)

In [79]:
df_workflows = (
    pd.DataFrame(m_workflows, columns=['workflow', 'name', 'events', 'jobs'])
    .set_index(['workflow'])
)
df_jobs = (
    pd.DataFrame(m_jobs, columns=['workflow', 'id', 'hash', 'name', 'uses', 'steps'])
    .set_index(['workflow', 'id'])
)
df_steps = (
    pd.DataFrame(m_steps, columns=['workflow', 'job', 'hash', 'name', 'pos', 'uses', 'run', 'run_hash', 'parameters', 'parameters_hash'])
    .set_index(['workflow', 'job', 'pos'])
)

In [80]:
df_workflows.to_csv(DATA_DIR / 'workflows.csv.gz', compression='gzip')
df_jobs.to_csv(DATA_DIR / 'jobs.csv.gz', compression='gzip')
df_steps.to_csv(DATA_DIR / 'steps.csv.gz', compression='gzip')