# Processing Refinitiv Marketdata 

## Imports

In [64]:
import s3fs
import bz2
import boto3
import pandas as pd
import io

In [39]:
SRC_BUCKET='epython'
SRC_PATH='mktdata/refinitiv/raw_bars/'
DST_BUCKET='epython-marketdata'

In [2]:
s3 = s3fs.S3FileSystem()

In [3]:
inst_type = 'IR'

## Getting files by instrument

In [41]:
def get_files_for_inst(inst_type):
    client = boto3.client('s3')
    paginator = client.get_paginator('list_objects')
    
    page_iterator = paginator.paginate(
        Bucket=SRC_BUCKET,
        Prefix=SRC_PATH)

    for page in page_iterator:
        for content in page['Contents']:
            key = content['Key']
            if key.endswith('bz2') and f'/{inst_type}=' in key:
                yield key
                

Let's see a few examples of **IR** files

In [89]:
### paths = get_files_for_inst('IR')
for _ in range(5):
    print(next(paths))

mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=20y(Ric_AUDQM3AB20Y=TWEB).csv.bz2
mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=25y(Ric_AUDQM3AB25Y=FMD).csv.bz2
mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=25y(Ric_AUDQM3AB25Y=TWEB).csv.bz2
mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=2y(Ric_AUDQM3AB2Y=BGCP).csv.bz2
mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=2y(Ric_AUDQM3AB2Y=FMD).csv.bz2


## Translate to destination path

To help DataLake partition with appropriate columns, we use *Key=Value* subdirectories. 

In [78]:
def get_dest_path(inst_type, src_path):
    parts = src_path.split('/')
    date, filename = parts[3], parts[4]
    if inst_type == 'IR':
        _, ccy, curve, tenor = filename.split('(', 1)[0].split('=')
        keys = [('date', date), ('ccy', ccy), ('curve', curve), ('tenor', tenor)]
    else:
        ## TODO:
        ## Perhaps many instruments has the same format as IR, but need to check
        raise NotImpelmentedError(f'{isnt_type} not yet implemented')
                                  
    subdir = '/'.join('='.join(x) for x in keys)
    return f'refinitiv/{subdir}/{filename}'

And here's an example

In [79]:
get_dest_path('IR', 'mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=12y(Ric_AUDQM3AB12Y=FMD).csv.bz2')

'refinitiv/date=20201014/ccy=AUD/curve=Swap_3M/tenor=12y/IR=AUD=Swap_3M=12y(Ric_AUDQM3AB12Y=FMD).csv.bz2'

## Process the file content

In [71]:
def process_file(path):
    with s3.open(path) as f:
        data = f.read()

    data = bz2.decompress(data).decode()
    # Nothing fancy for now. Just cut off the '#' in front of the header
    # as the Crawler thinks it's a comment
    return data[1:]


### Visulise the data

Not required, but just for our sanity, let's visualise the data

In [72]:
def visualise_data(data):
    buffer = io.StringIO(data)
    return pd.read_csv(buffer)

data = process_file('epython/mktdata/refinitiv/raw_bars/20201015/IR=AUD=Swap_6M=25y(Ric_AUDSM6AB25Y=TWEB).csv.bz2')
visualise_data(data)

Unnamed: 0,RIC,Domain,Date-Time,GMT Offset,Type,Price,Bid Price,Ask Price,UserDefinedIdentifier
0,AUDSM6AB25Y=TWEB,Market Price,2020-10-15T00:09:14.890626000Z,-4,Trade,1.164,,,IR:AUD:Swap_6M:25y
1,AUDSM6AB25Y=TWEB,Market Price,2020-10-15T00:09:14.890626000Z,-4,Quote,,1.164,1.189,IR:AUD:Swap_6M:25y
2,AUDSM6AB25Y=TWEB,Market Price,2020-10-15T00:09:15.975061000Z,-4,Trade,1.160,,,IR:AUD:Swap_6M:25y
3,AUDSM6AB25Y=TWEB,Market Price,2020-10-15T00:09:15.975061000Z,-4,Quote,,1.160,1.186,IR:AUD:Swap_6M:25y
4,AUDSM6AB25Y=TWEB,Market Price,2020-10-15T00:09:17.430824000Z,-4,Trade,1.160,,,IR:AUD:Swap_6M:25y
...,...,...,...,...,...,...,...,...,...
1645,AUDSM6AB25Y=TWEB,Market Price,2020-10-15T21:33:22.014640000Z,-4,Quote,,1.143,1.173,IR:AUD:Swap_6M:25y
1646,AUDSM6AB25Y=TWEB,Market Price,2020-10-15T21:33:23.134698000Z,-4,Trade,1.146,,,IR:AUD:Swap_6M:25y
1647,AUDSM6AB25Y=TWEB,Market Price,2020-10-15T21:33:23.134698000Z,-4,Quote,,1.146,1.172,IR:AUD:Swap_6M:25y
1648,AUDSM6AB25Y=TWEB,Market Price,2020-10-15T21:33:24.462774000Z,-4,Trade,1.147,,,IR:AUD:Swap_6M:25y


## End to End

### IR Insturments

In [85]:
paths = get_files_for_inst('IR')

inst_type = 'IR'
for i, path in enumerate(paths):
    src_path = SRC_BUCKET + '/' + path
    print(f'processing file {i}')
    print(f'reading from file: {src_path}')
    data = process_file(src_path)
    
    dst_path = DST_BUCKET + '/' + get_dest_path(inst_type, path)
    compressed = bz2.compress(data.encode())
    print(f'writing to file: {dst_path}')
    with s3.open(dst_path, 'wb') as f:
        f.write(compressed)

    # When we are happy with the results,
    # we take out this and it'll run for all
    if i > 5:
        break

processing file 0
reading from file: epython/mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=12y(Ric_AUDQM3AB12Y=FMD).csv.bz2
writing to file: epython-marketdata/refinitiv/date=20201014/ccy=AUD/curve=Swap_3M/tenor=12y/IR=AUD=Swap_3M=12y(Ric_AUDQM3AB12Y=FMD).csv.bz2
processing file 1
reading from file: epython/mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=12y(Ric_AUDQM3AB12Y=TWEB).csv.bz2
writing to file: epython-marketdata/refinitiv/date=20201014/ccy=AUD/curve=Swap_3M/tenor=12y/IR=AUD=Swap_3M=12y(Ric_AUDQM3AB12Y=TWEB).csv.bz2
processing file 2
reading from file: epython/mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=15y(Ric_AUDQM3AB15Y=FMD).csv.bz2
writing to file: epython-marketdata/refinitiv/date=20201014/ccy=AUD/curve=Swap_3M/tenor=15y/IR=AUD=Swap_3M=15y(Ric_AUDQM3AB15Y=FMD).csv.bz2
processing file 3
reading from file: epython/mktdata/refinitiv/raw_bars/20201014/IR=AUD=Swap_3M=15y(Ric_AUDQM3AB15Y=TWEB).csv.bz2
writing to file: epython-marketdata/refinitiv/date=20201014/c

## Checking the result

Let's read the results back just to make sure it is okay

In [87]:
def read_data(path):
    with s3.open(path, 'rb') as f:
        data = f.read()

    return visualise_data(bz2.decompress(data).decode())
    
path = 'epython-marketdata/refinitiv/date=20201014/ccy=AUD/curve=Swap_3M/tenor=20y/IR=AUD=Swap_3M=20y(Ric_AUDQM3AB20Y=FMD).csv.bz2'
read_data(path)

Unnamed: 0,RIC,Domain,Date-Time,GMT Offset,Type,Price,Bid Price,Ask Price,UserDefinedIdentifier
0,AUDQM3AB20Y=FMD,Market Price,2020-10-15T00:01:54.962672000Z,0,Quote,,0.2201,0.2501,IR:AUD:Swap_3M:20y
1,AUDQM3AB20Y=FMD,Market Price,2020-10-15T00:07:57.846693000Z,0,Quote,,0.2200,0.2500,IR:AUD:Swap_3M:20y
2,AUDQM3AB20Y=FMD,Market Price,2020-10-15T00:08:00.119248000Z,0,Quote,,0.2191,0.2491,IR:AUD:Swap_3M:20y
3,AUDQM3AB20Y=FMD,Market Price,2020-10-15T00:08:50.726670000Z,0,Quote,,0.2192,0.2492,IR:AUD:Swap_3M:20y
4,AUDQM3AB20Y=FMD,Market Price,2020-10-15T00:09:06.998694000Z,0,Quote,,0.2191,0.2491,IR:AUD:Swap_3M:20y
...,...,...,...,...,...,...,...,...,...
274,AUDQM3AB20Y=FMD,Market Price,2020-10-15T23:54:44.162701000Z,0,Quote,,0.2128,0.2428,IR:AUD:Swap_3M:20y
275,AUDQM3AB20Y=FMD,Market Price,2020-10-15T23:57:58.794724000Z,0,Quote,,0.2127,0.2427,IR:AUD:Swap_3M:20y
276,AUDQM3AB20Y=FMD,Market Price,2020-10-15T23:58:01.082740000Z,0,Quote,,0.2128,0.2428,IR:AUD:Swap_3M:20y
277,AUDQM3AB20Y=FMD,Market Price,2020-10-15T23:58:48.670694000Z,0,Quote,,0.2127,0.2427,IR:AUD:Swap_3M:20y


And everything looks good :D