In [18]:
import dlt 

from dlt.sources.helpers.rest_client import RESTClient

import duckdb

In [19]:
secret = dlt.secrets.get("access_token")

client = RESTClient(
    base_url="https://api.github.com",
    headers={"Authorization": f"Bearer {secret}",
        "Accept": "application/json",
    },
)


In [20]:
def get_github_issues():

    for page in client.paginate(
        "/repos/dlt-hub/dlt/issues", params={"state": "all", "per_page": 100}
    ):
        for record in page:
            
            yield record

In [21]:
originals=[x for x in get_github_issues()]

print(originals[0].keys())

dict_keys(['url', 'repository_url', 'labels_url', 'comments_url', 'events_url', 'html_url', 'id', 'node_id', 'number', 'title', 'user', 'labels', 'state', 'locked', 'assignee', 'assignees', 'milestone', 'comments', 'created_at', 'updated_at', 'closed_at', 'author_association', 'type', 'sub_issues_summary', 'active_lock_reason', 'body', 'closed_by', 'reactions', 'timeline_url', 'performed_via_github_app', 'state_reason'])


In [23]:
github_issues_resource = dlt.resource(
    get_github_issues(), name="issues", write_disposition="merge", primary_key="id"
)

pipeline = dlt.pipeline(
    pipeline_name="github_issues", destination=dlt.destinations.duckdb("duckdb:////workspaces/dlt_presentation/dlt_presentation/github_issues.duckdb"), dataset_name="github_data"
)

load_info = pipeline.run(github_issues_resource, table_name="get_github_issues")

print(f"Load info: {load_info}")

Load info: Pipeline github_issues load step completed in 6.26 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_data
The duckdb destination used duckdb:////workspaces/dlt_presentation/dlt_presentation/github_issues.duckdb location to store data
Load package 1744236531.1541185 is LOADED and contains no failed jobs


In [24]:
conn = duckdb.connect("./github_issues.duckdb")

query = """
SELECT columns    
FROM github_data.get_github_issues LIMIT 10
"""

query2="SELECT column_name  FROM information_schema.columns WHERE table_name = 'get_github_issues' LIMIT 3"

result = conn.execute(query2).fetchall()

for row in result:
    print(row)

('url',)
('repository_url',)
('labels_url',)


In [25]:

def get_modified_issues():

  for issue in get_github_issues():

    issue["New"] = "True"

    yield issue




In [26]:
originals=[x for x in get_modified_issues()]

print(originals[0].keys())

dict_keys(['url', 'repository_url', 'labels_url', 'comments_url', 'events_url', 'html_url', 'id', 'node_id', 'number', 'title', 'user', 'labels', 'state', 'locked', 'assignee', 'assignees', 'milestone', 'comments', 'created_at', 'updated_at', 'closed_at', 'author_association', 'type', 'sub_issues_summary', 'active_lock_reason', 'body', 'closed_by', 'reactions', 'timeline_url', 'performed_via_github_app', 'state_reason', 'New'])


In [27]:
load_info = pipeline.run(
    get_modified_issues(),
    table_name="get_github_issues",
    write_disposition="merge",
    primary_key="id"
)

print(f"Load info: {load_info}")


Load info: Pipeline github_issues load step completed in 6.17 seconds
1 load package(s) were loaded to destination duckdb and into dataset github_data
The duckdb destination used duckdb:////workspaces/dlt_presentation/dlt_presentation/github_issues.duckdb location to store data
Load package 1744236659.8337326 is LOADED and contains no failed jobs


In [28]:
conn = duckdb.connect("./github_issues.duckdb")

query = """
SELECT count(*)    
FROM github_data.get_github_issues where New is null
"""

query2="SELECT column_name  FROM information_schema.columns WHERE table_name = 'get_github_issues'"

result = conn.execute(query).fetchall()

for i in result:

  print(i)





(9900,)
