In [None]:
import dlt
from dlt.sources.helpers import requests
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

access_token = ""

# define new resource - github stargazers
@dlt.resource
def github_stargazers():
    client = RESTClient(
        base_url="https://api.github.com",
        auth=BearerTokenAuth(token=access_token)
    )

    for page in client.paginate("repos/dlt-hub/dlt/stargazers"):
        print(page)
        yield page


# define new dlt pipeline
pipeline = dlt.pipeline(destination="duckdb")


# run the pipeline with the new resource
load_info = pipeline.run(github_stargazers)
print(load_info)


# explore loaded data
pipeline.dataset(dataset_type="default").github_stargazers.df().shape

In [None]:
import duckdb

# Connect to the database
conn = duckdb.connect("fingrid.duckdb")

# List all schemas
print("Schemas:")
print(conn.execute("SELECT schema_name FROM information_schema.schemata").df())

# List all tables in all schemas
print("\nTables:")
print(conn.execute("""
    SELECT table_schema, table_name 
    FROM information_schema.tables
    WHERE table_schema != 'information_schema'
""").df())

# Once you know the correct schema and table, query your data:
try:
    df = conn.execute("""
        SELECT * 
        FROM fingrid  -- replace with your actual schema.table name
        LIMIT 100
    """).df()
    print("\nFirst few rows:")
    print(df.head())
except Exception as e:
    print(f"Error querying table: {e}")

conn.close()

In [None]:
import dlt
from dlt.sources.helpers import requests
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

In [None]:
api_key = ""

In [None]:
import dlt
from dlt.sources.helpers import requests
from datetime import datetime, timedelta

# define dlt resources
@dlt.resource
def fingrid(dataset_id: str):
    client = RESTClient(
        base_url="https://data.fingrid.fi/api/",
        headers={
            "x-api-key": api_key,
            "Accept": "application/json"
        }
    )
    dataset_id = "181"
    start_time = (datetime.now() - timedelta(hours=1)).isoformat()
    end_time = datetime.now().isoformat()

    params = {
            "start_time": start_time,
            "end_time": end_time
        }
    endpoint = "datasets/181/data"
    response = client.get(endpoint, params=params)
    print(response.json()['data'])
    yield response.json()


# define dlt pipeline
pipeline = dlt.pipeline(destination="duckdb")

# run dlt pipeline
load_info = pipeline.run(fingrid("181"))
print(load_info)

# explore loaded data
pipeline.dataset(dataset_type="default").fingrid.df()

In [None]:
import dlt
dlt.secrets["api_secret_key"]

In [None]:
from typing import Any, Optional

import dlt
from dlt.common.pendulum import pendulum
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers import requests
from datetime import datetime, timedelta
from dlt.sources.rest_api import (
    RESTAPIConfig,
    check_connection,
    rest_api_resources,
    rest_api_source,
)


@dlt.source(name="fingrid")
def fingrid_source(api_key: Optional[str] = dlt.secrets.value) -> Any:
    # Create a REST API configuration for the GitHub API
    # Use RESTAPIConfig to get autocompletion and type checking
    client = RESTClient(
        base_url="https://data.fingrid.fi/api",
        headers={
            "x-api-key": api_key,
            "Accept": "application/json"
        }
    )

    datasets = [[181, "WindPower"]]
    start_time = (datetime.now() - timedelta(hours=1)).isoformat()
    end_time = (datetime.now()).isoformat()

    def get_resource(dataset):
        params = {
            "start_time": start_time,
            "end_time": end_time
        }
        endpoint = "/datasets/{dataset}/data"
        response = client.get(endpoint, params=params)
        yield response.json()

    for dataset in datasets:
        yield dlt.resource(get_resource(dataset[0]), name=dataset[1])



def load_fingrid() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="rest_api_fingrid",
        destination='motherduck',
        dataset_name="rest_api_data",
    )

    load_info = pipeline.run(fingrid_source(dlt.secrets["api_secret_key"]))
    print(load_info)  # noqa: T201

if __name__ == "__main__":
    load_fingrid()
