In [69]:
# Extract

import requests
import psycopg2
from psycopg2 import connect, sql

# Configure your PostgreSQL connection string
conn_string = "dbname='etl_bites' user='ilhaam.ahmed' password='etl_proj' host='localhost' port='5432'"

def get_data_from_api(url):
    response = requests.get(url)
    return response.json()

posts_url = "https://jsonplaceholder.typicode.com/posts"
users_url = "https://jsonplaceholder.typicode.com/users"

posts_data = get_data_from_api(posts_url)
users_data = get_data_from_api(users_url)

The above code is fetching data from two different API endpoints (posts and users) using the 'requests' library which I am lready familiar with. It is then storing the retrieved JSON data in the 'posts_data' and 'users_data' variables.

Reminder:
- 'requests' - sends HTTP requests to web servers and receives responses.

In [70]:
# Transform

def join_posts_and_users(posts, users):
    for post in posts:
        for user in users:
            if post['userId'] == user['id']:
                post['author'] = user['name']
    return posts

combined_data = join_posts_and_users(posts_data, users_data)

Above, a join_posts_and_users() function is defined which takes 'posts' and 'users' as parameters. Within the function, a simple join operation is made: so for each post, the function iterates over each element in the users list and checks for matching user and post IDs.

Then the data is combined and stored in the 'combine_data' variable.

In [71]:
# Load

# Create tables in analytical DB
# This could also be done manually via a GUI (e.g. TablePlus) or with a SQL script
def execute_query_postgresql(conn_string, query):
    with connect(conn_string) as conn:
        with conn.cursor() as cur:
            cur.execute(query)
            conn.commit()

# check if table exists before creating them
def table_exists(conn_string, table_name):
    with psycopg2.connect(conn_string) as conn:
        with conn.cursor() as cur:
            cur.execute(f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name}')")
            return cur.fetchone()[0]

# Create new tables

if not table_exists(conn_string, "api_data"):
    create_api_data_table = '''
    CREATE TABLE api_data (
        post_id INTEGER NOT NULL,
        title TEXT NOT NULL,
        body TEXT NOT NULL,
        user_id INTEGER NOT NULL,
        author TEXT NOT NULL
    );
    '''
    execute_query_postgresql(conn_string, create_api_data_table)

This next part above, defined the execute_query_postgresql() function to connect to the postgres database and uses cursors to execute the sql queries as well as commit them to the database.

Then an 'api_data' table is created with the specific columns.

Finally the table created query is executed using the execute_query_postgresql() function.

In [72]:
# insert the data - Load

def insert_data_to_postgresql(conn_string, table_name, data):
    with connect(conn_string) as conn:
        with conn.cursor() as cur:
            for item in data:
                query = sql.SQL("INSERT INTO {} (post_id, title, body, user_id, author) VALUES (%s, %s, %s, %s, %s)").format(sql.Identifier(table_name))
                cur.execute(query, (item['id'], item['title'], item['body'], item['userId'], item['author']))
        conn.commit()

table_name = "api_data"
insert_data_to_postgresql(conn_string, table_name, combined_data)

This last part defines the insert_data_to_postgresql() function, which is used to insert the fetched API data into a postgres database table called 'api_data'.

'connect' is used to establish a connection, then a cursor is used to execute the sql queries.

A breakdown of the 'query' variable:
- The 'sql.SQL' function is used to format the query and prevent issues (malicious SQL statements being inserted).
- The 'sql.Identifier(table_name)' part safely formats the table name as an identifier, which helps prevent SQL injection attacks on the table name.
- The 'cur.execute' method executes the query and inserts the data into the table.
- The changes are then commited.

# Exercise

### Extract the todos and users data from the API, and calculate the number of completed tasks for each user.

### Load the result into a new table in the local PostgreSQL database.

In [73]:
# Extract 

todos_url = "https://jsonplaceholder.typicode.com/todos"
todos_data = get_data_from_api(todos_url)

In [74]:
# Transform

def calculate_completed_tasks(todos, users):
    completed_tasks = {}

    for todo in todos:
        user_id = todo['userId']
        if todo['completed']:
            completed_tasks[user_id] = completed_tasks.get(user_id, 0) + 1

    result = []
    for user in users:
        result.append({'userId': user['id'], 'name': user['name'], 'completedTasks': completed_tasks[user['id']]})
    return result

user_completed_tasks = calculate_completed_tasks(todos_data, users_data)

In [75]:
# Load

def execute_query_postgresql(conn_string, query):
    with connect(conn_string) as conn:
        with conn.cursor() as cur:
            cur.execute(query)
            conn.commit()

# check if table exists before creating them
def table_exists(conn_string, table_name):
    with psycopg2.connect(conn_string) as conn:
        with conn.cursor() as cur:
            cur.execute(f"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name}')")
            return cur.fetchone()[0]

# Create new tables

if not table_exists(conn_string, "user_completed_tasks"):
    create_user_completed_tasks_table = '''
    CREATE TABLE user_completed_tasks (
        user_id INTEGER NOT NULL,
        name TEXT NOT NULL,
        completed_tasks INTEGER NOT NULL
    );
    '''
    execute_query_postgresql(conn_string, create_user_completed_tasks_table)

In [76]:
# Insert into table

def insert_data_to_postgresql(conn_string, table_name, data):
    with connect(conn_string) as conn:
        with conn.cursor() as cur:
            for item in data:
                query = sql.SQL("INSERT INTO {} (user_id, name, completed_tasks) VALUES (%s, %s, %s)").format(sql.Identifier(table_name))
                cur.execute(query, (item['userId'], item['name'], item['completedTasks']))
        conn.commit()

table_name = "user_completed_tasks"
insert_data_to_postgresql(conn_string, table_name, user_completed_tasks)

# CHALLENGE

### Extract all repositories from the GitHub API for a specific user, and calculate the total number of stars they received. Use pagination when fetching the data from the API. Load the result into a new table in the local PostgreSQL database.