In [13]:
"""
Part 1: Data Fetch & Preparation
--------------------------------
This script:
 1) Fetches data from the GitHub API (commits, issues, PRs, etc.) using GitHubFetcher.
 2) Builds metrics and time-series data via GitHubMetricsBuilder.
 3) Writes out multiple CSV files (commits, issues, pulls, timeseries, etc.) into a 'data/' folder.
"""

import requests
import time
from datetime import datetime, timezone
from typing import Optional, Dict, List, Tuple
import pandas as pd
import numpy as np
from op_analytics.coreutils.logger import structlog
from op_analytics.coreutils.request import new_session, get_data
from op_analytics.coreutils.threads import run_concurrently

# -------------------------------------
# Rate Limit Helpers
# -------------------------------------
def get_rate_limit_info(session: requests.Session, token: Optional[str] = None) -> dict:
    """
    Fetch the current GitHub rate limit info from /rate_limit endpoint.
    Returns a dict like: { 'limit': ..., 'remaining': ..., 'reset': ... } for the 'core' resource.
    """
    headers = {"Accept": "application/vnd.github+json"}
    if token:
        headers["Authorization"] = f"Bearer {token}"

    rate_limit_url = "https://api.github.com/rate_limit"
    resp = session.get(rate_limit_url, headers=headers)
    resp.raise_for_status()

    data = resp.json()
    core_data = data["resources"]["core"]
    reset_time = datetime.fromtimestamp(core_data["reset"], tz=timezone.utc).strftime('%Y-%m-%d %H:%M:%S %Z')
    return {
        "limit": core_data["limit"],
        "remaining": core_data["remaining"],
        "reset": reset_time,  # Human-readable time
    }

def wait_for_rate_limit(session: requests.Session, token: Optional[str] = None) -> None:
    """
    Checks your 'core' REST rate limit. If remaining=0, waits until reset time.
    """
    while True:
        rate_info = get_rate_limit_info(session, token=token)
        limit = rate_info["limit"]
        remaining = rate_info["remaining"]
        reset_time_str = rate_info["reset"]

        now_utc = datetime.now(timezone.utc)
        print(f"[RateLimit] Current UTC time: {now_utc.strftime('%Y-%m-%d %H:%M:%S %Z')}")
        print(f"[RateLimit] Core usage: {remaining}/{limit} calls remaining.")
        print(f"[RateLimit] Reset time: {reset_time_str}")

        if remaining == 0:
            reset_dt = datetime.strptime(reset_time_str, '%Y-%m-%d %H:%M:%S %Z').replace(tzinfo=timezone.utc)
            wait_seconds = (reset_dt - now_utc).total_seconds()
            if wait_seconds > 0:
                print(f"[RateLimit] Exhausted. Waiting ~{wait_seconds:.0f}s until reset at {reset_dt}.")
                print(f"[RateLimit] ETA on reset: {reset_time_str}")
                time.sleep(wait_seconds + 2)  # small buffer
            else:
                print("[RateLimit] Wait time is zero or negative, presumably just reset.")
        else:
            break


# -------------------------------------
# GitHubFetcher
# -------------------------------------
class GitHubFetcher:
    """Fetch data from the GitHub API using concurrency and custom request logic."""

    def __init__(self, token: str, owner: str, repos: List[str]):
        self.token = token
        self.owner = owner
        self.repos = repos
        # We'll reuse a single session across all fetches
        self.session = new_session()
        self.session.headers.update({"Authorization": f"token {self.token}"})
   
    def _fetch_all_pages(self, initial_url: str) -> List:
        """
        Fetch all pages for a given endpoint concurrently:
        1) Fetch the initial page (serially).
        2) Parse the Link header for rel="last" to determine total pages (if available).
        3) Generate URLs for all pages (2..N) or fetch a single 'next' page if there's no 'last' link.
        4) Fetch them concurrently and combine results.
        """
        # Single-page (initial) fetch
        first_page_data = self._fetch_single_page_with_retry(initial_url)
        all_items = first_page_data if isinstance(first_page_data, list) else [first_page_data]

        link_header = self.session.head(initial_url).headers.get("Link", None)
        if not link_header:
            # Possibly the HEAD approach was not needed or didn't find pagination, fallback
            return all_items

        # Attempt to find 'rel="last"'
        last_link = None
        for part in link_header.split(","):
            if 'rel="last"' in part:
                start = part.find('<') + 1
                end = part.find('>')
                last_link = part[start:end]
                break

        if not last_link:
            # If no 'last', maybe there's just 'next'
            next_link = None
            for part in link_header.split(","):
                if 'rel="next"' in part:
                    start = part.find('<') + 1
                    end = part.find('>')
                    next_link = part[start:end]
                    break
            if not next_link:
                return all_items  # no next link => only one page

            # Single next page scenario
            return all_items + self._fetch_pages_in_parallel([next_link])

        # There's a 'last' link. Let's parse out total pages from it.
        from urllib.parse import urlparse, parse_qs
        parsed_last = urlparse(last_link)
        qs_last = parse_qs(parsed_last.query)
        last_page = int(qs_last.get('page', [1])[0])

        if last_page <= 1:
            return all_items  # only one page total

        # Build pages for 2..N
        parsed_init = urlparse(initial_url)
        qs_init = parse_qs(parsed_init.query)
        qs_init.pop('page', None)

        def build_page_url(page_num: int) -> str:
            new_qs = qs_init.copy()
            new_qs['page'] = str(page_num)
            from urllib.parse import urlencode
            query_str = urlencode(new_qs, doseq=True)
            return f"{parsed_init.scheme}://{parsed_init.netloc}{parsed_init.path}?{query_str}"

        all_page_urls = [build_page_url(p) for p in range(2, last_page + 1)]
        results = self._fetch_pages_in_parallel(all_page_urls)
        all_items.extend(results)
        return all_items

    def _fetch_pages_in_parallel(self, urls: List[str]) -> List:
        """Helper to fetch multiple page URLs concurrently with built-in wait-and-retry logic."""
        if not urls:
            return []

        def fetch_func(url: str) -> List:
            data = self._fetch_single_page_with_retry(url)
            return data if isinstance(data, list) else [data]

        # Prepare concurrency targets
        targets = {f"page_{i+1}": url for i, url in enumerate(urls)}
        concurrency_results = run_concurrently(fetch_func, targets=targets, max_workers=4)

        merged_items = []
        for _, items in concurrency_results.items():
            merged_items.extend(items)
        return merged_items

    def _fetch_single_page_with_retry(self, page_url: str) -> List | Dict:
        """
        Manually handle retries by looping until we succeed or 
        decide to give up. Each iteration calls wait_for_rate_limit.
        """
        max_attempts = 5
        attempt = 0
        backoff_seconds = 5

        while True:
            attempt += 1
            try:
                wait_for_rate_limit(self.session, token=self.token)
                return get_data(session=self.session, url=page_url, retry_attempts=1)
            except Exception as e:
                print(f"Error in fetching {page_url}. Attempt {attempt}/{max_attempts}: {e}")
                if attempt >= max_attempts:
                    print("[_fetch_single_page_with_retry] Exceeded max attempts. Raising.")
                    raise
                else:
                    print(f"Sleeping {backoff_seconds}s before retrying.")
                    time.sleep(backoff_seconds)
                    backoff_seconds *= 2  # exponential backoff

    def fetch_commits(self, repo: str, since: Optional[str] = None) -> List[Dict]:
        url = f'https://api.github.com/repos/{self.owner}/{repo}/commits?state=all'
        if since:
            url += f'&since={since}'
        return self._fetch_all_pages(url)

    def fetch_issues(self, repo: str, since: Optional[str] = None) -> List[Dict]:
        url = f'https://api.github.com/repos/{self.owner}/{repo}/issues?state=all'
        if since:
            url += f'&since={since}'
        return self._fetch_all_pages(url)

    def fetch_pulls(self, repo: str, since: Optional[str] = None) -> List[Dict]:
        url = f'https://api.github.com/repos/{self.owner}/{repo}/pulls?state=all&sort=updated&direction=desc'
        if since:
            url += f'&since={since}'
        return self._fetch_all_pages(url)

    def fetch_releases(self, repo: str) -> List[Dict]:
        url = f'https://api.github.com/repos/{self.owner}/{repo}/releases?state=all'
        return self._fetch_all_pages(url)

    def fetch_pr_comments(self, repo: str, pr_num: int) -> List[Dict]:
        url = f'https://api.github.com/repos/{self.owner}/{repo}/pulls/{pr_num}/comments'
        return self._fetch_all_pages(url)

    def fetch_pr_reviews(self, repo: str, pr_num: int) -> List[Dict]:
        url = f'https://api.github.com/repos/{self.owner}/{repo}/pulls/{pr_num}/reviews'
        return self._fetch_all_pages(url)


# -------------------------------------
# GitHubMetricsBuilder
# -------------------------------------
log = structlog.get_logger()

class GitHubMetricsBuilder:
    """Build DataFrames and compute metrics for GitHub repos, using concurrency and 'since' logic."""

    def __init__(self, token: str, owner: str, repos: List[str]):
        self.fetcher = GitHubFetcher(token, owner, repos)
        # DataFrame placeholders
        self.commits = pd.DataFrame()
        self.issues = pd.DataFrame()
        self.pulls = pd.DataFrame()
        self.releases = pd.DataFrame()
        self.pr_comments = pd.DataFrame()

    def _fetch_data_for_repo(self, repo: str, since: Optional[str]) -> Dict[str, pd.DataFrame]:
        commits_data = self.fetcher.fetch_commits(repo, since=since)
        issues_data = self.fetcher.fetch_issues(repo, since=since)
        pulls_data = self.fetcher.fetch_pulls(repo, since=since)
        releases_data = self.fetcher.fetch_releases(repo)

        def df_with_repo(data_list, repo_str):
            df = pd.DataFrame(data_list) if data_list else pd.DataFrame()
            if not df.empty:
                df['repo'] = repo_str
            return df

        return {
            'commits' : df_with_repo(commits_data, repo),
            'issues'  : df_with_repo(issues_data,  repo),
            'pulls'   : df_with_repo(pulls_data,   repo),
            'releases': df_with_repo(releases_data,repo)
        }

    def fetch_all_data(self, since: Optional[str] = None) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
        # Create concurrency targets
        targets = {repo: repo for repo in self.fetcher.repos}

        def fetch_repo_func(r: str) -> Dict[str, pd.DataFrame]:
            return self._fetch_data_for_repo(r, since)

        concurrency_results = run_concurrently(fetch_repo_func, targets=targets, max_workers=32)
        commits_df, issues_df, pulls_df, releases_df = pd.DataFrame(), pd.DataFrame(), pd.DataFrame(), pd.DataFrame()

        for repo_name, repo_dict in concurrency_results.items():
            ctemp = repo_dict['commits']
            itemp = repo_dict['issues']
            ptemp = repo_dict['pulls']
            rtemp = repo_dict['releases']

            if not ctemp.empty:
                commits_df = pd.concat([commits_df, ctemp], ignore_index=True)
            if not itemp.empty:
                issues_df = pd.concat([issues_df, itemp], ignore_index=True)
            if not ptemp.empty:
                pulls_df = pd.concat([pulls_df, ptemp], ignore_index=True)
            if not rtemp.empty:
                releases_df = pd.concat([releases_df, rtemp], ignore_index=True)

        return commits_df, issues_df, pulls_df, releases_df

    def _extract_approval_info(self, reviews: List[Dict]) -> Tuple[pd.Timestamp, str]:
        """Find the first 'APPROVED' review's time and user."""
        approval = next((r for r in reviews if r.get('state', '').upper() == 'APPROVED'), None)
        if approval:
            return pd.to_datetime(approval['submitted_at'], errors='coerce'), approval['user']['login']
        return pd.NaT, None

    def _comments_to_df(self, comments: List[Dict]) -> pd.DataFrame:
        """Transform raw comment JSON into a DataFrame, marking is_bot."""
        if not comments:
            return pd.DataFrame()
        df = pd.DataFrame(comments)
        df['user'] = df['user'].apply(lambda x: x.get('login'))
        df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce')
        df['is_bot'] = df['user'].str.contains("[bot]", case=False, na=False)
        return df

    def process_pull_requests(self, pulls: pd.DataFrame, since: Optional[str] = None) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Process PR concurrency => fetch reviews/comments => metrics."""
        if pulls.empty:
            return pulls, pd.DataFrame()

        pulls['created_at'] = pd.to_datetime(pulls['created_at'], errors='coerce')
        pulls['merged_at'] = pd.to_datetime(pulls['merged_at'], errors='coerce')
        pulls['approval_time'] = pd.NaT
        pulls['approver'] = None

        since_dt = pd.to_datetime(since, utc=True, errors='coerce') if since else None
        if since_dt is not None:
            pulls = pulls[pulls['created_at'] >= since_dt].copy()
        if pulls.empty:
            return pulls, pd.DataFrame()

        def build_target(row_idx, row):
            return {
                'row_idx': row_idx,
                'repo': row['repo'],
                'pr_number': row['number'],
                'pr_url': row['url']
            }

        targets = {f"pr_{i}": build_target(i, pr_row) for i, pr_row in pulls.iterrows()}

        def fetch_pr_data(pr_info: dict) -> dict:
            """
            Each PR fetches reviews & comments (also with concurrency-level wait & retry),
            then returns the relevant approval times & comments DF.
            """
            row_idx = pr_info['row_idx']
            repo = pr_info['repo']
            pr_number = pr_info['pr_number']
            pr_url = pr_info['pr_url']

            output = {
                'row_idx': row_idx,
                'approval_time': pd.NaT,
                'approver': None,
                'comments_df': pd.DataFrame()
            }
            try:
                reviews = self.fetcher.fetch_pr_reviews(repo, pr_number)
                approval_time, approver = self._extract_approval_info(reviews)
                output['approval_time'] = approval_time
                output['approver'] = approver

                raw_comments = self.fetcher.fetch_pr_comments(repo, pr_number)
                cdf = self._comments_to_df(raw_comments)
                if not cdf.empty:
                    cdf['pull_request_number'] = pr_number
                    cdf['repo'] = repo
                    cdf['pull_request_url'] = pr_url
                    output['comments_df'] = cdf

            except Exception as ex:
                log.error(f"Failed to process PR #{pr_number} in {repo}", exc_info=ex)

            return output

        concurrency_results = run_concurrently(fetch_pr_data, targets, max_workers=32)

        all_comments_list = []
        for pr_key, pr_dict in concurrency_results.items():
            idx = pr_dict['row_idx']
            pulls.at[idx, 'approval_time'] = pr_dict['approval_time']
            pulls.at[idx, 'approver'] = pr_dict['approver']
            cdf = pr_dict['comments_df']
            if not cdf.empty:
                all_comments_list.append(cdf)

        comments_df = pd.concat(all_comments_list, ignore_index=True) if all_comments_list else pd.DataFrame()

        pulls['time_to_merge_days'] = (pulls['merged_at'] - pulls['created_at']).dt.total_seconds() / (3600 * 24)
        pulls['approval_time'] = pd.to_datetime(pulls['approval_time'], errors='coerce')
        pulls['time_to_approval_days'] = (pulls['approval_time'] - pulls['created_at']).dt.total_seconds() / (3600 * 24)

        if not comments_df.empty:
            non_bot_comments = comments_df[~comments_df['is_bot']].copy()
            non_bot_comments.sort_values(by='created_at', inplace=True)
            first_non_bot_comments = non_bot_comments.groupby('pull_request_url', as_index=False).first()

            pulls = pulls.merge(
                first_non_bot_comments[['pull_request_url', 'user', 'created_at']],
                left_on='url',
                right_on='pull_request_url',
                how='left',
                suffixes=('', '_first_non_bot')
            )

            pulls['time_to_first_non_bot_comment_days'] = (
                pulls['created_at_first_non_bot'] - pulls['created_at']
            ).dt.total_seconds() / (3600 * 24)
            pulls.rename(columns={'user': 'first_non_bot_comment_user'}, inplace=True)
        else:
            pulls['time_to_first_non_bot_comment_days'] = np.nan
            pulls['first_non_bot_comment_user'] = None

        return pulls, comments_df

    def process_releases(self, releases: pd.DataFrame) -> pd.DataFrame:
        if releases.empty:
            return releases
        releases['created_at'] = pd.to_datetime(releases['created_at'], errors='coerce')
        releases['major_version'] = (
            releases['tag_name']
            .str.extract(r'(\d+\.\d+\.\d+)')[0]
            .str.split('.')
            .str[0]
            .astype(float, errors='ignore')
        ).fillna(0)
        releases.sort_values(by=['repo', 'created_at'], inplace=True)
        releases['version_change'] = releases.groupby('repo')['major_version'].diff().fillna(0) != 0
        releases['major_version_change_time'] = releases.groupby('repo')['created_at'].diff()
        return releases

    def build_developer_experience_metrics(self, pulls: pd.DataFrame, comments_df: pd.DataFrame) -> pd.DataFrame:
        if comments_df.empty:
            pulls['comments_count'] = 0
        else:
            non_bot_comments = comments_df[~comments_df['is_bot']]
            comment_counts = (non_bot_comments.groupby('pull_request_url').size().reset_index(name='comments_count'))
            pulls = pulls.merge(comment_counts, left_on='url', right_on='pull_request_url', how='left')
            pulls['comments_count'] = pulls['comments_count'].fillna(0)
        pulls['approved'] = pulls['time_to_approval_days'].notna()
        return pulls

    def build_time_series_metrics(self, pulls: pd.DataFrame) -> pd.DataFrame:
        if pulls.empty:
            return pd.DataFrame()

        pulls['date'] = pulls['created_at'].dt.date
        grouped = pulls.groupby(['repo', 'date'])
        timeseries = grouped.agg(
            number_of_prs=('number', 'count'),
            avg_time_to_merge_days=('time_to_merge_days', 'mean'),
            median_time_to_merge_days=('time_to_merge_days', 'median'),
            min_time_to_merge_days=('time_to_merge_days', 'min'),
            max_time_to_merge_days=('time_to_merge_days', 'max'),
            avg_time_to_first_non_bot_comment_days=('time_to_first_non_bot_comment_days', 'mean'),
            median_time_to_first_non_bot_comment_days=('time_to_first_non_bot_comment_days', 'median'),
            avg_time_to_approval_days=('time_to_approval_days', 'mean'),
            median_time_to_approval_days=('time_to_approval_days', 'median'),
            avg_comments_per_pr=('comments_count', 'mean'),
            approval_ratio=('approved', 'mean')
        ).reset_index()
        return timeseries

    def get_metrics(self, since: Optional[str] = None) -> Dict[str, pd.DataFrame]:
        self.commits, self.issues, self.pulls, self.releases = self.fetch_all_data(since=since)
        self.pulls, self.pr_comments = self.process_pull_requests(self.pulls, since=since)
        self.releases = self.process_releases(self.releases)
        self.pulls = self.build_developer_experience_metrics(self.pulls, self.pr_comments)
        timeseries_metrics = self.build_time_series_metrics(self.pulls)
        return {
            'commits': self.commits,
            'issues': self.issues,
            'pull_requests': self.pulls,
            'pr_comments': self.pr_comments,
            'releases': self.releases,
            'timeseries_metrics': timeseries_metrics
        }

## Perform data fetch & store CSV

In [14]:
from op_analytics.coreutils.env.vault import init
import os
init()

In [16]:
# Provide your GH personal access token
TOKEN = os.getenv("GITHUB_TOKEN")
owner = "ethereum-optimism"

# Repos you want to process
all_repos = [
    "optimism",
    "supersim",
    "superchainerc20-starter",
    "superchain-registry",
    "superchain-ops",
    "docs",
    "specs",
    "design-docs",
    "infra",
]

# Create a session for rate-limit checks
session = new_session()
get_rate_limit_info(session, token=TOKEN)
wait_for_rate_limit(session, token=TOKEN)

# If you want to limit data from a certain date:
since_date = "2024-01-01T00:00:00Z"

for repo in all_repos:
    print(f"Processing {repo} ...")
    repos = [repo]
    rate_limit = get_rate_limit_info(session, token=TOKEN)
    while rate_limit['remaining'] < 2000:
        print(f"Rate limit remaining: {rate_limit['remaining']}. Waiting for reset...")
        time.sleep(30)
        rate_limit = get_rate_limit_info(session, token=TOKEN)
        print(f"Rate limit remaining: {rate_limit['remaining']}")
    # Build metrics
    builder = GitHubMetricsBuilder(TOKEN, owner, repos)
    metrics = builder.get_metrics(since=since_date)

    commits_df = metrics['commits']
    issues_df = metrics['issues']
    pulls_df = metrics['pull_requests']
    pr_comments_df = metrics['pr_comments']
    releases_df = metrics['releases']
    timeseries_df = metrics['timeseries_metrics']

    print("Commits shape:", commits_df.shape)
    print("Issues shape:", issues_df.shape)
    print("Pull Requests shape:", pulls_df.shape)
    print("Timeseries shape:", timeseries_df.shape)
    # Create data directory if it doesn't exist
    os.makedirs("data", exist_ok=True)

    # Save all DataFrames under data/ folder
    commits_df.to_csv(f"data/{repo}_commits.csv", index=False)
    issues_df.to_csv(f"data/{repo}_issues.csv", index=False)
    pulls_df.to_csv(f"data/{repo}_pulls.csv", index=False)
    pr_comments_df.to_csv(f"data/{repo}_pr_comments.csv", index=False)
    releases_df.to_csv(f"data/{repo}_releases.csv", index=False)
    timeseries_df.to_csv(f"data/{repo}_timeseries.csv", index=False)