# Introduction to Client File Store (CFS) - Python

In [None]:
import requests, json, time, getopt, sys
import pandas as pd

### Set Valid Credentials 

Valid RDP credentials are required to proceed:
* USERNAME
* PASSWORD
* CLIENTID

To read one's valid credentials from a file (that can be shared by many code examples), leave below code as is.

To provide credentials in place:
* replace the commented credentials with one's valid assigned credentials
* comment the read from file step readCredsFromFile

In [None]:
USERNAME = "VALIDUSER"
PASSWORD = "VALIDPASSWORD"
CLIENT_ID = "SELFGENERATEDCLIENTID"

def readCredsFromFile(filePathName):
### Read valid credentials from file
    global USERNAME, PASSWORD, CLIENT_ID
    credFile = open(filePathName,"r")    # one per line
                                                #--- RDP MACHINE ID---
                                                #--- LONG PASSWORD---
                                                #--- GENERATED CLIENT ID---

    USERNAME = credFile.readline().rstrip('\n')
    PASSWORD = credFile.readline().rstrip('\n')
    CLIENT_ID = credFile.readline().rstrip('\n')

    credFile.close()

readCredsFromFile("..\creds\credFileHuman.txt")

# Uncomment - to make sure that creds are either set in code or read in correctly
#print("USERNAME="+str(USERNAME))
#print("PASSWORD="+str(PASSWORD))
#print("CLIENT_ID="+str(CLIENT_ID))

### Set Application Constants

In [None]:
# Set Application Constants
RDP_AUTH_VERSION = "/v1"
RDP_CFS_VERSION = "/v1"
RDP_BASE_URL = "https://api.refinitiv.com"
CATEGORY_URL = "/auth/oauth2"
ENDPOINT_URL = "/token"
CLIENT_SECRET = ""
TOKEN_FILE = "token.txt"
SCOPE = "trapi"

### Define Token Handling and Obtain a Valid Token

Having a valid token is a pre-requisite to requesting of any RDP content, and will be passed into the next steps.

In [None]:
TOKEN_ENDPOINT = RDP_BASE_URL + CATEGORY_URL + RDP_AUTH_VERSION + ENDPOINT_URL

def _requestNewToken(refreshToken):
    if refreshToken is None:
        tData = {
            "username": USERNAME,
            "password": PASSWORD,
            "grant_type": "password",
            "scope": SCOPE,
            "takeExclusiveSignOnControl": "true"
        };
    else:
        tData = {
            "refresh_token": refreshToken,
            "grant_type": "refresh_token",
        };

    # Make a REST call to get latest access token
    response = requests.post(
        TOKEN_ENDPOINT,
        headers = {
            "Accept": "application/json"
        },
        data = tData,
        auth = (
            CLIENT_ID,
            CLIENT_SECRET
        )
    )
    
    if response.status_code != 200:
        raise Exception("Failed to get access token {0} - {1}".format(response.status_code, response.text));

    # Return the new token
    return json.loads(response.text);

def saveToken(tknObject):
    tf = open(TOKEN_FILE, "w+");
    print("Saving the new token");
    # Append the expiry time to token
    tknObject["expiry_tm"] = time.time() + int(tknObject["expires_in"]) - 10;
    # Store it in the file
    json.dump(tknObject, tf, indent=4)
    
def getToken():
    try:
        print("Reading the token from: " + TOKEN_FILE);
        # Read the token from a file
        tf = open(TOKEN_FILE, "r+")
        tknObject = json.load(tf);

        # Is access token valid
        if tknObject["expiry_tm"] > time.time():
            # return access token
            return tknObject["access_token"];

        print("Token expired, refreshing a new one...");
        tf.close();
        # Get a new token from refresh token
        tknObject = _requestNewToken(tknObject["refresh_token"]);

    except Exception as exp:
        print("Caught exception: " + str(exp))
        print("Getting a new token using Password Grant...");
        tknObject = _requestNewToken(None);

    # Persist this token for future queries
    saveToken(tknObject)
    print("Token is: " + tknObject["access_token"])
    # Return access token
    return tknObject["access_token"];

accessToken = getToken();
print("Have token now");

### Select CFS Bucket to Test with

In [None]:
# RDP_CFS_BUCKET = "ESG"
RDP_CFS_BUCKET = input()
print('RDP_CFS_BUCKET=' + RDP_CFS_BUCKET)

### Request Available File Sets in the Bucket

In [None]:
FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/file-sets?bucket='+ RDP_CFS_BUCKET
FILESET_ID = ''

def requestFileSets(token, withNext, skipToken, bucket, attributes):   
    global FILESET_ENDPOINT    
    print("Obtaining FileSets in "+bucket+" Bucket...")
  
    FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/file-sets?bucket='+ bucket
    
    querystring = {}
    payload = ""
    jsonfull = ""
    jsonpartial = ""
    
    headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer " + token,
            'cache-control': "no-cache"
    }

    if attributes:
        FILESET_ENDPOINT = FILESET_ENDPOINT + attributes
    if withNext:
        FILESET_ENDPOINT = FILESET_ENDPOINT + '&skipToken=' +skipToken
        
    response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)
    
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)
         
    print('Raw response=');
    print(response);
    
    if response.status_code == 200:
        jsonFullResp = json.loads(response.text)
        return jsonFullResp; 
    else:
        return '';

jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'');

print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df

### Pick File Set Id

In [None]:
# Optionally, copy from result of "Present FileSet Results in Tabular View"
FILESET_ID = input()
print('FILESET_ID selected is: ' + FILESET_ID)

### Paginate Through All the Available FileSets
(interrupt at any point)

In [None]:
i = 1
while "@nextLink" in jsonFullResp: 
    print('<<< Iteraction: '+str(i)+' >>>  More exists: '+ jsonFullResp['@nextLink'] + ', skipToken is: ' + jsonFullResp['@nextLink'][-62:]+'\n')
    jsonFullResp = requestFileSets(accessToken, True, jsonFullResp['@nextLink'][-62:],RDP_CFS_BUCKET,'');
    print(json.dumps(jsonFullResp, indent=2));
    i+=1;
print('Last response without next=');
print(json.dumps(jsonFullResp, indent=2));

### Request Available Packages

In [None]:
#https://api.refinitiv.com/file-store/v1/packages?packageName={packageName}
#https://api.refinitiv.com/file-store/v1/packages
PACKAGES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/packages?packageType=bulk'
PACKAGE_ID = ''

def requestPackages(token, withNext, skipToken, attributes):   
    global PACKAGES_ENDPOINT    
    print("Obtaining Packages of type bulk")
  
    PACKAGES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/packages'
    
    querystring = {}
    payload = ""
    jsonfull = ""
    jsonpartial = ""
    
    headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer " + token,
            'cache-control': "no-cache"
    }

    if attributes:
        PACKAGES_ENDPOINT = PACKAGES_ENDPOINT + attributes
    if withNext:
        PACKAGES_ENDPOINT = PACKAGES_ENDPOINT + '&skipToken=' +skipToken
        
    response = requests.request("GET", PACKAGES_ENDPOINT, data=payload, headers=headers, params=querystring)
    
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.request("GET", PACKAGES_ENDPOINT, data=payload, headers=headers, params=querystring)
         
    print('Raw response=');
    print(response);
    
    if response.status_code == 200:
        jsonFullResp = json.loads(response.text)
        return jsonFullResp; 
    else:
        return '';

jsonFullResp = requestPackages(accessToken, False, '',''); 
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
df = pd.json_normalize(jsonFullResp['value'])
df

### Select Package Id

In [None]:
# Optionally, copy from result of "Present Packages Result in Tabular View"
PACKAGE_ID = input()
print('PACKAGE_ID selected is: ' + PACKAGE_ID)

### Retrieve FileSets of Specific File Type (Filter By Attribute and By PackageId)
The file types may change over time, at the time of this writing, we are going to use as examples:

* ESG Sources
* Symbology Cusip

Further, the selected package, if also filtering by packageId has to contain the files per filtering arrtibutes, in order to request their listing succefully,
otherwise the result will be empty.


In [None]:
jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'&attributes=ContentType:Symbology Cusip');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df

In [None]:
# PackageId 4867-9a46-216e838a-9241-8fc3561b51ef
jsonFullResp = requestFileSets(accessToken, False, '',RDP_CFS_BUCKET,'&packageId='+PACKAGE_ID+'&attributes=ContentType:ESG Scores');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df

### Example - Retrieve Tick History File Sets for a Given Venue In View

In [None]:
RDP_TH_BUCKET = 'TICKHISTORY_VBD_UNLIMITED'
CFS_VBD_VENUE = 'NSQ'
CFS_VBD_VIEW = 'normalized'
jsonFullResp = requestFileSets(accessToken, False, '',RDP_TH_BUCKET,'&attributes=venue:'+CFS_VBD_VENUE+',view='+CFS_VBD_VIEW);
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df

### Example - Retrieve Tick History File Sets for a Given Venue Limit By Dates

In [None]:
jsonFullResp = requestFileSets(accessToken, False, '',RDP_TH_BUCKET,'&attributes=venue:'+CFS_VBD_VENUE+',view='+CFS_VBD_VIEW+
                               '&contentFrom=2007-01-01T00:00:00Z&contentTo=2010-01-01T00:00:00Z');
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
print('Same response, tabular view');
df = pd.json_normalize(jsonFullResp['value'])
df

### Retrieve Complete File Details of FileSet ID

In a previous step we have stored a FileSet ID that we are about to use for the demonstartion of this feature.

In [None]:
FILES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files?filesetId='+ FILESET_ID
 
def requestFileDetails(token):   
    global FILES_ENDPOINT
    print("Obtaining File details for FileSet= "+ FILESET_ID + " ...")
    print("(If result is Response=400, make sure that FILESET_ID is set with a valid value...)")
  
    querystring = {}
    payload = ""
    jsonfull = ""
    jsonpartial = ""
    
    headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer " + token,
            'cache-control': "no-cache"
    }
        
    response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)
    
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)
         
    print('Raw response=');
    print(response);
    
    if response.status_code == 200:
        jsonFullResp = json.loads(response.text)
        return jsonFullResp; 
    else:
        return '';

jsonFullResp = requestFileDetails(accessToken);
print('Parsed json response=');
print(json.dumps(jsonFullResp, indent=2));
df = pd.json_normalize(jsonFullResp['value'])
df

### Select File Id and File Name ( To Be Requested)

In [None]:
# Optionally, copy from result of "Retrieve Complete File Details of FileSet ID"
FILE_ID = input()
print('FILE_ID selected is: ' + FILE_ID)
FILE_NAME = input()
print('FILE_NAME selected is: ' + FILE_NAME)

###  Stream File via File Id using Redirect

In [None]:
import shutil

FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'

# use valid values, obtained from the previous step
exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'  
exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'

def requestFileDownload(token, fileId, fileName):   
    FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream'
    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)
  
    chunk_size = 1000
    
    headers = {
            'Authorization': 'Bearer ' + token,
            'cache-control': "no-cache",
            'Accept': '*/*'
    }
        
    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)
    
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)

         
    print('Response code=' + str(response.status_code));
    
    if response.status_code == 200:
        print('Processing...')
        with open(fileName, 'wb') as fd:
            shutil.copyfileobj(response.raw, fd) 
        print('Look for gzipped file named: '+ fileName + ' in current directory')
        response.connection.close()
        
    return; 

# consider below an example only
requestFileDownload(accessToken, exampleFileId, exampleFileName);
#requestFileDownload(accessToken, FILE_ID, FILE_NAME);

### Stream All Files In FileSet
This may take long .<.>. .<.>. .<.>.

In [None]:
print("List of files to be streamed by this step:")
for item in jsonFullResp['value']:
    print ('File name: ' +item['filename'])
print("\n... Starting to stream now, this may take long  .<.>. .<.>. .<.>.")
for item in jsonFullResp['value']:
    print ('Streaming File: ' +item['filename'])
    requestFileDownload(accessToken, item['id'],item['filename']);

### Get File Location (Step 1 of 2)

In [None]:
import shutil

FILES_STREAM_ENDPOINT_START = RDP_BASE_URL+'/file-store'+RDP_CFS_VERSION + '/files/'
DIRECT_URL = ''

exampleFileId = '4edd-99af-da829f42-8ddd-07fabfcddca9'
 
def requestFileLocation(token, fileId):   
    
    FILES_STREAM_ENDPOINT = FILES_STREAM_ENDPOINT_START + fileId+ '/stream?doNotRedirect=true'
    
    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)
  
    chunk_size = 1000
    
    headers = {
            'Authorization': 'Bearer ' + token,
            'cache-control': "no-cache",
            'Accept': '*/*'
    }
        
    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)
    
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)

         
    print('Response code=' + str(response.status_code));
    
    if response.status_code == 200:
        jsonFullResp = json.loads(response.text)
        print('Parsed json response=');
        print(json.dumps(jsonFullResp, indent=2));
        DIRECT_URL = jsonFullResp['url'];
        print('File Direct URL is: '  +str(DIRECT_URL)+ '|||');
        
    return DIRECT_URL; 


DIRECT_URL = requestFileLocation(accessToken, FILE_ID);

### Download File From File Location (Step 2 of 2)

In [None]:
from urllib.parse import urlparse, parse_qs

exampleFileName = 'RFT-ESG-Sources-Full-Init-2021-01-17-part07.jsonl.gz'

def requestDirectFileDownload(token, fileURL, fileName):   

    print("Obtaining File from URL... " + fileURL + '... to file name=' + fileName)
    
    #Parse out URL parameters for submission into requests
    url_obj = urlparse(fileURL)
    parsed_params = parse_qs(url_obj.query)
    # extract the URL without query parameters
    parsed_url = url_obj._replace(query=None).geturl()

    response = requests.get(parsed_url, params=parsed_params,stream=True)
        
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.get(parsed_url, params=query)

         
    print('Response code=' + str(response.status_code));        
  
    filename = 'another_'+fileName    
    
    if response.status_code == 200:
        print('Processing...')
        with open(filename, 'wb') as fd:
            shutil.copyfileobj(response.raw, fd) 

        print('Look for gzipped file named: '+ filename + ' in current directory')
        response.connection.close()
        
    return; 


requestDirectFileDownload(accessToken, DIRECT_URL, FILE_NAME);