Lets have look at workflow metadata databse

In [None]:
import pandas as pd

# Adjust the path if necessary
metadata_path = '../../data-raw/Workflows-Metadata/workflows.csv.gz'

# Read the first few lines of the compressed CSV
df_workflows = pd.read_csv(metadata_path, compression='gzip')
print(len(df_workflows))
df_workflows.head(10)


In [None]:
print(f"Total number of workflows: {len(df_workflows)} ")
print (f"Unique number of workflows: {df_workflows['uid'].nunique()}")

In [None]:
matching_rows = df_workflows[(df_workflows["file_hash"] == df_workflows["previous_file_hash"]) & (df_workflows["file_path"] == df_workflows["previous_file_path"] )]

matching_rows


Now its time filter valid workflow files. Since we want to only extract path/value pairs from those their validness checked.

In [None]:
import pandas as pd

# Adjust the path if necessary
metadata_path = './data-raw/Workflows-Metadata/workflows.csv.gz'
output_valid_hashes='./data/valid_workflow_hashes.parquet'
# Load the metadata dataset
df_workflows = pd.read_csv(metadata_path, compression='gzip')

# Filter only the rows where the workflow is valid
df_valid_workflows = df_workflows[df_workflows['valid_workflow'] == True] 

# Extract the unique file hashes of valid workflows
valid_workflow_hashes = df_valid_workflows['file_hash'].unique()

df_valid_hashes = pd.DataFrame(valid_workflow_hashes, columns=['file_hash'])
df_valid_hashes.to_parquet(output_valid_hashes, index=False)

# At this point, we have an array of file_hashes that correspond to valid workflows.


print(f"Number of valid workflows: {len(valid_workflow_hashes)} out of: {len(df_workflows)} workflows")
print("Example of valid workflow hashes:", valid_workflow_hashes[:5])


 We can then use `valid_workflow_hashes` in the next steps to:
 1. Load only those workflows from Workflow Files.
 2. Extract (path, value) pairs from them.
 3. Store the results.

Now for this cell we will do these three steps

In [None]:
import os
import pandas as pd
from ruamel.yaml import YAML
import pyarrow as pa
import pyarrow.parquet as pq
import logging

# Paths and configurations
extracted_path = '../data-raw/Workflow-Files/workflows'
valid_hashes_parquet = '../data/valid_workflow_hashes.parquet'
output_file = '../data/workflows_data.parquet'
log_file = 'error_log.txt'

batch_size = 500
yaml = YAML(typ='safe', pure=True)
yaml.allow_duplicate_keys = False

# Set up logging
logging.basicConfig(filename=log_file, level=logging.ERROR, format='%(asctime)s - %(message)s')

# Load valid workflow hashes
df_valid_hashes = pd.read_parquet(valid_hashes_parquet)
valid_hashes = set(df_valid_hashes['file_hash'])

# Get list of YAML files
all_files = os.listdir(extracted_path)
yaml_files = [f for f in all_files if f in valid_hashes]
total_files = len(yaml_files)

# Check for existing log file and remove it
if os.path.exists(log_file):
    os.remove(log_file)

# Helper functions
def check_for_on_key(data):
    if "on" in data:
        w_on = data["on"]
        if not isinstance(w_on, dict):
            if not isinstance(w_on, list):
                w_on = [w_on]
            data["on"] = {k: None for k in w_on}
    return data

def extract_paths(data, prefix=None):
    """
    Recursively extract the "path" of each leaf node as a list of keys,
    and the corresponding value of that leaf node.
    """
    if prefix is None:
        prefix = []  # start empty

    results = []
    if isinstance(data, dict):
        for key, value in data.items():
            new_prefix = prefix + [key]
            if isinstance(value, (dict, list)):
                results.extend(extract_paths(value, new_prefix))
            else:
                results.append((new_prefix, value))
    elif isinstance(data, list):
        for index, item in enumerate(data):
            new_prefix = prefix + [str(index)]
            if isinstance(item, (dict, list)):
                results.extend(extract_paths(item, new_prefix))
            else:
                results.append((new_prefix, item))
    return results

def process_file(file):
    file_path = os.path.join(extracted_path, file)
    workflow_hash = file
    batch_data = []

    try:
        with open(file_path, 'r') as f:
            workflow_data = yaml.load(f)
        workflow_data = check_for_on_key(workflow_data)

        paths_and_values = extract_paths(workflow_data)
        for path, value in paths_and_values:
            # Ensure each path is a proper list
            if not isinstance(path, list):
                path = [str(path)]  # Coerce to a list of strings
            batch_data.append({
                'workflow_hash': workflow_hash,
                'path': path,
                'value': value
            })

    except Exception as e:
        logging.error(f"Error processing file {file}: {str(e)}")
    return batch_data

# Initialize an empty DataFrame to accumulate data
accumulated_data = []

# Process files in batches
for i in range(0, total_files, batch_size):
    batch_files = yaml_files[i: i + batch_size]
    batch_data = []

    # Process each file in the batch
    for file in batch_files:
        file_data = process_file(file)
        batch_data.extend(file_data)

    if batch_data:
        df_batch = pd.DataFrame(batch_data)
        df_batch['workflow_hash'] = df_batch['workflow_hash'].astype(str)
        df_batch['value'] = df_batch['value'].astype(str)
        
        # Accumulate batch data
        accumulated_data.append(df_batch)

    print(f"Processed {min(i + batch_size, total_files)}/{total_files} files...")

# Concatenate all accumulated data into a single DataFrame
final_df = pd.concat(accumulated_data, ignore_index=True)

# Ensure `path` is a list of strings for every row
final_df['path'] = final_df['path'].apply(lambda x: [str(elem) for elem in x] if isinstance(x, list) else [])

# Define the schema for PyArrow
schema = pa.schema([
    ('workflow_hash', pa.string()),
    ('path', pa.list_(pa.string())),  # Explicitly define path as a list of strings
    ('value', pa.string()),
])

# Convert the DataFrame to a PyArrow Table with the schema
table = pa.Table.from_pandas(final_df, schema=schema)

# Write the table to a Parquet file
pq.write_table(table, output_file)

print(f"Processing completed! Data written to {output_file}")


Experimenting: getting number of distinct workflows (to check we successessfully process all the workflows)

In [None]:
"""
Experimenting: getting number of distinct workflows ...

"""

import pandas as pd

# Load the Parquet file
df = pd.read_parquet('../../data/workflows_data.parquet')

# Group by 'workflow_id' and count unique workflows
unique_workflows = df['workflow_hash'].nunique()

# Get total number of rows and columns
total_rows = len(df)
total_columns = len(df.columns)

# Display the information
print(f"Total number of unique workflows: {unique_workflows}")
print(f"Total number of rows: {total_rows}")
print(f"Total number of columns: {total_columns}")



In [None]:
import pandas as pd
import numpy as np

# Load the Parquet file
df = pd.read_parquet('../../data/workflows_data.parquet')

# Convert any string 'None' values in the 'value' column to np.nan
# df['value'] = df['value'].replace('None', None)

# Filter the DataFrame for rows where the 'path' starts with 'on.'
on_key_df = df[df['path'].apply(lambda path_list: path_list[0] == 'on')]

# Identify rows where the 'value' is not in dictionary format or is None
incorrect_values_df = on_key_df[(on_key_df['value'].isnull()) | 
                                (on_key_df['value'].apply(lambda x: isinstance(x, str) and x.lower() == 'none'))]

# Display the results
incorrect_values_df

In [None]:
incorrect_values_df[2000:]

In [None]:
"""
Experimenting: getting number of distinct workflows ...

"""

import pandas as pd

# Load the Parquet files
df = pd.read_parquet('../../data/workflows_data.parquet')

# Group by 'workflow_id' and count unique workflows
unique_workflows = df['workflow_hash'].nunique()

# Get total number of rows and columns
total_rows = len(df)
total_columns = len(df.columns)

# Display the information
print(f"Total number of unique workflows: {unique_workflows}")
print(f"Total number of rows: {total_rows}")
print(f"Total number of columns: {total_columns}")



In [None]:
# Reading and returning the head of data from parquet file containing our path/value pairs

df.head(20)

No we have a databse of workflows path/value pairs which each row is for a path/value pair of a workflow. As we can see each workflow got rows as many as its path/value pairs.

Its time to enrich the database from more features for our evolution analysis. Since we want to study size, complexity, and use of concepts, we need more features (columns) from the metadata database. So we did a join to have an enriched database. we can adjust our columns in columns_to_keep list.

In [None]:
pwd

In [None]:
# import pandas as pd
# import pyarrow.parquet as pq

# # Paths to the databases
# path_value_file = '../../data/workflows_data.parquet'
# metadata_file = '../../data-raw/Workflows-Metadata/workflows.csv.gz'

# # Load the metadata file
# df_metadata = pd.read_csv(metadata_file, compression='gzip')

# # Select columns to keep from metadata
# columns_to_keep = [
#     'repository', 'commit_hash', 'committed_date',
#     'file_hash', 'previous_file_hash', 'uid'
# ]
# df_metadata = df_metadata[columns_to_keep]

# # Read parquet file in smaller batches
# parquet_file = pq.ParquetFile(path_value_file)
# batch_size = 100000  

# df_enhanced_list = []

# for batch in parquet_file.iter_batches(batch_size):
#     df_chunk = batch.to_pandas()
#     df_chunk_enhanced = df_chunk.merge(
#         df_metadata,
#         left_on='workflow_hash',
#         right_on='file_hash',
#         how='inner'
#     )
#     df_enhanced_list.append(df_chunk_enhanced)

# # Concatenate all chunks into a single DataFrame
# df_enhanced = pd.concat(df_enhanced_list, ignore_index=True)

# # Save the enhanced dataset
# enhanced_output_file = './data/enhanced_workflows_data.parquet'
# # df_enhanced.to_parquet(enhanced_output_file, index=False, engine='pyarrow')

# # Display summary
# print(f"Enhanced dataset created with {len(df_enhanced)} rows.")
# df_enhanced.head()


In [None]:
pwd

In [None]:
duplicate_file_hashes = df_metadata[df_metadata.duplicated('file_hash', keep=False)]
print(f"Number of duplicate file_hashes: {len(duplicate_file_hashes)}")
duplicate_file_hashes.head(10)


In the next cell, we will detect which concepts are used in a given workflow path (if any).

Concepts List:

Matrix Strategy,
Permissions,
Triggers,
Reusable Actions,
Reusable Workflows,
Secrets,
Environment Variables,
Conditional Statements,
Job Dependencies,

These concepts are identified in the detect_concept function using predefined regex patterns.

In [None]:
import pandas as pd
import re
from tqdm import tqdm
import time
import math
import pyarrow as pa
import pyarrow.parquet as pq

# Read the parquet file
file_path = '../../data/workflows_data.parquet'
output_file_path = '../../data/workflows_path_value_concepts.parquet'



import re

DELIM = r"::"

# IDENT = letters, digits, underscore, dot, hyphen, slash, colon,
# dollar sign, question mark, plus spaces. 

IDENT = r"(?:[^:]|:(?!:))+"


DIGIT = r"\d+"
STEP_INDEX = rf"steps(?:{DELIM}{DIGIT})?"
INDEX = rf"{DELIM}{DIGIT}" 


patterns = [
    # 1) TRIGGER (global level)
    (
        "trigger",
        rf"^on{DELIM}{IDENT}(?:{DELIM}{IDENT}(?:{INDEX})?)*$"
    ),
    # 2) PERMISSIONS (global & job-level)
    (
        "permissions_global",
        # Global: can be just [permissions] or [permissions, something, ...]
        rf"^permissions(?:{DELIM}{IDENT})*$"
    ),
    (
        "permissions_job_level",
        # Job-level: jobs::X::permissions(...) 
        rf"^jobs{DELIM}{IDENT}{DELIM}permissions(?:{DELIM}{IDENT})*$"
    ),

    # 3) ENVIRONMENT VARIABLE (global, job, or step level)
    (
        "environment_variable_global",
        # Global env: env or env::KEY1::KEY2...
        rf"^env(?:{DELIM}{IDENT})*$"
    ),
    (
        "environment_variable_job_level",
        # Job-level: jobs::X::env(...)
        rf"^jobs{DELIM}{IDENT}{DELIM}env(?:{DELIM}{IDENT})*$"
    ),
    (
        "environment_variable_step_level",
        # Step-level: jobs::X::steps(::\d+)?::env(...)
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}env(?:{DELIM}{IDENT})*$"
    ),

    # 4) WITH USAGE (job-level & step-level)
    (
        "with_usage_job_level",
        # Job-level: jobs::X::with(...)
        rf"^jobs{DELIM}{IDENT}{DELIM}with(?:{DELIM}{IDENT})*$",
    ),
    (
        "with_usage_step_level",
        # Step-level: jobs::X::steps(::\d+)?::with(...)
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}with(?:{DELIM}{IDENT})*$"
    ),


    # 13) ENTRYPOINT (step-level & container-level)
    (
        "entrypoint_step_level",
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}with{DELIM}entrypoint$"
    ),
    (
        "entrypoint_container_level",
        rf"^jobs{DELIM}{IDENT}{DELIM}container{DELIM}entrypoint$"
    ),


    # 5) ARGS (step-level)
    (
        "args",
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}with{DELIM}args$"
    ),

    # 6) CONDITIONAL STATEMENT (job-level or step-level)
    (
        "conditional_statement",
        rf"^jobs{DELIM}{IDENT}{DELIM}if$",
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}if$"
    ),

    # 7) REUSABLE ACTION (step-level)
    (
        "reusable_action",
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}uses$"
    ),

    # 8) REUSABLE WORKFLOW (job-level)
    (
        "reusable_workflow",
        rf"^jobs{DELIM}{IDENT}{DELIM}uses$"
    ),

    # 9) MATRIX STRATEGY (job-level)
    (
        "matrix_strategy",
        rf"^jobs{DELIM}{IDENT}{DELIM}strategy{DELIM}matrix(?:{DELIM}{IDENT})*$"
    ),

    # 10) MATRIX INCLUDE (job-level)
    (
        "matrix_include",
        rf"^jobs{DELIM}{IDENT}{DELIM}strategy{DELIM}matrix{DELIM}include(?:{DELIM}{IDENT})*$"
    ),

    # 11) MATRIX EXCLUDE (job-level)
    (
        "matrix_exclude",
        rf"^jobs{DELIM}{IDENT}{DELIM}strategy{DELIM}matrix{DELIM}exclude(?:{DELIM}{IDENT})*$"
    ),

    # 12) MAX PARALLEL (job-level)
    (
        "max_parallel",
        rf"^jobs{DELIM}{IDENT}{DELIM}strategy{DELIM}max-parallel$"
    ),

    # 14) WORKING DIRECTORY (job-level defaults or step-level) 
    (
        "working_directory_job-level",
        rf"^jobs{DELIM}{IDENT}{DELIM}defaults{DELIM}run{DELIM}working-directory$"
    ),
    (
        "working_directory_step-level",
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}working-directory$"
    ),
 

    # 15) CONCURRENCY (global & job-level)
    (
        "concurrency_global",
        # Global: concurrency or concurrency::something
        rf"^concurrency(?:{DELIM}{IDENT})*$"
    ),
    (
        "concurrency_job_level",
        # Job-level: jobs::X::concurrency(...)
        rf"^jobs{DELIM}{IDENT}{DELIM}concurrency(?:{DELIM}{IDENT})*$"
    ),

    # 16) JOB OUTPUTS (job-level)
    (
        "job_outputs",
        rf"^jobs{DELIM}{IDENT}{DELIM}outputs(?:{DELIM}{IDENT})*$"
    ),

    # 17) TIMEOUTS (job-level or step-level)
    (
        "timeouts_job_level",
        rf"^jobs{DELIM}{IDENT}{DELIM}timeout-minutes$"
    ),
    (
        "timeouts_step_level",
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}timeout-minutes$"
    ),

    # 18) ERROR HANDLING (fail-fast or continue-on-error)
    (
        "error_handling_fail_fast",
        rf"^jobs{DELIM}{IDENT}{DELIM}strategy{DELIM}fail-fast$"
    ),
    (
        "error_handling_continue_on_error",
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}continue-on-error$"
    ),

    # 19) CONTAINER (job-level)
    (
        "container",
        rf"^jobs{DELIM}{IDENT}{DELIM}container(?:{DELIM}{IDENT})*$"
    ),

    # 20) CONTAINER OPTIONS (job-level)
    (
        "container_options",
        # If you need multiple sub-tokens after 'options', use (?:{DELIM}{IDENT})* instead
        rf"^jobs{DELIM}{IDENT}{DELIM}container{DELIM}options(?:{DELIM}{IDENT})*$"
    ),

    # 21) SERVICES (job-level)
    (
        "services",
        rf"^jobs{DELIM}{IDENT}{DELIM}services{DELIM}{IDENT}(?:{DELIM}{IDENT})*$"
    ),

    # 22) SERVICES OPTIONS (job-level)
    (
        "services_options",
        rf"^jobs{DELIM}{IDENT}{DELIM}services{DELIM}{IDENT}{DELIM}options(?:{DELIM}{IDENT})*$"
    ),

    # 23) COMMANDS (step-level)
    (
        "commands",
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}run$"
    ),

    # 24) DEFAULTS (global & job-level)
    (
        "defaults_global",
        rf"^defaults(?:{DELIM}{IDENT}(?:{INDEX})?)*$",
    ),
    (
        "defaults_job_level",
        rf"^jobs{DELIM}{IDENT}{DELIM}defaults(?:{DELIM}{IDENT}(?:{INDEX})?)*$"
    ),

    # 25) SHELL (global level)
    (
        "shell_global",
        rf"^defaults{DELIM}run{DELIM}shell$"
    ),
    (
        "shell_job_level",
        rf"^jobs{DELIM}{IDENT}{DELIM}defaults{DELIM}run{DELIM}shell$"
    ),
    (
        "shell_step_level",
        rf"^jobs{DELIM}{IDENT}{DELIM}{STEP_INDEX}{DELIM}shell$"
    ),


    # 27) NEEDS (job-level)
    (
        "needs",
        rf"^jobs{DELIM}{IDENT}{DELIM}needs(?:{INDEX})?$"
    ),

    # 28) RUNS-ON (job-level)
    (
        "runs_on",
        rf"^jobs{DELIM}{IDENT}{DELIM}runs-on(?:{DELIM}{IDENT})*$"
    )
]



def check_presence(path):
    """
    Join the path list with '::' and run regex checks against the joined path.
    Return a dict of booleans for each concept.
    """

    results = {concept: False for concept, *_ in patterns}
    for concept, *regex_list in patterns:
        for regex in regex_list:
            if re.search(regex, "::".join(path)):
                results[concept] = True
                break
    return results


table = pq.read_table(file_path, columns=["workflow_hash", "path", "value"])
df_len = table.num_rows
CHUNK_SIZE = 50_000
n_chunks = math.ceil(df_len / CHUNK_SIZE)

start_time = time.time()
first_write = True  # Track if we are writing the first chunk

# Initialize ParquetWriter
writer = None

for i in tqdm(range(n_chunks), desc="Processing chunks"):
    start_idx = i * CHUNK_SIZE
    end_idx = min((i + 1) * CHUNK_SIZE, df_len)
    
    # Convert slice of the table into a pandas DataFrame
    df_chunk = table.slice(start_idx, end_idx - start_idx).to_pandas()

    # Run concept checks chunk by chunk
    presence_data = [check_presence(path) for path in df_chunk["path"]]
    presence_df = pd.DataFrame(presence_data)

    # Append concept columns to df_chunk
    for col in presence_df.columns:
        df_chunk[col] = presence_df[col]

    # Convert to PyArrow Table
    table_result = pa.Table.from_pandas(df_chunk)

    # Write to Parquet file
    if writer is None:
        writer = pq.ParquetWriter(output_file_path, table_result.schema)
    writer.write_table(table_result)

    # Free memory manually
    del df_chunk, presence_data, presence_df, table_result

# Close the writer
if writer is not None:
    writer.close()

elapsed = time.time() - start_time
print(f"Done. Processed {df_len} rows in {elapsed:.2f} seconds.")


In [None]:
import pandas as pd

path_value_concepts_file_path = '../../data/workflows_path_value_features.parquet'

df=pd.read_parquet(path_value_concepts_file_path)



In [None]:
df