In [1]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

schema = StructType([
    StructField("id", StringType(), True),
    StructField("state", StringType(), True),
    StructField("title", StringType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", TimestampType(), True),
    StructField("closed_at", TimestampType(), True),
    StructField("merged_at", TimestampType(), True),
    StructField("user", StructType([
        StructField("login", StringType(), True),
    ]), True),
    StructField("head", StructType([
        StructField("repo", StructType([
            StructField("id", StringType(), True),
            StructField("name", StringType(), True),
            StructField("full_name", StringType(), True),
        ]), True),
    ]), True),
])


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, max, split

# Initialize SparkSession
spark = SparkSession.builder.appName("Process GitHub Repos").getOrCreate()

# Specify the folder containing JSON files
folder_path = "../data/*.json"  # Use wildcard to select all JSON files

# Read all JSON files in the folder using the defined schema
df = spark.read.schema(schema).json(folder_path)

# Filter out empty DataFrames by checking if there are any rows
if df.rdd.isEmpty():
    print("No data found in any file.")
else:
    # Proceed with processing if the DataFrame is not empty
    transformed_df = df.withColumn("Organization Name", split(col("head.repo.full_name"), "/")[0]) \
        .withColumn("repository_id", col("head.repo.id")) \
        .withColumn("repository_name", col("head.repo.name")) \
        .withColumn("repository_owner", col("user.login")) \
        .groupBy("Organization Name", "repository_id", "repository_name", "repository_owner") \
        .agg(
            count("id").alias("num_prs"),
            count(when(col("state") == "MERGED", True)).alias("num_prs_merged"),
            max("merged_at").alias("merged_at")
        ) \
        .withColumn("is_compliant", 
                    ((col("num_prs") == col("num_prs_merged")) & col("repository_owner").contains("scytale")).cast("boolean"))

    # Save the transformed DataFrame to a Parquet file


24/02/04 13:13:39 WARN Utils: Your hostname, Sinethembas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 172.16.2.0 instead (on interface en0)
24/02/04 13:13:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/04 13:13:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
transformed_df.head()

                                                                                

Row(Organization Name='Scytale-exercise', repository_id='721612130', repository_name='scytale-repo3', repository_owner='aviyashar', num_prs=4, num_prs_merged=0, merged_at=datetime.datetime(2023, 11, 21, 14, 29, 7), is_compliant=False)

In [4]:
# After your transformations are complete
transformed_df.show(truncate=False)


+-----------------+-------------+---------------+----------------+-------+--------------+-------------------+------------+
|Organization Name|repository_id|repository_name|repository_owner|num_prs|num_prs_merged|merged_at          |is_compliant|
+-----------------+-------------+---------------+----------------+-------+--------------+-------------------+------------+
|Scytale-exercise |721612130    |scytale-repo3  |aviyashar       |4      |0             |2023-11-21 14:29:07|false       |
|Scytale-exercise |724133322    |Scytale_repo   |aviyashar       |2      |0             |NULL               |false       |
|Scytale-exercise |724140378    |scytale-repo2  |aviyashar       |1      |0             |2023-11-27 15:34:05|false       |
+-----------------+-------------+---------------+----------------+-------+--------------+-------------------+------------+



In [5]:
transformed_df.write.mode("overwrite").parquet("../output/prs_aggregated.parquet")

                                                                                

In [7]:
#Check if the parquet file has been correctly output and can be read back into a dataframe
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("ParquetFileCheck").getOrCreate()

# Path to your parquet file
parquet_file_path = '../output/prs_aggregated.parquet'

# Read the parquet file into a DataFrame
parquet_df = spark.read.parquet(parquet_file_path)

# Show the DataFrame schema to verify structure
parquet_df.printSchema()

# Show the first few rows of the DataFrame
parquet_df.show()

# If you just want to verify the row count
print("Row count:", parquet_df.count())


root
 |-- repository_id: string (nullable = true)
 |-- repository_name: string (nullable = true)
 |-- repository_owner: string (nullable = true)
 |-- num_prs: long (nullable = true)
 |-- num_prs_merged: long (nullable = true)
 |-- latest_merged_at: timestamp (nullable = true)
 |-- is_compliant: boolean (nullable = true)
 |-- Organization Name: string (nullable = true)

+-------------+---------------+----------------+-------+--------------+-------------------+------------+-----------------+
|repository_id|repository_name|repository_owner|num_prs|num_prs_merged|   latest_merged_at|is_compliant|Organization Name|
+-------------+---------------+----------------+-------+--------------+-------------------+------------+-----------------+
|    721612130|  scytale-repo3|       aviyashar|      4|             0|2023-11-21 14:29:07|       false| Scytale-exercise|
|    724133322|   Scytale_repo|       aviyashar|      2|             0|               NULL|       false| Scytale-exercise|
|    72414037