# Refinitiv Data Platform
## *Swiss Portfolio periodic tax returns*

### Step 1) Preparation

In [1]:
import requests
import pandas as pd
import logging
import asyncio
from copy import deepcopy
from collections import defaultdict
import json
from datetime import datetime
import time
import configparser
import traceback
import os

import refinitiv.data as rd
from refinitiv.data.delivery import endpoint_request

import nest_asyncio
nest_asyncio.apply()

In [2]:
# ==============================================
def read_input_file(input_file_name):
# ==============================================
    """Import input file (Portfolio.csv) and convert it to Python dictionary with instrument type as a key and code as a value"""
    port = pd.read_csv(f".\\{input_file_name}", header=None)
    port.rename(columns={0: "type", 1: "id"}, inplace=True)
    port = port.drop_duplicates()
    
    # map instrument type
    id_dict = defaultdict(list)
    for row in port.itertuples():
        id_dict[row[1]].append(row[2])
    return id_dict

### Step 2) Convert Symbology

In [3]:
# =============================================
def get_payloads(id_dict_ori, limit):
# ==============================================
    """Generate API Payload
    dividing the instruments into batches with 1,500 instruments per each batch
    due to Symbology API input limit (maximum 1,500 instruments per call)"""
    id_dict = deepcopy(id_dict_ori)
    inputs = []
    payloads = []
    count = 0
    initial = 0
    while any(len(value) for value in id_dict.values()):
        for key, value in id_dict.items():
            count += len(value)
            if count > limit:
                ids = value[initial : len(value) + limit - count]
                id_dict[key] = value[len(value) + limit - count :]
                initial += limit
                count = 0
                input_dict = {"identifierTypes": [key], "values": ids}
                inputs.append(input_dict)
                payloads.append(inputs)
                inputs = []
                break
            else:
                input_dict = {"identifierTypes": [key], "values": value}
                inputs.append(input_dict)
                id_dict[key] = value[len(value) :]
    payloads.append(inputs)
    return payloads

# ==============================================
def get_symbols(payloads):
# ==============================================
    symbols = []
    for payload in payloads:
        request_definition = rd.delivery.endpoint_request.Definition(
            method = rd.delivery.endpoint_request.RequestMethod.POST,
            url = 'https://api.refinitiv.com/discovery/symbology/v1/lookup',
            body_parameters = {         # Specify body parameters
            "from": payload,
            "to": [
                {
                    "identifierTypes": [
                        "PermID"
                    ]
                }
            ],
            "type": "auto"
            }
        )
        response = request_definition.get_data()
        symbols.extend(response.data.raw["data"])
    return symbols

### Step 3) Data Store GraphQL API - retrieve Swiss Tax Data

In [4]:
gql_limit = 200
# ==============================================
def convert_symbols_to_dict(all_symbols):
# ==============================================
    """Convert the Symbology API output to Python dictionary data type and create a dataframe of its input/output"""
    # response symbology
    symbol = []
    for index, value in enumerate(all_symbols):
        if value["output"]:
            symbol.append(value)
    response_df = pd.DataFrame()
    for index, data in enumerate(symbol):
        input_list = []
        output_list = []
        #data["input"][0]["instrument"] = data["input"][0].pop("value")
        if len(data["output"]) > 1:
            for row in range(len(data["output"])):
                input = pd.DataFrame.from_dict(data["input"][0], orient="index").T
                input_list.append(input)
            raw_input_df = pd.concat(input_list, ignore_index=True, sort=False)
        else:
            raw_input_df = pd.DataFrame.from_dict(data["input"][0], orient="index").T

        if len(data["output"]) > 1:
            for row in range(len(data["output"])):
                output = pd.DataFrame.from_dict(data["output"][row], orient="index").T
                output_list.append(output)
            raw_output_df = pd.concat(output_list, ignore_index=True, sort=False)
        else:
            raw_output_df = pd.DataFrame.from_dict(data["output"][0], orient="index").T
        raw_input_df.rename(columns={"value": "Input ID"}, inplace=True)
        raw_output_df.rename(columns={"value": "PermID"}, inplace=True)
        raw_df = pd.concat([raw_input_df, raw_output_df], axis=1)
        response_df = response_df.append(raw_df, ignore_index=True, sort=False)
    # logging.info(f'response dataframe \n{response_df}\n')
    object_id_dict = defaultdict(list)
    for row in response_df.itertuples():
        object_id_dict[row[5]].append(row[3])

    column_names = response_df.columns.values
    column_names[1] = "Input ID type"
    response_df.columns = column_names

    io_mapping = response_df.loc[:,["Input ID type", "Input ID", "PermID"]]
    return object_id_dict, io_mapping

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i : i + n]


# ==============================================
def gen_qraphql_payload_get_data(object_id_dict):
# ==============================================
    """Generate GraphQL payload for calling the endpoint"""
    gql_output = {}
    gql_class_mapping = {
        "FundShareClass": "DerivedContentLipperFundClass",
        "EDInstrument": "DerivedContentEDFInstrument",
        "GovCorpInstrument": "DerivedContentGovCorpBonds",
    }
    cnt = 0
    for query_type, class_name in gql_class_mapping.items():
        several_tasks = asyncio.gather(
            *[
                get_graphql_data_rdp_async(
                    class_name, json.dumps(chunk_instruments), cnt
                )
                for chunk_instruments in chunks(object_id_dict[query_type], gql_limit)
            ]
        )
        loop = asyncio.get_event_loop()
        gql_output[class_name] = loop.run_until_complete(several_tasks)

        cnt += 1
    return gql_output

# ==============================================
async def get_graphql_data_rdp_async(class_name, list_instrument, cnt):
# ==============================================
    """Call the endpoint as a batch with 200 instruments per each batch due to the Data Store GraphQL API input limit (maximum 200 instruments per call)"""
    print(f"[RDP-ASYNC] calling graphql for the {class_name} data {cnt}")

    #response = await endpoint.send_request_async(
    request_definition = rd.delivery.endpoint_request.Definition(
        method = rd.delivery.endpoint_request.RequestMethod.POST,
        url = 'https://api.refinitiv.com/data-store/v1/graphql',
        body_parameters = {
            "query": "{"
            + class_name
            + "(objectIds: "
            + list_instrument
            + "){SwissStampDutyTax {AssetTypeDescription InstrumentId IssuerDomicileText SwissStampDutyFlag SwissStampTaxRate TaxationComment TaxationType}}}",
            "variable": {},
        }
    )
    response = await request_definition.get_data_async()
    return response.data.raw, class_name

# ==============================================
def format_graphql_response_thread(gql_output,io_mapping):
# ==============================================
    """Convert an output to a dataframe and merge it with the dataframe of Symbology API's input/output"""
    raw_output_df = pd.DataFrame()
    response_list = []

    for gql_class in gql_output:
        for response, class_name in gql_output[gql_class]:
            raw_response = response["data"]
            for raw in raw_response[class_name]:
                output = pd.DataFrame.from_dict(raw, orient="index")
                output["class_name"] = class_name
                response_list.append(output)
            raw_output_df = pd.concat(response_list, ignore_index=True, sort=False)
            
    raw_output_df = io_mapping.merge(raw_output_df, left_on="PermID", right_on="InstrumentId", how="inner")
    raw_output_df = raw_output_df.drop(columns=["InstrumentId"])
    raw_output_df = raw_output_df.drop_duplicates()
    
    return raw_output_df

# ==============================================
def save_as_csv(df, folder_name, file_name):
# ==============================================
    """Save an output dataframe as a CSV file named swiss_tax_result_{current_time}.csv in the folder swiss_tax_result"""
    print(f"Save output dataframe as CSV file named: {file_name}")
    if not os.path.exists(folder_name):
        os.makedirs(folder_name)
    df.to_csv(os.path.join(folder_name,file_name), index=False)

### Step 4) Run the defined functions

In [5]:
# using now() to get current time for output file name
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
input_file_name = 'Portfolio.csv'
swiss_tax_folder_name = "swiss_tax_result"
swiss_tax_file_name = f"swiss_tax_result_{current_time}.csv"

try:
    # setting up
    rd.open_session()
    start = time.time()
    
    # read input file
    id_dict = read_input_file(input_file_name)

    # request symbology
    payloads = get_payloads(id_dict, limit=1500)
    all_symbols = get_symbols(payloads)

    # call graphql endpoint
    object_id_dict, io_mapping = convert_symbols_to_dict(all_symbols)
    gql_output = gen_qraphql_payload_get_data(object_id_dict)
    swiss_tax_df = format_graphql_response_thread(gql_output, io_mapping)

    # show runtime
    end = time.time()
    print(f"Runtime of the app is {end - start}")
    rd.close_session()

    # save to csv file
    save_as_csv(swiss_tax_df, swiss_tax_folder_name, swiss_tax_file_name)

    display(swiss_tax_df)

except Exception as e:
    rd.close_session()
    logging.error(f'App failed with error message : {e}')
    print(traceback.print_exc())

[RDP-ASYNC] calling graphql for the DerivedContentLipperFundClass data 0
[RDP-ASYNC] calling graphql for the DerivedContentLipperFundClass data 0
[RDP-ASYNC] calling graphql for the DerivedContentEDFInstrument data 1
[RDP-ASYNC] calling graphql for the DerivedContentEDFInstrument data 1
[RDP-ASYNC] calling graphql for the DerivedContentEDFInstrument data 1
[RDP-ASYNC] calling graphql for the DerivedContentGovCorpBonds data 2
[RDP-ASYNC] calling graphql for the DerivedContentGovCorpBonds data 2
[RDP-ASYNC] calling graphql for the DerivedContentGovCorpBonds data 2
[RDP-ASYNC] calling graphql for the DerivedContentGovCorpBonds data 2
Runtime of the app is 51.181368827819824
Save output dataframe as CSV file named: swiss_tax_result_2022-08-08_17-28-59.csv


Unnamed: 0,Input ID type,Input ID,PermID,AssetTypeDescription,IssuerDomicileText,SwissStampDutyFlag,SwissStampTaxRate,TaxationComment,TaxationType,class_name
0,Isin,INE647O01011,21526523222,Equity,Non-Domestic,Y,,Taxable equity instrument,Equity,DerivedContentEDFInstrument
1,Isin,US115885AK10,15628360339,Debenture,Non-Domestic,Y,,"Taxable GSAC asset class, tenor >1y",Debenture,DerivedContentGovCorpBonds
2,Isin,US44881HCL69,15628338669,Debenture,Non-Domestic,Y,,"Taxable GSAC asset class, tenor >1y",Debenture,DerivedContentGovCorpBonds
3,Isin,US69512EBY05,15628335385,Debenture,Non-Domestic,Y,,"Taxable GSAC asset class, tenor >1y",Debenture,DerivedContentGovCorpBonds
4,Isin,US197677AJ62,15628366133,Debenture,Non-Domestic,Y,,"Taxable GSAC asset class, tenor >1y",Debenture,DerivedContentGovCorpBonds
...,...,...,...,...,...,...,...,...,...,...
1551,Isin,US912810EN47,15628340672,Debenture,Non-Domestic,Y,,"Taxable GSAC asset class, tenor >1y",Debenture,DerivedContentGovCorpBonds
1552,Isin,US90982PAA57,15628371156,Debenture,Non-Domestic,Y,,"Taxable GSAC asset class, tenor >1y",Debenture,DerivedContentGovCorpBonds
1553,Isin,US81756KAA97,15628317201,Debenture,Non-Domestic,Y,,"Taxable GSAC asset class, tenor >1y",Debenture,DerivedContentGovCorpBonds
1554,Isin,US911546US60,15628319100,Debenture,Non-Domestic,Y,,"Taxable GSAC asset class, tenor >1y",Debenture,DerivedContentGovCorpBonds


# Thank you :)