In [0]:
# Notes
# Optimize reads
# Define a schema (look at all the rows you're going to keep, and settle on a datatype)
# Speeds up pipeline

# Reading optimizations
# Loop through the files reading them
# cache it, (in memory for an hour or day)
# trigger an action to actually perform the cache

# Now you don't have to keep on rereading the dataframe by inserting the files
# Important to df.cache to keep it! And then unpersist when you're done with that batch of
# information

# May make sense to repartition right on read? May not be worth it, something to think about.

# Sampling: Test out files from different months to get a better idea of the data.

# Regarding the actor ids with multiple logins:
# After mentioning to Jordan how they can appear over 80 times, he said to just include the most recent one. This has been resolved.

##### You are tasked with the following aggregations:

- Data aggregated by type of GitHub event per hour
- PushEvent data aggregated by ref type – whether the commit is on the main branch
- Breakdown of events by type and number of commits per event
- User activity should be aggregated so that a filterable chart can be populated with breakdowns of user activity by week or month.
- Breakdown of activity by project – find a unique use case
- Challenge: Based on the commit messages – breakdown the events by language

#### Schema

In [0]:
from pyspark.sql.functions import col, row_number, collect_set, count, max
from pyspark.sql.window import Window

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, BooleanType,MapType, BooleanType, LongType
from pyspark.sql import SparkSession

# max read size of our partitions
# default value is already 128 so we don't actually need this line
spark.conf.set("spark.sql.files.maxPartitionBytes", 134217728) # 128 mb

# Stanley: should we pass in a sample csv to ai to create our schema? We might have a lot of columns since it may explode everything out

# Javi: I'm thinking the initial schema when reading in the files should be like this:
# We realistically cannot specify a type for all the columns in the files, as the nesting for some entries will differ
# We should only specify the types of the main columns

# General schema for json
schema = StructType([
    StructField('id', StringType(), True),
    StructField('type', StringType(), True),
    StructField('created_at', StringType(), True),
    StructField('public', BooleanType(), True),
    StructField('actor', StructType([
        StructField('id', LongType(), True),
        StructField('login', StringType(), True),
    ]), True),
    StructField('repo', StructType([
        StructField('id', LongType(), True),
        StructField('name', StringType(), True),
    ]), True),
    StructField('org', StructType([
        StructField('id', LongType(), True),
        StructField('login', StringType(), True),
    ]), True),
    StructField('payload', StructType([
        StructField('action', StringType(), True),
        StructField('size', LongType(), True),
        StructField('distinct_size', LongType(), True),
        StructField('ref', StringType(), True),
        StructField('ref_type', StringType(), True),
        StructField('pull_request', StructType([
            StructField('closed_at', StringType(), True),
            StructField('additions', LongType(), True),
            StructField('deletions', LongType(), True),
            StructField('changed_files', LongType(), True),
            StructField('commits', LongType(), True),
            StructField('merged', BooleanType(), True),
            StructField('base', StructType([
                StructField('ref', StringType(), True),
                StructField('repo', StructType([
                    StructField('language', StringType(), True),
                    StructField('default_branch', StringType(), True),
                    StructField('created_at', StringType(), True),
                    StructField('fork', BooleanType(), True),
                    StructField('private', BooleanType(), True),
                ]), True),
            ]), True),
            StructField('head', StructType([
                StructField('ref', StringType(), True),
            ]), True),
        ]), True),
        StructField('issue', StructType([
            StructField('closed_at', StringType(), True),
            StructField('number', LongType(), True),
        ]), True),
        StructField('comment', StructType([
            StructField('id', LongType(), True),
        ]), True),
        StructField('forkee', StructType([
            StructField('id', LongType(), True),
            StructField('full_name', StringType(), True),
            StructField('language', StringType(), True),
        ]), True),
        StructField('member', StructType([
            StructField('id', LongType(), True),
            StructField('login', StringType(), True),
        ]), True),
    ]), True),
])

# Stanley: I agree, this is already a lot better than using inferSchema which is automatically True for reading jsons

# After we read in the files, we remove the columns we don't need, and perhaps create a new schema and new df with the columns and their data type

# That or we make every column into a string and fix the types later (Although this is more of a crazy idea)

#### Load in data

In [0]:
# files = [
#     "abfss://gharchive-raw@20251124eyproject2.dfs.core.windows.net/2015-01-15-*.json.gz",
#     "abfss://gharchive-raw@20251124eyproject2.dfs.core.windows.net/2015-03-20-*.json.gz",
#     "abfss://gharchive-raw@20251124eyproject2.dfs.core.windows.net/2015-05-10-*.json.gz",
#     "abfss://gharchive-raw@20251124eyproject2.dfs.core.windows.net/2015-07-25-*.json.gz",
#     "abfss://gharchive-raw@20251124eyproject2.dfs.core.windows.net/2015-09-05-*.json.gz",
#     "abfss://gharchive-raw@20251124eyproject2.dfs.core.windows.net/2015-11-18-*.json.gz",
# ]


path = "abfss://gharchive-raw@20251124eyproject2.dfs.core.windows.net/2015-*-*-*.json.gz"
df = spark.read.schema(schema).json(path)
# df_small = spark.read.schema(schema).json(files)

#### Select Columns into new Dataframe From Flattened JSON

In [0]:
from pyspark.sql.functions import col, to_timestamp

# CHRIS

# organized columns based on section 
silver_test = df.select(
    col("id").alias("event_id"), # Primary key
    col("type").alias("event_type"), # Event counts, branch analysis, basically everything
    to_timestamp("created_at").alias("created_at"), # hourly breakdown, activity over time

    # actor section
    col("actor.id").alias("actor_id"),
    col("actor.login").alias("actor_login"), # most active users

    # repo section
    col("repo.id").alias("repo_id"),
    col("repo.name").alias("repo_name"), # Which repos have the most activity

    # more repo stuff from PR base repo
    col("payload.pull_request.base.repo.language").alias("repo_language"),
    col("payload.pull_request.base.repo.default_branch").alias("repo_default_branch"),
    col("payload.pull_request.base.repo.created_at").alias("repo_created_at"),
    col("payload.pull_request.base.repo.fork").alias("repo_is_fork"),
    col("payload.pull_request.base.repo.private").alias("repo_is_private"),

    # org section
    col("org.id").alias("organization_id"),
    col("org.login").alias("org_login"), # rename in table - various different logins
    
    # Push Events
    col("payload.size").alias("commit_count"), # Added; Commits per push, commit distribution
    col("payload.distinct_size").alias("distinct_size"),

    # Push, Create, and Delete
    col("payload.ref").alias("ref"),
    col("payload.ref_type").alias("ref_type"),

    # PR section
    col("payload.pull_request.closed_at").alias("pr_closed_at"),
    col("payload.pull_request.additions").alias("pr_additions"),
    col("payload.pull_request.deletions").alias("pr_deletions"),
    col("payload.pull_request.changed_files").alias("pr_changed_files"),
    col("payload.pull_request.commits").alias("pr_commits"),
    col("payload.pull_request.base.ref").alias("pr_base_ref"),
    col("payload.pull_request.head.ref").alias("pr_head_ref"),
    col("payload.pull_request.merged").alias("pr_merged"),

    # Issue Fields
    col("payload.issue.closed_at").alias("issue_closed_at"),

    # Issue Comment Event
    col("payload.comment.id").alias("comment_id"),
    col("payload.issue.number").alias("issue_number"),

    # PR + Issue
    col("payload.action").alias("action"), # Opened, closed, reopened, assigned, etc. might be useful. 

    # ForkEvent
    col("payload.forkee.id").alias("forkee_id"),
    col("payload.forkee.full_name").alias("forkee_name"),
    col("payload.forkee.language").alias("forkee_language"),

    # MemberEvent
    col("payload.member.id").alias("member_id"),
    col("payload.member.login").alias("member_login"),

)

    # Removed Event Types
    # WatchEvent - payload.action is always "started" and no other significant data to capture
    # PullRequestReviewCommentEvent - Same as WatchEvent
    # CommitCommentEvent - Comments are just text descriptions, no analytical value, plus low volume
    # GollumEvent - Wiki Edits, no analytical value
    # ReleaseEvent - Too low volume
    # PublicEvent - Too low volume

    # Removed columns that are notable
    # payload.pusher_type - literally all "user"
    # payload.forkee.stargazers_count - ALL 0
    # payload.forkee.watchers_count - ALL 0
    # payload.pull_request.state - Redundant with action
    # issue_created_at, pr_created_at - Redundant with created_at
    # issue_state - Can get from action field

#### Create Lookup Tables

In [0]:
# Creating lookup tables : Javi

# For columns that don't have a specified id and will be made into lookup tables
# You can assign a unique id for each value based on the row number to make a lookup table

# Repo Table
repo_lookup = (
    silver_test
    .select(
        'repo_id',
        'repo_name',
        'repo_language',
        'repo_default_branch',
        'repo_created_at',
        'repo_is_fork',
        'repo_is_private'
    )
    .where(col("repo_language").isNotNull()) # Keep rows w needed data
    .dropDuplicates(['repo_id']) # THEN drop dupes
)

# Org Table
org_lookup = (
    silver_test
    .select('organization_id', 'org_login')
    .where(col("organization_id").isNotNull()) # Don't want null values in lookup table
    .dropDuplicates(['organization_id'])
)

# Main event table added to other events


# After this, since we already have the IDs for the values in the dataframe, we would just drop the value columns, no need to do a join

#### Actor id,login Section

In [0]:
# Making the actor dimension table extracting only the 3 columns we need

# ALL VALIDATED (just some variable names updated), IT WORKS WELL
actor_dim_df = silver_test.select('actor_id','actor_login','created_at')

In [0]:
# Getting a unique login for each id
# We need to get the distinct id and login combinations, but we don't want the distinct timestamps as it would give us a larger row count than what it actually is
# We group by id and login, and for each combination, we get the max timestamp for each group
actor_login_ts_df = (
    silver_test
    .select(
        col('actor_id').alias('actor_id'),
        col('actor_login').alias('actor_login'),
        col('created_at')
    ).groupBy('actor_id','actor_login').agg(max('created_at').alias('created_at'))
)

In [0]:

# We then partition by id, and order by the timestamp in descending order
# We order by descending order, this is because we want to keep the latest timestamp
w = Window.partitionBy("actor_id").orderBy(col("created_at").desc())

# Since the latest timestamp will be row number 1 we filter our dataframe by row 1, the result is we keep the login with the latest timestamp
latest_login_df = (
    actor_login_ts_df
    .withColumn("rn", row_number().over(w))
    .filter(col("rn") == 1)
    .drop("rn", "created_at")
)

# After all of this, we can then remove the timestamp column and we have our lookup table!
# How this could work for the whole dataset:
    # Every time we ingest a new batch of data, since we'll probably be ingesting by date order, we can join the previous and new dataframes on ids after having run the code above, that way, we always have the latest login for the id
    # That or we just specify lookup tables for each month or however we'll be ingesting data when we run the pipeline and run separate joins when we have all the lookup tables


#### Create Fact Tables

In [0]:
# To create the specific events tables, we select the columns we need, and filter by event type

# CHRIS

# ALL VALIDATED
''' Tables:
fact_events
fact_pull_request
fact_issue
fact_push_event
fact_create_event
fact_delete_event
fact_fork_event
fact_member_event
fact_issue_comment
'''

# Main event Table
fact_events = (
    silver_test
    .select(
        'event_id',
        'event_type', 
        'created_at',
        'actor_id',
        'repo_id',
        'organization_id'
    )
)

# PR Fact Table
fact_pull_request = silver_test.select(
    'event_id',
    'action',
    'pr_closed_at',
    'pr_additions',
    'pr_deletions',
    'pr_changed_files',
    'pr_commits',
    'pr_base_ref',
    'pr_head_ref',
    'pr_merged'
).where(silver_test.event_type == 'PullRequestEvent')

# Issue Fact Table
fact_issue = silver_test.select(
    'event_id',
    'action',
    'issue_closed_at'
).where(silver_test.event_type == 'IssuesEvent')

# Push Event Fact Table
fact_push_event = silver_test.select(
    'event_id',
    'commit_count',
    'distinct_size',
    'ref'
).where(silver_test.event_type == 'PushEvent')

# Create Event Fact Table
fact_create_event = silver_test.select(
    'event_id',
    'ref',
    'ref_type'
).where(silver_test.event_type == 'CreateEvent')

# Delete Event Fact Table
fact_delete_event = silver_test.select(
    'event_id',
    'ref',
    'ref_type'
).where(silver_test.event_type == 'DeleteEvent')

# Fork Event Fact Table
fact_fork_event = silver_test.select(
    'event_id',
    'forkee_id',
    'forkee_name',
    'forkee_language'
).where(silver_test.event_type == 'ForkEvent')

# Member Event Fact Table
fact_member_event = silver_test.select(
    'event_id',
    'action',
    'member_id',
    'member_login'
).where(silver_test.event_type == 'MemberEvent')

# Issue Fact Table 
fact_issue_comment = silver_test.select(
    'event_id',
    'comment_id',
    'issue_number'
).where(silver_test.event_type == 'IssueCommentEvent')

# This can be done for any other event type that has specific information!

#### Partition by EventType as 128 MB parquet files

In [0]:
# display(dbutils.fs.ls("abfss://team3-project2@20251124eyproject2.dfs.core.windows.net/"))

In [0]:
import pandas as pd

# I think we could add in all tables here since everything will be calculated anyways? 
# org_lookup, the actor table 
# intentionally added wrong_ so you dont run it again
wrong_tables = {
    "fact_events": fact_events,
    "fact_pull_request": fact_pull_request,
    "fact_issue": fact_issue,
    "fact_push_event": fact_push_event,
    "fact_create_event": fact_create_event,
    "fact_delete_event": fact_delete_event,
    "fact_fork_event": fact_fork_event,
    "fact_member_event": fact_member_event,
    "fact_issue_comment": fact_issue_comment,
    "repo_lookup": repo_lookup,
    "actor_lookup": latest_login_df,
    "org_lookup": org_lookup
}

# NOTE: Don't need this anymore since scale_factor is redundant
# CHANGE LATER since we'll be processing by month
# create a dict later to calculate processed files?
# days_in_month = {
#     "jan": 31, "feb": 28, "mar": 31, "apr": 30,
#     "may": 31, "jun": 30, "jul": 31, "aug": 31,
#     "sep": 30, "oct": 31, "nov": 30, "dec": 31
# }
# days * 24 hours
# files_in_month = days_in_month[month] * 24
# Don't need scale factor anymore, we are estimating sum of bytes in each cleaned dataframe of the month instead
# scale_factor = 8760 / 144

# 128MB target size
TARGET_PARTITION_SIZE = 128 * 1024 * 1024
# NOTE: conversion between cleaned dataframe to parquet
# after running test, changed 0.25 to 0.117
# 102 MB / 0.25 = 408 MB
# 47.77 MB in container / 408
parquet_compression = 0.117 # estimate is closer to actual size now

# where to write to
path_to_silver = "abfss://team3-project2@20251124eyproject2.dfs.core.windows.net/silver/"

# converts a single row in the cleaned dataframe to json, then computes size
# TEST
def size_in_bytes(pdf: pd.DataFrame) -> pd.DataFrame:
    total = pdf.apply(lambda row: row.to_json().encode("utf-8").__len__(), axis=1).sum()
    return pd.DataFrame({"total_bytes": [total]})

for table_name, df in tables.items():
    # Step 1: Estimate sample size in bytes with the function above
    # TEST
    # Sample 1% of rows instead of the entire dataframe, used to calculate avg row size
    df_sample = df.sample(fraction=0.01, seed=42)
    # Calculates total bytes in the sample
    total_bytes_sample = df_sample.mapInPandas(
        lambda pdf_iter: (size_in_bytes(pdf) for pdf in pdf_iter),
        schema="total_bytes long"
    ).agg({"total_bytes": "sum"}).collect()[0][0]

    # Gets average row size using the sample
    num_sample_rows = df_sample.count()
    avg_row_size = total_bytes_sample / num_sample_rows
    
    # Step 2: Scales to the full dataframe -> parquet
    # total_rows = df.count() # this would take too long
    total_rows_estimate = int(num_sample_rows / 0.01) # this is a bit off but better since we are only using 1% of dataframe
    # total_size_estimate_bytes of dataframe as parquet
    total_size_estimate_bytes = avg_row_size * total_rows_estimate * parquet_compression # removed: * scale_factor
    
    # Step 3: Computes the number of 128MB partitions we have
    rows_per_partition = TARGET_PARTITION_SIZE / (avg_row_size * parquet_compression)
    # if num of partition is 0, then default to 1
    # this will happen if dataframe is less than 128MB
    num_partitions = int(total_rows_estimate / rows_per_partition)
    if(num_partitions == 0):
        num_partitions = 1
    
    # Step 4: Print estimates, Use this for logging?
    #print(f"Table: {table_name}\n") # maybe add in the month the table is from?
    #print(f"Estimated total rows: {int(total_rows_estimate)}\n")
    #print(f"Estimated total size as parquet (GB): {total_size_estimate_bytes / (1024**3):.2f}\n")
    #print(f"Recommended partitions for ~128MB files: {num_partitions}\n\n")

    # NOTE: Step 5: Writes to the disk, need to add month to file path
    # ex: path_to_silver + table_name + month + "/"
    df.repartition(num_partitions).write.mode("overwrite").parquet(path_to_silver + table_name + "/")



#### Testing below

In [0]:
# display(df_small.limit(5))

In [0]:

'''
from pyspark.sql.functions import collect_set,count

actor_dim_df \
    .groupBy("actor_id") \
    .agg(
        collect_set("actor_login").alias("actor_logins"),
        count("*").alias("row_count")
    ) \
    .filter("row_count > 1") \
    .orderBy("actor_id") \
    .show(truncate=False)

SOLVED, BY JAVI

Still need to figure out how to deal with multi actor_logins for same actor_id
- most recent one
- column for each actor_login, explode
- if more than 2 actor logins show up, maybe a column for original name and one for current name?

Original:
|actor_id|actor_logins             |row_count
|82479   |[Laith, LaithLaith]      |2 

I don't think we need to change any past rows, just future ones when a new actor_login appears (username change)

So maybe it can look like this? 
We can also just ignore original_actor_login and stick with the current one since we aren't doing too much analytics on actor_login

Option 1:
|actor_id   |original_actor_login   |current_actor_login   |row_count
|82479      |Laith                  |Laith                 |2 
|82479      |Laith                  |LaithLaith            |2                  <- this event happened after new actor_login is added

Option 2:
|actor_id   |current_actor_login   |row_count
|82479      |Laith                 |2 
|82479      |LaithLaith            |2                    <- this event happened after new actor_login is added

Option 3:
'''


'\nfrom pyspark.sql.functions import collect_set,count\n\nactor_dim_df     .groupBy("actor_id")     .agg(\n        collect_set("actor_login").alias("actor_logins"),\n        count("*").alias("row_count")\n    )     .filter("row_count > 1")     .orderBy("actor_id")     .show(truncate=False)\n\nSOLVED, BY JAVI\n\nStill need to figure out how to deal with multi actor_logins for same actor_id\n- most recent one\n- column for each actor_login, explode\n- if more than 2 actor logins show up, maybe a column for original name and one for current name?\n\nOriginal:\n|actor_id|actor_logins             |row_count\n|82479   |[Laith, LaithLaith]      |2 \n\nI don\'t think we need to change any past rows, just future ones when a new actor_login appears (username change)\n\nSo maybe it can look like this? \nWe can also just ignore original_actor_login and stick with the current one since we aren\'t doing too much analytics on actor_login\n\nOption 1:\n|actor_id   |original_actor_login   |current_acto

In [0]:
path_to_silver = "abfss://team3-project2@20251124eyproject2.dfs.core.windows.net/silver/"

tables = [
    "actor_lookup",
    "org_lookup", 
    "repo_lookup",
    "fact_events",
    "fact_push_event",
    "fact_create_event",
    "fact_delete_event",
    "fact_pull_request",
    "fact_issue",
    "fact_fork_event",
    "fact_member_event",
    "fact_issue_comment"
]

for table in tables:
    print(f"=== {table} ===")
    df = spark.read.parquet(path_to_silver + table + "/")
    print(f"Row count: {df.count():,}")
    df.show(3)
    print("\n")

ALL IS WELL

=== actor_lookup ===
Row count: 4,331,563
+--------+----------------+
|actor_id|     actor_login|
+--------+----------------+
|     159|technicalpickles|
|     170|        rbazinet|
|     314|        zmoazeni|
+--------+----------------+
only showing top 3 rows


=== org_lookup ===
Row count: 281,479
+---------------+--------------------+
|organization_id|           org_login|
+---------------+--------------------+
|         362782|              yzalis|
|       10242389|travis-infrastruc...|
|       10772219|          clickberry|
+---------------+--------------------+
only showing top 3 rows


=== repo_lookup ===
Row count: 726,388
+--------+--------------------+-------------+-------------------+--------------------+------------+---------------+
| repo_id|           repo_name|repo_language|repo_default_branch|     repo_created_at|repo_is_fork|repo_is_private|
+--------+--------------------+-------------+-------------------+--------------------+------------+---------------+
|37734226|Te