# Refinitiv Data Platform Library for Python
## Content - Streaming Prices - How to use streaming events

This Notebook demonstrates how to retrieve level 1 streaming data (such as trades and quotes) from the Refinitiv Data Platform. The example will show how to define a StreamingPrices object with registered event handlers so that your application is notified when new data is coming in.

Using StreamingPrices that way allows your application to be updated in real-time when data changes on the market. With this event-driven mode, your application can still benefit from the StreamingPrices data cache and use the get_snapshot function to pull out real-time snapshots as Pandas DataFrames.

#### Learn more

To learn more about the Refinitiv Data Platform Data Libraries just connect to the Refinitiv Developer Community. By [registering](https://developers.refinitiv.com/iam/register) and [login](https://developers.refinitiv.com/iam/login) to the Refinitiv Developer Community portal you will get free access to a number of learning materials like [Quick Start guides](https://developers.refinitiv.com/refinitiv-data-platform/refinitiv-data-platform-libraries/quick-start), [Tutorials](https://developers.refinitiv.com/refinitiv-data-platform/refinitiv-data-platform-libraries/learning), [Documentation](https://developers.refinitiv.com/refinitiv-data-platform/refinitiv-data-platform-libraries/docs) and much more.  

#### Getting Help and Support

If you have any questions regarding the API usage, please post them on the [Refinitiv Data Platform Q&A Forum](https://community.developers.thomsonreuters.com/spaces/231/index.html). The Refinitiv Developer Community will be very pleased to help you. 

## Introduction to streaming events

Using a StreamingPrices object with events requires you to define event handlers that are called by a background execution thread when new events are received for the instruments you requested. There are 4 different types of event handlers you can optionnaly define depending on the type to events your are interrested in.  

### The 4 event types and their related event handlers:
 - **Refresh events:** Refresh events happen when all fields of one the requested instruments are received. This complete list of fields is sometimes called the 'image' of the instrument. This image that comes with Refresh messages can be later updated by subsequent Update events. When several Refresh events are received for the same instrument, the fields transported by the latest Refresh are considered as the new image. Fields received in previous Refresh events or Update events must be discarded. StreamingPrices objects automatically manage this logic for their internal cache, meaning that when you call get_snapshot you always get the latest and relevant field values for the requested instruments. 
 
 Refresh event handlers take 3 parameters: 
  - The StreamingPrices object that received the event
  - The name of the concerned instrument
  - The fields and values of the Image
 
 
 - **Update events:** Update events are received when fields of a requested instrument change. Update events only contain the fields and values that changed. When the application receives an Update it must update its internal representation of the instrument (if any) accordingly. StreamingPrices objects automatically manage this logic for their internal cache, meaning that when you call get_snapshot you always get the latest values of the requested instruments. 
 
 Update event handlers take 3 parameters 
  - The StreamingPrices object that received the event
  - The name of the concerned instrument
  - The updated fields with their new values
 
 
 - **Status events:** Status events are received when the status of one of the requested instruments changes.
 
 Status event handlers take 3 parameters 
  - The StreamingPrices object that received the event
  - The name of the concerned instrument
  - The new status of the instrument
 
 
 - **Complete events:** A Complete event is received once all the requested instruments received either a Refresh or a Status event. The Complete event indicates that the StreamingPrices object is complete and that it's internal cache contains the full data set (instruments and fields) that were requested.     

 Complete event handlers take one parameter
  - The StreamingPrices object that is complete
 
**Side note:** As Refresh events and Update events use handlers with the same signature, the same handler can be used for these 2 event types if you do not need to distinguish them 
 
### Typical events flow

As an example, if you use a StreamingPrices object with event for the following instruments and fields: 
 - Instruments: 'CAD=','GBP=', 'JPY=', 'JUNK'
 - Fields: 'CF_BID','CF_ASK','OPEN_PRC', 'CF_HIGH','CF_LOW', 'CF_CLOSE'

You may receive a flow of events like this one:
 1. **Refresh** event for GBP=
 1. **Refresh** event for CAD=
 1. **Status** event for JUNK
 1. **Refresh** event for JPY=
 1. **Complete** event => Indicating that data (or status) has been received for all requested instruments
 1. **Update** event for JPY=
 1. **Update** event for CAD=
 1. **Refresh** event for CAD=
 1. **Update** event for GBP=
 1. **Update** event for CAD=
 1. **Update** event for JPY=
 1. ...


## Import the library and load credentials

Credentials used by this notebook are stored in the ./credentials.ipynb. Please edit ./credentials.ipynb to set your credentials and run the next cell to continue with this scripts

In [1]:
import refinitiv.dataplatform as rdp
import datetime

%run ./credentials.ipynb

## Open the session of your choice

Depending on the access point your application uses to connect to the Refinitiv Data Platform, it needs to call one of the following functions to create and open a session to the platform. This session will become the default session used by all other function calls.

#### Either

Create and open a Desktop session to connect to the Refinitiv Data Platform pvia Eikon 4 or the Refinitiv Workspace.

In [20]:
rdp.open_desktop_session(APP_KEY)

<refinitiv.dataplatform.core.session.desktop_session.DesktopSession at 0xb0927f0>

#### or

Create and open a Platform session to connect directly to the Refinitiv Data Platform. 

In [10]:
rdp.open_platform_session(
    APP_KEY, 
    rdp.GrantPassword(
        username = RDP_LOGIN, 
        password = RDP_PASSWORD
    )
)

<refinitiv.dataplatform.core.session.platform_session.PlatformSession at 0xb0652e8>

#### or

Create and open a Deployed Platform session to connect directly to a deployed enterprise platform (a.k.a.TREP). 

In [24]:
rdp.open_deployed_platform_session(
    APP_KEY,
    DEPLOYED_PLATFORM_HOST,
    DEPLOYED_PLATFORM_USER_NAME
)

<refinitiv.dataplatform.core.session.deployed_platform_session.DeployedPlatformSession at 0xadfd748>

## Define callbacks to capture incoming events

The following function will be used to capture Refresh events. It displays the name of the refreshed instrument and its full image (complete list of requested fields).

In [3]:
def display_refreshed_fields(streaming_price, instrument_name, fields):
    current_time = datetime.datetime.now().time()
    print(current_time, "- Refresh received for", instrument_name, ":", fields)    

The following function will be used to capture Update events. It displays the name of the updated instrument and the updated fields.

In [4]:
def display_updated_fields(streaming_price, instrument_name, fields):
    current_time = datetime.datetime.now().time()
    print(current_time, "- Update received for", instrument_name, ":", fields)    

The following function will be used to capture Status events. It displays the name of the instrument and the received status.

In [5]:
def display_status(streaming_price, instrument_name, status):
    current_time = datetime.datetime.now().time()
    print(current_time, "- Status received for", instrument_name, ":", status)    

The following function will be used to capture Complete events. It calls get_snapshot to pull out the memory cache of the StreamingPrice as a Pandas DataFrame and displays it. As a result, the latest values of all requested fields and instruments are displayed in a table. 

In [6]:
def display_complete_snapshot(streaming_prices):
    current_time = datetime.datetime.now().time()
    print(current_time, "- StreamingPrice is complete. Full snapshot:")
    display(streaming_prices.get_snapshot())

## Create StreamingPrices and register the  event callbacks

In [25]:
streaming_prices = rdp.StreamingPrices(
    universe = ['EUR=', 'GBP=', 'JPY=', 'CAD='], 
    fields   = ['BID', 'ASK', 'OPEN_PRC', 'HST_CLOSE'],
    on_refresh = lambda streaming_price, instrument_name, fields : 
        display_refreshed_fields(streaming_price, instrument_name, fields),
    on_update = lambda streaming_price, instrument_name, fields : 
        display_updated_fields(streaming_price, instrument_name, fields),
    on_status = lambda streaming_price, instrument_name, status : 
        display_status(streaming_price, instrument_name, status),
    on_complete = lambda streaming_price : 
        display_complete_snapshot(streaming_price)
)

## Open StreamingPrices and wait for events

In [26]:
streaming_prices.open()

14:33:48.271306 - Status received for GBP= : {'status': <StreamState.Open: 3>, 'code': 'Open', 'message': 'All is well'}
14:33:48.271306 - Refresh received for GBP= : {'OPEN_PRC': 1.3077, 'HST_CLOSE': 1.3076, 'BID': 1.3044, 'ASK': 1.3048}
14:33:48.271306 - Status received for EUR= : {'status': <StreamState.Open: 3>, 'code': 'Open', 'message': 'All is well'}
14:33:48.271306 - Refresh received for EUR= : {'OPEN_PRC': 1.1111, 'HST_CLOSE': 1.1111, 'BID': 1.1121, 'ASK': 1.1125}
14:33:48.271306 - Update received for GBP= : {'BID': 1.3044, 'ASK': 1.3048}
14:33:48.271306 - Status received for JPY= : {'status': <StreamState.Open: 3>, 'code': 'Open', 'message': 'All is well'}
14:33:48.271306 - Refresh received for JPY= : {'OPEN_PRC': 109.54, 'HST_CLOSE': 109.53, 'BID': 109.44, 'ASK': 109.47}
14:33:48.271306 - Status received for CAD= : {'status': <StreamState.Open: 3>, 'code': 'Open', 'message': 'All is well'}
14:33:48.272306 - Refresh received for CAD= : {'OPEN_PRC': 1.311, 'HST_CLOSE': 1.3113,

Unnamed: 0,Instrument,OPEN_PRC,HST_CLOSE,BID,ASK
0,EUR=,1.1111,1.1111,1.1121,1.1125
1,GBP=,1.3077,1.3076,1.3044,1.3048
2,JPY=,109.54,109.53,109.44,109.47
3,CAD=,1.311,1.3113,1.3116,1.312


<StreamState.Open: 3>

14:33:48.602238 - Update received for GBP= : {'BID': 1.3045, 'ASK': 1.3046}
14:33:48.602238 - Update received for CAD= : {'BID': 1.3119, 'ASK': 1.3123}
14:33:49.353081 - Update received for EUR= : {'BID': 1.1121, 'ASK': 1.1123}
14:33:49.670018 - Update received for GBP= : {'BID': 1.3043, 'ASK': 1.3048}
14:33:49.671017 - Update received for GBP= : {'BID': 1.3043, 'ASK': 1.3048}
14:33:49.991953 - Update received for GBP= : {'BID': 1.3045, 'ASK': 1.3048}
14:33:50.306890 - Update received for CAD= : {'BID': 1.3119, 'ASK': 1.3124}
14:33:50.306890 - Update received for CAD= : {'BID': 1.3116, 'ASK': 1.312}
14:33:50.626826 - Update received for GBP= : {'BID': 1.3044, 'ASK': 1.3048}
14:33:50.626826 - Update received for GBP= : {'BID': 1.3045, 'ASK': 1.3046}
14:33:50.947762 - Update received for EUR= : {'BID': 1.1122, 'ASK': 1.1123}
14:33:51.266698 - Update received for CAD= : {'BID': 1.3116, 'ASK': 1.312}


## Close StreamingPrices when done

In [None]:
streaming_prices.close()