# API's as a Data Source

In [1]:
# API's for batch extraction
# 1. Call an API at scheduled intervals (e.g every hour or day)
# 2. Retrieve all availbale data (e.g last 24 hours of records)
# 3. Store results in a database, data warehouse or file system

In [None]:
import requests
import json

def fetch_batch_data():
    url = "https://api.example.com/daily_reports"
    response = requests.get(url)
    data = response.json()

    with open('daily_reports.json', 'w') as file:
        json.dump(data, file, indent=4)

fetch_batch_data()

In [2]:
import requests

url = 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events'

response = requests.get(url)
print(response.json())

[{'id': '52602815580', 'type': 'WatchEvent', 'actor': {'id': 66147771, 'login': 'HephWisdom', 'display_login': 'HephWisdom', 'gravatar_id': '', 'url': 'https://api.github.com/users/HephWisdom', 'avatar_url': 'https://avatars.githubusercontent.com/u/66147771?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2025-07-28T00:36:16Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}, {'id': '52601462680', 'type': 'ForkEvent', 'actor': {'id': 33403958, 'login': 'singh3ss', 'display_login': 'singh3ss', 'gravatar_id': '', 'url': 'https://api.github.com/users/singh3ss', 'avatar_url': 'https://avatars.githubusercontent.com/u/33403958?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/

#### APIs for streaming data extraction

- there are API's that allow you to extract data in real-time or near real-time. this method is used for systems that require continuous data updates, such as social media feeds, financial market data, or IoT sensor data.

#### Common streaming API examples:
- Webhooks (Stripe, Github, Slack) - Real-time event notifications
- Social Media APIs (Twitter, Facebook) - Real-time posts and interactions
- Financial Market APIs (Alpha Vantage, IEX Cloud) - Real-time stock prices and trading data

#### How the streaming API extraction works:
1. API sends real-time updates as data changes
2. A webhook  or WebSocket listens for events
3. Data is processed immediately as it arrives instead of being stored in bulk

In [None]:
import websocket

def on_message(ws, message):
    print("Received message:", message)

ws = websocket.WebSocketApp("wss://api.example.com/stream", on_message=on_message)
ws.run_forever()

In [None]:
# Common challenges
import time
# 1. Rate Limiting
# 'requests' is already imported in a previous cell, so we can use it directly
url = 'https://api.github.com/rate_limit'
requests.get(url).json()

# sleep for some time to avoid hitting rate limits
remaining_requests = requests.get(url).json()['rate']['remaining']

if remaining_requests == 0:
    time.sleep(60)

{'resources': {'core': {'limit': 60,
   'remaining': 56,
   'reset': 1753672125,
   'used': 4,
   'resource': 'core'},
  'graphql': {'limit': 0,
   'remaining': 0,
   'reset': 1753672459,
   'used': 0,
   'resource': 'graphql'},
  'integration_manifest': {'limit': 5000,
   'remaining': 5000,
   'reset': 1753672459,
   'used': 0,
   'resource': 'integration_manifest'},
  'search': {'limit': 10,
   'remaining': 10,
   'reset': 1753668919,
   'used': 0,
   'resource': 'search'}},
 'rate': {'limit': 60,
  'remaining': 56,
  'reset': 1753672125,
  'used': 4,
  'resource': 'core'}}

In [4]:
# 2. Authentication
# Most APIs require an API key or token to access data securely. Without authentication, requests may be denied or limited.

# Types of Authentication:
# 1. API Keys - Simple key provided by the API provider, included in request headers or parameters.
# 2. OAuth tokens- More secure, allows users to authorize access without sharing credentials.
# 3. Basic Authentication - Username and password encoded in the request header.(less common)

# Example of using an API key in a request
import os
from dotenv import load_dotenv

load_dotenv()
GITHUB_ACCESS_TOKEN = os.getenv("GITHUB_ACCESS_TOKEN")

headers = {
    'Authorization': f'token {GITHUB_ACCESS_TOKEN}',
}

url = 'https://api.github.com/user'
requests.get(url, headers=headers).json()

{'login': 'TatendaTy',
 'id': 46075417,
 'node_id': 'MDQ6VXNlcjQ2MDc1NDE3',
 'avatar_url': 'https://avatars.githubusercontent.com/u/46075417?v=4',
 'gravatar_id': '',
 'url': 'https://api.github.com/users/TatendaTy',
 'html_url': 'https://github.com/TatendaTy',
 'followers_url': 'https://api.github.com/users/TatendaTy/followers',
 'following_url': 'https://api.github.com/users/TatendaTy/following{/other_user}',
 'gists_url': 'https://api.github.com/users/TatendaTy/gists{/gist_id}',
 'starred_url': 'https://api.github.com/users/TatendaTy/starred{/owner}{/repo}',
 'subscriptions_url': 'https://api.github.com/users/TatendaTy/subscriptions',
 'organizations_url': 'https://api.github.com/users/TatendaTy/orgs',
 'repos_url': 'https://api.github.com/users/TatendaTy/repos',
 'events_url': 'https://api.github.com/users/TatendaTy/events{/privacy}',
 'received_events_url': 'https://api.github.com/users/TatendaTy/received_events',
 'type': 'User',
 'user_view_type': 'public',
 'site_admin': False,

In [5]:
# 3. Pagination
# Many APIs return data in pages to limit the amount of data sent in a single response. 
# You may need to handle pagination to retrieve all available data.

# Example of handling pagination
url = 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events'

response = requests.get(url)
response


# what the above does
# 1. Starts at page 1 and makes a GET request to the API endpoint.
# 2. Retrieves JSON data from the response.
# 3. Checks if there are more pages by looking for a 'next' link in the response headers.
# 4. If there is a 'next' link,it updates BASE_URL and makes requests until all pages are retrieved.
# NB different API's handle pagination differently (such as offsets, cursors, page numbers, or tokens instead of links), so there's need to adjust the code based on the API's documentation. 



<Response [200]>

In [6]:
url = 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events'

while True:
    response = requests.get(url)
    data = response.json()
    print(len(data), "events retrieved")

    if 'next' not in response.links:
        break

    url = response.links['next']['url']

30 events retrieved
30 events retrieved
30 events retrieved
30 events retrieved
30 events retrieved
30 events retrieved
30 events retrieved
30 events retrieved
30 events retrieved
27 events retrieved


In [None]:
# 4. Avoiding memory issues while extracting
# to prevent the pipeline from crashing, control memory usage
# Challenges with memory
# 1. many pipelines run on systems with limited memory, i.e serverless functions or shared clusters
# 2. loading all the data into memory at once can lead to memory errors or slow performance

In [7]:
# Solution for batch processing/streaming data
# Here streaming means that data is processed in smaller chunks or batches, rather than loading everything into memory at once. This approach helps to manage memory usage and allows for processing large datasets efficiently.
# Use streaming to transfer data between buffers, such as:
# 1. from API's to local files
# 2. from webhooks to event queries
# 3. from event queries(like Kafka) to storage buckets

def events_getter():
    url = 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp/events'
    
    while True:
        response = requests.get(url)
        data = response.json()
        yield data
        
        if 'next' not in response.links:
            break
        
        url = response.links['next']['url']

In [8]:
events_pages = events_getter()

for events_page in events_pages:
    # Process each page of events
    # print(len(events_getter), "events retrieved")
    # Here you can save the events to a file or database, or process them as needed
    # For example, you could write them to a JSON file:
    # with open('events.json', 'a') as f:
    #     json.dump(events_getter, f)
    #     f.write('\n')  # Write each page on a new line
    # use this for now
    print(events_page)  # Print or process the events as needed

[{'id': '52602815580', 'type': 'WatchEvent', 'actor': {'id': 66147771, 'login': 'HephWisdom', 'display_login': 'HephWisdom', 'gravatar_id': '', 'url': 'https://api.github.com/users/HephWisdom', 'avatar_url': 'https://avatars.githubusercontent.com/u/66147771?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/data-engineering-zoomcamp', 'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'}, 'payload': {'action': 'started'}, 'public': True, 'created_at': '2025-07-28T00:36:16Z', 'org': {'id': 72699292, 'login': 'DataTalksClub', 'gravatar_id': '', 'url': 'https://api.github.com/orgs/DataTalksClub', 'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}, {'id': '52601462680', 'type': 'ForkEvent', 'actor': {'id': 33403958, 'login': 'singh3ss', 'display_login': 'singh3ss', 'gravatar_id': '', 'url': 'https://api.github.com/users/singh3ss', 'avatar_url': 'https://avatars.githubusercontent.com/u/33403958?'}, 'repo': {'id': 419661684, 'name': 'DataTalksClub/

In [9]:
# normalization example
event = events_page[0]
event

{'id': '52040127738',
 'type': 'WatchEvent',
 'actor': {'id': 173776489,
  'login': 'LoneMusashi',
  'display_login': 'LoneMusashi',
  'gravatar_id': '',
  'url': 'https://api.github.com/users/LoneMusashi',
  'avatar_url': 'https://avatars.githubusercontent.com/u/173776489?'},
 'repo': {'id': 419661684,
  'name': 'DataTalksClub/data-engineering-zoomcamp',
  'url': 'https://api.github.com/repos/DataTalksClub/data-engineering-zoomcamp'},
 'payload': {'action': 'started'},
 'public': True,
 'created_at': '2025-07-14T06:37:48Z',
 'org': {'id': 72699292,
  'login': 'DataTalksClub',
  'gravatar_id': '',
  'url': 'https://api.github.com/orgs/DataTalksClub',
  'avatar_url': 'https://avatars.githubusercontent.com/u/72699292?'}}

In [10]:
def process_event(event):
    result = {}

    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']
    result['created_at'] = event['created_at']

    result['actor__id'] = event['actor']['id']
    result['actor__login'] = event['actor']['login']

    return result

In [11]:
processed_events = [] # Initialize an empty list to store processed events

# Process each event in the current page
for event in events_page:
    processed_events.append(process_event(event))  # Append the processed event to the list

processed_events

[{'id': '52040127738',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-14T06:37:48Z',
  'actor__id': 173776489,
  'actor__login': 'LoneMusashi'},
 {'id': '52036538730',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-14T04:24:54Z',
  'actor__id': 43141076,
  'actor__login': 'yankunsong'},
 {'id': '52029959989',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-13T21:51:35Z',
  'actor__id': 102864198,
  'actor__login': 'MustafaKpn'},
 {'id': '52029395646',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-13T21:03:01Z',
  'actor__id': 189120651,
  'actor__login': 'PYcrypto666'},
 {'id': '52028328598',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-13T19:33:36Z',
  'actor__id': 61951032,
  'actor__login': 'fyoalharbi'},
 {'id': '52026587240',
  'type': 'WatchEvent',
  'public': True,
  'created_at': '2025-07-13T17:19:55Z',
  'actor__id': 111182377,
  'actor__login': 'JoanData'},
 {'id': '520263438

In [12]:
from datetime import datetime

def process_event(event):
    result = {}

    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']

    parsed_timestamp = datetime.strptime(event['created_at'], '%Y-%m-%dT%H:%M:%SZ')
    result['created_at'] = parsed_timestamp.timestamp() # Convert to timestamp

    result['actor__login'] = event['actor']['login']

    return result

process_event(event)

{'id': '52014996769',
 'type': 'WatchEvent',
 'public': True,
 'created_at': 1752358635.0,
 'actor__login': 'BrunoChiconato'}

In [13]:
all_data = []

pages = events_getter()

for page in pages:
    all_data.extend(page) # Extend the all_data list with the current page of events

len(all_data)  # Check the total number of events collected

297

In [14]:
def process_event(event):
    '''
    Process a single event and extract relevant information.
    This function also extracts topics linked to the event.
    It returns a dictionary with the processed event data and a list of topics.
    '''
    result = {}

    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']

    parsed_timestamp = datetime.strptime(event['created_at'], '%Y-%m-%dT%H:%M:%SZ')
    result['created_at'] = parsed_timestamp.timestamp()  # Convert to timestamp

    result['actor__id'] = event['actor']['id']
    result['actor__login'] = event['actor']['login']

    topics = event.get('payload', {}).get('pull_request', {}).get('base', {}).get('repo', {}).get('topics', [])

    processed_topics = []
    for topic in topics:
        processed_topic = {
            'event_id': event['id'],
            'topic_name': topic,
        }
        processed_topics.append(processed_topic)

    return result, processed_topics

In [26]:
processed_events = []  # Initialize an empty list to store processed events
processed_topics = []  # Initialize an empty list to store processed topics

for event in all_data:
    # Process each event and extract topics linked to the event
    processed_event, topics = process_event(event)
    processed_events.append(processed_event)  # Append the processed event to the list
    processed_topics.extend(topics)  # Extend the processed_topics list with the current event's topics

print(processed_events[:5]) # Print the first 5 processed events to verify
print(processed_topics[:5]) # Print the processed topics to verify

[{'id': '52602815580', 'type': 'WatchEvent', 'public': True, 'created_at': 1753652176.0, 'actor__id': 66147771, 'actor__login': 'HephWisdom', 'repo__id': 419661684}, {'id': '52601462680', 'type': 'ForkEvent', 'public': True, 'created_at': 1753645940.0, 'actor__id': 33403958, 'actor__login': 'singh3ss', 'repo__id': 419661684}, {'id': '52599931066', 'type': 'IssuesEvent', 'public': True, 'created_at': 1753638143.0, 'actor__id': 42957613, 'actor__login': 'SanathPatil', 'repo__id': 419661684}, {'id': '52599592886', 'type': 'ForkEvent', 'public': True, 'created_at': 1753636486.0, 'actor__id': 217340527, 'actor__login': 'KittyNiuNiu', 'repo__id': 419661684}, {'id': '52599588942', 'type': 'WatchEvent', 'public': True, 'created_at': 1753636467.0, 'actor__id': 217340527, 'actor__login': 'KittyNiuNiu', 'repo__id': 419661684}]
[{'event_id': '52146985881', 'topic_name': 'data-engineering'}, {'event_id': '52146985881', 'topic_name': 'dbt'}, {'event_id': '52146985881', 'topic_name': 'docker'}, {'eve

##### Loading Data

In [22]:
# Using DuckDB script that creates two separate tables
# 1. watch_events -> Stores WatchEvent data
# 2. pull_request_events -> Stores PullRequestEvent data
# 
# A basic pipeline requires:
# 1. Setting up a database connection
# 2. Creating tables and defining schemas
# 3. Writing queries to insert/update data
# 4. Handling schema changes manually

In [16]:
import duckdb

# 1. create a connection to a DuckDB database
conn = duckdb.connect('github_events.db')

In [17]:
processed_events[0]

{'id': '52602815580',
 'type': 'WatchEvent',
 'public': True,
 'created_at': 1753652176.0,
 'actor__id': 66147771,
 'actor__login': 'HephWisdom'}

In [18]:
# 2. Create the `github_events` table
conn.execute("""
CREATE TABLE IF NOT EXISTS github_events (
    id TEXT PRIMARY KEY,
    type TEXT,
    public BOOLEAN,
    created_at DOUBLE,
    actor__id BIGINT,
    actor__login TEXT
);
""")

<duckdb.duckdb.DuckDBPyConnection at 0x28f6399f670>

In [19]:
flattened_data = [
    (
        record['id'],
        record['type'],
        record['public'],
        record['created_at'],
        record['actor__id'],
        record['actor__login']
    )
    for record in processed_events
]

# 3. Insert the processed events into the `github_events` table
conn.executemany("""
INSERT INTO github_events (id, type, public, created_at, actor__id, actor__login)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO NOTHING;
""", flattened_data)

<duckdb.duckdb.DuckDBPyConnection at 0x28f6399f670>

In [23]:
df = conn.execute("SELECT * FROM github_events").df()
df.head()

Unnamed: 0,id,type,public,created_at,actor__id,actor__login
0,52597550818,WatchEvent,True,1753627000.0,223146409,stanlech1922
1,52597445530,WatchEvent,True,1753627000.0,131308557,stanlech97
2,52596512513,WatchEvent,True,1753623000.0,48296646,jia-jiang
3,52594624618,WatchEvent,True,1753615000.0,759158,cleonildo
4,52594229027,WatchEvent,True,1753613000.0,107476202,Augustus2011


In [24]:
conn.close()  # Close the connection when done

#### Dynamic Schema Handling

In [25]:
def process_event(event):
    '''
    Process a single event and extract relevant information.
    This function also extracts topics linked to the event.
    It returns a dictionary with the processed event data and a list of topics.
    '''
    result = {}

    result['id'] = event['id']
    result['type'] = event['type']
    result['public'] = event['public']

    parsed_timestamp = datetime.strptime(event['created_at'], '%Y-%m-%dT%H:%M:%SZ')
    result['created_at'] = parsed_timestamp.timestamp()  # Convert to timestamp

    result['actor__id'] = event['actor']['id']
    result['actor__login'] = event['actor']['login']

    result['repo__id'] = event['repo']['id'] # Extract repository ID

    topics = event.get('payload', {}).get('pull_request', {}).get('base', {}).get('repo', {}).get('topics', [])

    processed_topics = []
    for topic in topics:
        processed_topic = {
            'event_id': event['id'],
            'topic_name': topic,
        }
        processed_topics.append(processed_topic)

    return result, processed_topics

In [27]:
processed_events = []  # Initialize an empty list to store processed events
processed_topics = []  # Initialize an empty list to store processed topics

for event in all_data:
    # Process each event and extract topics linked to the event
    processed_event, topics = process_event(event)
    processed_events.append(processed_event)  # Append the processed event to the list
    processed_topics.extend(topics)  # Extend the processed_topics list with the current event's topics

print(processed_events[:5]) # Print the first 5 processed events to verify
print(processed_topics[:5]) # Print the processed topics to verify

[{'id': '52602815580', 'type': 'WatchEvent', 'public': True, 'created_at': 1753652176.0, 'actor__id': 66147771, 'actor__login': 'HephWisdom', 'repo__id': 419661684}, {'id': '52601462680', 'type': 'ForkEvent', 'public': True, 'created_at': 1753645940.0, 'actor__id': 33403958, 'actor__login': 'singh3ss', 'repo__id': 419661684}, {'id': '52599931066', 'type': 'IssuesEvent', 'public': True, 'created_at': 1753638143.0, 'actor__id': 42957613, 'actor__login': 'SanathPatil', 'repo__id': 419661684}, {'id': '52599592886', 'type': 'ForkEvent', 'public': True, 'created_at': 1753636486.0, 'actor__id': 217340527, 'actor__login': 'KittyNiuNiu', 'repo__id': 419661684}, {'id': '52599588942', 'type': 'WatchEvent', 'public': True, 'created_at': 1753636467.0, 'actor__id': 217340527, 'actor__login': 'KittyNiuNiu', 'repo__id': 419661684}]
[{'event_id': '52146985881', 'topic_name': 'data-engineering'}, {'event_id': '52146985881', 'topic_name': 'dbt'}, {'event_id': '52146985881', 'topic_name': 'docker'}, {'eve

In [28]:
# Create connection to DuckDB
conn = duckdb.connect('github_events.db')

In [29]:
current_columns = {row[1] for row in conn.execute("PRAGMA table_info(github_events)").fetchall()}
print("Current columns in github_events table:", current_columns)

Current columns in github_events table: {'created_at', 'actor__id', 'actor__login', 'type', 'public', 'id'}


In [30]:
# 3. Detect and add new columns dynamically
for record in processed_events[10:]:
    for key in record.keys():
        if key not in current_columns:
            col_type = 'TEXT'  # Default type for new columns
            if isinstance(record[key], bool):
                col_type = 'BOOLEAN'
            elif isinstance(record[key], int):
                col_type = 'BIGINT'
            elif isinstance(record[key], float):
                col_type = 'DOUBLE'
            print(f'ALTER TABLE github_events ADD COLUMN {key} {col_type};')
            alter_query = f"ALTER TABLE github_events ADD COLUMN {key} {col_type};"
            conn.execute(alter_query)
            print(f"Added new column: {key} with type {col_type}")
            current_columns.add(key) # Update schema tracking

ALTER TABLE github_events ADD COLUMN repo__id BIGINT;
Added new column: repo__id with type BIGINT


In [31]:
# 4. Prepare data for insertion (handle missing fields)
columns = sorted(current_columns) # Maintain consistent order
flattened_data = [
    tuple(record.get(col, None) for col in columns)  # # fill missing values wwith NULL
    for record in processed_events
]

# 5. Construct dynamic SQL for insertion
placeholders = ', '.join(['?' for _ in columns])  # Create placeholders for each column
columns_str = ', '.join(columns)  # Create a comma-separated string of column names

insert_query = f"""
INSERT INTO github_events ({columns_str})
VALUES ({placeholders})
ON CONFLICT (id) DO UPDATE SET {", ".join([f"{col}=excluded.{col}" for col in columns if col != 'id'])};
"""

In [32]:
conn.executemany(insert_query, flattened_data)

<duckdb.duckdb.DuckDBPyConnection at 0x28f0b8357b0>

In [33]:
df = conn.execute("SELECT * FROM github_events").df()
df.head()

Unnamed: 0,id,type,public,created_at,actor__id,actor__login,repo__id
0,52597550818,WatchEvent,True,1753627000.0,223146409,stanlech1922,419661684
1,52597445530,WatchEvent,True,1753627000.0,131308557,stanlech97,419661684
2,52596512513,WatchEvent,True,1753623000.0,48296646,jia-jiang,419661684
3,52594624618,WatchEvent,True,1753615000.0,759158,cleonildo,419661684
4,52594229027,WatchEvent,True,1753613000.0,107476202,Augustus2011,419661684
