In [0]:
%run "./ADLS Setup Variables_SP"

In [0]:
from pyspark.sql.functions import when, from_json, col, explode, lit, udf, split, count, size, element_at, hour, day, weekofyear, month, year, date_format, count, avg, sum
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
import pandas as pd
import matplotlib.pyplot as plt
import os


## Loading in the gold layer

In [0]:
org_dim = spark.read.format('parquet').load(gold_path + "OrgDim")
repo_dim = spark.read.format('parquet').load(gold_path + "RepoDim")
actor_dim = spark.read.format('parquet').load(gold_path + "ActorDim")
event_fact = spark.read.format('parquet').load(gold_path + "EventFact")
event_dim = spark.read.format('parquet').load(gold_path + "EventDim")
pe_dim = spark.read.format('parquet').load(gold_path + "PushEventDim")
# commit_dim = spark.read.format('parquet').load(gold_path + "CommitDim")

# Use Case 1 : Data aggregated by type of GitHub event per hour 


In [0]:
ed = spark.read.format('parquet').load('dbfs:/FileStore/TeamFour/Gold/EventDim')

In [0]:

#df = spark.read.format('parquet').load(table_path)
#df = read_table("EventTable")   #the string will need to be changed to what the folder/file is called


In [0]:
ed = ed.withColumn("hour", hour(col("created_at")))
#df = df.repartition("created_day", "hour")  # this made it slower ...


In [0]:
display(ed)

In [0]:
display(ed.groupBy("type","hour").agg(count("type").alias("count")))

# Use Case 2 : PushEvent data aggregated by ref type – whether the commit is on the main branch 


In [0]:

#df = spark.read.format('parquet').load(table_path)
#df = read_table("PushEventTable")   #the string will need to be changed to what the folder/file is called


In [0]:
pe = spark.read.format('parquet').load('dbfs:/FileStore/TeamFour/Gold/PushEventDim')

In [0]:
pe = pe.withColumn("is_main",
    when(col("ref").endswith("main"), "True")
    .when(col("ref").endswith("master"), "True")
    .otherwise("False"))

In [0]:
display(pe)

In [0]:
main_branch = pe.groupBy("is_main").agg(count("is_main").alias("count"))
# maybe cache this??

In [0]:
display(main_branch)

is_main,count
False,11305275
True,37381673


Databricks visualization. Run in Databricks to view.

In [0]:
# Extract data for the chart
labels = main_branch.select('is_main').rdd.flatMap(lambda x: x).collect()
values = main_branch.select('count').rdd.flatMap(lambda x: x).collect()

In [0]:
# Create the bar chart
plt.figure(figsize=(8, 6))
bars = plt.bar(labels, values, color=['blue', 'green'])
plt.xlabel('Commited to Main Branch')
plt.ylabel('count')
plt.yscale('log')
plt.title('Count of Main Branch Committed')
for bar, value in zip(bars, values):
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width() / 2, height, value,
             ha='center', va='bottom')
# Display the chart
plt.show()

# Use Case 3 : Breakdown of events by type and number of commits per event 



In [0]:
event_type = spark.read.format('parquet').load(gold_path + "EventDim").join(event_fact['id','pe_size','pe_distinct_size'],'id', 'left' ).groupby('type').agg(count('id').alias('NumEvents'), sum('pe_size').alias('NumCommits'))

In [0]:
event_type.display()
# event_type.summary().show()

type,NumEvents,NumCommits
PullRequestReviewEvent,2500840,
PushEvent,50904451,189051065.0
GollumEvent,234357,
ReleaseEvent,644171,
CommitCommentEvent,506486,
CreateEvent,14718471,
PullRequestReviewCommentEvent,1438591,
IssueCommentEvent,5329569,
DeleteEvent,3623980,
IssuesEvent,2280423,


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

# Use Case 4 : User activity should be aggregated so that a filterable chart can be populated with breakdowns of user activity by week or month. 

In [0]:
uc4_df = event_fact \
    .join(actor_dim, event_fact.actor_id == actor_dim.id) \
    .join(event_dim, event_fact.event_id == event_dim.id)

In [0]:
activity_by_week_month = uc4_df.groupBy(
    actor_dim.display_login,
    # year(col("created_at")).alias("year"),
    weekofyear(col("created_at")).alias("week"),
    month(col("created_at")).alias("month")
).agg(
    count("*").alias("activity_count")
).orderBy("month", "week")

In [0]:
activity_by_week_month.createOrReplaceTempView("uc4_activity_view")

# potentially cache this?

In [0]:
def display_user_activity(username):
    query = f"""
        SELECT * FROM uc4_activity_view WHERE display_login = '{username}'
    """
    user_activity_df = spark.sql(query)
    display(user_activity_df)


In [0]:
display_user_activity("matttbe")

display_login,week,month,activity_count
matttbe,1,1,171
matttbe,2,1,151
matttbe,3,1,90
matttbe,4,1,219


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.


# Use Case 5 : Comparison of Bot vs. Human activities

In [0]:
joined_df = event_fact.join(actor_dim, event_fact.actor_id == actor_dim.id)

# Create a temporary view
joined_df.createOrReplaceTempView("activity_view")

In [0]:
def display_bot_vs_human_activity():
    query = """
    SELECT 
        is_bot,
        COUNT(*) as activity_count
    FROM 
        activity_view
    GROUP BY 
        is_bot
    """
    
    # Execute the query
    bot_human_activity_df = spark.sql(query)

    # Display the result
    display(bot_human_activity_df)


In [0]:
display_bot_vs_human_activity()

is_bot,activity_count
True,18928026
False,79908081


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.