In [4]:
import duckdb
import dagster as dg
import io
from minimal_lakehouse.ressources import OpenF1Api, MinioStorage
import json
from dagster_duckdb import DuckDBResource


In [2]:
daily_partition = dg.DailyPartitionsDefinition(start_date= '2025-01-01')
monthly_partition = dg.MonthlyPartitionsDefinition(start_date= '2025-01-01', end_offset= 1)

In [None]:
def openf1_interface(context: dg.AssetExecutionContext, api: OpenF1Api, minio: MinioStorage, request_params:dict[str, str] = {}) -> dg.MaterializeResult:
          
    context.log.info(f"Requesting endpoint: {api.base_url}/{api.endpoint}")
    res = api.request(**request_params)
    if res.status_code != 200:
        return dg.MaterializeResult(metadata= {'result': dg.MetadataValue.text(f'Request for {api.endpoint} on date {context.partition_key} failed with status {res.status_code}')})
    
    data = res.json()

    if not data:
        return dg.MaterializeResult(metadata= {'result': dg.MetadataValue.text(f'No data for date {context.partition_key}')})
    
    data_bytes = json.dumps(data).encode('utf-8')

    data_stream = io.BytesIO(data_bytes)

    file_name = f'{api.endpoint}/{context.partition_key[:4]}/{context.partition_key[:-3]}/{context.partition_key}.json'

    minio_client = minio.get_client()
    minio_client.put_object(
        minio.dest_bucket, 
        file_name, 
        data=data_stream, 
        length=len(data_bytes), 
        content_type='application/json'
        )
    
    return dg.MaterializeResult(metadata= {'result': dg.MetadataValue.text(f"{file_name} salvo no storage")})


@dg.asset(
        partitions_def = daily_partition,
        compute_kind = 'python',
        group_name="openf1_raw",
        automation_condition= dg.AutomationCondition.on_cron('@daily')
)
def openf1_sessions(context: dg.AssetExecutionContext, sessions_api: OpenF1Api, minio: MinioStorage):
    request_params = {'date_start': context.partition_key}
    return openf1_interface(context, sessions_api, minio, request_params)

In [None]:
@dg.asset(
    partitions_def = monthly_partition,
    compute_kind = 'python',
    group_name="openf1_bronze",
    deps = [openf1_sessions],
    automation_condition= dg.AutomationCondition.all_deps_updated_since_cron('@daily')
)
def bronze_sessions(context: dg.AssetExecutionContext, duckdb: DuckDBResource) -> dg.MaterializeResult:
    
    with duckdb.get_connection() as cnn:

In [None]:
duckdb.sql(f"""
            SET s3_region='us-east-1';
            SET s3_endpoint='{dg.EnvVar('MINIO_API_HOST').get_value()}';
            SET s3_access_key_id='{dg.EnvVar('MINIO_ACCESS_KEY').get_value()}' ;
            SET s3_secret_access_key='{dg.EnvVar('MINIO_SECRET_KEY').get_value()}';
            """)
duckdb.sql(
    f"""
    CREATE OR REPLACE TEMP VIEW sessions_stg AS
    SELECT * FROM 's3://lakehouse/sessions/2025/2025-01/*.json'
    """
    )

IOException: IO Error: SSL connection failed error for HTTP GET to '/lakehouse/?encoding-type=url&list-type=2&prefix=sessions%2F2025%2F2025-01%2F'