In [14]:
from configs import *
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, StructField, TimestampType
from pyspark.sql.functions import col, concat_ws, split
from github import Github
import json,os, datetime



# read configuration from config file
configs = project_configs
github_token = configs["token"]
org_name = configs["org_name"]
#spark_master = configs["spark_master"]

#
#
# create PyGithub client with authentication token
gh = Github(github_token)
files_path = os.getcwd() +  r'/extracted_json_files/' 
repos = gh.get_organization(org_name).get_repos()

for repo in repos:
    pr_filename = files_path + repo.name + ".json"
    list_prs = []
    for pr in repo.get_pulls(state='all'):
        pr_data = {
            'organization_name' : repo.full_name,
            'repository_id' : repo.id,
            'repository_name' : repo.name,
            'repository_owner' : repo.owner.login,
            'merged_at' : str(pr.merged_at),
            'state' : pr.state
            }
        list_prs.append(pr_data)
        with open(pr_filename, 'w') as f:
            json.dump(list_prs, f)

In [2]:
columnsMetaData = StructType(
    [StructField("organization_name", StringType(), True),
     StructField("repository_id", StringType(), True),
     StructField("repository_name", StringType(), True),
     StructField("repository_owner", StringType(), True),
     #StructField("title", StringType(), True),
     StructField("merged_at", TimestampType(), True),
     StructField("state", StringType(), True)]
    )

In [3]:
#load files into sparksession
spark = SparkSession.builder.appName("GithubPRsToParquet").getOrCreate()
df=spark.read.json(files_path, columnsMetaData)

#cleaning the data and transforming the datasets
df = df.withColumn('organization_name',split(col('organization_name'),'/')[0])

df_num_prs = df.groupBy('organization_name','repository_id','repository_owner') \
    .count().withColumnRenamed('count', 'num_prs') 

df_num_merged_prs = df.filter(col('merged_at').isNotNull())
df_num_merged_prs = df_num_merged_prs.groupBy('repository_id') \
   .count().withColumnRenamed('count', "num_prs_merged")

df_merged_at = df.groupBy('repository_id') \
    .agg(F.max('merged_at').alias('merged_at'))

df.show()
df_num_prs.show()
df_num_merged_prs.show()
df_merged_at.show()

df = df_num_prs.join(df_num_merged_prs, ['repository_id'], how='left') \
     .join(df_merged_at, df_num_prs['repository_id'] == df_merged_at['repository_id'], how='left') \
     .select('organization_name',df_num_prs['repository_id'],'repository_owner','num_prs','num_prs_merged', 'merged_at')

df = df.withColumn(
    'is_compliant',
    F.when((F.col("num_prs") == F.col("num_prs_merged")) & F.col("repository_owner").like('%scytale%') , 1)\
    .otherwise(0)
)
df.show()


+-----------------+-------------+---------------+----------------+-------------------+------+
|organization_name|repository_id|repository_name|repository_owner|          merged_at| state|
+-----------------+-------------+---------------+----------------+-------------------+------+
| Scytale-exercise|    721612130|  scytale-repo3|Scytale-exercise|2023-11-21 12:29:07|closed|
| Scytale-exercise|    721612130|  scytale-repo3|Scytale-exercise|2023-11-21 12:27:11|closed|
| Scytale-exercise|    721612130|  scytale-repo3|Scytale-exercise|2023-11-21 12:25:14|closed|
| Scytale-exercise|    721612130|  scytale-repo3|Scytale-exercise|2023-11-21 12:23:48|closed|
| Scytale-exercise|    724133322|   Scytale_repo|Scytale-exercise|               NULL|  open|
| Scytale-exercise|    724133322|   Scytale_repo|Scytale-exercise|               NULL|closed|
| Scytale-exercise|    724140378|  scytale-repo2|Scytale-exercise|2023-11-27 13:34:05|closed|
+-----------------+-------------+---------------+-----------

In [5]:
df.write.mode('overwrite').format('parquet').save(files_path + 'ouput_file')