In [9]:
from mpi4py import MPI
import json

# read parameters
params =

comm = MPI.COMM_WORLD
mode = MPI.MODE_RDONLY
fh = MPI.File.Open(comm, "data-big.txt", mode)

# set read size parameter
file_size = fh.Get_size()
num_process = comm.Get_size()
rank = comm.Get_rank()
chunk_size = int(file_size / num_process)
start = rank * chunk_size + 1

# read all
buf = bytearray(chunk_size)
fh.Iread_at(start, buf)

raw = buf.decode()

print("Process", rank, "finish reading. Total size is", len(raw), "bytes.")

Exception: Invalid argument, error stack:
MPI_FILE_IREAD_AT(100): Invalid offset argument

In [22]:
import json
from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
fh = MPI.File.Open(comm, "config", MPI.MODE_RDONLY)
file_size = fh.Get_size()
buf = bytearray(file_size)
fh.Iread_at(0, buf)
params = json.loads(buf.decode())

# with open("config", "r") as f:
#     params = json.load(f)

print("Process", rank, "finish", "Answer: ", params)

Process 0 finish Answer:  {'chunk_size': 100000}


In [2]:
ts = [(x[0], float(x[1]), int(x[2])) for row in raw.split("\n") for x in [row.split(",")] if len(x) == 3]
print(len(ts))
print(ts[:5])
print(ts[-5:])

KeyboardInterrupt: 

In [4]:
raw[:100]

'20140804:10:00:00.574914,1173.56,471577\n20140804:10:00:00.898688,1251.60,445361\n20140804:10:00:00.94'

In [133]:
# find out the longest misplacement

tmp = [x[0] for x in ts]
rank = np.argsort(tmp)
rev = np.maximum(rank - np.arange(len(tmp)), 0)
cdf = np.sort(rev)

print(cdf[99000]) # 99% quantile is 871
print(cdf[99900]) # 99.9% quantile is 1085
print(cdf[-1]) # 100% quantile is 99882
print(np.sum(cdf>0)/len(cdf)) # 50% of records are misplaced

871
1085
99882
0.49394


In [136]:
# duplicates - record, time, (price, vol)
# a paper about filtering tick data
# erratic price on thin volume
# t2 from (t1, t2, t3) is out of wreck. use brownian bridge?

In [8]:
from mpi4py import MPI
import numpy as np
import json
import datetime
import os
from operator import itemgetter

def log(fh, rank, code, msg):
    # code: 0 - Info, 1 - Warning, 2 - Error
    codex = {0: "INFO", 1: "WARNING", 2: "ERROR"}
    msg = "[" + str(rank) + "][" + codex[code] + "]" + " (" + datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") \
          + "): " + msg
    fh.Write_shared(bytearray(msg.encode()))

if __name__ == "__main__":

    # start MPI and get basic info
    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    num_nodes = comm.Get_size()

    # reset log file
    if rank == 0:
        try:
            os.remove("log")
        except:
            pass

    flog = MPI.File.Open(comm, "log", MPI.MODE_WRONLY | MPI.MODE_CREATE)

    # read parameters
    try:
        with open("config", "r") as f:
            params = json.load(f)

        chunk_size = params["chunk_size"]
        chunk_buf = params["chunk_buf"]
        overlap_size = params["overlap_size"]

        log(flog, rank, 0, "Loaded in parameters")

    except:
        log(flog, rank, 2, "Fail to load in parameters. Set to defaults")

        chunk_size = 10000
        chunk_buf = 50
        overlap_size = 1000



    # read file in chunks
    fh = MPI.File.Open(comm, "data-small.txt", MPI.MODE_RDONLY)
    fsig = MPI.File.Open(comm, "signal.txt", MPI.MODE_WRONLY | MPI.MODE_CREATE)
    fnoi = MPI.File.Open(comm, "noise.txt", MPI.MODE_WRONLY | MPI.MODE_CREATE)

    file_size = fh.Get_size()
    block_size = file_size / num_nodes
    block_offset = rank * block_size

    num_chunks = int(block_size // chunk_size)
    overlap = []
    for iter in range(1):
        buf = bytearray(chunk_size + chunk_buf)
        fh.Read_at(block_offset + iter * chunk_size, buf)

        # discard everything before the first "\n"
        rows = buf[:chunk_size].decode().split("\n")[1:]

        # get remaining part from the remainder
        extra = buf[chunk_size:].decode().split("\n")[0]
        # if last byte is "\n", take one more record from the remainder
        if buf[chunk_size - 1] == 10:
            rows.append(extra)
        else:
            # append the remaining part of last row from the remainder
            rows[-1] += extra

        parsed = overlap + [(x[0], float(x[1]), int(x[2])) for row in rows for x in [row.split(",")]]
        # sort by time
        sorted_ix = np.argsort([x[0] for x in parsed])

        signal = []
        noise = []
        for i, j in zip(sorted_ix[:-overlap_size], sorted_ix[1:-(overlap_size-1)]):
            # filter by duplicates, price anomaly and negative volume
            if parsed[i] == parsed[j] or parsed[i][1] <= 0 or parsed[i][1] > 50000 or parsed[i][2] <= 0:
                noise.append(i)
            else:
                signal.append(i)

        overlap = [parsed[ix] for ix in sorted_ix[-overlap_size:]]
        
        # output result
        # fsig.Write_shared(bytearray("\n".join([parsed[ix] for ix in signal]).encode()))
        # fnoi.Write_shared(bytearray("\n".join([parsed[ix] for ix in noise]).encode()))

    # close files
    flog.Close()
    fh.Close()
    fsig.Close()
    fnoi.Close()

In [7]:
[parsed[ix] for ix in signal]

[('20140803:10:00:14.096783', 1592.13, 409123),
 ('20140803:10:00:16.385589', 1388.14, 170451),
 ('20140804:09:00:15.477069', 1383.22, 412705),
 ('20140804:10:00:13.002033', 1422.66, 587893),
 ('20140804:10:00:13.006122', 1223.81, 265954),
 ('20140804:10:00:13.010388', 1670.74, 492291),
 ('20140804:10:00:13.010719', 1212.62, 458423),
 ('20140804:10:00:13.015196', 1315.04, 249536),
 ('20140804:10:00:13.015647', 1561.74, 125340),
 ('20140804:10:00:13.020506', 1330.99, 73103),
 ('20140804:10:00:13.031423', 1384.71, 635757),
 ('20140804:10:00:13.032954', 1667.17, 151972),
 ('20140804:10:00:13.043173', 1460.62, 635069),
 ('20140804:10:00:13.052094', 1380.79, 31815),
 ('20140804:10:00:13.054458', 1463.7, 723309),
 ('20140804:10:00:13.064663', 1549.76, 154781),
 ('20140804:10:00:13.066897', 1847.74, 36752),
 ('20140804:10:00:13.067737', 1522.0, 365817),
 ('20140804:10:00:13.073592', 1021.86, 670339),
 ('20140804:10:00:13.092943', 1595.6, 657864),
 ('20140804:10:00:13.107291', 1937.1, 567363),