# Eikon Data API - How to use StreamingPrices with events

The following example demonstrates how to retrieve level 1 streaming data (such as trades and quotes) from Eikon or the Refinitiv Workspace. The example will show how to define a StreamingPrices object with registered event handlers so that your application is notified when new data is coming in.

Using StreamingPrices that way allows your application to be updated in real-time when data changes on the market. With this event-driven mode, your application can still benefit from the StreamingPrices data cache and use the get_snapshot function to pull out real-time snapshots as Pandas DataFrames.

## About events


Using a StreamingPrices object with events requires you to define event handlers that are called by a background execution thread when new events are received for the instruments you requested. There are 4 different types of event handlers you can optionnaly define depending on the type to events your are interrested in.  

### The 4 event types and their related event handlers:
 - **Refresh events:** Refresh events happen when all fields of one the requested instruments are received. This complete list of fields is sometimes called the 'image' of the instrument. This image that comes with Refresh messages can be later updated by subsequent Update events. When several Refresh events are received for the same instrument, the fields transported by the latest Refresh are considered as the new image. Fields received in previous Refresh events or Update events must be discarded. StreamingPrices objects automatically manage this logic for their internal cache, meaning that when you call get_snapshot you always get the latest and relevant field values for the requested instruments. 
 
 Refresh event handlers take 3 parameters: 
  - The StreamingPrices object that received the event
  - The name of the concerned instrument
  - The fields and values of the Image
 
 
 - **Update events:** Update events are received when fields of a requested instrument change. Update events only contain the fields and values that changed. When the application receives an Update it must update its internal representation of the instrument (if any) accordingly. StreamingPrices objects automatically manage this logic for their internal cache, meaning that when you call get_snapshot you always get the latest values of the requested instruments. 
 
 Update event handlers take 3 parameters 
  - The StreamingPrices object that received the event
  - The name of the concerned instrument
  - The updated fields with their new values
 
 
 - **Status events:** Status events are received when the status of one of the requested instruments changes.
 
 Status event handlers take 3 parameters 
  - The StreamingPrices object that received the event
  - The name of the concerned instrument
  - The new status of the instrument
 
 
 - **Complete events:** A Complete event is received once all the requested instruments received either a Refresh or a Status event. The Complete event indicates that the StreamingPrices object is complete and that it's internal cache contains the full data set (instruments and fields) that were requested.     

 Complete event handlers take one parameter
  - The StreamingPrices object that is complete
 
**Side note:** As Refresh events and Update events use handlers with the same signature, the same handler can be used for these 2 event types if you do not need to distinguish them 
 
### Typical events flow

As an example, if you use a StreamingPrices object with event for the following instruments and fields: 
 - Instruments: 'CAD=','GBP=', 'JPY=', 'JUNK'
 - Fields: 'CF_BID','CF_ASK','OPEN_PRC', 'CF_HIGH','CF_LOW', 'CF_CLOSE'

You may receive a flow of events like this one:
 1. **Refresh** event for GBP=
 1. **Refresh** event for CAD=
 1. **Status** event for JUNK
 1. **Refresh** event for JPY=
 1. **Complete** event => Indicating that data (or status) has been received for all requested instruments
 1. **Update** event for JPY=
 1. **Update** event for CAD=
 1. **Refresh** event for CAD=
 1. **Update** event for GBP=
 1. **Update** event for CAD=
 1. **Update** event for JPY=
 1. ...

  


## Imports

Imports the 'eikon' library and other usefull libraries needed by this notebook

In [1]:
import eikon as ek
import datetime

## Set App Key
Set the AppKey of this application and connect to Eikon.

In [2]:
ek.set_app_key('YOUR_APP_KEY')

## Define callbacks to capture incoming events

The following function will be used to capture Refresh events. It displays the name of the refreshed instrument and its full image (complete list of requested fields).

In [3]:
def display_refreshed_fields(streaming_price, instrument_name, fields):
    current_time = datetime.datetime.now().time()
    print(current_time, "- Refresh received for", instrument_name, ":", fields)    

The following function will be used to capture Update events. It displays the name of the updated instrument and the updated fields.

In [4]:
def display_updated_fields(streaming_price, instrument_name, fields):
    current_time = datetime.datetime.now().time()
    print(current_time, "- Update received for", instrument_name, ":", fields)    

The following function will be used to capture Status events. It displays the name of the instrument and the received status.

In [5]:
def display_status(streaming_price, instrument_name, status):
    current_time = datetime.datetime.now().time()
    print(current_time, "- Status received for", instrument_name, ":", status)    

The following function will be used to capture Complete events. It calls get_snapshot to pull out the memory cache of the StreamingPrice as a Pandas DataFrame and displays it. As a result, the latest values of all requested fields and instruments are displayed in a table. 

In [6]:
def display_complete_snapshot(streaming_prices):
    current_time = datetime.datetime.now().time()
    print(current_time, "- StreamingPrice is complete. Full snapshot:")
    display(streaming_prices.get_snapshot())

## Create StreamingPrices and register event callbacks

In [7]:
streaming_prices = ek.StreamingPrices(
    instruments = ['EUR=','GBP=','JPY=', 'CAD='], 
    fields   = ['SALTIM', 'CF_BID','CF_ASK','OPEN_PRC', 'CF_HIGH','CF_LOW', 'CF_CLOSE'],
    on_refresh = lambda streaming_price, instrument_name, fields : 
        display_refreshed_fields(streaming_price, instrument_name, fields),
    on_update = lambda streaming_price, instrument_name, fields : 
        display_updated_fields(streaming_price, instrument_name, fields),
    on_status = lambda streaming_price, instrument_name, status : 
        display_status(streaming_price, instrument_name, status),
    on_complete = lambda streaming_price : 
        display_complete_snapshot(streaming_price)
)

## Open StreamingPrices and wait for events

In [8]:
streaming_prices.open()

12:52:59.198710 - Status received for GBP= : {'status': <StreamState.Open: 3>, 'code': 'Open', 'message': 'All is well'}
12:52:59.199709 - Refresh received for GBP= : {'CF_BID': 1.2903, 'CF_ASK': 1.2907, 'OPEN_PRC': 1.2922, 'CF_HIGH': 1.2931, 'CF_LOW': 1.2889, 'CF_CLOSE': 1.2922}
12:52:59.199709 - Status received for CAD= : {'status': <StreamState.Open: 3>, 'code': 'Open', 'message': 'All is well'}
12:52:59.199709 - Refresh received for CAD= : {'CF_BID': 1.3303, 'CF_ASK': 1.3307, 'OPEN_PRC': 1.3266, 'CF_HIGH': 1.3315, 'CF_LOW': 1.3262, 'CF_CLOSE': 1.3266}
12:52:59.199709 - Status received for JPY= : {'status': <StreamState.Open: 3>, 'code': 'Open', 'message': 'All is well'}
12:52:59.199709 - Refresh received for JPY= : {'CF_BID': 108.42, 'CF_ASK': 108.44, 'OPEN_PRC': 108.53, 'CF_HIGH': 108.57, 'CF_LOW': 108.36, 'CF_CLOSE': 108.53}
12:52:59.199709 - Status received for EUR= : {'status': <StreamState.Open: 3>, 'code': 'Open', 'message': 'All is well'}
12:52:59.199709 - Refresh received f

Unnamed: 0,Instrument,CF_BID,CF_ASK,OPEN_PRC,CF_HIGH,CF_LOW,CF_CLOSE
0,EUR=,1.1059,1.106,1.1077,1.1081,1.1057,1.1078
1,GBP=,1.2903,1.2907,1.2922,1.2931,1.2889,1.2922
2,JPY=,108.42,108.44,108.53,108.57,108.36,108.53
3,CAD=,1.3303,1.3307,1.3266,1.3315,1.3262,1.3266


<StreamState.Open: 3>

12:52:59.791650 - Update received for GBP= : {'CF_BID': 1.2903, 'CF_ASK': 1.2907}
12:53:01.108519 - Update received for JPY= : {'CF_BID': 108.42, 'CF_ASK': 108.45}
12:53:01.428487 - Update received for CAD= : {'CF_BID': 1.3303, 'CF_ASK': 1.3307}
12:53:01.794450 - Update received for EUR= : {'CF_BID': 1.1058, 'CF_ASK': 1.106}
12:53:02.751354 - Update received for GBP= : {'CF_BID': 1.2903, 'CF_ASK': 1.2907}
12:53:03.643265 - Status received for EUR= : {'status': <StreamState.Open: 3>, 'code': 'Open', 'message': None}
12:53:04.094220 - Update received for JPY= : {'CF_BID': 108.43, 'CF_ASK': 108.46}


## Close StreamingPrices when done

In [9]:
streaming_prices.close()

<StreamState.Closed: 1>