In [None]:
# !pyenv install 3.12.4
# !pyenv local 3.12.4
# !python -m venv .venv
# !source .venv/bin/activate


In [None]:
# !pip install -r requirements.txt


In [None]:
import os
import csv
import json
import logging
import requests
from urllib.parse import urlparse
from dotenv import load_dotenv, find_dotenv


In [None]:
load_dotenv(find_dotenv())

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)


In [None]:
def get_repo_from_url(github_url):
    parsed_url = urlparse(github_url)
    
    path_parts = parsed_url.path.strip('/').split('/')
    
    if len(path_parts) >= 2:
        owner = path_parts[0]
        repo = path_parts[1]
        return owner, repo
    else:
        print("Invalid GitHub URL")
        return None, None
        

In [None]:
# github_url = "https://github.com/apache/kafka"
# github_url = "https://github.com/midday-ai/v1"
github_url = "https://github.com/midday-ai/midday"
# github_url = "https://github.com/LetsTrie/Grind-75"

owner, repo = get_repo_from_url(github_url)
print(f"Owner: {owner}\nRepository: {repo}")


In [None]:
GITHUB_ACCESS_TOKEN = os.environ.get("GITHUB_ACCESS_TOKEN")


In [None]:
def is_empty(value): 
    if not value or value.strip() == "":
        return True
        
    return False
    

In [None]:
DATA_DIRECTORY = "data"
os.makedirs(DATA_DIRECTORY, exist_ok=True)

def create_csv_file(data_list, file_name):
    file_path = os.path.join(DATA_DIRECTORY, file_name)
    with open(file_path, mode='w', newline='') as csv_file:
        csv_writer = csv.DictWriter(csv_file, fieldnames=data_list[0].keys())
        csv_writer.writeheader()
        csv_writer.writerows(data_list)
    logging.info(f"CSV file '{file_path}' created successfully.")

def read_csv_to_array(file_name):
    data_list = []
    file_path = os.path.join(DATA_DIRECTORY, file_name)
    
    try:
        with open(file_path, mode='r') as csv_file:
            csv_reader = csv.DictReader(csv_file)
            for row in csv_reader:
                data_list.append(row)
        logging.info(f"Data successfully read from '{file_path}' into array.")
    except FileNotFoundError:
        logging.error(f"File '{file_path}' not found.")
    except Exception as error:
        logging.error(f"An error occurred while reading '{file_path}': {error}")
    
    return data_list


In [None]:
def get_data_with_backup(filename, cb):
    csv_path = os.path.join(DATA_DIRECTORY, filename)
    
    if os.path.exists(csv_path):
        logging.info(f"File '{csv_path}' exists. Reading data from the file.")
        return read_csv_to_array(filename)
    else:
        logging.info(f"File '{csv_path}' does not exist. Executing GitHub API.")
        data = cb()
        if data:
            create_csv_file(data, filename)
        return data
        

In [None]:
def fetch_github_data(endpoint, repo, params=None):
    url = f"https://api.github.com/repos/{repo}/{endpoint}"

    headers = {
        "Authorization": f"Bearer {GITHUB_ACCESS_TOKEN}",
        "Accept": "application/vnd.github+json",
        "X-GitHub-Api-Version": "2022-11-28"
    }

    all_data = []
    page = 1
    per_page = 100  

    while True:
        if params is None:
            params = {}
            
        params.update({"page": page, "per_page": per_page})
        
        logging.info(f"Fetching data from {url} with params: {params}")
        response = requests.get(url, headers=headers, params=params)

        if response.status_code != 200:
            logging.error(f"Failed to fetch data: HTTP {response.status_code}")
            logging.error(f"Response text: {response.text}")
            logging.error(f"headers: {headers}")
            return None
        
        try:
            data = response.json()
            if not data: 
                break

            all_data.extend(data)
            page += 1
        except requests.JSONDecodeError as e:
            logging.error("Failed to parse JSON response")
            logging.exception(e)
            return None

    logging.info(f"Total data fetched: {len(all_data)} items")
    return all_data


In [None]:
def fetch_and_process_commits(owner, repo):
    commit_data = fetch_github_data(endpoint="commits", repo=f"{owner}/{repo}")

    if not commit_data:
        logging.error("Failed to fetch commits from the GitHub API.")
        return []

    processed_commits = []
    for commit_entry in commit_data:
        commit_details = {
            "title": commit_entry["commit"]["message"].split('\n')[0],
            "author_name": commit_entry["commit"]["author"]["name"],
            "author_email": commit_entry["commit"]["author"]["email"],
            "commit_date": commit_entry["commit"]["author"]["date"],
            "full_message": commit_entry["commit"]["message"],
            "commit_hash": commit_entry["sha"],
            "commit_url": commit_entry["html_url"]
        }
        processed_commits.append(commit_details)

    return processed_commits

def get_commits_data(owner, repo):
    return get_data_with_backup(f"{owner}_{repo}_commits.csv", lambda: fetch_and_process_commits(owner, repo))

print(json.dumps(get_commits_data(owner, repo), indent=4))


In [None]:
# issues = fetch_github_data(endpoint="issues", repo=f"{owner}/{repo}", params={"state": "open"})
# print(json.dumps(issues, indent=4))
