# Silver Layer
---

In [0]:
%run "/Shared/20231023 Demos/ADLS Setup Variables_SP"

In [0]:
from pyspark.sql.functions import col, explode
from pyspark.sql.types import StructType, StructField, LongType, IntegerType, StringType, BooleanType, DateType, TimestampType

### Connect to Azure Data Lake Storage

In [0]:
storage_account = '20231023desa'
container_name1 = "team2-project2"
container_name2 = "team3-project2"
paths=[f'abfss://{container_name1}@{storage_account}.dfs.core.windows.net/BronzeLayer/',f'abfss://{container_name2}@{storage_account}.dfs.core.windows.net/BronzeLayer/']

### Azure service principal

In [0]:
spark.conf.set(f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net", client_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net", client_secret)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net", f"https://login.microsoftonline.com/{tenant_id}/oauth2/token")

### Reading in Parquet files

In [0]:
df1 = spark.read.format('parquet').load(paths[0])
df2 = spark.read.format('parquet').load(paths[1])
df1 = df1.drop("created_at")
df1 = df1.withColumnRenamed("date", "created_at")

In [0]:
main_df = df1.union(df2)

### Dropping true duplicates

In [0]:
main_df = main_df.dropDuplicates()

### Dropping unecessary data from main dataframe

In [0]:
main_df = main_df.select(col("id").cast(LongType()).alias("event_id"),
                         col("actor.id").alias("actor_id"),
                         col("actor.login").alias("actor_login"),
                         col("org.id").alias("org_id"),
                         col("org.login").alias("org_login"),
                         col("repo.id").alias("repo_id"),
                         col("repo.name").alias("repo_name"),
                         "public","type","created_at","payload")

### Creating actor, org, repo Dataframes

In [0]:
actor_df = main_df.select("actor_id", "actor_login", "created_at").filter(
    col("actor_login").isNotNull() & col("actor_id").isNotNull()
)

In [0]:
org_df = main_df.select("org_id", "org_login", "created_at").filter(
    col("org_login").isNotNull() & col("org_id").isNotNull()
)

In [0]:
repo_df = main_df.select("repo_id", "repo_name", "created_at").filter(
    col("repo_id").isNotNull() & col("repo_name").isNotNull()
)

### Drop data already in tables

In [0]:
main_df = main_df.drop("actor", "org", "repo")

### Creating Event tables

In [0]:
commit_comment_event_df = main_df.where(col("type") == "CommitCommentEvent")
sponsorship_event_df = main_df.where(col("type") == "SponsorshipEvent")
create_event_df = main_df.where(col("type") == "CreateEvent")
delete_event_df = main_df.where(col("type") == "DeleteEvent")
fork_event_df = main_df.where(col("type") == "ForkEvent")
push_event_df = main_df.where(col("type") == "PushEvent")
pull_request_review_thread_event_df = main_df.where(col("type") == "PullRequestReviewThreadEvent")
pull_request_review_comment_event_df = main_df.where(col("type") == "PullRequestReviewCommentEvent")
pull_request_review_event_df = main_df.where(col("type") == "PullRequestReviewEvent")
pull_request_event_df = main_df.where(col("type") == "PullRequestEvent")
gollum_event_df = main_df.where(col("type") == "GollumEvent")
issue_comment_event_df = main_df.where(col("type") == "IssueCommentEvent")
issues_event_df = main_df.where(col("type") == "IssuesEvent")
member_event_df = main_df.where(col("type") == "MemberEvent")
public_event_df = main_df.where(col("type") == "PublicEvent")
release_event_df = main_df.where(col("type") == "ReleaseEvent")
watch_event_df = main_df.where(col("type") == "WatchEvent")

### Storing necessary data from payload per event table

In [0]:
commit_comment_event_df = commit_comment_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

sponsorship_event_df = sponsorship_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

create_event_df = create_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.ref_type").alias("ref_type"))

delete_event_df = delete_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.ref_type").alias("ref_type"))

fork_event_df = fork_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at")

push_event_df = push_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.size").cast(IntegerType()).alias("num_commits"), col("payload.ref").alias("payload_ref"), col("payload.commits.author.email").alias("author_email"), col("payload.commits.author.name").alias("author_name"), col("payload.commits.message").alias("message"))

pull_request_review_thread_event_df = pull_request_review_thread_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

pull_request_review_comment_event_df = pull_request_review_comment_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

pull_request_review_event_df = pull_request_review_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

pull_request_event_df = pull_request_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

gollum_event_df = gollum_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at")

issue_comment_event_df = issue_comment_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

issues_event_df = issues_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

member_event_df = member_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

public_event_df = public_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at")

release_event_df = release_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

watch_event_df = watch_event_df.select("event_id","type", "actor_id", "org_id","repo_id","public", "created_at", col("payload.action").alias("action"))

### Find the correct partition size for each DataFrame

In [0]:
num_cores = 8

def find_partition_num(df):
    size_bytes = sc._jvm.org.apache.spark.util.SizeEstimator.estimate(df._jdf)
    size_megabytes = (size_bytes / (1024**2))
    partitions_for_month = max(1, int(size_megabytes / 128))
    partitions_for_day = max(1, int(partitions_for_month /31))
    partition_num = partitions_for_day
    remainder = partition_num % num_cores
    if remainder != 0:
        partition_num += num_cores - remainder
    return partition_num


### Writing to Silver Layer

In [0]:
dataframe_list = [
    (commit_comment_event_df, "commit_comment_event"),
    (sponsorship_event_df, "sponsorship_event"),
    (create_event_df, "create_event"),
    (delete_event_df, "delete_event"),
    (fork_event_df, "fork_event"),
    (push_event_df, "push_event"),
    (pull_request_review_thread_event_df, "pull_request_review_thread_event"),
    (pull_request_review_comment_event_df, "pull_request_review_comment_event"),
    (pull_request_review_event_df, "pull_request_review_event"),
    (pull_request_event_df, "pull_request_event"),
    (gollum_event_df, "gollum_event"),
    (issue_comment_event_df, "issue_comment_event"),
    (issues_event_df, "issues_event"),
    (member_event_df, "member_event"),
    (public_event_df, "public_event"),
    (release_event_df, "release_event"),
    (watch_event_df, "watch_event"),
    (actor_df, "actor"),
    (org_df, "org"),
    (repo_df, "repo")
]

In [0]:
silver_conn_str = f"abfss://{container_name2}@{storage_account}.dfs.core.windows.net/SilverLayer"

In [0]:
def save_to_silver (df, event_name, repartitions):
    df.repartition(repartitions)\
    .write.format("parquet")\
    .option("header", True)\
    .partitionBy("created_at")\
    .mode("overwrite")\
    .save(silver_conn_str + f"/{event_name}")

In [0]:
for df, event_name in dataframe_list:
    save_to_silver(df, event_name, find_partition_num(df))