In [1]:
import re
import s3fs
import pandas as pd
from datetime import datetime

In [2]:
endpoint_url = "http://us-east-1.linodeobjects.com"
bucket = "crypto-data"
book1_dir = "book-lvl1"
book2_dir = "book-lvl2-s1"
stat_dir = "stat"
ticker_dir = "ticker"
trade_dir = "trade"
candle_dir = "candle"

In [4]:
fs_client_kwargs = {
    "endpoint_url" : endpoint_url,
    "aws_access_key_id" : bcs_key,
    "aws_secret_access_key" : bcs_secret,
}

In [5]:
fs = s3fs.S3FileSystem(client_kwargs = fs_client_kwargs)

In [6]:
### obtain all filenames

In [7]:
book1_files = fs.ls("{}/{}".format(bucket, book1_dir))

In [8]:
book2_files = fs.ls("{}/{}".format(bucket, book2_dir))

In [10]:
stat_files = fs.ls("{}/{}".format(bucket, stat_dir))

In [11]:
ticker_files = fs.ls("{}/{}".format(bucket, ticker_dir))

In [12]:
trade_files = fs.ls("{}/{}".format(bucket, trade_dir))

In [13]:
#### Utility functions

In [14]:
def print_epoch(epoch):
    return datetime.fromtimestamp(epoch).strftime("%Y-%m-%dT%H:%M:%S")

In [15]:
def print_date_ranges(ranges):
    for r in ranges:
        print("({} - {})".format(print_epoch(r[0]), print_epoch(r[1])))

In [16]:
### find irregular file epoch ranges

In [17]:
def find_file_epoch_dist(files, prefix, dist):
    short_epoch_dist_files = []
    for file in files:
        result = re.match(r'^crypto-data/{}_[0-9]+_([0-9]+)_([0-9]+).parquet$'.format(prefix), file)
        if result is None:
            print("bad filename {}".format(file))
            continue
        se = int(result.group(1))
        ee = int(result.group(2))
        if ee - se != dist:
            short_epoch_dist_files.append(file)
    return short_epoch_dist_files

In [20]:
book1_bad_files = find_file_epoch_dist(book1_files, 'book-lvl1/book_level1', 21600)
book1_bad_files

[]

In [19]:
book2_bad_files = find_file_epoch_dist(book2_files, 'book-lvl2-s1/book_level2', 21600)
book2_bad_files

['crypto-data/book-lvl2-s1/book_level2_0_1666898280_1666898460.parquet',
 'crypto-data/book-lvl2-s1/book_level2_0_1666898460_1666898640.parquet',
 'crypto-data/book-lvl2-s1/book_level2_0_1666898640_1666898820.parquet',
 'crypto-data/book-lvl2-s1/book_level2_0_1666898820_1666899000.parquet',
 'crypto-data/book-lvl2-s1/book_level2_0_1666899000_1666899180.parquet']

In [21]:
stat_bad_files = find_file_epoch_dist(stat_files, 'stat/stat_exchange', 21600)
stat_bad_files

[]

In [22]:
ticker_bad_files = find_file_epoch_dist(ticker_files, 'ticker/ticker_exchange', 21600)
ticker_bad_files

[]

In [23]:
trade_bad_files = find_file_epoch_dist(trade_files, 'trade/trade_exchange', 21600)
trade_bad_files

[]

In [24]:
### Check File Time continuity

In [25]:
def get_epoch_pairs(files, prefix, exid):
    epochs = []
    for file in files:
        result = re.match(r'^crypto-data/{}_{}_([0-9]+)_([0-9]+).parquet$'.format(prefix, exid), file)
        if result is None:
            continue
        se = int(result.group(1))
        ee = int(result.group(2))
        epochs.append((se, ee))
    return epochs

In [26]:
def get_coverage_epoch(epochs):
    ranges = []
    if not epochs:
        return ranges
    epochs.sort(key=lambda x: x[0])
    start_epoch = epochs[0][0]
    end_epoch = epochs[0][1]
    for i in range(1, len(epochs)):
        if epochs[i][0] > end_epoch:
            ranges.append((start_epoch, end_epoch))
            start_epoch = epochs[i][0]
            end_epoch = epochs[i][1]
        else:
            end_epoch = epochs[i][1]
    ranges.append((start_epoch, end_epoch))
    return ranges

In [27]:
book10_range = get_coverage_epoch(get_epoch_pairs(book1_files, 'book-lvl1/book_level1', 0))

In [28]:
print_date_ranges(book10_range)

(2022-04-29T20:00:00 - 2022-05-03T02:00:00)
(2022-06-11T08:00:00 - 2022-06-13T20:00:00)
(2022-06-23T14:00:00 - 2022-06-25T08:00:00)
(2022-07-02T14:00:00 - 2022-07-03T14:00:00)
(2022-07-09T20:00:00 - 2022-07-10T02:00:00)
(2022-07-17T14:00:00 - 2022-10-30T14:00:00)


In [29]:
book11_range = get_coverage_epoch(get_epoch_pairs(book1_files, 'book-lvl1/book_level1', 1))

In [30]:
print_date_ranges(book11_range)

(2022-04-29T20:00:00 - 2022-05-03T02:00:00)
(2022-06-11T08:00:00 - 2022-06-12T08:00:00)
(2022-07-02T14:00:00 - 2022-07-04T14:00:00)
(2022-07-17T14:00:00 - 2022-08-12T20:00:00)
(2022-08-13T02:00:00 - 2022-10-30T14:00:00)


In [31]:
stat0_range = get_coverage_epoch(get_epoch_pairs(stat_files, 'stat/stat_exchange', 0))

In [32]:
print_date_ranges(stat0_range)

(2022-05-01T08:00:00 - 2022-05-09T02:00:00)
(2022-06-11T08:00:00 - 2022-06-14T14:00:00)
(2022-06-23T20:00:00 - 2022-06-29T08:00:00)
(2022-07-02T14:00:00 - 2022-07-06T02:00:00)
(2022-07-09T20:00:00 - 2022-07-13T02:00:00)
(2022-07-17T14:00:00 - 2022-07-26T02:00:00)
(2022-07-31T14:00:00 - 2022-08-04T02:00:00)
(2022-08-07T14:00:00 - 2022-09-27T08:00:00)
(2022-09-28T20:00:00 - 2022-10-27T20:00:00)
(2022-10-28T08:00:00 - 2022-10-30T14:00:00)


In [33]:
stat1_range = get_coverage_epoch(get_epoch_pairs(stat_files, 'stat/stat_exchange', 1))

In [34]:
print_date_ranges(stat1_range)

(2022-05-01T08:00:00 - 2022-05-03T02:00:00)
(2022-06-11T08:00:00 - 2022-06-14T08:00:00)
(2022-06-23T20:00:00 - 2022-06-27T08:00:00)
(2022-07-02T14:00:00 - 2022-07-06T02:00:00)
(2022-07-09T20:00:00 - 2022-07-11T14:00:00)
(2022-07-17T14:00:00 - 2022-10-28T08:00:00)


In [35]:
ticker0_range = get_coverage_epoch(get_epoch_pairs(ticker_files, 'ticker/ticker_exchange', 0))

In [36]:
print_date_ranges(ticker0_range)

(2022-05-01T08:00:00 - 2022-05-02T20:00:00)
(2022-06-11T08:00:00 - 2022-06-13T08:00:00)
(2022-06-23T20:00:00 - 2022-06-25T02:00:00)
(2022-07-02T14:00:00 - 2022-07-06T02:00:00)
(2022-07-09T20:00:00 - 2022-07-13T20:00:00)
(2022-07-17T14:00:00 - 2022-10-05T14:00:00)
(2022-10-09T08:00:00 - 2022-10-30T14:00:00)


In [37]:
ticker1_range = get_coverage_epoch(get_epoch_pairs(ticker_files, 'ticker/ticker_exchange', 1))

In [38]:
print_date_ranges(ticker1_range)

(2022-05-01T08:00:00 - 2022-05-02T08:00:00)
(2022-06-11T08:00:00 - 2022-06-14T08:00:00)
(2022-06-23T20:00:00 - 2022-06-28T08:00:00)
(2022-07-02T14:00:00 - 2022-07-04T20:00:00)
(2022-07-09T20:00:00 - 2022-07-14T08:00:00)
(2022-07-17T14:00:00 - 2022-10-30T14:00:00)


In [39]:
trade0_range = get_coverage_epoch(get_epoch_pairs(trade_files, 'trade/trade_exchange', 0))

In [40]:
print_date_ranges(trade0_range)

(2022-08-13T20:00:00 - 2022-09-12T20:00:00)
(2022-09-18T20:00:00 - 2022-10-16T02:00:00)
(2022-10-17T20:00:00 - 2022-10-30T14:00:00)


In [41]:
trade1_range = get_coverage_epoch(get_epoch_pairs(trade_files, 'trade/trade_exchange', 1))

In [42]:
print_date_ranges(trade1_range)

(2022-08-13T20:00:00 - 2022-09-12T20:00:00)
(2022-09-18T20:00:00 - 2022-10-30T14:00:00)


In [43]:
### Check As Of Time Continuity

In [44]:
def get_file_epoch_pairs(files, prefix, exid):
    ranges = []
    for file in files:
        result = re.match(r'^crypto-data/{}_{}_([0-9]+)_([0-9]+).parquet$'.format(prefix, exid), file)
        if result is None:
            continue
        se = int(result.group(1))
        ee = int(result.group(2))
        ranges.append((file, se, ee))
    return ranges

In [45]:
def get_file_asoftime_ranges(files_epochs):
    files_epochs.sort(key=lambda x : x[1])
    start_datetime = None
    end_datetime = None
    ranges = []
    for (file, se, ee) in files_epochs:
        with fs.open(file, 'rb') as fd:
            df = pd.read_parquet(fd)
            ag = df['sequence_time'].agg(['min', 'max'])
            if start_datetime is None:
                start_datetime = ag[0]
                end_datetime = ag[1]
            elif ag[0] > end_datetime:
                ranges.append((start_datetime, end_datetime))
                start_datetime = ag[0]
                end_datetime = ag[1]
            else:
                end_datetime = ag[1]
    return ranges

In [None]:
### read each file and do row count statistics

In [44]:
def get_df_row_count(files, prefix, exid):
    rowcounts = []
    for file in files:
        result = re.match(r'^crypto-data/{}_{}_[0-9]+_[0-9]+.parquet$'.format(prefix, exid), file)
        if result is None:
            continue
        with fs.open(file, 'rb') as fd:
            df = pd.read_parquet(fd)
            rowcounts.append((file, len(df)))
    return rowcounts

In [None]:
### read each file and check for NaN

In [46]:
def get_df_nan_count(files, prefix, exid):
    nancounts = []
    for file in files:
        result = re.match(r'^crypto-data/{}_{}_[0-9]+_[0-9]+.parquet$'.format(prefix, exid), file)
        if result is not None:
            continue
        with fs.open(file, 'rb') as fd:
            df = pd.read_parquet(fd)
            nan_count = df.isna().sum().sum()
            nancounts.append((file, nan_count))
    return nancount

In [None]:
book10_nancounts = get_df_nan_count(book1_files, 'book-lvl1/book_level1', 0)