In [27]:
from pathlib import Path

current_repo_path = Path().parent.resolve()
TARGET_REPO = "https://github.com/developerscope/codeutils"
repo_path = current_repo_path.parent / TARGET_REPO

In [28]:
url = "https://github.com/alfaInsurance/devQ_testData_pythonProject/"

### Stats CRUD 

In [3]:
from developerscope._types import RepositoryStats
from pathlib import Path
import json
import os

def _extract_repoName_repoPath_statsPath(url: str) -> tuple[str, Path, Path]:
    repo_name = url.rstrip("/").split("/")[-1].removesuffix(".git")
    repo_path = current_repo_path.parent / repo_name
    stats_path = Path() / 'out' / f'{repo_name}.json'

    return repo_name, repo_path, stats_path

def init_repo_stats(url: str) -> RepositoryStats:
    repo_name, repo_path, stats_path = _extract_repoName_repoPath_statsPath(url)
    if not repo_path.exists():
        raise FileNotFoundError(f"Expected sibling repository at {repo_path}, but it does not exist.")

    
    stats_path.parent.mkdir(parents=True, exist_ok=True)

    if not stats_path.exists():
        stats = RepositoryStats(
            status="NEW",
            url=url,
            authors=[],
        )
        with open(stats_path, 'w', encoding='utf-8') as f:
            json.dump(stats, f, indent=4)
        return stats
    else:
        with open(stats_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        return RepositoryStats(**data)


In [4]:
def save_repo_stats(stats: RepositoryStats):
    url = stats['url']
    _, _, stats_path = _extract_repoName_repoPath_statsPath(url)
    with open(stats_path, 'w', encoding='utf-8') as f:
        json.dump(stats, f, indent=4, ensure_ascii=False)


In [29]:
stats = init_repo_stats(url=url)

In [30]:
save_repo_stats(stats)

In [31]:
from collections import defaultdict
from developerscope._types import RepositoryStats, AuthorStats, BranchStats, CommitStatus
from developerscope.analyzer import get_merge_commits_map, get_all_branches


def extract_repo_commit_stats(stats: RepositoryStats) -> None:
    repo_name, repo_path, _ = _extract_repoName_repoPath_statsPath(stats["url"])
    
    branches = [head.name for head in get_all_branches(repo_path)]
    
    authors_map: dict[str, AuthorStats] = {}

    for branch in branches:
        merge_commits_map, author_mapping = get_merge_commits_map(str(repo_path), only_in_branch=branch)

        for username, commit_hashes in merge_commits_map.items():
            email, name = sorted(author_mapping[username])[0]  # pick first (stable)

            if username not in authors_map:
                authors_map[username] = {
                    "name": name,
                    "email": email,
                    "branches": []
                }

            commit_statuses: list[CommitStatus] = [
                {"commitHash": h, "status": "NEW"} for h in commit_hashes
            ]

            authors_map[username]["branches"].append({
                "name": branch,
                "commits": commit_statuses
            })

    stats["authors"] = list(authors_map.values())


In [32]:
from typing import Literal


def update_repo_commit_status(hexsha: str, status: Literal[
        "NEW",
        "PENDING",
        "DONE",
    ], stats: RepositoryStats):
    for author in stats['authors']:
        for branch in author['branches']:
            for commit in branch['commits']:
                if commit['commitHash'] == hexsha:
                    commit['status'] = status
                    return

In [33]:
save_repo_stats(stats)

### Analysis CRUD

In [34]:
from typing import cast
from developerscope._types import AuthorsAnalysis, DetailedMergeRequestAnalysis
from developerscope.analyzer import extract_username
from report_generator import MergeRequestAnalysis


def get_author_analysis(stats: RepositoryStats, author: str) -> AuthorsAnalysis:
    author = author.lower() 
    repo_name, _, stats_path = _extract_repoName_repoPath_statsPath(stats["url"])
    out_dir_repo = stats_path.parent / repo_name
    out_dir_repo.mkdir(parents=True, exist_ok=True)

    author_file = out_dir_repo / f"{author}.json"

    # find the author in stats
    
    for authorStats in stats['authors']:
        if extract_username(authorStats['email']).lower() == author.lower():
            break
    else:
        raise KeyError(f"Author '{author}' not found in RepositoryStats")

    # initialize if missing
    if not author_file.exists():
        placeholder: AuthorsAnalysis = {
            "author": author,
            "branches": [
                {
                    "branch": br["name"],
                    "mergeRequests": []
                }
                for br in authorStats["branches"]
            ]
        }
        with open(author_file, "w", encoding="utf-8") as f:
            json.dump(placeholder, f, indent=4, ensure_ascii=False)
        return placeholder

    # otherwise load existing
    with open(author_file, "r", encoding="utf-8") as f:
        data = json.load(f)
    return data


def save_author_analysis(stats: RepositoryStats, analysis: AuthorsAnalysis):
    """
    Write back the given AuthorsAnalysis to:
      out/<repo_name>/authors_analysis/<author>.json
    """
    # assumes you still have your repo‐stats object in `stats`
    repo_name, _, stats_path = _extract_repoName_repoPath_statsPath(stats["url"])
    out_dir_repo = stats_path.parent / repo_name
    out_dir_repo.mkdir(parents=True, exist_ok=True)

    author_file = out_dir_repo / f"{analysis['author']}.json"


    author_file = out_dir_repo / f"{analysis['author']}.json"
    with open(author_file, "w", encoding="utf-8") as f:
        json.dump(analysis, f, indent=4, ensure_ascii=False)

    return cast(AuthorsAnalysis, author_file)


def insert_merge_requests(
        author_analysis: AuthorsAnalysis, 
        stats: RepositoryStats,
        merge_request: DetailedMergeRequestAnalysis,
        ):
    for author in stats['authors']:
        if extract_username(author["email"]).lower() != author_analysis["author"].lower():
            continue

        for branch in author['branches']:
            for commit in branch['commits']:
                if commit["commitHash"] == merge_request["commitHash"]:
                    break
            else:
                continue
            break
        else:
            continue
        break
    else:
        raise KeyError('Commit did not find in stats')
    
    commit["status"] = "DONE"
    for branch_a in author_analysis['branches']:
        if branch_a['branch'] != branch['name']:
            continue
        branch_a["mergeRequests"].append(merge_request)
        return
    else:
        raise KeyError('No such branch in the author_analysis')

In [43]:
extract_repo_commit_stats(stats)

In [35]:
import git

from developerscope._types import CommitMetrics
from developerscope.haslted import halstead_effort

def get_metrics(commit: git.Commit) -> CommitMetrics:
    return {'halstedEffort' : halstead_effort(commit)}

In [36]:
git_repo = git.Repo(repo_path)

In [37]:
from developerscope.gpt import anylyze_commit 


async def process_commit(commit: git.Commit, stats: RepositoryStats) -> DetailedMergeRequestAnalysis:
    # Analyze the commit (asynchronous)
    try:
        analysis: MergeRequestAnalysis = await anylyze_commit(commit)
    except Exception as e:
        print(e)
        return None
    
    # Add metrics/details
    detailed: DetailedMergeRequestAnalysis = {
        **analysis,
        'commitHash': commit.hexsha,
        'metrics': get_metrics(commit)
    }

    # Save to author analysis and repo stats
    author_username = extract_username(commit.author.email)
    author_analysis: AuthorsAnalysis = get_author_analysis(stats, author_username)
    
    insert_merge_requests(author_analysis, stats, detailed)
    save_author_analysis(stats, author_analysis)
    save_repo_stats(stats)
    print('done')
    return detailed


In [38]:
import asyncio


async def process_batch(batch: list[git.Commit], stats: RepositoryStats):
    return await asyncio.gather(*(process_commit(c, stats) for c in batch))


In [17]:
batch

[<git.Commit "2a2f65f4b7857dc77eb878367eeb7b581186717d">,
 <git.Commit "59661b198d08946e71a426def0f579ecfa8d2512">,
 <git.Commit "a52c340ca357fce955b78186636b699e905bc2e5">,
 <git.Commit "53bd56569b432c87c27b82ca60c596e751a0cfa0">,
 <git.Commit "764b229319e3fef4ea540a0506dc3be6d9e18c71">,
 <git.Commit "0cac8e3b343f224055f2905bddcf2cbfe40bc90d">,
 <git.Commit "10d52c8242699f0e6d81628c7b12c16ea55cca7f">,
 <git.Commit "cbb9b259fddc5381ef0c787479a2c50012689850">,
 <git.Commit "353119ed2c5a7b870e9c0fa646aeedc37338594b">,
 <git.Commit "53bd56569b432c87c27b82ca60c596e751a0cfa0">]

In [46]:
batch: list[git.Commit] = []
batch_size = 20

In [47]:
jobs = 0
haslted = 0
success = 0
for author in stats['authors']:
    for branch in author['branches']:
        for commit in branch['commits']:
            jobs += 1
            if commit['status'] != 'NEW':
                continue
            git_commit = git_repo.commit(commit['commitHash'])
            # if halstead_effort(git_commit) > 0:
            haslted += 1
            batch.append(git_commit)
            # if haslted <= batch_size:
            result = await process_batch(batch, stats)
            success += len([x for x in result if isinstance(x, dict)])
            print('SUCESS:', success)
            batch.clear()
            haslted = 0
        else:
            continue
        break
    else:
        continue
    break

jobs, haslted, success

get_file_contents {'files': ['calc_sub.py']}
calc_sub.py
get_file_contents {'files': ['calc_sub.py']}
calc_sub.py
done
SUCESS: 1
get_file_contents {'files': ['string_lower.py', 'string_reverse.py', 'string_upper.py']}
string_lower.py
string_reverse.py
string_upper.py
get_file_contents {'files': ['string_lower.py', 'string_upper.py', 'string_reverse.py']}
string_lower.py
string_reverse.py
string_upper.py
done
SUCESS: 2
get_file_contents {'files': ['unsafe_eval.py']}
unsafe_eval.py
get_file_contents {'files': ['unsafe_eval.py']}
unsafe_eval.py
done
SUCESS: 3
get_file_contents {'files': ['list_flatten.py']}
list_flatten.py
get_file_contents {'files': ['list_flatten.py']}
list_flatten.py
done
SUCESS: 4
get_file_contents {'files': ['admin.html', 'index.html']}
admin.html
index.html
get_file_contents {'files': ['index.html', 'admin.html']}
admin.html
index.html
done
SUCESS: 5
get_file_contents {'files': ['api.py']}
api.py
get_file_contents {'files': ['api.py']}
api.py
done
SUCESS: 6
get_file

(7, 0, 7)

In [None]:
await process_batch(batch, stats)

In [21]:
halstead_effort(git_repo.commit('e653be919f10f67e77ccdf9ae877ee73151ae06e'))

1435.41