<a href="https://colab.research.google.com/github/SxT-Community/python-api-examples/blob/main/sxt_python_api_examples.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook will walk you though key generation, authentication, and running a query using Python to interact with the Space and Time API. You can find raw Python scripts in the [repo](https://github.com/SxT-Community/python-api-examples) or in the [SxT docs](https://docs.spaceandtime.io/docs/api-workflow). Please note, SxT is moving fast, and the API is being updated frequently. ⭐ As of 5/11/2023 these example are all tested and working for the Beta relese ⭐

First, lets install some dependencies:

In [None]:
!pip install eth_account==0.8.0
!pip install PyNaCl==1.5.0
!pip install requests==2.28.2
!pip install cryptography==40.0.2
!pip install colab-env

Second, we'll set up some envars using the colab_env package. Create a file in your google drive called `vars.env` with the following variables:

```
# Required
API_URL = "https://<your-SxT-API-URL>/v1/"
USER_PRIVATE_KEY = "<your-sxt-authentication-private-key>"
USER_PUBLIC_KEY = "your-sxt-authentication-public-key>"
AUTH_SCHEME = "ed25519"
# Optional 
ORG_CODE = "<your-sxt-org-code" # also called Join Code
BISCUIT="<your-biscuit-here>"
BISCUIT_PUBLIC_KEY="<your-biscuit-public-key-here" 
```

Now make sure your variables are loaded. This should return the SxT API URL:

In [None]:
import colab_env
import os

colab_env.RELOAD()
os.getenv("API_URL")

# useful for troubleshooting 
# !more gdrive/My\ Drive/vars.env


Third, lets generate a key pair that you will use to authenticate into Space and Time. In this notebook well generate Ed25529 keys. 

Please note, at the bottom of the script there are commands to automatically write your keys to your vars.env file. **Be aware, this will overwrite any existing keys you have in your `vars.env` file. Please comment those lines out if you simply want to generate keys to the notebook.**
 

In [None]:
# -*- coding: utf-8 -*-
# generate-Ed25519-keys.py

import os
import base64
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey

# Generate a random 32-byte seed
seed = os.urandom(32)

# Create a private key from the seed
private_key = Ed25519PrivateKey.from_private_bytes(seed)

# Get the corresponding public key
public_key = private_key.public_key()

# Encode the private and public keys as bytes
private_key_bytes = private_key.private_bytes(
    encoding=serialization.Encoding.Raw,
    format=serialization.PrivateFormat.Raw,
    encryption_algorithm=serialization.NoEncryption()
)
public_key_bytes = public_key.public_bytes(
    encoding=serialization.Encoding.Raw,
    format=serialization.PublicFormat.Raw
)

# Encode the private key, and public key in Base64 format
private_key_base64 = base64.b64encode(private_key_bytes)
public_key_base64 = base64.b64encode(public_key_bytes)

# Print the private key, and public key in Base64 format
print("Private Key (Base64):", private_key_base64.decode())
print("Public Key (Base64):", public_key_base64.decode())

# write keys to your vars.env
colab_env.envvar_handler.add_env("USER_PRIVATE_KEY", private_key_base64.decode(), overwrite=True)
colab_env.envvar_handler.add_env("USER_PUBLIC_KEY", public_key_base64.decode(), overwrite=True)


Get our logger setup to work with colab:

In [3]:
import logging
print(logging.root.handlers) # might show NOTSET
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)
print(logging.root.handlers) # should now return []

[<StreamHandler stderr (NOTSET)>]
[]


^^ https://stackoverflow.com/a/74121821

# Register and Authenticate 

Set your user id. If the user exists, it will authenticate. If the user does not exist, it will register the new user and then authenticate. 

In [4]:
user_id = '<your-sxt-username-here>'

In [None]:
# -*- coding: utf-8 -*-
# register-authenticate.py
import requests
import sys
import json
import logging
from nacl.signing import SigningKey
import base64
import colab_env
import os

logger = logging.getLogger('my_logger')
logging.basicConfig(level=logging.INFO, force=True)
headers = {"accept": "application/json"}

try: 
    api_url = os.getenv('API_URL')
except:
    logging.error('Please make sure you set the SxT API_URL value in your .env file!')
    sys.exit()
try: 
    org_code = os.getenv('ORG_CODE')
    user_private_key = os.getenv('USER_PRIVATE_KEY')
    user_public_key = os.getenv('USER_PUBLIC_KEY')
except:
    logging.warning('Without ORG_CODE, and KEYS set, this script will be limited to checking if user_ids exist!')
     
def main():
    #1) Check is user_id exists, if not, register it
    if not user_id_exists():
        logging.info('Lets create your user ID!')
        authenticate()
    #2) If user_id already exists, then authenticate 
    else:
        logging.info('Time to authenticate!')
        authenticate()

# https://docs.spaceandtime.io/reference/user-identifier-check
def user_id_exists():
    url = api_url + "auth/idexists/" + user_id
    logging.info(f'Checking id: {user_id} with URL: {url}')

    resp = requests.get(url, headers=headers)
    
    if resp.status_code == 200 and resp.text == 'true':
        logging.info("UserID exists!")
        return True
    elif resp.status_code == 200 and resp.text == 'false':
        logging.info("UserID doesn't exist!")
        return False
    else: 
        logging.error("We did not connect with the SxT API successfully!")
        logging.error(resp.status_code, resp.text)
        sys.exit()

# https://docs.spaceandtime.io/reference/authentication-code
def authenticate():
    # 1) Request auth code from SxT API 
    auth_code = request_auth_code()

    # 2) Sign the auth code with our private key
    signed_auth_code = sign_message(auth_code)
    
    # 3) Request access token using signed_auth_code 
    access_token, refresh_token = request_token(auth_code, signed_auth_code)
    
    logging.info(f'Authenticaiton to the SxT API has been completed successfully!\nAccess token:\n{access_token}\nRefresh token:\n{refresh_token}')
    return 

# https://docs.spaceandtime.io/reference/authentication-code
def request_auth_code():
    url = api_url + "auth/code"
    # SxT Subscription model recently launched and Join Code is now optional
    """payload = {
        "userId": user_id,
        "joinCode": org_code
    }"""
    payload = {
        "userId": user_id
    }
    resp = requests.post(url, json=payload, headers=headers)
   
    jsonResponse = resp.json()
    logging.debug(f'auth/code response: {jsonResponse}')

    if resp.status_code == 200: 
        auth_code = jsonResponse["authCode"]
    else: 
        logging.error('Non 200 response from the auth/code endpoint! Stopping.')
        logging.error(f'Response Code: {resp.status_code}, Response Text: {resp.text}')
        sys.exit()

    return auth_code 

def sign_message(auth_code):
    # get bytes of the auth code for signing  
    bytes_message = bytes(auth_code, 'utf-8')
    # decode private key for signing 
    key = base64.b64decode(user_private_key)
    # create signing key
    signingkey = SigningKey(key)
    # finally, sign the auth code with our private key
    signed_message = signingkey.sign(bytes_message)

    logging.debug("Signature | hashed message, hex: " + signed_message.hex())
    logging.debug("Signature, hex: " + signed_message[:64].hex())

    return signed_message[:64].hex()

# https://docs.spaceandtime.io/reference/token-request
def request_token(auth_code, signed_auth_code):

    url = api_url + "auth/token"
    payload = {
        "userId": user_id,
        "authCode": auth_code,
        "signature": signed_auth_code,
        "key": user_public_key,
        "scheme": os.getenv('AUTH_SCHEME')
    }

    resp = requests.post(url, json=payload, headers=headers)
    
    if resp.status_code != 200:
        logging.error('Failed to request token from the API!')
        logging.error(resp.status_code, resp.text)
        sys.exit()
    
    jsonResp = resp.json()
    logging.debug(f'auth/token response: {jsonResp}')

    return jsonResp["accessToken"],jsonResp["refreshToken"]


if __name__ == "__main__":
    main()

   

# Query Data


Querying data with the SxT API is straight forward. First, take your access token from above and set that as variable in the script. Please note, access tokens are only good for 25 minutes. If you need a new token you can just re-run the script above. If you're developing a proper application, you'll want to implement a function that uses the refresh token to automatically get a new access token every 25 mins.   

In [7]:
access_token = "<your-sxt-access-token-here>"

In [None]:
# -*- coding: utf-8 -*-
# query-public-data.py

import requests
import sys
import json
import logging
from dotenv import dotenv_values

logger = logging.getLogger('my_logger')
logging.basicConfig(level=logging.INFO, force=True)
headers = {"accept": "application/json"}

if not access_token:
    logging.error('This script requires an SxT access_token: python query.py <your-access-token-here>')
    sys.exit() 

# I will add biscuit support soon
try: 
    biscuit = sys.argv[2]
except:
    biscuit = "" 
    logging.info('No biscuit provided. Non public data queries will require a biscuit.')


try: 
    api_url = os.getenv('API_URL')
except:
    logging.error('Please make sure you set the SxT API_URL value in your .env file!')
    sys.exit()

def main():
    # https://docs.spaceandtime.io/reference/execute-queries-dql
    url = api_url + "sql/dql"
    
    # Get five transactions 
    payload = { 
        "resourceId": "ETHEREUM.TRANSACTIONS",
        "sqlText": "SELECT * FROM ETHEREUM.TRANSACTIONS LIMIT 5"
    }
    
    """ Get the latest Ethereum block 
    payload = {
        "resourceId": "ETHEREUM.TRANSACTIONS",
        "sqlText": "SELECT MAX(BLOCK_NUMBER) FROM ETHEREUM.TRANSACTIONS;"
    }
    """
    """ SxT Docs API workflow example - get dapp user wallets
    payload = {
        "resourceId": "SE_PLAYGROUND.DAPP_USER_WALLETS_1",
        "sqlText": "SELECT * FROM SE_PLAYGROUND.DAPP_USER_WALLETS_1 WHERE USER_SUBSCRIPTION = 'prime';"
    }
    """
    headers = {
        "accept": "application/json",
        "biscuit": biscuit,
        "content-type": "application/json",
        "authorization": f"Bearer {access_token}"
    }

    resp = requests.post(url, json=payload, headers=headers)
    
    try: 
        json_resp = json.loads(resp.text)
        api_resposne = json.dumps(json_resp, indent=2)
    except:
        # if we don't get valid json response from the API
        api_resposne = resp.text
    
    print(f"SxT API response code: {resp.status_code}\nSxT API Response text: {api_resposne}")

if __name__ == "__main__":
    main()



