## Tick History Request- Parameterize and Parallelize

 Based on "RTH Python Code Samples":
 <br>
 https://developers.refinitiv.com/en/api-catalog/refinitiv-tick-history/refinitiv-tick-history-rth-rest-api/download
 <br>
 
### Introduction to the Approach:
 * Authentication- token request
 * Parametrize request (TickHistoryTimeAndSalesRequest)
 * On Demand extraction request
 * Extraction status polling request
 * Extraction notes retrieval
 *  Send On-demand Extraction Request - parallelize
 * Data retrieval and save to disk (the data file is gzipped)
   <br>
   Includes AWS download capability
 * Data retrieval and save to disk - parallize

### Import Required Modules and Define Required Parameters

In [None]:
import requests
import json
import shutil
import time
import urllib3
import gzip
import copy
import asyncio

reqStart = "https://selectapi.datascope"  #endpoint URL start

requestTSBodyStarter={      #to be parametrized with instruments and date range
    "ExtractionRequest": {
    "@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.TickHistoryTimeAndSalesExtractionRequest",
    "ContentFieldNames": [
        "Trade - Exchange/Contributor ID",
        "Trade - Price",
        "Trade - Volume",
        "Trade - Qualifiers",
        "Trade - Sequence Number",
        "Trade - Exchange Time",
        "Trade - Open",
        "Trade - High",
        "Trade - Low",
        "Quote - Bid Price",
        "Quote - Bid Size",
        "Quote - Ask Price",
        "Quote - Ask Size"
    ],
    "IdentifierList": {
    "@odata.type": "#DataScope.Select.Api.Extractions.ExtractionRequests.InstrumentIdentifierList",  
    "InstrumentIdentifiers": [
    { "Identifier": "CARR.PA", "IdentifierType": "Ric" }  #placeholder
    ],
    "ValidationOptions": {
        "AllowHistoricalInstruments": "true"
    },
    "UseUserPreferencesForValidationOptions": "false"
    },
    "Condition": {
        "MessageTimeStampIn": "GmtUtc",
        "ApplyCorrectionsAndCancellations": "true",
        "ReportDateRangeType": "Range",
        "QueryStartDate": "2021-08-04T00:00:00.000",  #placeholder
        "QueryEndDate": "2021-08-04T23:59:59.999",    #placeholder
        "DateRangeTimeZone" : "Local Exchange Time Zone",
        "DisplaySourceRIC": "false"
    }
    }
}

filePath = ".\\downloads\\"  #Location to save downloaded files
fileNameRoot = "RTH."     #Root of the name for the downloaded files
useAws = False

###  Read Credentials from File

In [None]:
myUsername = "Valid"
myPassword = "ValidOnly"

# comment out the next lines, if the creds are hard-coded instead
def readCredsFromFile(filePathName):
    global myUsername, myPassword
    credFile = open(filePathName,"r")    # one per line
                                               
    myUsername = credFile.readline().rstrip('\n')
    myPassword = credFile.readline().rstrip('\n')

    credFile.close()

readCredsFromFile("..\creds\credsDSS.txt")
#print(myUsername)


### Authentication - Datascope Token Request

In [None]:
def requestToken(dssUsername, dssRassword) :
    requestUrl = reqStart + ".refinitiv.com/RestApi/v1/Authentication/RequestToken"

    requestHeaders={
        "Prefer":"respond-async",
        "Content-Type":"application/json"
        }

    requestBody={
        "Credentials": {
        "Username": myUsername,
        "Password": myPassword
    }
    }

    r1 = requests.post(requestUrl, json=requestBody,headers=requestHeaders)

    if r1.status_code == 200 :
        jsonResponse = json.loads(r1.text.encode('ascii', 'ignore'))
        token = jsonResponse["value"]
   #     print ('Authentication token (valid 24 hours):')
   #     print (token)
    else:
        print ('Replace myUserName and myPassword with valid credentials, then repeat the request')
        token = 'None'
    
    return token

tokenValid = requestToken(myUsername, myPassword)
print("Authentication token (valid 24 hours): "+tokenValid)

### Parametrise Request

In [None]:
#  RICs (instruments) and date ranges for request parametrization
request1Parameters = {
    "RICs": ["CARP.PA","IBM.N","MSFT.O","VOD.L"],
    "QueryStartDate": "2022-01-04T00:00:00.000",
    "QueryEndDate": "2022-01-04T03:59:59.999"
}

request2Parameters = {
    "RICs": ["JPY=","EUR=","U30YT=RR"],
    "QueryStartDate": "2021-01-04T00:00:00.000",
    "QueryEndDate": "2021-01-04T03:59:59.999"
}

def paramterizeRequest(requestBodyStarter, requestParameters):
    requestBody = copy.deepcopy(requestBodyStarter)
    i = 0
    for i, ric in enumerate(requestParameters["RICs"]):
#        print(i, ric)
        if i == 0:
            requestBody["ExtractionRequest"]["IdentifierList"]["InstrumentIdentifiers"][i]["Identifier"] = ric
            requestBody["ExtractionRequest"]["IdentifierList"]["InstrumentIdentifiers"][i]["IdentifierType"] = "Ric"
        else:
            requestBody["ExtractionRequest"]["IdentifierList"]["InstrumentIdentifiers"].append({ 
                'Identifier' : ric, 
                'IdentifierType' : 'Ric'}) 
    requestBody["ExtractionRequest"]["Condition"]["QueryStartDate"] = requestParameters["QueryStartDate"]
    requestBody["ExtractionRequest"]["Condition"]["QueryEndDate"] = requestParameters["QueryEndDate"]

    return requestBody
 
request1Body = paramterizeRequest(requestTSBodyStarter, request1Parameters)
print("requestBody ready to submit: \n"+json.dumps(request1Body, indent = 2))
request2Body = paramterizeRequest(requestTSBodyStarter, request2Parameters)
print("requestBody ready to submit: \n"+json.dumps(request2Body, indent = 2))

### Send On-demand Extraction Request - Define
send an on demand extraction request using the received token 

In [None]:
from functools import partial

async def submitRequest(token, requestBody, requestMarker):

    requestUrl=reqStart + '.refinitiv.com/RestApi/v1/Extractions/ExtractRaw'

    requestHeaders={
        "Prefer":"respond-async",
        "Content-Type":"application/json",
        "Authorization": "token " + token
    }
    
    loop = asyncio.get_event_loop()

    print("posting request "+ json.dumps(requestBody))
    requestP = partial(requests.post, requestUrl, json=requestBody,headers=requestHeaders)
    r2 = await loop.run_in_executor(None, requestP)

    #Display the HTTP status of the response
    #Initial response status (after approximately 30 seconds wait) is usually 202
    status_code = r2.status_code
    print (requestMarker, "HTTP status of the response: " + str(status_code))

    #if required, poll the status of the request using the received location URL.
    #Once the request has completed, retrieve the jobId and extraction notes.

    #If status is 202, display the location url we received, and will use to poll the status of the extraction request:
    if status_code == 202 :
        requestUrl = r2.headers["location"]
        print (requestMarker, 'Extraction is not complete, we shall poll the location URL:')
        print (str(requestUrl))
        
        requestHeaders={
            "Prefer":"respond-async",
            "Content-Type":"application/json",
            "Authorization":"token " + token
        }

    #As long as the status of the request is 202, the extraction is not finished;
    #we must wait, and poll the status until it is no longer 202:
    r3 = ""
    while (status_code == 202):
        print (requestMarker,'As we received a 202, we wait 30 seconds, then poll again (until we receive a 200)')
        time.sleep(30)
        requestG = partial(requests.get, requestUrl,headers=requestHeaders)
        r3 = await loop.run_in_executor(None, requestG)
        status_code = r3.status_code
        print (requestMarker, 'HTTP status of the response: ' + str(status_code))

    if r3 == "":
        r3 = r2
    #When the status of the request is 200 the extraction is complete;
    #we retrieve and display the jobId and the extraction notes (it is recommended to analyse their content)):
    if status_code == 200 :
        r3Json = json.loads(r3.text.encode('ascii', 'ignore'))
        jobId = r3Json["JobId"]
        print ('\njobId: ' + jobId + '\n')
        notes = r3Json["Notes"]
        print (requestMarker, 'Extraction notes:\n' + notes[0])

    #If instead of a status 200 we receive a different status, there was an error:
    if status_code != 200 :
        jobId = -1
        print (requestMarker, 'An error occurred.\n')
    
    return jobId

#job1Id = submitRequest(tokenValid, request1Body)

### Send On-demand Extraction Request - Parallelize

In [None]:
tasks = asyncio.gather(
    submitRequest(tokenValid, request1Body, 'request#1'),
    submitRequest(tokenValid, request2Body, 'request#2')
)

In [None]:
#asyncio.get_event_loop().run_until_complete(tasks)
print(await tasks)
job1Id, job2Id = tasks.result()
print(job1Id,job2Id)

### Retrieve Results and Save to File - Define
* Get the extraction results, using the received jobId.
* We also save the compressed data to disk, as a GZIP.
* We only display a few lines of the data.

In [None]:
async def retrieveResults(token, jobId, resultMarker):
    requestUrl = reqStart + ".refinitiv.com/RestApi/v1/Extractions/RawExtractionResults" + "('" + jobId + "')" + "/$value"

    #AWS requires an additional header: X-Direct-Download
    if useAws:
        requestHeaders={
            "Prefer":"respond-async",
            "Content-Type":"text/plain",
            "Accept-Encoding":"gzip",
            "X-Direct-Download":"true",
            "Authorization": "token " + token
        }
    else:
        requestHeaders={
            "Prefer":"respond-async",
            "Content-Type":"text/plain",
            "Accept-Encoding":"gzip",
            "Authorization": "token " + token
        }

    r5 = requests.get(requestUrl,headers=requestHeaders,stream=True)
    #Ensure we do not automatically decompress the data on the fly:
    r5.raw.decode_content = False
    if useAws:
        print ('Content response headers (AWS server): type: ' + r5.headers["Content-Type"] + '\n')
        #AWS does not set header Content-Encoding="gzip".
    else:
        print ('Content response headers (TRTH server): type: ' + r5.headers["Content-Type"] + ' - encoding: ' + r5.headers["Content-Encoding"] + '\n')
    
    fileName = filePath + fileNameRoot + resultMarker +".csv.gz"
    print (resultMarker, 'Saving compressed data to file:' + fileName + ' ... please be patient')
   
    chunk_size = 1024
    rr = r5.raw
    with open(fileName, 'wb') as fd:
        shutil.copyfileobj(rr, fd, chunk_size)
    fd.close

    print ('Finished saving compressed data to file:' + fileName + '\n')

    #Now let us read and decompress the file we just created.
    #For the demo we limit the treatment to a few lines:
    maxLines = 10
    print ('Read data from file, and decompress at most ' + str(maxLines) + ' lines of it:')

    uncompressedData = ""
    count = 0
    with gzip.open(fileName, 'rb') as fd:
        for line in fd:
            dataLine = line.decode("utf-8")
            #Do something with the data:
            print (dataLine)
            uncompressedData = uncompressedData + dataLine
            count += 1
            if count >= maxLines:
                break
    fd.close()
    
#retrieveResults(tokenValid, job1Id, 'result1')


### Retrieve Results and Save to File - Prallelize

In [None]:
tasks = asyncio.gather(
    retrieveResults(tokenValid, job1Id, 'result#1'),
    retrieveResults(tokenValid, job2Id, 'result#2')
)
#asyncio.get_event_loop().run_until_complete(tasks)
print(await tasks)
print("<<<All done>>>")