In [None]:
#!pip install 'dlt[duckdb]'
#!pip install 'dlt[bigquery]'
#!pip install streamlit
#!pip install "dlt[gs]"

In [1]:
import pandas as pd
import numpy as np
import json
from datetime import datetime
import duckdb

import pyarrow
import dlt
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import OffsetPaginator, BasePaginator

from dlt.destinations import filesystem #for GCS



In [2]:
from typing import Any, List, Optional
from dlt.sources.helpers.rest_client.paginators import BasePaginator
from dlt.sources.helpers.requests import Response, Request

class QueryParamPaginator(BasePaginator):
    def __init__(self, page_param: str = "page", initial_page: int = 1):
        super().__init__()
        self.page_param = page_param
        self.page = initial_page

    def init_request(self, request: Request) -> None:
        # This will set the initial page number (e.g., page=1)
        self.update_request(request)

    def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.page += 1

    def update_request(self, request: Request) -> None:
        if request.params is None:
            request.params = {}
        request.params[self.page_param] = self.page

In [3]:
from typing import Any, List, Optional
from dlt.sources.helpers.rest_client.paginators import BasePaginator
from dlt.sources.helpers.requests import Response, Request

#source for custom paginator: https://dlthub.com/docs/general-usage/http/rest-client#paginators
#Chatgpt helped me build it based on that page.
class TimeRangePaginator(BasePaginator):
    def __init__(self, start_time: int, end_time: int, interval_ms: int):
        super().__init__()
        self.current_time = start_time
        self.end_time = end_time
        self.interval_ms = interval_ms

    def init_request(self, request: Request) -> None:
        """Initialize the request with the first startTime and endTime"""
        self.update_request(request)

    def update_state(self, response: Response, data: Optional[List[Any]] = None) -> None:
        """Update the pagination state based on response data"""
        if not response.json():  # Stop if no more data
            self._has_next_page = False
        else:
            self.current_time += self.interval_ms

    def update_request(self, request: Request) -> None:
        """Update the request parameters with the current time range"""
        if request.params is None:
            request.params = {}

        next_end_time = min(self.current_time + self.interval_ms, self.end_time)
        request.params['startTime'] = self.current_time
        request.params['endTime'] = next_end_time


In [4]:
with open('aggtrades.txt','r') as data:
    data = data.read()
data = json.loads(data)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [None]:
len(data)

In [None]:
data[0]['T']

In [None]:
type(data[0]['T'])

In [None]:
datetime.fromtimestamp(data[0]['T']/1000)

In [None]:
df = pd.DataFrame(data=data)
df.info()

In [None]:
with open('depth.txt','r') as depth:
    depth = depth.read()
depth = json.loads(depth)
len(depth)

In [None]:
depth

In [None]:
def read_file(file):
    with open(file,'r') as file_name:
        data = file_name.read()
    data = json.loads(data)
    print(len(data))
    print(data[0])
    #return data

In [None]:
read_file('trades.txt')

In [None]:
read_file('depth.txt')

In [None]:
read_file('aggtrades.txt')

In [None]:
import sys

In [None]:
sys.version

In [None]:
sys.executable

In [None]:
!pip -V

In [None]:
!pip uninstall pyarrow -y

In [None]:
pip install --upgrade pyarrow dlt

In [None]:
pip -V

In [None]:
!pip list

In [None]:
dlt.__version__

In [None]:
pyarrow.__version__

In [4]:
@dlt.resource(name="aggtrades", write_disposition="append") #aggtrades will be the table name
def binance_api(start_timem,end_time):
    client = RESTClient(
        base_url="https://data-api.binance.vision"
        ,paginator=TimeRangePaginator(
        #start_time=1672531200000,  # Start time in milliseconds (e.g., 2023-01-01)
        #end_time=1672617600000,    # End time in milliseconds (e.g., 2023-01-02)
        start_time=start_time,  # Start time in milliseconds (e.g., 2023-01-01)
        end_time=end_time,    # End time in milliseconds (e.g., 2023-01-02)
        interval_ms=60  * 1000  # 1-hour interval
        )
        
    )

    for page in client.paginate("/api/v3/aggTrades?symbol=BTCUSDT"):
        yield page

In [None]:
# define new dlt pipeline
pipeline = dlt.pipeline(
    destination="duckdb", #database technology
    pipeline_name='binance', #database name in the destination
    dataset_name='aggtrade' #dataset name in the destination
)

# run the pipeline with the new resource
load_info = pipeline.run(binance_api, write_disposition="replace")
print(load_info)

# explore loaded data
pipeline.dataset(dataset_type="default").aggtrades.df()

In [8]:
import os

In [10]:
os.chdir('/Users/koray/Documents/GitHub/deng25/Project/dlt')
os.getcwd()

'/Users/koray/Documents/GitHub/deng25/Project/dlt'

In [6]:
# define new dlt pipeline
pipeline = dlt.pipeline(
    destination="filesystem", #database technology
    pipeline_name='binance', #database name in the destination
    dataset_name='aggtrade' #dataset name in the destination
)

# run the pipeline with the new resource
load_info = pipeline.run(binance_api, write_disposition="replace")
print(load_info)

# explore loaded data
pipeline.dataset(dataset_type="default").aggtrades.df()

Pipeline binance load step completed in 3.62 seconds
1 load package(s) were loaded to destination filesystem and into dataset aggtrade
The filesystem destination used gs://elegant-bucket location to store data
Load package 1744588288.4741309 is LOADED and contains no failed jobs


ModuleNotFoundError: No module named 'sqlglot'

In [None]:
pipeline = dlt.pipeline(
    pipeline_name="data_things",
    destination=filesystem(
        layout="{table_name}/{test_placeholder}/{timestamp}/{load_id}.{file_id}.{ext}",
        current_datetime=pendulum.now(),
        extra_placeholders={
            "test_placeholder": "test_value",
        }
    )
)

In [None]:
dlt pipeline binance show

In [None]:
con = duckdb.connect('binance.duckdb')

In [None]:
con.sql('SELECT * from INFORMATION_SCHEMA.tables').df()

In [None]:
con.sql('SELECT * from aggtrade.aggtrades').df()

In [None]:
!curl 