In [8]:
import awswrangler as wr
from dotenv import load_dotenv
from requests.adapters import HTTPAdapter
from urllib3 import Retry
from cachecontrol import CacheControl
from cachecontrol.caches.redis_cache import RedisCache
import redis
import requests
import pendulum
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import pandas as pd
from pandas.api.types import CategoricalDtype
from os import environ

import boto3

load_dotenv()

S3_ENDPOINT="http://localhost:9000"
S3_ACCESS_KEY_ID=environ.get('MINIO_ACCESS_KEY_ID')
S3_SECRET_ACCESS_KEY=environ.get('MINIO_SECRET_ACCESS_KEY')

wr.config.s3_endpoint_url = S3_ENDPOINT

boto3.setup_default_session(
    aws_access_key_id=S3_ACCESS_KEY_ID,
    aws_secret_access_key=S3_SECRET_ACCESS_KEY
)
s3 = wr.s3

MARKET_ORDER_RANGE_DTYPE = CategoricalDtype(
    categories=[
        "station",
        "region",
        "solarsystem",
        "1",
        "2",
        "3",
        "4",
        "5",
        "10",
        "20",
        "30",
        "40",
    ]
)



class ESI:
    BASE_URL = "https://esi.evetech.net/latest"

    def __init__(self) -> None:
        self.http = self.build_client()

    def get_market_structure_orders(self, structure_id, token, page=1):
        res = self.http.get(
            f"{self.BASE_URL}/markets/structures/{structure_id}/",
            headers={"Authorization": f"Bearer {token}"},
            params=sorted([("order_type", "all"), ("page", page)]),
        )
        res.raise_for_status()
        return res

    def build_client(self):
        session = requests.Session()
        adapter = HTTPAdapter(
            max_retries=Retry(
                total=5,
                backoff_factor=1,
                allowed_methods=None,
                status_forcelist=[503, 504],
            ),
            pool_connections=100,
            pool_maxsize=100,
        )
        session.mount("https://", adapter)
        return session


In [9]:
structure_id = 1028858195912
token = "eyJhbGciOiJSUzI1NiIsImtpZCI6IkpXVC1TaWduYXR1cmUtS2V5IiwidHlwIjoiSldUIn0.eyJzY3AiOlsiZXNpLWNvcnBvcmF0aW9ucy5yZWFkX3N0cnVjdHVyZXMudjEiLCJlc2ktbWFya2V0cy5zdHJ1Y3R1cmVfbWFya2V0cy52MSJdLCJqdGkiOiJiZjBiMjZhOS1jN2E5LTQyNzktYjcyZS05NzA4NjVkMDZkNDgiLCJraWQiOiJKV1QtU2lnbmF0dXJlLUtleSIsInN1YiI6IkNIQVJBQ1RFUjpFVkU6MjExMzAyNDUzNiIsImF6cCI6IjY4MzA4NGFiNWY4ODQ4ZDRiMTg3NDYyYWMzYjk3Njc3IiwidGVuYW50IjoidHJhbnF1aWxpdHkiLCJ0aWVyIjoibGl2ZSIsInJlZ2lvbiI6IndvcmxkIiwiYXVkIjoiRVZFIE9ubGluZSIsIm5hbWUiOiJCb2tvYm8gU2hhaG5pIiwib3duZXIiOiIrbFJNZktBRHZva3JyTDlaQU91TWFyOXRvR009IiwiZXhwIjoxNjcyODAwMDI3LCJpYXQiOjE2NzI3OTg4MjcsImlzcyI6ImxvZ2luLmV2ZW9ubGluZS5jb20ifQ.EV76tQFoPKlDLhuZG485kBYGTH3G5Gybvjs_NZcMH26-viGA87nIwKts5RVkU9ysJUmhIVjAO-XSAnUEy_Sgsu6ND26ZyNu1z_HvWiuTBGTtsNNObfI5FpjQS6zgEkoJ_gdkC-VIfmaOJ7IgDVGCQgVaGUDDqjS0PWj6GKfyv8HdX4ueefp4b44aKxhVZ18TyJ1XFOJUCgo_qjTw9VHx_FtNjvgIGRJ5G8Z2U8dIYJJJRVXXtdwh4X7n783SeK2rgb3M0Jt4tgY_lI4cZRjCw1x_Mu92HgDxm95t1fAa6QGzO5y_5WRcRLVrF56mFubAj7wp3pkgUWC9Hj0oXq5vcA"

def path_timestamp(dt):
    return dt.format("YYYYMMDD_X")


def path_partition(dt):
    return dt.format("YYYY/MM/DD")

def get_structure_orders_page(esi, structure_id, token, page):
    res = esi.get_market_structure_orders(structure_id, token, page)
    return pd.DataFrame.from_dict(res.json())

def market_structure_orders_filename(structure_id, last_modified):
    return f"market_structure_orders_{structure_id}_{path_timestamp(last_modified)}.parquet"


def market_structure_orders_prefix(structure_id, last_modified):
    return f"market_structure_orders/{structure_id}/{path_partition(last_modified)}"


def market_structure_orders_path(structure_id, last_modified):
    filename = market_structure_orders_filename(structure_id, last_modified)
    prefix = market_structure_orders_prefix(structure_id, last_modified)
    return f"s3://evellama-dev-data-raw/{prefix}/{filename}"



def load_structure_orders_parquet(structure_id, last_modified, orders_df):
    path = market_structure_orders_path(structure_id, last_modified)
    s3.to_parquet(orders_df, path)
    print(f"Wrote {len(orders_df.index)} order(s) for region {structure_id} to {path}")

    return path


esi = ESI()
res = esi.get_market_structure_orders(structure_id, token)
last_modified = pendulum.parse(res.headers.get("last-modified"), strict=False)

data = res.json()
if not data:
    print("No orders")

page_dfs = [pd.DataFrame.from_dict(data)]
pages = int(res.headers.get("x-pages", 1))
with ThreadPoolExecutor(max_workers=100) as exe:
    futures = {
        exe.submit(get_structure_orders_page, esi, structure_id, token, p): p
        for p in range(2, pages + 1)
    }
    for future in concurrent.futures.as_completed(futures):
        page = futures[future]
        try:
            page_df = future.result()
            page_dfs.append(page_df)
        except Exception as exc:
            print(f"Error fetching page {page} for structure {structure_id}: ", exc)
            raise

orders_df = pd.concat(page_dfs, copy=False, ignore_index=True)
orders_df.astype(
    {"issued": "datetime64", "range": MARKET_ORDER_RANGE_DTYPE}, copy=False
)
orders_df["last_modified"] = pd.Timestamp(last_modified)

path = load_structure_orders_parquet(structure_id, last_modified, orders_df)
print(path)

HTTPError: 404 Client Error: Not Found for url: https://esi.evetech.net/latest/markets/structures/1028858195912/orders/?order_type=all&page=1