# Delta Lake for ETL

Project Workflow
1. EXTRACT: Fetch PR data from Github
2. TRANSFORM: subset + clean up text with regex
3. LOAD: store as Delta Table

Oh wait, we actually need usernames
1. Get usernames
2. Remove duplicates
3. Join two tables
4. Store as Delta Table: new column so **schema evolution** + **schema enforcement**

Now we need to scale up
1. Get more repos
2. Partitioning / Query performance / Fetch multiple repos in parallel

Make a mistake
1. Time travel / Roll back

Still to show:
- ACID guarantees:
- Time travel
- 

In [1]:
import requests
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from delta import *

In [2]:
builder = SparkSession.builder.master("local[4]").appName("parallel") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [21]:
# Define the Delta table path
delta_table_path = "/data/github_data"

# Step 1: Fetch data from GitHub API
def fetch_github_data():
    # GitHub API URL
    repo = "delta-io/delta"
    end_date = datetime.now()
    start_date = end_date - timedelta(days=1)
    params = {
        "state": "all",
        "since": start_date.isoformat(),
        "per_page": 100
    }
    
    prs_url = f"https://api.github.com/repos/{repo}/pulls"
    prs_response = requests.get(prs_url, params=params)
    prs = prs_response.json()
    
    return prs #, issues

In [22]:
prs = fetch_github_data()

In [25]:
len(prs)

100

In [23]:
prs[0]

{'url': 'https://api.github.com/repos/delta-io/delta/pulls/3269',
 'id': 1917635149,
 'node_id': 'PR_kwDOCuYOpM5yTMpN',
 'html_url': 'https://github.com/delta-io/delta/pull/3269',
 'diff_url': 'https://github.com/delta-io/delta/pull/3269.diff',
 'patch_url': 'https://github.com/delta-io/delta/pull/3269.patch',
 'issue_url': 'https://api.github.com/repos/delta-io/delta/issues/3269',
 'number': 3269,
 'state': 'open',
 'locked': False,
 'title': '[Kernel] Add API on TxnBuilder to set the table properties',
 'user': {'login': 'EstherBear',
  'id': 55441375,
  'node_id': 'MDQ6VXNlcjU1NDQxMzc1',
  'avatar_url': 'https://avatars.githubusercontent.com/u/55441375?v=4',
  'gravatar_id': '',
  'url': 'https://api.github.com/users/EstherBear',
  'html_url': 'https://github.com/EstherBear',
  'followers_url': 'https://api.github.com/users/EstherBear/followers',
  'following_url': 'https://api.github.com/users/EstherBear/following{/other_user}',
  'gists_url': 'https://api.github.com/users/EstherBe

In [24]:
prs[0].keys()

dict_keys(['url', 'id', 'node_id', 'html_url', 'diff_url', 'patch_url', 'issue_url', 'number', 'state', 'locked', 'title', 'user', 'body', 'created_at', 'updated_at', 'closed_at', 'merged_at', 'merge_commit_sha', 'assignee', 'assignees', 'requested_reviewers', 'requested_teams', 'labels', 'milestone', 'draft', 'commits_url', 'review_comments_url', 'review_comment_url', 'comments_url', 'statuses_url', 'head', 'base', '_links', 'author_association', 'auto_merge', 'active_lock_reason'])

In [26]:
pr_keys = ['id', 'number', 'title', 'body']

In [27]:
prs_users = []
user_keys = ['login', 'id']

for pr in prs:
    pr_user = pr['user']
    try:
        pr_user_sub = {k: pr_user[k] for k in user_keys}
    except:
        pass
    prs_users.append(pr_user_sub)

users_df = spark.createDataFrame(prs_users)
print(f"{users_df.count()} users made PRs in this timeframe.")

# Remove duplicates
users_df = users_df.dropDuplicates()
print(f"Unique users: {users_df.count()}")

100 users made PRs in this timeframe.
Unique users: 28


In [28]:
users_df.show()

+---------+----------------+
|       id|           login|
+---------+----------------+
| 89107911|  allisonport-db|
|107926660|       longvu-db|
| 46442880|    tomvanbussel|
|159059218|       sumeet-db|
| 13973764|         zedtang|
| 55441375|      EstherBear|
|171533467|yumingxuanguo-db|
| 93710326|  andreaschat-db|
|  1719945|      vkorukanti|
|169104436|    andrewxue-db|
| 87341375|      linzhou-db|
|134436035|      ChengJi-db|
|  2551496|   prakharjain09|
|170372889|    anniewang-db|
|125324092|    dhruvarya-db|
|143959416|           zzl-7|
| 87336575|     richardc-db|
| 91381303|          xzhseh|
|112876214|       johanl-db|
| 22524871|       horizonzy|
+---------+----------------+
only showing top 20 rows



In [29]:
# Select relevant fields
keys_to_include = ['id', 'number', 'title', 'body']
prs_simple = []

# iterate over list of prs
for pr in prs:
    pr_subset = {k: pr[k] for k in keys_to_include}
    pr_subset['user_id'] = pr['user']['id']
    prs_simple.append(pr_subset)

# Create DataFrame
prs_df = spark.createDataFrame(prs_simple)

In [30]:
prs_df.show(3)

+--------------------+----------+------+--------------------+--------+
|                body|        id|number|               title| user_id|
+--------------------+----------+------+--------------------+--------+
|Add API on `Trans...|1917635149|  3269|[Kernel] Add API ...|55441375|
|<!--\r\nThanks fo...|1917578285|  3268|[DO_NOT_MERGE][4....|89107911|
|<!--\r\nThanks fo...|1917566048|  3267|[3.2 branch] Add ...|89107911|
+--------------------+----------+------+--------------------+--------+
only showing top 3 rows



In [31]:
users_df.show(3)

+---------+--------------+
|       id|         login|
+---------+--------------+
| 89107911|allisonport-db|
|107926660|     longvu-db|
| 46442880|  tomvanbussel|
+---------+--------------+
only showing top 3 rows



In [32]:
joined = prs_df.alias("a").join(
    users_df.alias("b"),
    prs_df.user_id == users_df.id,
    how = "left"
).select("a.body", "a.id", "a.number", "a.title", "a.user_id", "b.login")

In [33]:
joined.show()

+--------------------+----------+------+--------------------+---------+----------------+
|                body|        id|number|               title|  user_id|           login|
+--------------------+----------+------+--------------------+---------+----------------+
|Add API on `Trans...|1917635149|  3269|[Kernel] Add API ...| 55441375|      EstherBear|
|Add an end2end pr...|1915468737|  3262|[Kernel] Add an e...| 55441375|      EstherBear|
|<!--\r\nThanks fo...|1916829021|  3264|Improve documenta...| 93710326|  andreaschat-db|
|## Description\r\...|1916523493|  3263|[Tests] Improve t...| 93710326|  andreaschat-db|
|<!--\r\nThanks fo...|1917578285|  3268|[DO_NOT_MERGE][4....| 89107911|  allisonport-db|
|<!--\r\nThanks fo...|1917566048|  3267|[3.2 branch] Add ...| 89107911|  allisonport-db|
|https://github.co...|1915177976|  3260|[4.0 Preview][INF...| 89107911|  allisonport-db|
|<!--\r\nThanks fo...|1915174253|  3258|[INFRA] Update in...| 89107911|  allisonport-db|
|#### Which Delta ...

In [34]:
# Step 2: Transform data
def transform_data(prs):
    # Select relevant fields
    keys_to_include = ['id', 'number', 'title', 'body']
    prs_simple = []

    # iterate over list of prs
    for pr in prs:
        pr_subset = {k: pr[k] for k in keys_to_include}
        prs_simple.append(pr_subset)

    # Create DataFrame
    prs_df = spark.createDataFrame(prs_simple)

    # Create DataFrame with usernames and ids
    prs_users = []
    user_keys = ['login', 'id']
    
    for pr in prs:
        pr_user = pr['user']
        try:
            pr_user_sub = {k: pr_user[k] for k in user_keys}
        except:
            pass
        prs_users.append(pr_user_sub)

    users_df = spark.createDataFrame(prs_users)
    print(f"{users_df.count()} users made PRs in this timeframe.")
    
    # Remove duplicates
    users_df = users_df.dropDuplicates()
    print(f"Unique users: {users_df.count()}")
    
    prs_df = prs_df.dropDuplicates()
    
    return prs_df

In [35]:
prs_df = transform_data(prs)

100 users made PRs in this timeframe.
Unique users: 28


In [36]:
prs_df.show()

+--------------------+----------+------+--------------------+
|                body|        id|number|               title|
+--------------------+----------+------+--------------------+
|#### Which Delta ...|1910216852|  3242|[Spark] Use SQL e...|
|<!--\r\nThanks fo...|1910890046|  3243|[Spark][4.0] Add ...|
|<!--\r\nThanks fo...|1909496916|  3232|[Doc] Delta 4.0 -...|
|<!--\r\nThanks fo...|1915174253|  3258|[INFRA] Update in...|
|#### Which Delta ...|1910998991|  3244|[Spark] Throw exc...|
|<!--\r\nThanks fo...|1909984441|  3241|[Spark][4.0] Rena...|
|<!--\r\nThanks fo...|1916936636|  3266|    Add Hudi support|
|<!--\r\nThanks fo...|1909640823|  3234|[Delta Uniform] C...|
|<!--\r\nThanks fo...|1913290396|  3251|[Spark] Support c...|
|\r\n## Descriptio...|1909656417|  3235|[4.0preview][Kern...|
|<!--\r\nThanks fo...|1912719956|  3246|[DELTA] Add logs ...|
|https://github.co...|1915177976|  3260|[4.0 Preview][INF...|
|## Description\r\...|1916523493|  3263|[Tests] Improve t...|
|<!--\r\

In [37]:
# LOAD data to Delta Table
prs_df.write.format("delta").mode("overwrite").save("data/github_data")

## CONTINUE HERE
- run script tomorrow
- how to append only new rows?
  - use `id` col somehow maybe? 


- join > schema evolution
- scale up
- mistake rollback
- 

In [None]:
# Add a new column to add username from other table
joined = prs_df.alias("a").join(
    users_df.alias("b"),
    prs_df.user_id == users_df.id,
    how = "left"
).select("a.body", "a.id", "a.number", "a.title", "a.user_id", "b.login")


In [7]:
# Step 4: Demonstrate Benefits
def demonstrate_benefits():
    # Reliability: ACID transactions ensure data consistency
    df = spark.read.format("delta").load(delta_table_path)
    df.createOrReplaceTempView("github_data")
    
    # Scalability: handle large volumes of data efficiently
    total_count = spark.sql("SELECT COUNT(*) FROM github_data").collect()[0][0]
    print(f"Total records: {total_count}")
    
    # Schema Enforcement and Evolution: demonstrate adding a new column
    df = df.withColumn("processed_at", lit(datetime.now().isoformat()))
    df.write.format("delta").mode("overwrite").save(delta_table_path)
    
    # Query Performance: optimized for fast data retrieval
    recent_issues = spark.sql("SELECT * FROM github_data WHERE type = 'issue' AND created_at >= date_sub(current_date(), 7)")
    recent_issues.show()

In [None]:
if __name__ == "__main__":
    issues, prs = fetch_github_data()
    df = transform_data(issues, prs)
    load_data_to_delta(df)
    demonstrate_benefits()