### Market data refresh 

### Input Description

RAW OHLC data.

### Output  

Clean OHLC data in a hdf store

### Operations

This code takes a financial market data file and runs it through a processing pipeline. The following operations are carried out :

- Localise the time data to market time
- Merge with existing RAW data based on datetime
- Save the resulting RAW data to HDF5

In [1]:
#!pip install --upgrade "../../quantutils"
import json, os, pandas
import quantutils.dataset.pipeline as ppl
from quantutils.api.datasource import MarketDataStore

In [2]:
def refreshMarketData(mds, root, datasource_file):

    # Loop over datasources...
    # TODO: In chronological order

    datasources = json.load(open(root + "/" + datasource_file))
    
    for datasource in datasources:

        DS_path = root + "/" + datasource["name"] + "/"
        SRC_path = DS_path + "raw/"

        for market in datasource["markets"]:

            for source in market["sources"]:

                # Loop over any source files...
                for infile in os.listdir(SRC_path):

                    if infile.lower().startswith(source["name"].lower()):

                        print("Adding " + infile + " to " + market["name"] + " table")

                        # Load RAW data (assume CSV)
                        newData = pandas.read_csv(SRC_path + infile,
                                                  index_col=datasource["index_col"],
                                                  parse_dates=datasource["parse_dates"],
                                                  header=None,
                                                  names=["Date", "Time", "Open", "High", "Low", "Close"],
                                                  usecols=range(0, 6),
                                                  skiprows=datasource["skiprows"],
                                                  dayfirst=datasource["dayfirst"]
                                                  )

                        if newData is not None:

                            newData = ppl.localize(newData, datasource["timezone"], "UTC")

                            mds.append(source["name"], newData, source["sample_unit"], update=True, debug=True)

In [None]:
mds = MarketDataStore(remote=True, location="http://pricestore.192.168.1.203.nip.io")
refreshMarketData(mds, "../datasources", "datasources_finam.json")

Adding D&J-IND_150101_170519.csv to DOW table
Converting from US/Eastern to UTC
http://pricestore.192.168.1.203.nip.io/prices/datasource/D&J-IND?unit=5min
<Response [200]>
{"rc":"success"}

{'rc': 'success'}
http://pricestore.192.168.1.203.nip.io/prices/datasource/D&J-IND?unit=5min
{'rc': 'success'}
Adding D&J-IND_161003_180319.csv to DOW table
Converting from US/Eastern to UTC
http://pricestore.192.168.1.203.nip.io/prices/datasource/D&J-IND?unit=5min
<Response [200]>
{"rc":"success"}

{'rc': 'success'}
http://pricestore.192.168.1.203.nip.io/prices/datasource/D&J-IND?unit=5min
{'rc': 'success'}
Adding D&J-IND_130101_141231.csv to DOW table
Converting from US/Eastern to UTC
http://pricestore.192.168.1.203.nip.io/prices/datasource/D&J-IND?unit=5min
<Response [200]>
{"rc":"success"}

{'rc': 'success'}
http://pricestore.192.168.1.203.nip.io/prices/datasource/D&J-IND?unit=5min
{'rc': 'success'}
Adding SANDP-500_161003_180319.csv to SPY table
Converting from US/Eastern to UTC
http://pricesto

In [15]:
mds.delete("D&J-IND")

{'rc': 'success'}

In [13]:
print('Your text', flush=True)

Your text


In [5]:
mds = MarketDataStore(location="../datasources")
refreshMarketData(mds, "../datasources", "datasources.json")

Adding WallSt-hourly-120217.txt to DOW table
Converting from Europe/London to UTC
Resampling to H periods
Appending data...
Adding WallSt-hourly-011116.txt to DOW table


  check_attribute_name(name)


Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding WallSt-hourly-210618.txt to DOW table
Converting from Europe/London to UTC
Resampling to H periods
Appending data...
Adding WallSt-hourly-230718.txt to DOW table
Converting from Europe/London to UTC
Resampling to H periods
Appending data...
Adding WallSt-hourly-050517.txt to DOW table
Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding WallSt-hourly-140518.txt to DOW table
Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding WallSt-hourly-040417.txt to DOW table
Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding WallSt-hourly-030818.txt to DOW table
Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding WallSt-hourly-160517.txt to DOW table
Converting from Europe/London to UTC
Resam

  check_attribute_name(name)


Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding SP500-hourly-210618.txt to SPY table
Converting from Europe/London to UTC
Resampling to H periods
Appending data...
Adding SP500-hourly-200318.txt to SPY table
Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding SP500-hourly-180418.txt to SPY table
Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding SP500-hourly-140518.txt to SPY table
Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding SP500-hourly-040618.txt to SPY table
Converting from Europe/London to UTC
Resampling to H periods
Re-writing table data for update...
Adding SP500-hourly-030818.txt to SPY table
Converting from Europe/London to UTC
Resampling to H periods
Appending data...
Adding SP500-hourly-230718.txt to SPY table
Converting from Europe/London to UTC
Resampling t

  check_attribute_name(name)


Converting from US/Eastern to UTC
Resampling to 5min periods
Re-writing table data for update...
Adding D&J-IND_130101_141231.csv to DOW table
Converting from US/Eastern to UTC
Resampling to 5min periods
Re-writing table data for update...
Adding SANDP-500_161003_180319.csv to SPY table
Converting from US/Eastern to UTC
Resampling to 5min periods
Appending data...
Adding SANDP-500_150101_170519.csv to SPY table


  check_attribute_name(name)


Converting from US/Eastern to UTC
Resampling to 5min periods
Re-writing table data for update...
Adding SANDP-500_130101_141231.csv to SPY table
Converting from US/Eastern to UTC
Resampling to 5min periods
Re-writing table data for update...


In [4]:
mds = MarketDataStore(remote=True, location="http://pricestore.192.168.1.203.nip.io")

In [14]:
x = mds.get("D%26J-IND", debug=True)

http://pricestore.192.168.1.203.nip.io/prices/datasource/D%26J-IND
<Response [200]>


IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [6]:
x

Unnamed: 0,Open,High,Low,Close
2013-01-04 15:00:00,13407.46,13411.91,13377.22,13387.45
2013-01-04 16:00:00,13387.37,13420.86,13386.8,13408.33
2013-01-04 17:00:00,13408.2,13415.8,13403.52,13405.06


In [10]:
mds.append("D%26J-IND", x, "H", update=False, debug=True)

http://localhost:8080/prices/datasource/D%26J-IND?unit=H
<Response [500]>
http://localhost:8080/prices/datasource/D%26J-IND?unit=H
{'message': 'Internal Server Error'}


{'message': 'Internal Server Error'}

In [9]:
mds = MarketDataStore(remote=True, location="http://localhost:8080")

In [72]:
import requests
data = x.to_json(orient='split', date_format="iso")
data2 = {"columns":["Open","High","Low","Close"],"index":["2013-01-04T15:00:00.000Z","2013-01-04T16:00:00.000Z","2013-01-04T17:00:00.000Z"],"data":[[13407.46,13411.91,13377.22,13387.45],[13387.37,13420.86,13386.8,13408.33],[13408.2,13415.8,13403.52,13405.06]]}
headers = {'Content-Type': 'application/json'}
url="http://pricestore.192.168.1.203.nip.io/prices/datasource/D%26J-IND?unit=H"
r = requests.post(url=url, headers=headers, data=data)

In [73]:
r.json()

{'msg': 'Error: Entry already exists for data starting at index 2013-01-04 15:00:00+00:00',
 'rc': 'fail'}

In [63]:
json.dumps(json.loads(data))

'{"columns": ["Open", "High", "Low", "Close"], "index": ["2013-01-04T15:00:00.000Z", "2013-01-04T16:00:00.000Z", "2013-01-04T17:00:00.000Z"], "data": [[13407.46, 13411.91, 13377.22, 13387.45], [13387.37, 13420.86, 13386.8, 13408.33], [13408.2, 13415.8, 13403.52, 13405.06]]}'

In [65]:
data

'{"columns":["Open","High","Low","Close"],"index":["2013-01-04T15:00:00.000Z","2013-01-04T16:00:00.000Z","2013-01-04T17:00:00.000Z"],"data":[[13407.46,13411.91,13377.22,13387.45],[13387.37,13420.86,13386.8,13408.33],[13408.2,13415.8,13403.52,13405.06]]}'