In [1]:
import sys
import json
import subprocess
import os
from tqdm import tqdm

import numpy as np

from numba import jit, vectorize
import pandas as pd

from typing import Tuple

# Running it live

## Set up the arrays and functions used in advance

- Alternatively they can also be set up when "BEGIN" is read.
- `num_data_points` is used to limit the array size to 10000 elements, 10M points is too large for my 8 gigs of RAM :(

In [2]:
num_markets = 12001
# Ideally this would 10M, but this is too large for my 8 gigs of RAM
num_data_points = 10000

volume = np.empty((num_markets * num_data_points), dtype=np.float64) * np.nan
price = np.empty((num_markets * num_data_points), dtype=np.float32) * np.nan
volume_price_product = np.empty((num_markets * num_data_points), dtype=np.float64) * np.nan
is_buy = np.empty((num_markets * num_data_points), dtype=float) * np.nan

market_analysis = np.zeros((num_markets, 5), dtype=np.float64)

## Declare the functions to be used

In [3]:
def get_data(line: dict) -> Tuple[int, int, np.ndarray]:
    idx = line["id"]
    market = line["market"]
    position_idx = market * num_data_points + idx
    
    volume[position_idx] = line["volume"]
    price[position_idx] = line["price"]
    volume_price_product[position_idx] = line["volume"] * line["price"]
    is_buy[position_idx] = line["is_buy"]
    return idx, market, volume, price, volume_price_product, is_buy

In [4]:
@jit(nopython=True)
def analyze_data(
    idx: int, 
    market: int, 
    volume: np.ndarray, 
    price: np.ndarray, 
    volume_price_product: np.ndarray, 
    is_buy: np.ndarray, 
    market_analysis: np.ndarray
):
    start_pos_idx = market * num_data_points
    end_pos_idx = market * num_data_points + idx + 1
    
    # Total volume
    market_analysis[market, 0] = np.nansum(volume[start_pos_idx:end_pos_idx])
    # Mean price
    market_analysis[market, 1] = np.nanmean(price[start_pos_idx:end_pos_idx])
    # Mean volume
    # We can also reuse the total_volume calculated above and just calculate the number of 
    # non-nan elements to get the mean volume, perhaps a slight performance improvement
    market_analysis[market, 2] = np.nanmean(volume[start_pos_idx:end_pos_idx])
    # Volume weighted price mean
    market_analysis[market, 3] = np.nanmean(volume_price_product[start_pos_idx:end_pos_idx])
    # Percentage buys
    not_nan = is_buy[start_pos_idx:end_pos_idx][np.isnan(is_buy[start_pos_idx:end_pos_idx]) == False]
    if not_nan.size > 0:
        market_analysis[market, 4] = np.count_nonzero(not_nan) / not_nan.size

analyze_data(0, 0, volume, price, volume_price_product, is_buy, market_analysis)

In [5]:
# Create a subprocess to read the stdout of the binary file
proc = subprocess.Popen([os.path.join(os.getcwd(), 'stdoutinator_amd64_darwin.bin')], stdout=subprocess.PIPE)

# Currently limiting the range due to memory issues with the full 10M array
# In the final version, the while loop should be used

# while True:
for _ in tqdm(range(num_data_points)):
    line = proc.stdout.readline().decode("utf-8").rstrip()
    
    if not line:
        break
    if line == "BEGIN":
        # Arrays and functions can also be initialized here
        continue
    if line == "END":
        # Some additional cleanup logic can be added here
        break
        
    # Extract relevant data from the json
    # Note that `id` has been renamed to `idx`to prevent built-in conflicts
    idx, market, volume, price, volume_price_product, is_buy = get_data(line=json.loads(line))
    
    # Analyze data, note that since `market_analysis` is declared globally, it can be called at anytime 
    # to retrieve the current analyzed data 
    analyze_data(
        idx=idx, 
        market=market, 
        volume=volume, 
        price=price, 
        volume_price_product=volume_price_product, 
        is_buy=is_buy, 
        market_analysis=market_analysis
    )

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████| 10000/10000 [00:00<00:00, 10500.99it/s]


# Visualize in DataFrame
DataFrame index is `market`

In [8]:
df = pd.DataFrame(columns=["total_volume", "mean_price", "mean_volume", "mean_volume_weighted_price", "percentage_buys"])

for index, element in enumerate(market_analysis):
    df.loc[index] = element

In [7]:
df.head()

Unnamed: 0,total_volume,mean_price,mean_volume,mean_volume_weighted_price,percentage_buys
0,0.0,,,,0.0
1,189.034643,1.280509,189.034643,242.060606,1.0
2,3994.634779,2.577998,3994.634779,10298.162026,1.0
3,2017.032211,3.283191,2017.032211,6622.301427,1.0
4,4372.212173,4.063693,4372.212173,17767.325878,1.0
