In [None]:
import concurrent.futures
import logging
import queue
import random
import threading
import time

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)

SENTINEL = object()

def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")
    
    
def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s  (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")
    
    
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()

In [None]:
import threading
import time
import logging
import random
import queue

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)

BUF_SIZE = 10
q = queue.Queue(BUF_SIZE)

class ProducerThread(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        super(ProducerThread,self).__init__()
        self.target = target
        self.name = name

    def run(self):
        while True:
            if not q.full():
                item = random.randint(1,10)
                q.put(item)
                logging.debug('Putting ' + str(item)  
                              + ' : ' + str(q.qsize()) + ' items in queue')
                time.sleep(random.random())
        return

class ConsumerThread(threading.Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, verbose=None):
        super(ConsumerThread,self).__init__()
        self.target = target
        self.name = name
        return

    def run(self):
        while True:
            if not q.empty():
                item = q.get()
                logging.debug('Getting ' + str(item) 
                              + ' : ' + str(q.qsize()) + ' items in queue')
                time.sleep(random.random())
        return

if __name__ == '__main__':
    
    p = ProducerThread(name='producer')
    c = ConsumerThread(name='consumer')

    p.start()
    time.sleep(2)
    c.start()
    time.sleep(2)
time.sleep(2)

In [None]:
import websocket
import bitmex_websocket as bm_ws
from bitmex_websocket import Instrument
from bitmex_websocket.constants import InstrumentChannels
from threading import Thread
import time
import pickle

class BitmexSubscriber:

    def __init__(self):
        self.subscriptions = {}

        self._message_handlers = {'trade': self._log_trade, 'quote': self._log_quote}
        self._buffers = {'trade':[], 'quote':[]}


    def archive(self):
        ref = self._buffers['trade']
        if len(ref) > 10:
            with open('XBTUSD2-trades.pkl', 'ab', buffering=10*1024) as f:
                pickle.dump(ref, f, pickle.HIGHEST_PROTOCOL)
            ref.clear()
            
        ref = self._buffers['quote']
        if len(ref) > 10:
            with open('XBTUSD2-quotes.pkl', 'ab', buffering=10*1024) as f:
                pickle.dump(ref, f, pickle.HIGHEST_PROTOCOL)
            ref.clear()
            
        
    def ingest_data(self, filename, data_dict):
        with open(f'{filename}.pkl', 'ab') as f:
            pickle.dump(data_dict, f, pickle.HIGHEST_PROTOCOL)

    def _log_trade(self, msg):
        self._buffers['trade'].append(msg['data'])
        #for item in msg['data']:
            #self.ingest_data(f"{item['symbol']}-trades", item)

    def _log_quote(self, msg):
        self._buffers['quote'].append(msg['data'])
        #for item in msg['data']:
            #self.ingest_data(f"{item['symbol']}-quotes", item)        

    def on_message(self, msg):
        print(len(self._buffers['trade']), len(self._buffers['quote']))
        #tmp = len(msg['data'])
        #print(f'Rec message {tmp}')
        #time.sleep(10.0)
        
        table = msg['table']
        handler = self._message_handlers[table]
        handler(msg)
        self.archive()

    def subscribe(self, symbol, channels):

        instrument = bm_ws.Instrument(symbol=symbol, channels=channels)
        instrument.on('action', lambda msg: self.on_message(msg))
        thread = Thread(target=instrument.run_forever)
        self.subscriptions.update({symbol: {'channels': channels, 'thread': thread}})
        thread.start()


def bitmex_subscribe():

    websocket.enableTrace(True)

    channels = [
        InstrumentChannels.trade, InstrumentChannels.quote
    ]
    #InstrumentChannels.trade

    subscriber = BitmexSubscriber()
    subscriber.subscribe('XBTUSD', channels)

bitmex_subscribe()

In [None]:
import pickle
import gzip
import numpy as np
import datetime
import itertools

timezero = datetime.datetime(2021, 1, 1, 8, 0, 0)

ncalls = 0

def rec_data():
    global ncalls
    ncalls += 1
    time = timezero + ncalls*datetime.timedelta(hours=3)
    return {'Timestamp': str(time), 'Hdr1': np.random.rand()}


buffer = []
cnt = 0
while True:
    buffer.append(rec_data())
    
    if cnt % 10 == 0:
        for key, group in itertools.groupby(buffer, lambda x: datetime.datetime.strptime(x['Timestamp'], '%Y%m%d')):
            print(key, group)
        #with open('C:\\TEMP\\2BUPL\\')
    
    
    cnt += 1
    if cnt > 20:
        break

In [None]:
import pickle
#import gzip
#import numpy as np
import datetime
import itertools
import time
import glob
import os
import shutil

timezero = datetime.datetime(2021, 1, 1, 8, 0, 0)
curr_time = timezero
last_upload = None
DATE_FORMAT = '%Y%m%d %H:%M:%S'


def rec_data():
    return {'Timestamp': curr_time.strftime(DATE_FORMAT), 'Hdr1': np.random.rand()}

last_cleanup = None
class Buffer:
    def __init__(self):
        self._buff = []
    
    def write_buffer(self):
        # group buffer records on date (assumed to be chronologically sorted)
        for key, group in itertools.groupby(self._buff, lambda x: datetime.datetime.strptime(x['Timestamp'], DATE_FORMAT).strftime('%Y%m%d')):
            records = list(group)
            with open('C:\\temp\\2BUPL\\{}_XBT_quote'.format(key), 'ab+') as f:
                print('writing {} records to {}'.format(len(records), key))
                pickle.dump(records, f, pickle.HIGHEST_PROTOCOL)
        self._buff = [] # empty buffer
        
    def append(self, records):
        self._buff.append(records)
        
    def size(self):
        return len(self._buff)

cnt = 0
buffer = Buffer()
while True:
    print('----------------')
    print('Time is now {}'.format(curr_time))
    buffer.append(rec_data())
    print('size(buffer) = {}'.format(buffer.size()))
    
    # Oly write to local disc in chunks
    if cnt % 10 == 0:
        buffer.write_buffer()
        
        
    if last_cleanup is None or curr_time.date() > last_cleanup:
        buffer.write_buffer()
        # Check if upload time (buffer must be empty here)..
        for fullpath in glob.glob('C:\\temp\\2BUPL\\*_XBT_quote*'):
            filename = os.path.basename(fullpath)
            fileanchor = datetime.datetime.strptime(filename[0:8], '%Y%m%d')
            if curr_time.date() > fileanchor.date():
                # Upload and move to uploaded local dir...
                new_path = '{}{}'.format('C:\\temp\\UPL\\', filename)
                print('moving from {} to {}'.format(fullpath, new_path))
                shutil.move(fullpath, new_path)
                
        # Check if delete time.
        for fullpath in glob.glob('C:\\temp\\UPL\\*_XBT_quote*'):
            filename = os.path.basename(fullpath)
            fileanchor = datetime.datetime.strptime(filename[0:8], '%Y%m%d')
            if fileanchor.date() < curr_time.date() - datetime.timedelta(days=3):
                print('deleting old file {}'.format(fullpath))
                os.remove(fullpath)
                
        last_cleanup = curr_time.date()
                
                
    curr_time += datetime.timedelta(hours=3)
    
    cnt += 1
    if cnt > 1000:
        break

In [None]:

def clear():
    open('C:\\TEMP\\foo1.txt', 'w').close()
    open('C:\\TEMP\\foo2.txt', 'w').close()

def foo1():
    for i in range(100):
        with open('C:\\TEMP\\foo1.txt', 'a+') as f:
            f.write(str(i) +'\n')
    return
            
def foo2():
    f = open('C:\\TEMP\\foo2.txt', 'a+')
    for i in range(100):
        f.write(str(i) +'\n')
    #f.close()
    return
    

In [None]:
clear()
%timeit foo1()
%timeit foo2()

In [1]:
import pickle

src = r"C:\Users\Pontus\Documents\GitHub\microprice\XBTUSD-quotes.pickle"
data = []
cnt = 0
with open(src, 'rb') as f:
    try:
        while True:
            data.append(pickle.load(f))
            cnt += 1
    except EOFError:
        print('End of File')




End of File


In [10]:
import gzip
import pandas as pd
import glob
import time
from datetime import datetime

def clear():
    for x in glob.glob('C:\\TEMP\\*.pklz'):
        open(x, 'w').close()
    
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

def bar1(seq, n): # open once, chunk dicts gzip
    f_hnd = gzip.open('C:\\TEMP\\default.pklz', 'ab+')
    for cnk in chunks(seq, n):
        pickle.dump(cnk, f_hnd)    
        #time.sleep(0.5)
    f_hnd.close()
    return

def bar1R(): # open once, chunk dicts gzip
    bla = []
    with gzip.open('C:\\TEMP\\default.pklz', 'rb') as f_hnd:
        try:
            while True:
                bla.append(pickle.load(f_hnd))
        except:
            pass
    return

def bar2(seq, n): # open once, chunk dicts gzip
    f_hnd = gzip.open('C:\\TEMP\\default_json.pklz', 'wt+')
    for x in seq:
        f_hnd.writelines(str(x))
    f_hnd.close()
    return


In [None]:
CHUNKSIZE = 1000
clear()
%timeit -n1 -r1 bar1(data, CHUNKSIZE)
%timeit -n1 -r1 bar2(data, CHUNKSIZE)
#%timeit -n1 -r1 bar4(data, CHUNKSIZE)
#%timeit -n1 -r1 bar5(data[0:10000], CHUNKSIZE) # Open each time

In [11]:
#bar2(data[0:10], 1)
bar2R()

malformed node or string: <ast.Subscript object at 0x00000207FA69EAC0>
0
<class 'list'>


In [6]:
from ast import literal_eval

literal_eval(str(data[0:2]))

[{'timestamp': '2020-12-14T00:07:24.057Z',
  'symbol': 'XBTUSD',
  'bidSize': 83434,
  'bidPrice': 19077,
  'askPrice': 19077.5,
  'askSize': 890861},
 {'timestamp': '2020-12-14T00:07:24.072Z',
  'symbol': 'XBTUSD',
  'bidSize': 83434,
  'bidPrice': 19077,
  'askPrice': 19077.5,
  'askSize': 902353}]

In [None]:
from datetime import datetime, timedelta

class Resource:
    def __init__(self):
        self.dt = datetime.now()
        self.handle = open(f'C:\\TEMP\\ABC')
        
    def update(self):
        
        
    def handle(self):
        now = datetime.now()
        if now - self.dt > timedelta(seconds=5):
            self.dt = now

    def __del__(self):
        print(f'- ({type(self).counter} -> {type(self).counter-1})')
        type(self).counter -= 1


class Animal:
    def __init__(self):
        self.name = 'Monkey'
        self.resource = Resource()
        
    def make_sound(self):
        print('HoHoHo')
        
def foo():
    anm1 = Animal()
    try:
        h = 1/0
    except:
        del anm1
    return
        

foo()


In [15]:
with open(r"C:\Users\Pontus\Documents\bitmexdata\2021-02-04%BCHUSD%quote", 'rb') as f:
    gg = pickle.load(f)

In [16]:
gg

{'timestamp': '2021-02-04T19:33:57.020Z',
 'symbol': 'XBTUSD',
 'bidSize': 616147,
 'bidPrice': 37228.5,
 'askPrice': 37229,
 'askSize': 229550}