In [5]:
import os
import requests
import json
from dotenv import load_dotenv

load_dotenv()

github_token = os.getenv("git_token")
headers = {
        'Authorization': f'token {github_token}'
}

In [6]:
def fetch_data_from_github():
    org_name = "Scytale-exercise"
    api_base_url = "https://api.github.com"
    output_folder = "git_data"

    os.makedirs(output_folder, exist_ok=True)

    organization_info = get_org_info(api_base_url, github_token, org_name)
    repositories_info = get_repositories(api_base_url, github_token, organization_info)
    save_pull_requests_to_files(repositories_info, github_token, output_folder)


def get_org_info(api_base_url, github_token, org_name):
    organization_url = f"{api_base_url}/orgs/{org_name}"
    
    response = requests.get(organization_url, headers=headers)
    return response.json()


def get_repositories(api_base_url, github_token, org_info):
    repositories_url = org_info["repos_url"]

    response = requests.get(repositories_url, headers=headers)
    return response.json()


def save_pull_requests_to_files(repositories_info, github_token, output_folder):
    for repo in repositories_info:
        repo_id = repo["id"]
        repo_name = repo["name"]
        repo_owner = repo["owner"]["login"]

        page_number = 1
        while True:
            pull_requests_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/pulls?" \
                                f"state=all&page={page_number}"

            response = requests.get(pull_requests_url, headers=headers)
            pull_requests_data = response.json()

            if not pull_requests_data:
                break

            file_name = f"{repo_name}_{repo_owner}_{page_number}_pulls.json"
            file_path = os.path.join(output_folder, file_name)
            with open(file_path, "w") as file:
                json.dump(pull_requests_data, file)

            page_number += 1


fetch_data_from_github()


In [7]:
import os
import pyspark
from pyspark.sql.functions import *
from functools import reduce
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession


def load_json_files(directory, spark_session: SparkSession):
    json_dataframes = []
    for filename in os.listdir(directory):
        json_dataframes.append(spark_session.read.json(os.path.join(directory, filename), multiLine=True))
    return json_dataframes


def process_github_data(dataframe):
    transformed_df = dataframe.select(
        split(col("base.repo.full_name"), '/').getItem(0).alias("Organization Name"),
        col("base.repo.id").alias("repository_id"),
        col("base.repo.name").alias("repository_name"),
        col("base.repo.owner.login").alias("repository_owner"),
        col("closed_at").alias("merged_at")
    ).filter(dataframe.closed_at.isNotNull())
    
    total_prs = dataframe.select(dataframe.base).count()
    transformed_df = transformed_df.withColumn("num_prs", lit(total_prs))
    
    total_merged_prs = dataframe.filter(dataframe.state.isin(['closed'])).count()
    transformed_df = transformed_df.withColumn("num_prs_merged", lit(total_merged_prs))
    
    transformed_df = transformed_df.withColumn(
        "is_compliant",
        (transformed_df.num_prs == transformed_df.num_prs_merged) & 
        (transformed_df.repository_owner.contains('Scytale'))
    )
    
    latest_merge_date_df = dataframe.filter(col("closed_at").isNotNull()) \
        .agg(max(col("closed_at")).alias("max_date"))
    
    transformed_df = transformed_df.join(
        latest_merge_date_df, 
        transformed_df.merged_at == latest_merge_date_df.max_date, 
        how='inner'
    )
    
    return transformed_df


def save_to_parquet(json_dataframes):
    transformed_dataframes = [process_github_data(df) for df in json_dataframes]
    final_dataframe = reduce(pyspark.sql.dataframe.DataFrame.unionByName, transformed_dataframes)
    final_dataframe.show()
    final_dataframe.write.parquet("./result.parquet")


# spark_context = SparkContext
spark_session = SparkSession.builder.master("local").appName("Spark").getOrCreate()

json_dataframes = load_json_files("./git_data", spark_session)

save_to_parquet(json_dataframes)


+-----------------+-------------+---------------+----------------+--------------------+-------+--------------+------------+--------------------+
|Organization Name|repository_id|repository_name|repository_owner|           merged_at|num_prs|num_prs_merged|is_compliant|            max_date|
+-----------------+-------------+---------------+----------------+--------------------+-------+--------------+------------+--------------------+
| Scytale-exercise|    724133322|   Scytale_repo|Scytale-exercise|2023-11-27T13:46:31Z|      2|             1|       false|2023-11-27T13:46:31Z|
| Scytale-exercise|    724140378|  scytale-repo2|Scytale-exercise|2023-11-27T13:34:05Z|      1|             1|        true|2023-11-27T13:34:05Z|
| Scytale-exercise|    721612130|  scytale-repo3|Scytale-exercise|2023-11-21T12:29:07Z|      4|             4|        true|2023-11-21T12:29:07Z|
+-----------------+-------------+---------------+----------------+--------------------+-------+--------------+------------+-------