Homework

Write a python script that will load data from the SpaceX API into DuckDB using dlt.

Use:
- @dlt.source
- @dlt.resource
- @dlt.transformer

SpaceX API URL: https://api.spacexdata.com

Docs: https://github.com/r-spacex/SpaceX-API/blob/master/docs/README.md

Endpoints for loading:
- launches
- rockets
- crew

# Install dlt with duckdb extention

In [21]:
%%capture
pip install dlt[duckdb]

# Play with SpaceX API

In [None]:
import requests
response = requests.get("https://api.spacexdata.com/v4/launches")
response.json()[0]

# Helper
Run the cell and ignore it.

In [2]:
from dlt.common.pipeline import LoadInfo

def assert_load_info(info: LoadInfo, expected_load_packages: int = 1) -> None:
    """Asserts that expected number of packages was loaded and there are no failed jobs"""
    assert len(info.loads_ids) == expected_load_packages
    # all packages loaded
    assert all(package.state == "loaded" for package in info.load_packages) is True
    # no failed jobs in any of the packages
    info.raise_on_failed_jobs()

# Task 1


Create a pipeline for SpaceX API, for the next endpoints: launches, rockets, crew.

- Fill the empty lines in the functions below.
- `get_rockets` resource should have `table_name=rockets`.
- Create a [resource](https://dlthub.com/docs/general-usage/resource#declare-a-resource) for the `crew` endpoint from scratch.
- [Run the pipeline](https://dlthub.com/docs/walkthroughs/run-a-pipeline) without errors.

In [2]:
import time
import dlt
import requests


@dlt.resource(table_name="launches")
def get_launches():
    # url to request launches
    url = "https://api.spacexdata.com/v4/launches"
    # make the request and check if succeeded
    response = requests.get(url)
    response.raise_for_status()
    yield response.json()

@dlt.resource(table_name="rockets")
def get_rockets():
    # url to request rockets
    url = "https://api.spacexdata.com/v4/rockets"
    # make the request and check if succeeded
    response = requests.get(url)
    response.raise_for_status()
    yield response.json()

@dlt.resource(table_name="crew")
def get_crew():
    # url to request crew
    url = "https://api.spacexdata.com/v4/crew"
    # make the request and check if succeeded
    response = requests.get(url)
    response.raise_for_status()
    yield response.json()


pipeline = dlt.pipeline(
    pipeline_name='spacex_with_source',
    destination='duckdb',
    dataset_name='spacex_data',
    dev_mode=True,
)

load_info = pipeline.run([get_launches(), get_rockets(), get_crew()])
print(load_info)
assert_load_info(load_info)

Pipeline spacex_with_source load step completed in 1.16 seconds
1 load package(s) were loaded to destination duckdb and into dataset spacex_data_20240827124856
The duckdb destination used duckdb:////Users/alejandrogonzalezbueno/Projects/dlt_workshop/spacex_with_source.duckdb location to store data
Load package 1724762936.673049 is LOADED and contains no failed jobs


Run the code below and
## Answer the Question:
- What weight (kg) has the heighest (meters) rocket?

In [7]:
import duckdb
import numpy as np
import pandas as pd
# a database '<pipeline_name>.duckdb' was created in working directory so just connect to it
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")
stats_table = conn.sql(
        """
        SELECT mass__kg, height__meters
        FROM rockets
        ORDER BY height__meters DESC
        LIMIT 1;
        """
).df()
display(stats_table)

Unnamed: 0,mass__kg,height__meters
0,1335000,118.0


# Task 2
- Add pagination, read [SpaceX API doc](https://github.com/r-spacex/SpaceX-API/blob/master/docs/queries.md).
- Combine all resources in one [source](https://dlthub.com/docs/general-usage/source) and the pipeline with `@dlt.source`.
- Add incremental loading for resource `get_launches` using `merge` write disposition, `id` as a  primary key and `dlt.sources.incremental`.
- Run the pipeline [only with](https://dlthub.com/docs/general-usage/source#access-and-select-resources-to-load) `get_launches` resource.

Read more about [incremental loading](https://dlthub.com/docs/general-usage/incremental-loading).

## Try post method to query SpaceX API

In [None]:
import requests

response = requests.post(
        "https://api.spacexdata.com/v4/launches/query",
        json={
            "query": {
                "date_utc": {
                    "$gt": 0,
                  },
            "options": {
                    "page":1
                }
            }
        }
    )
response.json()


Use code above to make your launches resource incremental.

In [None]:
import dlt
import requests


@dlt.resource(table_name='launches', 
              write_disposition='merge',
              primary_key="id")
def get_launches(date_unix=dlt.sources.incremental("date_unix", initial_value=0)):
    url = "https://api.spacexdata.com/v4/launches/query"
    query_launch = {
        "query": {
            "date_utc": {
                "$gt":date_unix.last_value
            },
        },
        "options": {
            "page": 1
        }
    }
    
    current_page = 1
    while True:
        query_launch["options"]["page"] = current_page
        response = requests.post(url, json=query_launch)
        response.raise_for_status()
        launches = response.json()["docs"]
        if not launches:
            break
        
        yield launches
        
        current_page += 1

@dlt.resource(table_name="rockets")
def get_rockets():
    # url to request rockets
    url = "https://api.spacexdata.com/v4/rockets"
    # make the request and check if succeeded
    response = requests.get(url)
    response.raise_for_status()
    yield response.json()

@dlt.resource(table_name="crew")
def get_crew():
    # url to request crew
    url = "https://api.spacexdata.com/v4/crew"
    # make the request and check if succeeded
    response = requests.get(url)
    response.raise_for_status()
    yield response.json()
    
@dlt.source
def spacex_source():
    return [get_launches, get_rockets, get_crew]


pipeline = dlt.pipeline(
    pipeline_name='spacex_with_source_inc',
    destination='duckdb',
    dataset_name='spacex_data_inc',
    #dev_mode=True,
)

data = spacex_source().with_resources("get_launches")


# Run the pipeline one more time, it should load no data
load_info = pipeline.run(data)
print(load_info)
assert_load_info(load_info, expected_load_packages=0)

In [13]:
import duckdb
import pandas as pd

# Conectar a la base de datos
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Ejecutar la consulta y obtener el DataFrame
stats_table = conn.sql(
    """
        SELECT
            distinct rocket
        FROM launches
        WHERE 
            date_utc BETWEEN '2022-11-01 00:00:00+00:00' AND '2022-11-02 00:00:00+00:00'"""
).df()

display(stats_table)

Unnamed: 0,rocket
0,5e9d0d95eda69973a809d1ec
1,5e9d0d95eda69974db09d1ed


# Task 3: Get payloads of launches

Use `@dlt.transformer` to get additional info for your data.

Read more about dlt [transformers](https://dlthub.com/docs/general-usage/resource#process-resources-with-dlttransformer).

In [73]:
import dlt
import requests

@dlt.resource(table_name='launches', 
              write_disposition='merge',
              primary_key="id")
def get_launches(date_unix=dlt.sources.incremental("date_unix", initial_value=0)):
    url = "https://api.spacexdata.com/v4/launches/query"
    query_launch = {
        "query": {
            "date_utc": {
                "$gt":date_unix.last_value
            },
        },
        "options": {
            "page": 1
        }
    }
    
    current_page = 1
    while True:
        query_launch["options"]["page"] = current_page
        response = requests.post(url, json=query_launch)
        response.raise_for_status()
        launches = response.json()["docs"]

        if not launches:
            break
        
        yield launches
        
        current_page += 1


@dlt.transformer(table_name='details')
def get_payloads(items):
    if not items:
        return

    for i in items:
        url = "https://api.spacexdata.com/v4/payloads/query"
        query_payload = {
            "query": {
                "launch": i['id']  # Use the launch ID in the query
            }
        }
        response = requests.post(url, json=query_payload)
        response.raise_for_status()
        yield response.json()["docs"]


# Define the pipeline
pipeline = dlt.pipeline(
    pipeline_name='spacex_with_source_tr__',
    destination='duckdb',  # or any other destination
    dataset_name='spacex_data',
    #dev_mode=True,
)

data = get_launches | get_payloads


# Run the pipeline using the transformer 'enrich_launches_with_payloads'
load_info = pipeline.run(data())
print(load_info)
#assert_load_info(load_info, expected_load_packages=0)

Pipeline spacex_with_source_tr__ load step completed in ---
0 load package(s) were loaded to destination duckdb and into dataset None
The duckdb destination used duckdb:////Users/alejandrogonzalezbueno/Projects/dlt_workshop/spacex_with_source_tr__.duckdb location to store data


In [9]:
import dlt
import requests

@dlt.resource(table_name='launches', 
              write_disposition='merge',
              primary_key="id")
def get_launches(date_unix=dlt.sources.incremental("date_unix", initial_value=0)):
    url = "https://api.spacexdata.com/v4/launches/query"
    query_launch = {
        "query": {
            "date_utc": {
                "$gt":0
            },
        },
        "options": {
            "page": 1
        }
    }
    
    current_page = 1
    while True:
        query_launch["options"]["page"] = current_page
        print(query_launch)
        response = requests.post(url, json=query_launch)
        response.raise_for_status()
        launches = response.json()["docs"]
        print(launches)

        if not launches:
            break
        
        yield launches
        
        current_page += 1


data = get_launches

# Define the pipeline
pipeline = dlt.pipeline(
    pipeline_name='test_11',
    destination='duckdb',  # or any other destination
    dataset_name='spacex_data',
    #dev_mode=True,
)

# Run the pipeline using the transformer 'enrich_launches_with_payloads'
load_info = pipeline.run(data())
print(load_info)
#assert_load_info(load_info, expected_load_packages=0)

{'query': {'date_utc': {'$gt': 1670198400}}, 'options': {'page': 22}}
[]
Pipeline test_11 load step completed in 1.07 seconds
1 load package(s) were loaded to destination duckdb and into dataset spacex_data
The duckdb destination used duckdb:////Users/alejandrogonzalezbueno/Projects/dlt_workshop/test_11.duckdb location to store data
Load package 1725288877.4435048 is LOADED and contains no failed jobs


In [10]:
import duckdb
import pandas as pd

# Conectar a la base de datos
conn = duckdb.connect(f"{pipeline.pipeline_name}.duckdb")
conn.sql(f"SET search_path = '{pipeline.dataset_name}'")

# Ejecutar la consulta y obtener el DataFrame
stats_table = conn.sql('show tables').df()

display(stats_table)

Unnamed: 0,name
0,_dlt_loads
1,_dlt_pipeline_state
2,_dlt_version


In [67]:
import requests

response = requests.post(
        "https://api.spacexdata.com/v4/launches/query",
        json={
            "query": {
                "date_utc": {
                    "$gt": 999999999999,
                  },
            "options": {
                    "page":1
                }
            }
        }
    )
response.json()['docs']
launches = response.json()['totalPages']
print(launches)

1


What regime has Satellite "FalconSAT-2" with launch id: 5eb87cd9ffd86e000604b32a?