Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions src/dipdup/datasources/tzkt/datasource.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from functools import partial
from typing import Any, Dict, List, Optional, Tuple, Union

Expand All @@ -11,6 +11,7 @@
from aiosignalrcore.transport.websockets.connection import ConnectionState # type: ignore
from tortoise.transactions import in_transaction

from dipdup import __version__
from dipdup.config import ROLLBACK_HANDLER, BigmapdiffIndexConfig, BlockIndexConfig, OperationHandlerConfig, OperationIndexConfig
from dipdup.datasources.tzkt.cache import OperationCache
from dipdup.datasources.tzkt.enums import TzktMessageType
Expand Down Expand Up @@ -49,6 +50,18 @@
)


@asynccontextmanager
async def tzkt_get():
async with aiohttp.ClientSession() as session:
yield partial(
session.get,
skip_auto_headers={'User-Agent'},
headers={
'User-Agent': f'dupdup/{__version__}',
},
)


class TzktDatasource:
def __init__(self, url: str):
super().__init__()
Expand Down Expand Up @@ -145,8 +158,8 @@ async def subscribe_to_operations(self, address: str, types: List[str]) -> None:

async def _fetch_operations(self, address: str, offset: int, first_level: int, last_level: int) -> List[Dict[str, Any]]:
self._logger.info('Fetching levels %s-%s with offset %s', first_level, last_level, offset)
async with aiohttp.ClientSession() as session:
async with session.get(
async with tzkt_get() as get:
async with get(
url=f'{self._url}/v1/operations/transactions',
params={
"anyof.sender.target.initiator": address,
Expand Down Expand Up @@ -217,8 +230,8 @@ async def _process_operations(address, operations):

async def fetch_jsonschemas(self, address: str) -> Dict[str, Any]:
self._logger.info('Fetching jsonschemas for address `%s', address)
async with aiohttp.ClientSession() as session:
async with session.get(
async with tzkt_get() as get:
async with get(
url=f'{self._url}/v1/contracts/{address}/interface',
) as resp:
jsonschemas = await resp.json()
Expand Down Expand Up @@ -347,8 +360,8 @@ def convert_operation(cls, operation_json: Dict[str, Any]) -> OperationData:

async def get_latest_block(self) -> Dict[str, Any]:
self._logger.info('Fetching latest block')
async with aiohttp.ClientSession() as session:
async with session.get(
async with tzkt_get() as get:
async with get(
url=f'{self._url}/v1/blocks',
params={
"limit": 1,
Expand Down