In [1]:
import requests
import json

# Exercise 1

**Explain concisely how you would fetch the data from bitflyer.**

To collect the data from Bitflyer, I would write a python script that would access the Bitflyer API using the python `requests` library.  Looking at a sample of the results below, I can see the `id` has gaps but I still can use the pagination feature in the API to ensure I get all the data.  On manual execution of the script, I would want to go back in time as far as I can go.  This API has a max of 31 days.  I would increase my `count` to the maximum the API would allow or what is suitable for my bandwidth.  For example, by default it is `100` and the max appears to be `500` (this is about $254B \times 500 \approx 124KB$).  Once I get a `count` parameter that's managable, I loop until the API is empty (retrieved all 31 days) by paginating on the `before` parameter as the last `id` on the next request.  After each request, I will store and commit the results into database.  To prevent my IP from being blocked by the API, I would make sure the requests don't exceed `500` requests every `5` minutes.  This would be controlled with a timer and sleep wrapper function.

Once I get all the past data, I would turn the script over to a job to run regularly, for example on AWS as a lambda, to get the latest executions.  It would work almost identical as above but instead of going backwards in time, this would go forward in time again using pagination.  The way it would work is to query the database for the latest `id` that was stored and then call the API using the `after` parameter and with the same count as above.  This would also loop till the data was caught up, relying on self-imposed rate limiting to ensure it doesn't exceed the API's rate limits and risk an IP ban.  The amount of data and time to run would determine the frequency of the job.

Error handling would also need to be gracefully handled.  The script would need to ensure that a batch of requests is not skipped when added to the database to prevent gaps in data.  The script would also need fail gracefully by reporting exceptions either to a log file or a notification system or both.

In [2]:
# default count is 100 records
# API limit is 500 requests per 5 minutes

endpoint = 'https://api.bitflyer.com/v1/'
path='executions'
query={"product_code": "BTC_JPY","market_type": "Spot", "count":999}

response = requests.get(f'{endpoint}{path}', params=query)
json = response.json()
len(json)

500

# Exercise 2

**Define the data structures you would use for executions and candles and write functions to process executions and build candles (assume that the data has been already fetched from bitflyer). Please add tests for these functions.**

For this work, since I'm working in Python, I am using Pandas DataFrames for the execution and candle data.  I created a function `build_bitflyer_candles` which takes in a json list of dicts and outputs a Pandas DataFrame containing the candles of the given size (default is 1 minute).

One thing to note is that this method may not be the ideal approach for this pipeline.  For example, it might be better to store the executions in the database first and then to a query on the database for a specified period and build the candles from that query result to then be stored into another candle table (mentioned below).  If I use this function, it's possible that the JSON will contain incomplete information on the head and tail.  There are a few others options that could be considered such as truncating the json results, removing the first and last candle, etc.

In this function, I am assuming buy and sell executions are joined together and treated as separate executions.  The buy/sell direction is not used in the candle creation.

In [3]:
import pandas as pd

def build_bitflyer_candles(json, candle_size='1min'):
    '''
    Builds candles dataframe from BitFlyer API json input.
    
    Parameters
    ----------
    json : list(dict())
        The json response from bitflyer
    
    size : string, optional
        Candle size.  (Default is 1 minute)
        Uses Panda's offset aliases. For more details, read here:
        https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#offset-aliases
    
    Returns
    -------
    DataFrame
        Columns: high,low,open,close,number_of_trades,volume,average
        Index: open_time (of each candle)
    
    Examples
    --------
    >>> build_candles(json, 'T')
     <Pandas DataFrame with 1 minute candles>
     
    >>> build_candles(json, '15min')
     <Pandas DataFrame with 15 minute candles>
     
    >>> build_candles(json, '1h')
     <Pandas DataFrame with 1 hour candles>
     
    >>> build_candles(json, '1d')
     <Pandas DataFrame with 1 day candles>
    '''
    
    # Build a Pandas Dataframe with exec_date as the index
    df=pd.DataFrame(json)
    df.exec_date = pd.to_datetime(df.exec_date)
    df.set_index('exec_date',inplace=True)
    df.sort_index(inplace=True) # needed for the merge_asof later

    # Add open, high, low, close prices
    group = df.groupby(pd.Grouper(freq=candle_size)) # group by minute
    minute_df = group['price'].agg({'first','min','max','last','count'})
    
    # Merge volume with prices
    minute_df = minute_df.merge(group['size'].sum(),how='left',left_index=True, right_index=True)
    minute_df.rename(columns={'min':'low','max':'high','last':'close','first':'open','size':'volume','count':'number_of_trades'},inplace=True)

    # Fill N/A prices with last close  - average will remain zero for these records
    minute_df.close = minute_df.close.ffill()
    na_cols = ['open','high','low']
    for c in na_cols:
        minute_df[c] = minute_df[c].fillna(minute_df.close)
    
    # Calculate the weighted price
    wp_df = pd.merge_asof(df, minute_df.volume, on='exec_date').set_index('exec_date')
    wp_df['average'] = (wp_df['size']/wp_df['volume'])*wp_df['price']
    wp_group = wp_df.groupby(pd.Grouper(freq=candle_size))
    minute_df = minute_df.merge(wp_group['average'].sum(),how='left',left_index=True, right_index=True)
    
    # Rename index as "open_time"
    minute_df.index.name = 'open_time'
    
    return minute_df

df = build_bitflyer_candles(json, '1min')
display(df[:50])

validate_columns = ['high','low','open','close','number_of_trades','volume','average']

assert len(df)>0, "Dataframe is empty"
assert df.index.name == 'open_time', "Index is not open_time"
assert type(df.index) == pd.DatetimeIndex, "Index is not of type pandas DatetimeIndex"
assert len(df.columns) == len(validate_columns), "Number of columns doesn't match"
assert all(c in df.columns for c in validate_columns), "Missing necessary columns"
assert all(df.notnull()), "There are null values"
assert len(df)!=(df.index.max()-df.index.min()).seconds//60, "Number of records doesn't match the number of minutes between the first and last timestamp"
assert all(df.high>=df.low), "The high price has values lower than the low price"
assert all(df.high>=df.open), "The high price has values lower than the open price"
assert all(df.high>=df.close), "The high price has values lower than the close price"
assert all(df.high>=df.average), "The high price has values lower than the average price"
assert all(df.low<=df.open), "The low price has values higher than the open price"
assert all(df.low<=df.close), "The low price has values higher than the close price"
assert all((df.low<=df.average) | (df.volume==0)), "The low price has values higher than the average price"

Unnamed: 0_level_0,high,low,open,number_of_trades,close,volume,average
open_time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2021-07-12 05:42:00,3778572.0,3776049.0,3778422.0,7,3776049.0,0.583183,3778232.0
2021-07-12 05:43:00,3777720.0,3776314.0,3777592.0,9,3777720.0,0.619797,3777563.0
2021-07-12 05:44:00,3778650.0,3774549.0,3778650.0,21,3775115.0,2.005997,3777383.0
2021-07-12 05:45:00,3775500.0,3769996.0,3775500.0,19,3769996.0,1.236068,3772550.0
2021-07-12 05:46:00,3771044.0,3769568.0,3769996.0,14,3769568.0,1.084558,3770142.0
2021-07-12 05:47:00,3770974.0,3768369.0,3768871.0,22,3770776.0,1.2108,3770428.0
2021-07-12 05:48:00,3769082.0,3767418.0,3769000.0,8,3768176.0,0.602,3768052.0
2021-07-12 05:49:00,3773251.0,3769624.0,3769624.0,86,3771146.0,9.412095,3772091.0
2021-07-12 05:50:00,3772993.0,3769631.0,3770625.0,17,3772632.0,1.140688,3771959.0
2021-07-12 05:51:00,3777039.0,3772666.0,3772666.0,15,3777039.0,0.549522,3775825.0


# Exercise 3
**Suggest a postgresql schema and sql queries to store the executions and candles. The schema must take into account the queries of the following question.**


The following PostgreSQL table would hold all the executions for `BTC_JPY` from the BitFlyer API. The `side` variable was reduced to a single character and could be reduced further to a boolean if desired (e.g. `buy`).  The `buy_child_id` and `sell_child_id` columns were made variable because this data could be removed if proved unuseful or to handle changes in structure from BitFlyer.  The `create_date` timestamp was added for a minimal amount of change control tracking across all tables.  Additional columns like `create_user`, `mod_date`, and `mod_user` could also be added if additional change tracking was needed.  The primary key for this table with the `id` from the BitFlyer API.

```postgresql
CREATE TABLE public.bitflyer_executions_btcjpy (
    id int(8) NOT NULL,
    side char(1) NOT NULL,             -- 'B' = buy | 'S' = sell
    exec_date timestamp NOT NULL,
    price numeric(20, 8) NOT NULL,
    volume numeric(20, 8) NOT NULL,    -- size of execution
    buy_child_id VARCHAR(30) NULL,
    sell_child_id VARCHAR(30) NULL,
    create_date timestamptz NOT NULL DEFAULT now(),
    CONSTRAINT bitflyer_executions_btcjpy_pkey PRIMARY KEY (id)
);
```

The following PostgreSQL tables would hold all the candlestick data for `BTC_JPY` calculated from the BitFlyer executions.  The reason this is all in one table is because candlestick data is usually lightweight.  Growth would most likely occur in the currency pairs so that data dimension would be separated by table instead of kept in one table.  Also, different sources of data could be added in the future (for example Binance data would be stored in another table).  This also aligns with the `bitflyer_executions_btcjpy` table.  The `size_id` will determine what size of candlestick.  For data integrity and to limit space consumption, this value is stored in another table and is linked by a foreign key.

```postgresql
CREATE TABLE public.bitflyer_candlesticks_btcjpy (
    size_id int2 NOT NULL,             -- e.g. 0 = 1min, 1 = 15min, 2 = 1h, etc..
    open_time timestamp NOT NULL,
    "open" numeric(20, 8) NOT NULL,
    high numeric(20, 8) NOT NULL,
    low numeric(20, 8) NOT NULL,
    "close" numeric(20, 8) NOT NULL,
    "average" numeric(20, 8) NOT NULL,
    volume numeric(20, 8) NOT NULL,
    number_of_trades int4 NOT NULL,
    create_date timestamptz NOT NULL DEFAULT now(),
    CONSTRAINT bitflyer_candlesticks_btcjpy_pkey PRIMARY KEY (size_id, open_time),
    CONSTRAINT bitflyer_candlesticks_btcjpy_size_fk FOREIGN KEY (size_id) REFERENCES public.candlestick_sizes(id)
);
```

To support the foreign key and data integrity, the following table would also be created:

```postgresql
CREATE TABLE public.candlestick_sizes (
    id int2 NOT NULL,                  -- e.g. 0, 1, 2, etc..
    "size" VARCHAR(30) NOT NULL,         -- e.g. 1min, 15min, 1h, etc...
    create_date timestamptz NOT NULL DEFAULT now(),
    CONSTRAINT candlestick_sizes_pkey PRIMARY KEY (id)
);
```

# Exercise 4

**Suggest efficient sql queries to handle the following:**

1. find the daily minimum and maximum price over the last 30 days
2. find the daily volume of executions that happened over and under a given price (parameter) for the last 30 days

**Query 1**

Suggested approach:
```postgresql
SELECT date_trunc('day', exec_date) as date,
       MIN(price) as min_price,
       MAX(price) as max_price
 FROM bitflyer_executions_btcjpy
WHERE exec_date >= (CURRENT_DATE - INTERVAL '30 days')
GROUP BY date
ORDER BY date
```

Alternative approach:
```postgresql
SELECT date_trunc('day', exec_date) as date,
       MIN(low) as min_price,
       MAX(high) as max_price
 FROM bitflyer_candlesticks_btcjpy
WHERE open_time >= (CURRENT_DATE - INTERVAL '30 days')
  AND size_id = 0  -- Assume 0 = 1min
GROUP BY date
ORDER BY date
```

**Query 2**

```postgresql
SELECT DATE_TRUNC('day', exec_date) as date,
       COALESCE(SUM(CASE WHEN price <= :price THEN volume END),0) as "under_volume",
       COALESCE(SUM(CASE WHEN price > :price THEN volume END),0) as "over_volume"
 FROM bitflyer_executions_btcjpy
WHERE exec_date >= (CURRENT_DATE - INTERVAL '30 days')
GROUP BY date
ORDER BY date
```

*Assumption:* Executions matching the price parameter will be placed in the `under_volume` bucket.