# Introduction to ESG Bulk - Python

ESG stands for Environmental, Social and (Corporate) Governance data.

Refinitiv Data Platform (RDP) provides simple web based API access to a broad range of content, including ESG content and ESG content in bulk.

With growing popularity of socially conscious investing, Refinitiv offers one of the most comprehensive Environmental, Social and Governance (ESG) databases in the industry, covering over 80% of global market cap, across more than 450 different ESG metrics, with history going back to 2002. Customers looking to download our ESG content can do so through our bulk API service in Refinitiv Data Platform (RDP). RDP is a cloud based API that provides a single access point to all Refinitiv content.

ESG data is the first content made available in our bulk API service known as Client File Store (CFS). This capability allows our customers to download our entire history of ESG coverage. To more about how the ESG Bulk Service works in Refinitiv Data Platform, please visit:

https://developers.refinitiv.com/refinitiv-data-platform/refinitiv-data-platform-apis/docs

Within RDP family of service, ESG Bulk is part of Client File Store (CFS) - based section of service, find out more at:

https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-platform-apis

Let us now focus on the programmatic interaction with ESG Bulk RDP service.

In [None]:
import requests, json, time, getopt, sys
import pandas as pd

### Set Valid Credentials 

Valid RDP credentials are required to proceed:
* USERNAME
* PASSWORD
* CLIENTID

To read one's valid credentials from a file (that can be shared by many code examples), leave below code as is.

To provide credentials in place:
* replace the commented credentials with one's valid assigned credentials
* comment the read from file step readCredsFromFile

In [None]:
USERNAME = "VALIDUSER"
PASSWORD = "VALIDPASSWORD"
CLIENT_ID = "SELFGENERATEDCLIENTID"

def readCredsFromFile(filePathName):
### Read valid credentials from file
    global USERNAME, PASSWORD, CLIENT_ID
    credFile = open(filePathName,"r")    # one per line
                                                #--- RDP MACHINE ID---
                                                #--- LONG PASSWORD---
                                                #--- GENERATED CLIENT ID---

    USERNAME = credFile.readline().rstrip('\n')
    PASSWORD = credFile.readline().rstrip('\n')
    CLIENT_ID = credFile.readline().rstrip('\n')

    credFile.close()

readCredsFromFile("..\creds\credFileHuman.txt")

# Make sure that creds are read in correctly
#print("USERNAME="+str(USERNAME))
#print("PASSWORD="+str(PASSWORD))
#print("CLIENT_ID="+str(CLIENT_ID))

### Set Application Constants

In [None]:
# Set Application Constants
RDP_AUTH_VERSION = "/v1"
RDP_ESG_BULK_VERSION = "/v1"
RDP_BASE_URL = "https://api.refinitiv.com"
RDP_ESG_BUCKET = "ESG"
CATEGORY_URL = "/auth/oauth2"
ENDPOINT_URL = "/token"
CLIENT_SECRET = ""
TOKEN_FILE = "token.txt"
SCOPE = "trapi"

### Define Token Handling and Obtain a Valid Token

Having a valid token is a pre-requisite to requesting of any RDP content, and will be passed into the next steps.

In [None]:
TOKEN_ENDPOINT = RDP_BASE_URL + CATEGORY_URL + RDP_AUTH_VERSION + ENDPOINT_URL

def _requestNewToken(refreshToken):
    if refreshToken is None:
        tData = {
            "username": USERNAME,
            "password": PASSWORD,
            "grant_type": "password",
            "scope": SCOPE,
            "takeExclusiveSignOnControl": "true"
        };
    else:
        tData = {
            "refresh_token": refreshToken,
            "grant_type": "refresh_token",
        };

    # Make a REST call to get latest access token
    response = requests.post(
        TOKEN_ENDPOINT,
        headers = {
            "Accept": "application/json"
        },
        data = tData,
        auth = (
            CLIENT_ID,
            CLIENT_SECRET
        )
    )
    
    if response.status_code != 200:
        raise Exception("Failed to get access token {0} - {1}".format(response.status_code, response.text));

    # Return the new token
    return json.loads(response.text);

def saveToken(tknObject):
    tf = open(TOKEN_FILE, "w+");
    print("Saving the new token");
    # Append the expiry time to token
    tknObject["expiry_tm"] = time.time() + int(tknObject["expires_in"]) - 10;
    # Store it in the file
    json.dump(tknObject, tf, indent=4)
    
def getToken():
    try:
        print("Reading the token from: " + TOKEN_FILE);
        # Read the token from a file
        tf = open(TOKEN_FILE, "r+")
        tknObject = json.load(tf);

        # Is access token valid
        if tknObject["expiry_tm"] > time.time():
            # return access token
            return tknObject["access_token"];

        print("Token expired, refreshing a new one...");
        tf.close();
        # Get a new token from refresh token
        tknObject = _requestNewToken(tknObject["refresh_token"]);

    except Exception as exp:
        print("Caught exception: " + str(exp))
        print("Getting a new token using Password Grant...");
        tknObject = _requestNewToken(None);

    # Persist this token for future queries
    saveToken(tknObject)
    print("Token is: " + tknObject["access_token"])
    # Return access token
    return tknObject["access_token"];

accessToken = getToken();
print("Have token now");

## Request Available File Sets

The purpose of ESG bulk service is obtaining ESG content in bulk.  The content is available as:

•	A full JSON data file containing history for all measures and all organizations.
•	A delta JSON data file that contains only incremental changes to the universe since last week. 

A customer can examine the available File Sets and is expected to:

•	Build the initial ESG representation with the full files
•	Apply delta, changes, as they become available
•	Fill a gap if the retrival was not completed and the content missed remains available

This step serves to verify the type of the file, for example:
•	ESGRawFullScheme
•	ESGScoresFull
•	ESGScoresWealthFull

In [None]:
FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/file-sets?bucket='+ RDP_ESG_BUCKET
FILESET_ID = ''
PACKAGE_ID = ''
WHICH_FILESET_ID_TO_STORE = 1   # 10 are returned per page
WHICH_PACKAGE_ID_TO_STORE = 2   

def requestFileSets(token, withNext, skipToken, attributes):   
    global FILESET_ENDPOINT
    global FILESET_ID
    global PACKAGE_ID
    global WHICH_FILESET_ID_TO_STORE   
    
    print("Obtaining FileSets in ESG Bucket...")
  
    FILESET_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/file-sets?bucket='+ RDP_ESG_BUCKET
    
    querystring = {}
    payload = ""
    jsonfull = ""
    jsonpartial = ""
    
    headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer " + token,
            'cache-control': "no-cache"
    }

    if attributes:
        FILESET_ENDPOINT = FILESET_ENDPOINT + attributes
    if withNext:
        FILESET_ENDPOINT = FILESET_ENDPOINT + '&skipToken=' +skipToken
        
    response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)
    
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.request("GET", FILESET_ENDPOINT, data=payload, headers=headers, params=querystring)
         
    print('Raw response=');
    print(response);
    
    if response.status_code == 200:
        jsonFullResp = json.loads(response.text)
        print('Parsed json response=');
        print(json.dumps(jsonFullResp, indent=2));
        # We are going to store FileSet ID of the second File Set retrieved for future reference
        FILESET_ID = jsonFullResp['value'][WHICH_FILESET_ID_TO_STORE]['id']
        print('FILESET_ID stored is: '+str(FILESET_ID))
        PACKAGE_ID = jsonFullResp['value'][WHICH_PACKAGE_ID_TO_STORE]['packageId']
        print('PACKAGE_ID stored is: '+str(PACKAGE_ID))
        return jsonFullResp; 
    else:
        return '';

jsonFullResp = requestFileSets(accessToken, False, '','');

### Present Raw Results in Tabular View

In [None]:
import pandas as pd
df = pd.json_normalize(jsonFullResp['value'])
df

### Paginate Through the Available FileSets
(interrupt at any point)

In [None]:
i = 1
while "@nextLink" in jsonFullResp: 
    print('<<< Iteraction: '+str(i)+' >>>  More exists: '+ jsonFullResp['@nextLink'] + ', skipToken is: ' + jsonFullResp['@nextLink'][-62:]+'\n')
    jsonFullResp = requestFileSets(accessToken, True, jsonFullResp['@nextLink'][-62:],'');
    i+=1;
print('Last response without next=');
print(jsonFullResp);

### Retrieve FileSets of Specific File Type (Filter By Attribute and By PackageId)
The file types may change over time, at the time of this writing, the available FileSets are of types:

* ESG Raw Full A
* ESG Raw Full B
* ESG Raw Current A
* ESG Raw Current B
* ESG Sources
* ESG Raw Wealth Standard

* Symbology Cusip
* Symbology SEDOL
* Symbology Organization
* Symbology Instrument Quote

Further, the selected package, if also filtering by packageId has to contain the files per filtering arrtibutes, in order to request their listing succefully,
otherwise the result will be empty.


In [None]:
requestFileSets(accessToken, False, '','&attributes=ContentType:Symbology Cusip');

In [None]:
requestFileSets(accessToken, False, '','&packageId='+PACKAGE_ID+'&attributes=ContentType:ESG Sources');

### Retrieve Complete File Details of FileSet ID

In a previous step we have stored a FileSet ID that we are about to use for the demonstartion of this feature.

In [None]:
FILES_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/files?filesetId='+ FILESET_ID
WHICH_FILE_ID_TO_STORE = 0
FILE_ID = ''
FILE_NAME = ''
 
def requestFileDetails(token):   
    global FILES_ENDPOINT
    global FILE_ID
    global FILE_NAME
    print("Obtaining File details..." )
  
    querystring = {}
    payload = ""
    jsonfull = ""
    jsonpartial = ""
    
    headers = {
            'Content-Type': "application/json",
            'Authorization': "Bearer " + token,
            'cache-control': "no-cache"
    }
        
    response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)
    
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.request("GET", FILES_ENDPOINT, data=payload, headers=headers, params=querystring)
         
    print('Raw response=');
    print(response);
    
    if response.status_code == 200:
        jsonFullResp = json.loads(response.text)
        print('Parsed json response=');
        print(json.dumps(jsonFullResp, indent=2));
        FILE_ID = jsonFullResp['value'][WHICH_FILE_ID_TO_STORE]['id']
        print('FILE_ID stored is: '+str(FILE_ID))
        FILE_NAME = jsonFullResp['value'][WHICH_FILE_ID_TO_STORE]['filename']
        print('FILE_NAME stored is: '+str(FILE_NAME))
        return jsonFullResp; 
    else:
        return '';

jsonFullResp = requestFileDetails(accessToken);

###  Stream File via File Id using Redirect

In [None]:
import shutil

FILES_STREAM_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/files/'+ FILE_ID+ '/stream'
 
def requestFileDownload(token):   
    global FILES_STREAM_ENDPOINT
    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)
  
    filename = FILE_NAME
    chunk_size = 1000
    
    headers = {
            'Authorization': 'Bearer ' + token,
            'cache-control': "no-cache",
            'Accept': '*/*'
    }
        
    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)
    
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=True, allow_redirects=True)

         
    print('Response code=' + str(response.status_code));
    
    if response.status_code == 200:
        print('Processing...')
        with open(filename, 'wb') as fd:
            shutil.copyfileobj(response.raw, fd) 
        print('Look for gzipped file named: '+ filename + ' in current directory')
        response.connection.close()
        
    return; 


requestFileDownload(accessToken);

### Get File Location (Step 1 of 2)

In [None]:
import shutil

FILES_STREAM_ENDPOINT = RDP_BASE_URL+'/file-store'+RDP_ESG_BULK_VERSION + '/files/'+ FILE_ID+ '/stream?doNotRedirect=true'
DIRECT_URL = ''
 
def requestFileLocation(token):   
    
    global FILES_STREAM_ENDPOINT
    global DIRECT_URL
    
    print("Obtaining File ... " + FILES_STREAM_ENDPOINT)
  
    filename = FILE_NAME
    chunk_size = 1000
    
    headers = {
            'Authorization': 'Bearer ' + token,
            'cache-control': "no-cache",
            'Accept': '*/*'
    }
        
    response = requests.request("GET", FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)
    
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.request("GET",FILES_STREAM_ENDPOINT, headers=headers, stream=False, allow_redirects=False)

         
    print('Response code=' + str(response.status_code));
    
    if response.status_code == 200:
        jsonFullResp = json.loads(response.text)
        print('Parsed json response=');
        print(json.dumps(jsonFullResp, indent=2));
        DIRECT_URL = jsonFullResp['url'];
        print('File Direct URL is: '  +str(DIRECT_URL)+ '|||');
        
    return; 


requestFileLocation(accessToken);

### Download File From File Location (Step 2 of 2)

In [None]:
from urllib.parse import urlparse, parse_qs
def requestDirectFileDownload(token):   
    
    global DIRECT_URL
    print("Obtaining File from URL... " + DIRECT_URL)
    
    #Parse out URL parameters for submission into requests
    url_obj = urlparse(DIRECT_URL)
    parsed_params = parse_qs(url_obj.query)
    # extract the URL without query parameters
    parsed_url = url_obj._replace(query=None).geturl()

    response = requests.get(parsed_url, params=parsed_params,stream=True)
        
    if response.status_code != 200:
        if response.status_code == 401:   # error when token expired
                accessToken = getToken();     # token refresh on token expired
                headers['Authorization'] = "Bearer " + accessToken
                response = requests.get(parsed_url, params=query)

         
    print('Response code=' + str(response.status_code));        
  
    filename = 'another_'+FILE_NAME    
    
    if response.status_code == 200:
        print('Processing...')
        with open(filename, 'wb') as fd:
            shutil.copyfileobj(response.raw, fd) 

        print('Look for gzipped file named: '+ filename + ' in current directory')
        response.connection.close()
        
    return; 


requestDirectFileDownload(accessToken);