This notebook analyses the downloaded repositories and extracts information about the workflows, jobs and steps. 

The notebook generates the following files: 

 - all_repositories.csv: the file contains all the repositories that were considered during the data extraction process. It contains various data about the repositories, such as their name, main branch, creation and update dates, stars, forks, etc. 
 - repositories.csv: the file contains all the repositories that have at least one associated workflow. It's a subset of the previous file.
 - workflows.csv: the file contains an entry for each of the workflow files we found in the repositories. For each workflow, it indicates the repository, the filename, the name of the workflow, the list of events that trigger it (including reusable workflows) and the number of jobs.
 - jobs.csv: the file contains an entry for each of the jobs in workflow files. For each job, we indicate the repository, the workflow file, the job id and name (if any), whether it corresponds to (i.e., use "uses:") another workflow and which one and the number of steps.
 - steps.csv: the file contains an entry for each steps we found in jobs. For each step, we report on the repository, the workflow file, the job id, step name (if any), step position in the job, and the name of the action (i.e., the "uses:" field) if any, the number of lines in the "run:" field, if any.

In [1]:
import pandas as pd
import ruamel.yaml as yaml
from tqdm import tqdm 

from multiprocessing import Pool
from pathlib import Path
from functools import partial

In [2]:
# Path to repositories
REPO_DIR = Path('/data/ghactions')

# Path to data folder
DATA_DIR = Path('../data/')

# Number of parallel jobs
WORKERS = 40

In [3]:
# Initial list of repositories. We will scan the REPO_DIR folder to see which ones 
# were effectively extracted and have a non-empty .github/workflows/ folder

FIELDS = {
    'Name': 'repository',
    'Default Branch': 'branch',
    'Main Language': 'language',
    'Created At': 'created',
    'Last Commit': 'updated',
    'Last Commit SHA': 'commit',
    'Stargazers': 'stars',
    'Watchers': 'watchers',
    'Forks': 'forks',
    'Size': 'size',
    'Branches': 'branches',
    'Commits': 'commits',
    'Contributors': 'contributors',
    'Total Issues': 'issues',
    'Total Pull Requests': 'prs',
}

df_input = (
    pd.read_csv('../data-raw/repositories.csv')
    [FIELDS.keys()]
    .rename(columns=FIELDS)
)

Let's define a function that will extract the "interesting parts" of the yaml files.

In [4]:
def extract_workflow(path):
    """
    Given a path to a workflow file, extract parts of its content and return a dictionary mimicking the parts
    of the y(a)ml file that are of interest (see documentation in this notebook for more details). 
    """
    output = dict()
    with open(path) as f: 
        workflow = yaml.round_trip_load(f)
    
    if workflow is None:
        return output
    
    # Name of the workflow
    output['name'] = workflow.get('name')
    
    # List of events that trigger the workflow
    if isinstance(workflow['on'], str):
        output['events'] = [workflow.get('on')]
    elif isinstance(workflow['on'], list):
        output['events'] = list(workflow['on'])
    elif isinstance(workflow['on'], dict):
        output['events'] = list(workflow['on'].keys())
    else:
        assert False, f'Unsupported type {type(workflow.get("on"))} for workflow.on field'
        
    # List of jobs
    jobs = workflow.get('jobs', dict())
    output['jobs'] = extract_jobs(jobs)
        
    return output
    
    
def extract_jobs(jobs):
    output = dict()
    
    for id, job in jobs.items():
        output[id] = dict()
        
        output[id]['name'] = job.get('name')
        output[id]['uses'] = job.get('uses')
        output[id]['steps'] = extract_steps(job.get('steps', []))
        
    return output


def extract_steps(steps):
    output = []
    
    for i, step in enumerate(steps):
        item = dict()
        
        item['name'] = step.get('name')
        item['position'] = i + 1
        item['uses'] = step.get('uses')
        _run = step.get('run', None)
        item['run'] = len(_run.split('\n')) if _run is not None else 0
        output.append(item)
        
    return output

Let's define a thin wrapper to handle outputs and errors.

In [5]:
def job(repository):
    # Look if repository exists
    path = REPO_DIR / repository.replace('/', '---') / '.github/workflows'
    if not path.exists():
        return None
    
    # Look for workflow files
    workflows = dict()
    for file in path.iterdir():
        if file.suffix in ['.yaml', '.yml']:
            try:
                workflows[file.name] = extract_workflow(file)
            except Exception as e:
                workflows[file.name] = e
    return workflows

In [6]:
output = []
inputs = list(df_input.itertuples())

with Pool(processes=WORKERS) as pool:
    jobs = pool.imap(job, [x.repository for x in inputs])
    for repo, result in tqdm(zip(inputs, jobs), total=len(inputs)):
        output.append((repo, result))

100%|███████████████████████████████████| 69147/69147 [00:53<00:00, 1282.03it/s]


In [7]:
print(f'There are {len([x for x in output if x[1] is not None])} repositories with workflows out of {len(output)}.')
_ = [x for x in output if x[1] is not None and any([isinstance(w, Exception) for w in x[1].values()])]
print(f'There were {len(_)} errors during the process.')

There are 29828 repositories with workflows out of 69147.
There were 121 errors during the process.


Now we can export these results as csv files. 

In [8]:
# Lists to store data (they will be converted to DataFrames afterward)
m_repositories = []
m_workflows = []
m_jobs = []
m_steps = []

for repository, workflows in output:
    # Skip if there is no parsed workflow
    if workflows is None or all([isinstance(e, Exception) for e in workflows.values()]):
        continue
    
    m_repositories.append(tuple([
        getattr(repository, field) for field in FIELDS.values()
    ]))
    
    for filename, workflow in workflows.items():
        if isinstance(workflow, Exception):
            continue
            
        m_workflows.append((
            repository.repository,
            filename,
            workflow.get('name'),
            ', '.join(workflow.get('events', [])),
            len(workflow.get('jobs', [])),
        ))
        
        for job_id, job in workflow.get('jobs', dict()).items():
            m_jobs.append((
                repository.repository,
                filename,
                job_id,
                job.get('name'),
                job.get('uses'),
                len(job.get('steps', [])),
            ))
            
            for step in job.get('steps', []):
                m_steps.append((
                    repository.repository,
                    filename,
                    job_id,
                    step.get('name'),
                    step['position'],
                    step.get('uses'),
                    step['run'],
                ))

In [9]:
len(m_repositories), len(m_workflows), len(m_jobs), len(m_steps)

(29778, 70278, 108500, 576352)

In [10]:
df_repositories = (
    pd.DataFrame(m_repositories, columns=FIELDS.values())
    .set_index('repository')
)
df_workflows = (
    pd.DataFrame(m_workflows, columns=['repository', 'filename', 'name', 'events', 'jobs'])
    .set_index(['repository', 'filename'])
)
df_jobs = (
    pd.DataFrame(m_jobs, columns=['repository', 'filename', 'id', 'name', 'uses', 'steps'])
    .set_index(['repository', 'filename', 'id'])
)
df_steps = (
    pd.DataFrame(m_steps, columns=['repository', 'filename', 'job', 'name', 'pos', 'uses', 'run'])
    .set_index(['repository', 'filename', 'job', 'pos'])
)

In [11]:
len(df_repositories), len(df_workflows), len(df_jobs), len(df_steps)

(29778, 70278, 108500, 576352)

In [15]:
df_input[FIELDS.values()].set_index('repository').to_csv(DATA_DIR / 'all_repositories.csv.gz', compression='gzip')
df_repositories.to_csv(DATA_DIR / 'repositories.csv.gz', compression='gzip')
df_workflows.to_csv(DATA_DIR / 'workflows.csv.gz', compression='gzip')
df_jobs.to_csv(DATA_DIR / 'jobs.csv.gz', compression='gzip')
df_steps.to_csv(DATA_DIR / 'steps.csv.gz', compression='gzip')