In [None]:
import sys, subprocess, pkgutil
def ensure(pkg, spec=None):
    name = pkg if spec is None else spec
    if not pkgutil.find_loader(pkg):
        subprocess.check_call([sys.executable, "-m", "pip", "install", name])

ensure("python_dotenv", "python-dotenv")
ensure("requests", "requests>=2.32")
ensure("websockets", "websockets>=12.0")
ensure("cryptography", "cryptography>=42.0")

In [None]:
KALSHI_ENV=PROD
PROD_KEYID=e8b43912-6603-413c-b544-3ca7f47cd06b
PROD_KEYFILE=/Users/Brett/kdata/brettkey.txt

In [None]:
from dotenv import load_dotenv, find_dotenv
import os
load_dotenv(find_dotenv(), override=True)

ENV_NAME = os.getenv("KALSHI_ENV", "DEMO").upper()
PROD_KEYID = os.getenv("PROD_KEYID")
PROD_KEYFILE = os.getenv("PROD_KEYFILE")
DEMO_KEYID = os.getenv("DEMO_KEYID")
DEMO_KEYFILE = os.getenv("DEMO_KEYFILE")

print("ENV:", ENV_NAME)
print("PROD_KEYID set?", bool(PROD_KEYID))
print("PROD_KEYFILE:", PROD_KEYFILE)

In [None]:
ENV_NAME    = "PROD"  # "DEMO" or "PROD"
PROD_KEYID  = "e8b43912-6603-413c-b544-3ca7f47cd06b"
PROD_KEYFILE= "/Users/Brett/kdata/brettkey.txt"
DEMO_KEYID  = None
DEMO_KEYFILE= None

In [None]:
import base64, time, json
from typing import Any, Dict, Optional
from datetime import datetime, timedelta
from enum import Enum

import requests
from requests.exceptions import HTTPError

from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.exceptions import InvalidSignature

import websockets

In [None]:
class Environment(Enum):
    DEMO = "demo"
    PROD = "prod"

class KalshiBaseClient:
    def __init__(
        self,
        key_id: str,
        private_key: rsa.RSAPrivateKey,
        environment: Environment = Environment.PROD,
    ):
        self.key_id = key_id
        self.private_key = private_key
        self.environment = environment
        self.last_api_call = datetime.now()

        if self.environment == Environment.DEMO:
            self.HTTP_BASE_URL = "https://demo-api.kalshi.co"
            self.WS_BASE_URL   = "wss://demo-api.kalshi.co"
        elif self.environment == Environment.PROD:
            # Use production trading host
            self.HTTP_BASE_URL = "https://trading-api.kalshi.com"
            self.WS_BASE_URL   = "wss://trading-api.kalshi.com"
        else:
            raise ValueError("Invalid environment")

    def request_headers(self, method: str, path: str) -> Dict[str, Any]:
        # Timestamp in milliseconds
        ts = str(int(time.time() * 1000))

        # Sign EXACTLY: timestamp + METHOD + PATH (no host; strip query)
        path_only = path.split("?", 1)[0]
        msg = f"{ts}{method.upper()}{path_only}"
        signature = self.sign_pss_text(msg)

        return {
            "Content-Type": "application/json",
            "KALSHI-ACCESS-KEY": self.key_id,
            "KALSHI-ACCESS-TIMESTAMP": ts,
            "KALSHI-ACCESS-SIGNATURE": signature,
        }

    def sign_pss_text(self, text: str) -> str:
        message = text.encode("utf-8")
        signature = self.private_key.sign(
            message,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH,  # important
            ),
            hashes.SHA256(),
        )
        return base64.b64encode(signature).decode("utf-8")

class KalshiHttpClient(KalshiBaseClient):
    def __init__(
        self,
        key_id: str,
        private_key: rsa.RSAPrivateKey,
        environment: Environment = Environment.PROD,
    ):
        super().__init__(key_id, private_key, environment)
        self.host = self.HTTP_BASE_URL
        self.exchange_url  = "/trade-api/v2/exchange"
        self.markets_url   = "/trade-api/v2/markets"
        self.portfolio_url = "/trade-api/v2/portfolio"

    def rate_limit(self) -> None:
        THRESHOLD_MS = 100
        now = datetime.now()
        if now - self.last_api_call < timedelta(milliseconds=THRESHOLD_MS):
            time.sleep(THRESHOLD_MS / 1000)
        self.last_api_call = datetime.now()

    def raise_if_bad_response(self, response: requests.Response) -> None:
        if not (200 <= response.status_code < 300):
            # print Kalshi's error body for clarity
            print("Kalshi error:", response.status_code, response.text[:500])
            response.raise_for_status()

    def post(self, path: str, body: dict) -> Any:
        self.rate_limit()
        r = requests.post(
            self.host + path,
            json=body,
            headers=self.request_headers("POST", path)
        )
        self.raise_if_bad_response(r)
        return r.json()

    def get(self, path: str, params: Dict[str, Any] = None) -> Any:
        self.rate_limit()
        r = requests.get(
            self.host + path,
            headers=self.request_headers("GET", path),
            params=params or {}
        )
        self.raise_if_bad_response(r)
        return r.json()

    def delete(self, path: str, params: Dict[str, Any] = None) -> Any:
        self.rate_limit()
        r = requests.delete(
            self.host + path,
            headers=self.request_headers("DELETE", path),
            params=params or {}
        )
        self.raise_if_bad_response(r)
        return r.json()

    # Convenience
    def get_exchange_status(self) -> Dict[str, Any]:
        return self.get(self.exchange_url + "/status")

    def get_balance(self) -> Dict[str, Any]:
        return self.get(self.portfolio_url + "/balance")

    def get_trades(
        self,
        ticker: Optional[str] = None,
        limit: Optional[int] = None,
        cursor: Optional[str] = None,
        max_ts: Optional[int] = None,
        min_ts: Optional[int] = None,
    ) -> Dict[str, Any]:
        params = {k: v for k, v in {
            "ticker": ticker, "limit": limit, "cursor": cursor,
            "max_ts": max_ts, "min_ts": min_ts,
        }.items() if v is not None}
        return self.get(self.markets_url + "/trades", params=params)

class KalshiWebSocketClient(KalshiBaseClient):
    def __init__(
        self,
        key_id: str,
        private_key: rsa.RSAPrivateKey,
        environment: Environment = Environment.PROD,
    ):
        super().__init__(key_id, private_key, environment)
        self.ws = None
        self.url_suffix = "/trade-api/ws/v2"
        self.message_id = 1

    async def connect(self):
        host = self.WS_BASE_URL + self.url_suffix
        auth_headers = self.request_headers("GET", self.url_suffix)
        # websockets uses extra_headers=
        async with websockets.connect(host, extra_headers=auth_headers) as websocket:
            self.ws = websocket
            await self.on_open()
            await self.handler()

    async def on_open(self):
        print("WebSocket connection opened.")
        await self.subscribe_to_tickers()

    async def subscribe_to_tickers(self):
        # You can fine-tune channels, tickers, etc.
        msg = {"id": self.message_id, "cmd": "subscribe", "params": {"channels": ["ticker"]}}
        await self.ws.send(json.dumps(msg))
        self.message_id += 1

    async def handler(self):
        try:
            async for message in self.ws:
                await self.on_message(message)
        except websockets.ConnectionClosed as e:
            await self.on_close(e.code, e.reason)
        except Exception as e:
            await self.on_error(e)

    async def on_message(self, message):
        # Parse/print; you can route by channel/type here
        print("WS message:", message)

    async def on_error(self, error):
        print("WebSocket error:", error)

    async def on_close(self, code, reason):
        print("WebSocket closed:", code, reason)


In [None]:
# Select environment from ENV_NAME
env = Environment.PROD if ENV_NAME == "PROD" else Environment.DEMO

# Pick the right key vars
if env is Environment.PROD:
    KEYID, KEYFILE = PROD_KEYID, PROD_KEYFILE
else:
    KEYID, KEYFILE = DEMO_KEYID, DEMO_KEYFILE

if not KEYID:
    raise ValueError(f"Missing KEYID for {env.name}. Set in your .env or hardcode above.")
if not KEYFILE:
    raise ValueError(f"Missing KEYFILE for {env.name}. Set in your .env or hardcode above.")

from cryptography.hazmat.primitives import serialization
from pathlib import Path
key_path = Path(KEYFILE).expanduser().resolve()
if not key_path.exists():
    raise FileNotFoundError(f"Private key not found at {key_path}")

with key_path.open("rb") as f:
    private_key = serialization.load_pem_private_key(f.read(), password=None)

print("Using", env.name, "| KEYID:", KEYID[:8] + "â€¦", "| KEYFILE:", str(key_path))


In [None]:
client = KalshiHttpClient(key_id=KEYID, private_key=private_key, environment=env)

# Exchange status (no permissions needed)
status = client.get_exchange_status()
print("Exchange status:", status)

# Balance (requires valid PROD account + key)
try:
    bal = client.get_balance()
    print("Balance:", bal)
except requests.HTTPError as e:
    print("Balance call failed:", e)


In [None]:
ws_client = KalshiWebSocketClient(key_id=KEYID, private_key=private_key, environment=env)

# In notebooks, just await the coroutine:
await ws_client.connect()   # This will run and print incoming messages


In [None]:
# import sys, subprocess, pkgutil

# def ensure(pkg, spec=None):
#     name = pkg if spec is None else spec
#     if not pkgutil.find_loader(pkg):
#         subprocess.check_call([sys.executable, "-m", "pip", "install", name])

# # upgrade packaging bits (helps on macOS/Apple Silicon)
# subprocess.check_call([sys.executable, "-m", "pip", "install", "-U", "pip", "setuptools", "wheel"])

# # install pandas (and numpy if missing)
# ensure("numpy", "numpy>=2.0")
# ensure("pandas", "pandas>=2.2.3")

# import pandas as pd, numpy as np
# print("pandas:", pd.__version__, "| numpy:", np.__version__, "| python:", sys.executable)


In [None]:
# import requests
# import base64
# import time
# from typing import Any, Dict, Optional
# from datetime import datetime, timedelta
# from enum import Enum
# import json

# from requests.exceptions import HTTPError

# from cryptography.hazmat.primitives import serialization, hashes
# from cryptography.hazmat.primitives.asymmetric import padding, rsa
# from cryptography.exceptions import InvalidSignature

# import websockets

# class Environment(Enum):
#     DEMO = "demo"
#     PROD = "prod"

# class KalshiBaseClient:
#     """Base client class for interacting with the Kalshi API."""
#     def __init__(
#         self,
#         key_id: str,
#         private_key: rsa.RSAPrivateKey,
#         environment: Environment = Environment.PROD,
    # ):
    #     """Initializes the client with the provided API key and private key.

    #     Args:
    #         key_id (str): Your Kalshi API key ID.
    #         private_key (rsa.RSAPrivateKey): Your RSA private key.
    #         environment (Environment): The API environment to use (DEMO or PROD).
    #     """
    #     self.key_id = key_id
    #     self.private_key = private_key
    #     self.environment = environment
    #     self.last_api_call = datetime.now()

    #     if self.environment == Environment.DEMO:
    #         self.HTTP_BASE_URL = "https://demo-api.kalshi.co"
    #         self.WS_BASE_URL = "wss://demo-api.kalshi.co"
    #     elif self.environment == Environment.PROD:
    #         self.HTTP_BASE_URL = "https://trading-api.kalshi.com"
    #         self.WS_BASE_URL = "wss://trading-api.kalshi.com"
    #     else:
    #         raise ValueError("Invalid environment")

    # def request_headers(self, method: str, path: str) -> Dict[str, Any]:
    #     """Generates the required authentication headers for API requests."""
    #     current_time_milliseconds = int(time.time() * 1000)
    #     timestamp_str = str(current_time_milliseconds)

    #     # Remove query params from path
    #     path_parts = path.split('?')

    #     msg_string = timestamp_str + method + path_parts[0]
    #     signature = self.sign_pss_text(msg_string)

    #     headers = {
    #         "Content-Type": "application/json",
    #         "KALSHI-ACCESS-KEY": self.key_id,
    #         "KALSHI-ACCESS-SIGNATURE": signature,
    #         "KALSHI-ACCESS-TIMESTAMP": timestamp_str,
    #     }
    #     return headers

    # def sign_pss_text(self, text: str) -> str:
    #     """Signs the text using RSA-PSS and returns the base64 encoded signature."""
    #     message = text.encode('utf-8')
    #     try:
    #         signature = self.private_key.sign(
    #             message,
    #             padding.PSS(
#                     mgf=padding.MGF1(hashes.SHA256()),
#                     salt_length=padding.PSS.MAX_LENGTH,
#                 ),
#                 hashes.SHA256()
#             )
#             return base64.b64encode(signature).decode('utf-8')
#         except InvalidSignature as e:
#             raise ValueError("RSA sign PSS failed") from e

# class KalshiHttpClient(KalshiBaseClient):
#     """Client for handling HTTP connections to the Kalshi API."""
#     def __init__(
#         self,
#         key_id: str,
#         private_key: rsa.RSAPrivateKey,
#         environment: Environment = Environment.DEMO,
#     ):
#         super().__init__(key_id, private_key, environment)
#         self.host = self.HTTP_BASE_URL
#         self.exchange_url = "/trade-api/v2/exchange"
#         self.markets_url = "/trade-api/v2/markets"
#         self.portfolio_url = "/trade-api/v2/portfolio"

    # def rate_limit(self) -> None:
    #     """Built-in rate limiter to prevent exceeding API rate limits."""
    #     THRESHOLD_IN_MILLISECONDS = 100
    #     now = datetime.now()
    #     threshold_in_microseconds = 1000 * THRESHOLD_IN_MILLISECONDS
    #     threshold_in_seconds = THRESHOLD_IN_MILLISECONDS / 1000
    #     if now - self.last_api_call < timedelta(microseconds=threshold_in_microseconds):
    #         time.sleep(threshold_in_seconds)
    #     self.last_api_call = datetime.now()

    # def raise_if_bad_response(self, response: requests.Response) -> None:
    #     """Raises an HTTPError if the response status code indicates an error."""
    #     if response.status_code not in range(200, 299):
    #         response.raise_for_status()

    # def post(self, path: str, body: dict) -> Any:
    #     """Performs an authenticated POST request to the Kalshi API."""
    #     self.rate_limit()
    #     response = requests.post(
    #         self.host + path,
    #         json=body,
    #         headers=self.request_headers("POST", path)
    #     )
    #     self.raise_if_bad_response(response)
    #     return response.json()

    # def get(self, path: str, params: Dict[str, Any] = {}) -> Any:
    #     """Performs an authenticated GET request to the Kalshi API."""
    #     self.rate_limit()
    #     response = requests.get(
    #         self.host + path,
    #         headers=self.request_headers("GET", path),
    #         params=params
    #     )
    #     self.raise_if_bad_response(response)
    #     return response.json()

    # def delete(self, path: str, params: Dict[str, Any] = {}) -> Any:
    #     """Performs an authenticated DELETE request to the Kalshi API."""
    #     self.rate_limit()
    #     response = requests.delete(
    #         self.host + path,
    #         headers=self.request_headers("DELETE", path),
    #         params=params
    #     )
    #     self.raise_if_bad_response(response)
    #     return response.json()

    # def get_balance(self) -> Dict[str, Any]:
    #     """Retrieves the account balance."""
    #     return self.get(self.portfolio_url + '/balance')

    # def get_exchange_status(self) -> Dict[str, Any]:
    #     """Retrieves the exchange status."""
#         return self.get(self.exchange_url + "/status")

#     def get_trades(
#         self,
#         ticker: Optional[str] = None,
#         limit: Optional[int] = None,
#         cursor: Optional[str] = None,
#         max_ts: Optional[int] = None,
#         min_ts: Optional[int] = None,
#     ) -> Dict[str, Any]:
#         """Retrieves trades based on provided filters."""
#         params = {
#             'ticker': ticker,
#             'limit': limit,
#             'cursor': cursor,
#             'max_ts': max_ts,
#             'min_ts': min_ts,
#         }
#         # Remove None values
#         params = {k: v for k, v in params.items() if v is not None}
#         return self.get(self.markets_url + '/trades', params=params)

# class KalshiWebSocketClient(KalshiBaseClient):
#     """Client for handling WebSocket connections to the Kalshi API."""
#     def __init__(
    #     self,
    #     key_id: str,
    #     private_key: rsa.RSAPrivateKey,
    #     environment: Environment = Environment.DEMO,
    # ):
    #     super().__init__(key_id, private_key, environment)
    #     self.ws = None
    #     self.url_suffix = "/trade-api/ws/v2"
    #     self.message_id = 1  # Add counter for message IDs

    # async def connect(self):
    #     """Establishes a WebSocket connection using authentication."""
    #     host = self.WS_BASE_URL + self.url_suffix
    #     auth_headers = self.request_headers("GET", self.url_suffix)
    #     async with websockets.connect(host, extra_headers=auth_headers) as websocket:
    #         self.ws = websocket
    #         await self.on_open()
    #         await self.handler()

    # async def on_open(self):
    #     """Callback when WebSocket connection is opened."""
    #     print("WebSocket connection opened.")
    #     await self.subscribe_to_tickers()

    # async def subscribe_to_tickers(self):
    #     """Subscribe to ticker updates for all markets."""
    #     subscription_message = {
    #         "id": self.message_id,
    #         "cmd": "subscribe",
    #         "params": {
    #             "channels": ["ticker"]
    #         }
    #     }
    #     await self.ws.send(json.dumps(subscription_message))
    #     self.message_id += 1

    # async def handler(self):
    #     """Handle incoming messages."""
    #     try:
    #         async for message in self.ws:
    #             await self.on_message(message)
    #     except websockets.ConnectionClosed as e:
    #         await self.on_close(e.code, e.reason)
    #     except Exception as e:
    #         await self.on_error(e)

    # async def on_message(self, message):
    #     """Callback for handling incoming messages."""
    #     print("Received message:", message)

    # async def on_error(self, error):
    #     """Callback for handling errors."""
    #     print("WebSocket error:", error)

    # async def on_close(self, close_status_code, close_msg):
        # """Callback when WebSocket connection is closed."""
        # print("WebSocket connection closed with code:", close_status_code, "and message:", close_msg)

In [None]:
# import os
# from dotenv import load_dotenv, find_dotenv
# from cryptography.hazmat.primitives import serialization
# import asyncio

# from clients import KalshiHttpClient, KalshiWebSocketClient, Environment

# # Load environment variables
# load_dotenv(find_dotenv(), override=True)
# env = Environment.PROD # toggle environment here
# KEYID = os.getenv('DEMO_KEYID') if env == Environment.DEMO else os.getenv('e8b43912-6603-413c-b544-3ca7f47cd06b')
# KEYFILE = os.getenv('DEMO_KEYFILE') if env == Environment.DEMO else os.getenv('/Users/Brett/kdata/brettkey.txt')

# if not KEYFILE:
#     KEYFILE = "/Users/Brett/kdata/brettkey.txt"
# if not KEYID:
#     KEYID = 'e8b43912-6603-413c-b544-3ca7f47cd06b'
# try:
#     with open(KEYFILE, "rb") as key_file:
#         private_key = serialization.load_pem_private_key(
#             key_file.read(),
#             password=None  # Provide the password if your key is encrypted
#         )
# except FileNotFoundError:
#     raise FileNotFoundError(f"Private key file not found at {KEYFILE}")
# except Exception as e:
#     raise Exception(f"Error loading private key: {str(e)}")

# print(env)
# print(KEYID)
# print(KEYFILE)

# # Initialize the HTTP client
# client = KalshiHttpClient(
#     key_id=KEYID,
#     private_key=private_key,
#     environment=env
# )

# # Get account balance
# balance = client.get_balance()
# print("Balance:", balance)

# # Initialize the WebSocket client
# ws_client = KalshiWebSocketClient(
#     key_id=KEYID,
#     private_key=private_key,
#     environment=env
# )

# # Connect via WebSocket
# asyncio.run(ws_client.connect())

Environment.PROD
e8b43912-6603-413c-b544-3ca7f47cd06b
/Users/Brett/kdata/brettkey.txt
Balance: {'balance': 0, 'portfolio_value': 0, 'updated_ts': 1760764425}


RuntimeError: asyncio.run() cannot be called from a running event loop