In [30]:
# NOTEBOOK CELL 1 - Setup and Imports
import logging
import os
import traceback
from typing import Dict, List, Any
import sqlalchemy
from sqlalchemy import create_engine
import psycopg2
import pandas as pd
# Setup logging for notebook
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Your imports here
from pull_raw.utils import get_tables_to_sync, start_job, end_job
from utils.utils import get_abspath
from utils.worker.dune_extractor import DuneExtractor
from utils.worker.dune_to_pg_worker import DuneToPgWorker
from utils.worker.pg_loader import PgLoader
from datetime import datetime
#from arguments import FULL_REFRESH, INCREMENTAL_VALUE

In [12]:
# NOTEBOOK CELL 2 - Configuration
TARGET_SCHEMA_NAME = 'bitcoin'

# Database connection (replace with your connection details)
DATABASE_URL = "postgresql://postgres:tranbaotin@localhost:5432/postgres"
engine = create_engine(DATABASE_URL)

In [3]:
# NOTEBOOK CELL 3 - Initialize Components

tables_to_sync = get_tables_to_sync()


In [4]:
name, id, tar_tbl, pkey = list(),list(),list(),list()
for index, row in tables_to_sync.iterrows():
    name.append(row['name'])
    id.append(row['id'])
    tar_tbl.append(row['target_table'])
    pkey.append(row['p_key'])

In [26]:
name

['bitcoin_inputs',
 'bitcoin_output',
 'prices_usd',
 'bitcoin_transactions',
 'bitcoin_block']

In [41]:
enum = 0

In [42]:
# Initialize Dune extractor
dune_extractor = DuneExtractor(api_key="Rq8roupIKKIZ9Iw5lFqNuqsgUAywgtvp")
start_job(name[enum])
# Execute the query and get execution ID
execution_id = dune_extractor.execute_query(
    query_id=id[enum],
    parameters="2025-01-01"
)

# Get the results
raw_data = dune_extractor.get_results(
    execution_id=execution_id,
    max_wait_time=10000
)

Cập nhật thành công cho job: bitcoin_inputs
Query 2177353 executed. Execution ID: 01K6G3YQM3Q6MMFQY5Q1PQFJ1S
Waiting... (state: QUERY_STATE_EXECUTING)
Data crawled successfully!
Retrieved 100000 rows


In [43]:
df = pd.DataFrame(raw_data)
df["ETL_updated_ts"] = datetime.now()

In [44]:
with engine.connect() as connection:
    df.to_sql(
        name=tar_tbl[enum],
        con=connection,
        schema=TARGET_SCHEMA_NAME,
        if_exists='append',
        index=False,
        method='multi'
    )
    
end_job(name[enum])

Cập nhật thành công cho job: bitcoin_inputs


In [None]:
# Initialize Dune extractor
dune_extractor = DuneExtractor(api_key="Rq8roupIKKIZ9Iw5lFqNuqsgUAywgtvp")

# NOTEBOOK CELL 5 - Process Each Table
for index, row in tables_to_sync.iterrows():
    start_job(row['name'])
    with engine.connect() as connection:
        # Initialize worker
        dune_to_pg_worker = DuneToPgWorker(
            dune_extractor=dune_extractor,
            target_schema_name=TARGET_SCHEMA_NAME,
            target_table="",  # Will be updated per table
            target_con=connection,
        )
        
        # Process tables
        for i, table in enumerate(tables_to_sync, 1):
            table_name = row['target_table']
            query_id = row['id']
            source_unique_keys = row['p_key']
            
            print(f"\n[{i}/{len(tables_to_sync)}] Processing: {table_name}")
            print(f"Query ID: {query_id}, Sync Type: {sync_type}")
            
            # Update target table for worker
            dune_to_pg_worker.target_table = table_name
            
            try:
                if sync_type == 'full_refresh':
                    print("🔄 Running full refresh...")
                    dune_to_pg_worker.run(
                        query_id=query_id,
                        query_parameters=table.get('query_parameters'),
                        source_unique_keys=source_unique_keys,
                        load_strategy=FULL_REFRESH,
                        max_wait_time=300
                    )
                    
                elif sync_type is None or sync_type == 'sync_incremental':
                    print("🔄 Running incremental sync...")
                    
                    if last_value is None:
                        print("No previous data found, running full refresh...")
                        load_strategy = FULL_REFRESH
                        query_parameters = None
                    else:
                        print(f"Last incremental value: {last_value}")
                        load_strategy = INCREMENTAL_VALUE
                        query_parameters = str(last_value)
                    
                    dune_to_pg_worker.run(
                        query_id=query_id,
                        query_parameters=query_parameters,
                        source_unique_keys=source_unique_keys,
                        incremental_column=incremental_column,
                        incremental_value=last_value,
                        load_strategy=load_strategy,
                        max_wait_time=300
                    )
                else:
                    raise ValueError(f'Invalid sync_type "{sync_type}"')
                    
                print(f"✅ Successfully processed: {table_name}")
                end_job(row['name'])
            except Exception as e:
                print(f"❌ Failed to process {table_name}: {e}")
                print(traceback.format_exc())
                # Continue with next table
                continue
                
    print("\n🎉 All tables processed!")