In [None]:
import warnings
warnings.filterwarnings(action='once')

In [None]:
from dask.distributed import Client
import dask

dask.config.set({'temporary-directory': r'E:\dask-cache'})
client = Client()  # start distributed scheduler locally.

In [None]:
%%time
import dask.dataframe as dd
from numpy import int64

dataset_path = r'E:\out'

ddf = dd.read_parquet(dataset_path)
print(ddf.npartitions)
print(ddf.dtypes)

In [None]:
%%time
import os

# Get unique 'instId' values
instIds = ddf['instId'].unique().compute()


In [None]:
test_instId = instIds[0]
ddf_instId = ddf[ddf['instId'] == test_instId]
ddf_instId = ddf_instId.compute()

In [None]:
import pyarrow as pa 

schema = pa.schema([
    ('instId', pa.string()),
    ('price', pa.float64()),
    ('size', pa.float64()),
    ('numOrders', pa.int64()),
    ('side', pa.string()),
    ('timestamp', pa.int64()),
    ('prevSeqId', pa.int64()),  # Specify the correct data type here
    ('seqId', pa.int64()),
    ('action', pa.string()),
    ('checksum', pa.int64()),
])


print(ddf_instId.npartitions)
# ddf_instId = dd.from_pandas(ddf_instId, chunksize=100_000_000)
ddf_instId.to_parquet(fr'e:\out2\{test_instId}', write_index=False, write_metadata_file=True, schema=schema, compression='gzip')

In [None]:
import pyarrow as pa 

schema = pa.schema([
    ('instId', pa.string()),
    ('price', pa.float64()),
    ('size', pa.float64()),
    ('numOrders', pa.int64()),
    ('side', pa.string()),
    ('timestamp', pa.int64()),
    ('prevSeqId', pa.int64()),
    ('seqId', pa.int64()),
    ('action', pa.string()),
    ('checksum', pa.int64()),
])


for instId in instIds[:1]:
    print(instId)
    # Filter dataframe by 'instId'
    ddf_instId = ddf[ddf['instId'] == instId].compute()
    print(type(ddf_instId))
    # Define the path
    path = os.path.join(r'E:\out2', instId)
    ddf_instId = dd.from_pandas(ddf_instId, chunksize=100_000_000)
    print(type(ddf_instId))
    # Save the filtered dataframe to a parquet file
    ddf_instId.to_parquet(path, write_index=False, write_metadata_file=True, schema=schema, compression='gzip')

In [None]:
import bisect
import zlib
from numpy import int32
from pandas import DataFrame, Series

from typing import List, Tuple


class BooksItem:
    def __init__(self,
                price: float,
                size: float,
                numOrders: int,
                side: str,
                ) -> None:
        self.price = price
        self.size = size
        self.numOrders = numOrders
        self.side = side
    
    def __str__(self) -> str:
        return f'{self.price}:{self.size}'


class Books:
    def __init__(self, instId: str) -> None:
        self.instId = instId
        self.bids: List[BooksItem] = []
        self.asks: List[BooksItem] = []
    
    def update(self, rawRecord: Series) -> None:
        if rawRecord['instId'].unique().tolist() != [self.instId]:
            raise ValueError('instId mismatch')
        
        for _, record in rawRecord.iterrows():
            new_booksItem = BooksItem(
                                record['price'], 
                                record['size'], 
                                record['numOrders'], 
                                record['side']
                            )
            if record['side'] not in ['bid', 'ask']:
                raise ValueError('side should be either bid or ask')
            target = self.bids if record['side'] == 'bid' else self.asks
            if record['side'] == 'ask':
                # asks are sorted in descending order
                target.reverse()
            
            dest_level = next(filter(lambda x: x.price == new_booksItem.price, target), None)
            if dest_level is None:
                inserted_point = bisect.bisect_left(target, new_booksItem, key=lambda x: x.price)
                target.insert(inserted_point, new_booksItem)
            else:
                if new_booksItem.size == 0:
                    target.remove(dest_level)
                else:
                    dest_level.size = new_booksItem.size
                    dest_level.numOrders = new_booksItem.numOrders
            
            if record['side'] == 'ask':
                # reverse back
                target.reverse()


    @property
    def checksum(self) -> int32:
        bids = self.bids[:25]
        asks = self.asks[:25]

        checksum_string = ''
        for i in range(max(len(bids), len(asks))):
            if i < len(bids):
                checksum_string = ':'.join([checksum_string, str(bids[i])])
            if i < len(asks):
                checksum_string = ':'.join([checksum_string, str(asks[i])])
        checksum_string = checksum_string.strip(':')
        
        # Calculate checksum with CRC32
        checksum = int32(zlib.crc32(checksum_string.encode('utf-8')))
        
        return checksum

    
def verify(instId: str, df: dd.DataFrame) -> int:
    df = df.sort_values('timestamp')
    
    books = Books(instId)
    status = 'empty'
    
    count: int = 0
    for index, row in df.iterrows():
        books.update(row)
        if status == 'empty':
            if row['action'] == 'snapshot':
                status = 'loading'
        elif status == 'loading':
            if row['action'] == 'update':
                status = 'full'
        elif status == 'full':
            if books.checksum != int32(row['checksum']):
                count += 1
    
    return count