# Imports

In [0]:
from pyspark.sql import DataFrame, functions as F
from pyspark.sql.functions import coalesce, lit, row_number, col
from pyspark.sql.window import Window
from functools import reduce

from pyspark.sql.types import *

In [0]:
Base_Silver = "abfss://team1-project2@20251124eyproject2.dfs.core.windows.net/gharchive-silver"
Base_Gold = "abfss://team1-project2@20251124eyproject2.dfs.core.windows.net/gharchive-gold"

# Schemas

In [0]:
event_df_schema = StructType([StructField('event_id', LongType(), True), StructField('event_type', StringType(), True), StructField('created_at', StringType(), True), StructField('public', BooleanType(), True), StructField('actor_id', LongType(), True), StructField('org_id', LongType(), True), StructField('repo_id', LongType(), True), StructField('event_year_month', StringType(), True)])

push_event_df_schema = StructType([StructField('event_id', LongType(), True), StructField('repo_id', LongType(), True), StructField('push_id', LongType(), True), StructField('ref', StringType(), True), StructField('head_sha', StringType(), True), StructField('before_sha', StringType(), True), StructField('event_year_month', StringType(), True)])

actor_df_schema = StructType([StructField('id', LongType(), True), StructField('login', StringType(), True), StructField('type', StringType(), True), StructField('site_admin', BooleanType(), True), StructField('event_year_month', StringType(), True)])

repo_df_schema = StructType([StructField('id', LongType(), True), StructField('login', StringType(), True), StructField('type', StringType(), True), StructField('site_admin', BooleanType(), True), StructField('event_year_month', StringType(), True)])

# Data Ingestion

In [0]:
event_df = spark.read.parquet(f"{Base_Silver}/event_data", schema=event_df_schema)

In [0]:
push_df = spark.read.parquet(f"{Base_Silver}/PushEvent", schema=push_event_df_schema)


In [0]:
actor_df = spark.read.parquet(f"{Base_Silver}/actor", schema=actor_df_schema)

In [0]:
repo_df = spark.read.parquet(f"{Base_Silver}/repo", schema= repo_df_schema)


# Gold Layer Table Creation

### Event Type (Dim)

In [0]:
event_type_dim = (
    event_df.select("event_type").alias("event_name")
    .distinct()
    .withColumn("event_type_id", F.monotonically_increasing_id() + 1)
    .withColumnRenamed("event_type_id", "type_id")
    .select(F.col("type_id"), "event_type")
)

### DateTime (Dim)

In [0]:
start_ts = "2015-01-01 00:00:00"
end_ts   = "2015-12-31 23:00:00"

datetime_df = (
    spark.range(1).select(
        F.explode(
            F.sequence(
                F.to_timestamp(F.lit(start_ts)),
                F.to_timestamp(F.lit(end_ts)),
                F.expr("interval 1 hour")
            )
        ).alias("ts")
    )
    .select(
        F.date_format("ts", "yyyyMMddHH").cast("long").alias("datetime_id"),
        F.year("ts").alias("year"),
        F.month("ts").alias("month"),
        F.weekofyear("ts").alias("week"),
        F.dayofmonth("ts").alias("day"),
        F.date_format("ts", "EEEE").alias("day_of_week"),
        F.dayofweek("ts").isin([1, 7]).alias("is_weekend"),
        F.quarter("ts").alias("quarter"),
        F.hour("ts").alias("hour_24"),
        F.expr("hour(ts) % 12").cast("int").alias("hour_12"),
        F.date_format("ts", "a").alias("am_pm"),

        F.when((F.hour("ts") >= 5) & (F.hour("ts") < 12), "Morning")
         .when((F.hour("ts") >= 12) & (F.hour("ts") < 17), "Afternoon")
         .when((F.hour("ts") >= 17) & (F.hour("ts") < 21), "Evening")
         .otherwise("Night").alias("day_part")

    )
)

### Event Hourly (Fact)

In [0]:
event_hourly_fact = (
    event_df
    # 1. Standardize the timestamp and create the hourly ID (e.g., 2024052014)
    .withColumn("ts", F.to_timestamp("created_at"))
    .withColumn("datetime_id", F.date_format(F.col("ts"), "yyyyMMddHH").cast("long"))
    
    # 2. Join with event_type_dim to get the 'type_id'
    # We join on the string 'event_type' column present in both DataFrames
    .join(F.broadcast(event_type_dim), "event_type").drop("event_type")
    
    # 3. Aggregate: Group by the hour and the type_id to get hourly counts
    .groupBy("datetime_id", F.col("type_id").alias("event_type_id"))
    .agg(F.count("event_id").cast("bigint").alias("event_count"))
    
    # 4. Final selection to match your schema diagram exactly
    .select("datetime_id", "event_type_id", "event_count")
)

### Breakdown of Weekly Repo Activity (Fact)

In [0]:
weekly_repo_activity = (
    event_df
    .withColumn("created_ts", F.to_timestamp("created_at"))
    .withColumn("datetime_id", F.date_format(F.col("created_ts"), "yyyyMMddHH").cast("long"))
    .join(datetime_df.select("datetime_id", "year", "week"), on="datetime_id", how="inner")
    .groupBy("repo_id", "year", "week")
    .agg(
        F.count("*").alias("event_count"),
        F.sum(F.when(F.col("event_type") == "PushEvent", 1).otherwise(0)).alias("commit_count"),
        F.countDistinct("actor_id").alias("unique_users_id_count")
    )
)

### Breakdown of Weekly Event Types (Fact)

In [0]:
weekly_event_type_activity = (
    event_df
        .withColumn("created_ts", F.to_timestamp("created_at"))
        .withColumn("datetime_id", F.date_format(F.col("created_ts"), "yyyyMMddHH").cast("long"))
        .join(datetime_df.select("datetime_id", "year", "week"), on="datetime_id", how="inner")
        .join(F.broadcast(event_type_dim), on="event_type", how="inner")
        .groupBy("type_id", "year", "week")
        .agg(F.count("*").alias("event_occurrence_count"))
        .join(event_type_dim.select("type_id", "event_type"), "type_id")
)

### User (Dim)

In [0]:
user_dim = actor_df.select(F.col("id").cast("bigint").alias("user_id"),
                           F.col("login").alias("username"),
                           F.col("site_admin").cast("boolean"),
                           F.col("type").cast("string").alias("user_type"))


### User Activity Weekly and Monthly (Fact)

In [0]:
event_user = event_df.select(F.col("actor_id").cast("bigint").alias("user_id"),
                             F.col("event_type").cast("string").alias("event_type"),
                             F.to_timestamp("created_at").alias("created_ts")).where(F.col("user_id").isNotNull() & F.col("created_at").isNotNull()).withColumn("datetime_id", F.date_format(F.col("created_ts"), "yyyyMMddHH").cast("int"))

event_user = event_user.withColumn("datetime_id", F.col("datetime_id").cast("long"))

In [0]:
date_dim = (
    datetime_df
    .select("year","month","week","day")
    .withColumn(
        "date_id",
        F.concat(
            F.col("year").cast("string"),
            F.lpad(F.col("month").cast("string"), 2, "0"),
            F.lpad(F.col("day").cast("string"), 2, "0")
        ).cast("int")
    )
    .select("date_id","year","month","week")
    .distinct()
)

date_bridge_dim = (
    datetime_df
    .select("datetime_id", "year", "month", "day")
    .withColumn(
        "date_id",
        F.concat(
            F.col("year").cast("string"),
            F.lpad(F.col("month").cast("string"), 2, "0"),
            F.lpad(F.col("day").cast("string"), 2, "0")
        ).cast("int")
    )
    .select("datetime_id", "date_id")
)



In [0]:
event_dimtime = event_user.join(date_bridge_dim.select("datetime_id", "date_id"), on="datetime_id", how="inner")

In [0]:
user_activity = event_dimtime.groupby("user_id", "date_id", "event_type").agg(F.count("*").cast("bigint").alias("event_count"))

In [0]:

user_activity_fact = (
    user_activity
    .withColumn("month_id", (F.col("date_id") / 100).cast("int"))  
)

In [0]:
user_activity_weekly_fact = (
    user_activity
    .withColumn("d", F.to_date(F.col("date_id").cast("string"), "yyyyMMdd"))
    .withColumn("week_start", F.date_trunc("week", F.col("d")).cast("date"))
    .withColumn("week_start_date_id", F.date_format(F.col("week_start"), "yyyyMMdd").cast("int"))
    .withColumn("year", F.year(F.col("week_start")))
    .withColumn("week", F.weekofyear(F.col("week_start")))
    .withColumn("week_id", (F.col("year") * 100 + F.col("week")).cast("int"))
    .groupBy("week_id", "week_start_date_id", "user_id", "event_type")
    .agg(F.sum("event_count").alias("event_count"))
    .withColumnRenamed("week_start_date_id", "date_id")  
)

In [0]:
user_activity_monthly_fact = (
    user_activity
    .withColumn("month_id", (F.col("date_id") / 100).cast("int")) 
    .groupBy("month_id", "user_id", "event_type")
    .agg(
        F.sum("event_count").alias("event_count"),
        F.min("date_id").alias("date_id")  
    )
)



### Push Event (Fact)


In [0]:
push_type = (
    push_df
    .select("ref")
    .withColumn("branch_name", F.regexp_extract("ref", r"^refs/heads/(.*)$", 1))
    .withColumn(
        "ref_type",
        F.when(F.col("branch_name").isin("main", "master"), F.lit("main_or_master"))
         .when(F.col("branch_name") == "gh-pages", F.lit("gh-pages"))
         .when(F.col("branch_name").isin("dev", "devel", "develop", "development"), F.lit("develop_family"))
         .when(F.col("ref").rlike("^refs/tags/"), F.lit("tags"))         
         .otherwise(F.lit("other_branches"))
    )
    .withColumn(
        "is_main_branch",
        F.col("branch_name").isin("main", "master")
    )
)

push_event_fact = (
    push_type
    .groupBy("ref_type", "is_main_branch")
    .agg(F.count("*").alias("push_event_count"))
    .orderBy("ref_type", "is_main_branch")
)
                                                          

# Output to Gold Layer

In [0]:
def write_to_gold_part(df: DataFrame, base_gold_path: str,table_name: str, part_col: str, mode: str = "overwrite"):

    out_path = f"{base_gold_path}/{table_name}"

    writer_df = df

    (writer_df
        .write
        .mode(mode)
        .partitionBy(part_col)
        .option("maxRecordsPerFile", 12000000)
        .parquet(out_path)
    )

In [0]:
def write_to_gold(df: DataFrame, base_gold_path: str,table_name: str,mode: str = "overwrite"):

    out_path = f"{base_gold_path}/{table_name}"

    writer_df = df

    (writer_df
        .write
        .mode(mode)
        .option("maxRecordsPerFile", 12000000)
        .parquet(out_path)
    )

In [0]:
write_to_gold_part(user_activity_weekly_fact, Base_Gold, "user_activity_weekly_fact", "week_id")
write_to_gold_part(user_activity_monthly_fact, Base_Gold, "user_activity_monthly_fact", "month_id")
write_to_gold_part(event_hourly_fact, Base_Gold, "event_hourly_fact", 'event_type_id')
write_to_gold_part(datetime_df, Base_Gold, "datetime_dim", "week")

write_to_gold(event_type_dim, Base_Gold, "event_type_dim")
write_to_gold(user_dim, Base_Gold, "user_dim")
write_to_gold(push_event_fact, Base_Gold, "push_event_fact")
write_to_gold(weekly_event_type_activity, Base_Gold, "weekly_event_type_activity")
write_to_gold(weekly_repo_activity, Base_Gold, "weekly_repo_activity")
write_to_gold(repo_df, Base_Gold, "repo_dim")


# Gold Layer ERD

[View Silver â†’ Gold ERD (PDF)](https://drive.google.com/file/d/1fQlTiK-wK5K0JsDDUmh6JRIl1L1xVqfV/view?usp=sharing)
<img src="https://raw.githubusercontent.com/TashawnD/gh-data-pipeline-docs/main/docs/erd/GitHub%20Archive%20Data%20-%20Gold%20Layer%20ERD.png"
     width="1000" style="border:1px solid #ccc; object-position: center;">