In [4]:
from tp_logger.config import LoggerConfig

In [6]:
import dlt



In [9]:
p = dlt.pipeline(
  pipeline_name='test_logg_data_pipeline',
  destination=dlt.destinations.duckdb("shared.duckdb"),
  dataset_name='test_logg_data',
  dev_mode=False
)

In [20]:
import duckdb
import dlt
from tp_logger.config import get_config

# Get the tp-logger configuration to find the correct DB path
config = get_config()

# Connect to the same DuckDB file that tp-logger is using
db = duckdb.connect("shared.duckdb")

# Create a pipeline that connects to the existing DuckDB file
p = dlt.pipeline(
    pipeline_name=config.pipeline_name,  # Use the same pipeline name
    destination=dlt.destinations.duckdb(db),  # Pass the connected db instance
    dataset_name=config.dataset_name,  # Use the same dataset name
    dev_mode=False
)

# Now you can query the database to see what tables exist
print("Available tables and schemas:")
print(db.sql("DESCRIBE;"))

# You can also query specific tables created by tp-logger
print("\nJob logs table schema:")
try:
    print(db.sql("DESCRIBE tp_logger_logs.job_logs;"))
except:
    print("job_logs table not found - run main.py first to create logs")

# Query the actual log data
print("\nSample log entries:")
try:
    print(db.sql("SELECT * FROM tp_logger_logs.job_logs LIMIT 5;"))
except:
    print("No log data found - run main.py first to create logs")

Available tables and schemas:
┌──────────┬────────────────┬─────────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬───────────┐
│ database │     schema     │        name         │                                                                                                       column_names                                                                                                        │                                                                          column_types                                                                          │ temporary │
│ varchar  │    varchar     │       varchar       │                             

In [None]:
import duckdb
import dlt
from typing import Iterator, Dict, Any

# Connect to the source database (keep this connection open)
source_db = duckdb.connect("tp_logger_pipeline.duckdb")

def job_logs_resource() -> Iterator[Dict[str, Any]]:
    """
    A DLT resource that reads job logs from the source DuckDB database.
    """
    # Create a fresh connection inside the resource function
    temp_db = duckdb.connect("tp_logger_pipeline.duckdb")
    
    # Query the job_logs table from the logs schema (which has more columns)
    query = """
    SELECT 
        id
    FROM tp_logger_logs.job_logs
    ORDER BY timestamp DESC
    """
    
    try:
        print("Attempting to read from logs.job_logs...")
        result = temp_db.execute(query).fetchall()
        print(result)
        columns = [desc[0] for desc in temp_db.description]
        print(columns)
        
        print(f"Found {len(result)} records in logs.job_logs")
        for row in result:
            yield dict(zip(columns, row))

    finally:
        temp_db.close()

# Create a new DuckDB database for the transferred data
target_db = duckdb.connect("transferred_logs.duckdb")
print("Connected to target DuckDB database for transfer.")
# Create a new DLT pipeline for the transfer with different dataset name
transfer_pipeline = dlt.pipeline(
    pipeline_name="log_transfer_pipeline",
    destination=dlt.destinations.duckdb(target_db),
    dataset_name="logs_data",
    dev_mode=False
)
print("Created transfer pipeline for logs_data.")
# First, let's check what data is available in the source
print("=== Checking source data ===")
try:
    print("Schemas in source database:")
    print(source_db.sql("DESCRIBE;"))
    
    print("\nSample data from logs.job_logs:")
    print(source_db.sql("SELECT COUNT(*) as record_count FROM tp_logger_logs.job_logs;"))
    
except Exception as e:
    print(f"Error checking source data: {e}")

query = """
    SELECT 
        *
    FROM tp_logger_logs.job_logs
    ORDER BY timestamp DESC
    """
print("Attempting to read from logs.job_logs...")
result = source_db.execute(query).fetchall()
print(result)
columns = [desc[0] for desc in source_db.description]
print(columns)
print(f"Found {len(result)} records in logs.job_logs")


Connected to target DuckDB database for transfer.
Created transfer pipeline for logs_data.
=== Checking source data ===
Schemas in source database:
┌────────────────────┬────────────────┬─────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬───────────┐
│      database      │     schema     │        name         │                                                                                 column_names                                                                                 │                                                                         column_types                                                                         │ temporary │
│      varchar       │    varchar   

SyntaxError: 'yield' outside function (2362381431.py, line 73)

In [54]:

import dlt
import duckdb
from typing import Iterator, Dict, Any

@dlt.resource(name="job_logs", write_disposition="replace")
def job_logs_resource() -> Iterator[Dict[str, Any]]:
    """
    A DLT resource that reads all job logs from the source DuckDB database.
    """
    # A resource should be self-contained and create its own connection
    with duckdb.connect("tp_logger_pipeline.duckdb", read_only=True) as conn:
        
        # Query all columns from the job_logs table
        query = """
        SELECT 
            *
        FROM tp_logger_logs.job_logs
        ORDER BY timestamp DESC
        """
        
        cursor = conn.execute(query)
        columns = [desc[0] for desc in cursor.description]
        
        # Yield each row as a dictionary
        # This streams results without loading the whole table into memory
        for row in cursor.fetchall():
            yield dict(zip(columns, row))

# You can now use this resource in a dlt pipeline, for example:

# 1. Create a pipeline to a new destination, disabling Lake Formation
transfer_pipeline = dlt.pipeline(
    pipeline_name="log_transfer",
    destination=dlt.destinations.athena(lakeformation_config=None),
    dataset_name="transferred_logs"
)

source_db.close()

# 2. Run the pipeline with the resource
load_info = transfer_pipeline.run(job_logs_resource())
print(load_info)



Pipeline log_transfer load step completed in 8.54 seconds
1 load package(s) were loaded to destination athena and into dataset transferred_logs
The filesystem staging destination used s3://prod-ddbb-data/temp/ location to stage data
The athena destination used s3://prod-ddbb-data/temp/ on awsdatacatalog location to store data
Load package 1754385330.507402 is LOADED and contains no failed jobs
