----
<img src="../../../files/refinitiv.png" width="20%" style="vertical-align: top;">

# Data Library for Python

----

## Content layer - Chain stream - How to use streaming events
The following example demonstrates how to use a chain stream to be notified in realtime of any chain constituent change. The example will show how to create a Chain stream object with registered callbacks so that your application is notified whenever the list of chain constituents changes.

Using a Chain stream that way prevents your application from sending too many requests to RDP. This is particularly useful if your application needs to retrieve chain constituents at regular and short intervals.

#### Learn more

To learn more about the Refinitiv Data Library for Python please join the Refinitiv Developer Community. By [registering](https://developers.refinitiv.com/iam/register) and [login](https://developers.refinitiv.com/content/devportal/en_us/initCookie.html) to the Refinitiv Developer Community portal you will get free access to a number of learning materials like 
 [Quick Start guides](https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python/quick-start), 
 [Tutorials](https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python/learning), 
 [Documentation](https://developers.refinitiv.com/en/api-catalog/refinitiv-data-platform/refinitiv-data-library-for-python/docs)
 and much more.

#### Getting Help and Support

If you have any questions regarding the API usage, please post them on 
the [Refinitiv Data Q&A Forum](https://community.developers.refinitiv.com/spaces/321/index.html). 
The Refinitiv Developer Community will be happy to help.

## About events


To use a Chain stream object with events, you need to define callback functions that are called by a background execution thread when new events are received for the chain you requested. There are 5 different types of callback functions you can use depending on the type of events you are interrested in. These callback functions are optionnal meaning that you do not have to use all of them.

### The 5 events types and their related callback functions:
 - **add events:** add events happen when a new constituent is added to a chain. When several Add events are received for the same index, only the the last received constituent is kept in memory. Other constituents received from the same index are discarded. Chain stream objects automatically manage this logic for their internal cache, meaning that when you call get_constituent you always get the latest list of constituents.
 
 Callback functions used to receive Add events take 3 parameters: 
  - The Chain stream that received the event
  - The index the constituent added to the chain
  - The name of the constituent
 
 - **remove events:** remove events happen when a constituent is removed from a chain. Chain stream objects automatically manage this logic for their internal cache, meaning that when you call get_constituent you always get the latest values.
 
 Callback functions used to receive Remove events take 3 parameters: 
  - The Chain streaml that received the event
  - The index of the constituent removed from the chain
  - The name of the removed constituent
 
 
 - **update events:** Update events are received the name of a chain constituent is updated. When several Update events are received for the same index, only the the last received constituent is kept in memory. Other constituents received from the same index are discarded. Chain stream objects automatically manage this logic for their internal cache, meaning that when you call get_constituent you always get the latest list of constituents.
 
 Callback functions used to receive Update events take 3 parameters 
  - The Chains that received the event
  - The name of the concerned chain record
  - The updated fields with their new values
 
 - **complete events:** A Complete event is received once all constituents of the chain have been received. 

 Callback functions used to receive Complete events take 1 parameter
  - The Chains that received the event
  - The constituents of the chain

- **error events:** An Error event is received if the Chain failed to decode the chain completely.

Callback functions used to receive Error events take 2 parameters
  - The Chains that received the event
  - The Received error


### Typical events flow

As an example, if you use a Chains stream with event for '0#.DJI'

You may receive a flow of events like one:
 1. **Add** event for 0#.DJI : 0 - TNXP.O
 1. **Add** event for 0#.DJI : 1 - TQQQ.O
 1. **Add** event for 0#.DJI : 2 - QQQ.O
 1. **Add** event for 0#.DJI : 3 - AAPL.O
 1. **Add** event for ...
 1. **Add** event for ...
 1. **Add** event for ...
 1. **Complete** event 0#.DJI => Indicating that data (or status) has been received for all requested chain records

  



## Set the configuration file location
For a better ease of use, you have the option to set initialization parameters of the Refinitiv Data Library in the _refinitiv-data.config.json_ configuration file. This file must be located beside your notebook, in your user folder or in a folder defined by the _RD_LIB_CONFIG_PATH_ environment variable. The _RD_LIB_CONFIG_PATH_ environment variable is the option used by this series of examples. The following code sets this environment variable.      

In [13]:
import os
os.environ["RD_LIB_CONFIG_PATH"] = "../../../Configuration"

## Some Imports to start with

In [1]:
import refinitiv.data as rd
from refinitiv.data.content import pricing
import datetime

## Open the data session

The open_session() function creates and open sessions based on the information contained in the refinitiv-data.config.json configuration file. Please edit this file to set the session type and other parameters required for the session you want to open.

In [2]:
rd.open_session()

<refinitiv.data.session.Definition object at 0x7fe9142182b0 {name='workspace'}>

## Retrieve data

### Define callbacks to capture incoming events

The following function will be used to capture Add events. It displays the name of the added constituent in the chain record.

In [3]:
def display_add(constituent, index, chain_stream):
    time_now = datetime.datetime.now().time()
    print(f"{time_now}- Add event received for {chain_stream.name}. {index}: {constituent}")

The following function will be used to capture Remove events. It displays the name of the removed constituent from the chain record.

In [4]:
def display_remove(constituent, index, chain_stream):
    current_time = datetime.datetime.now().time()
    print(f"{time_now}- Remove event received for {chain_stream.name}. {index}: {constituent}")

The following function will be used to capture Update events. It displays the name of the updated constituent on the chain record.

In [5]:
def display_update(new_constituent, old_constituent, index, chain_stream):
    time_now = datetime.datetime.now().time()
    print(
        f"{time_now}- Update event received for {chain_stream.name}."
        f"{index}: {old_constituent} => {new_constituent}"
    )

The following function will be used to capture a complete decoded events. It calls get_constituents to pull out the memory cache of the Chain as a list of constituents and displays it. As a result, the latest values of all constituents of this chain record is displayed in a list.

In [6]:
def display_complete(constituents, chain_stream):
    time_now = datetime.datetime.now().time()
    print(
        f"{time_now}- Chain {chain_stream.name}"
        f" is completely decoded.\nconstituents:\n{constituents}"
    )

The following function will be used to capture Error events. It displays the name of the chain and the received error.

In [7]:
def display_error(error, chain_record, chain_stream):
    time_now = datetime.datetime.now().time()
    print(f"{time_now}- Error received for Chain {chain_stream.name}. Chain Record: {chain_record}. Error: {error}")   

### Create and open a Chain stream

A Chain stream object is created for a specific list of chain record. Note that skip_summary_links and skip_empty are optionals default are True. The override_summary_links is an opional argument, it is used for override the number of summary links for given chain record when the skip_summary_links is True.

In [8]:
nyse_active_volume_leaders = pricing.chain.Definition(".AV.O").get_stream()

nyse_active_volume_leaders.on_add(display_add)
nyse_active_volume_leaders.on_remove(display_remove)
nyse_active_volume_leaders.on_update(display_update)
nyse_active_volume_leaders.on_complete(display_complete)
nyse_active_volume_leaders.on_error(display_error)

<refinitiv.data.content.pricing.chain.Stream object at 0x7fe914255a30 {name='.AV.O'}>

The open method tells the Chain object to subscribe to the streams of the related chain elements.

### Open the chain stream and wait for events

In [9]:
nyse_active_volume_leaders.open()

18:20:53.792847- Add event received for .AV.O. TQQQ.O: 0
18:20:53.793032- Add event received for .AV.O. SNDL.O: 1
18:20:53.793104- Add event received for .AV.O. CLVS.O: 2
18:20:53.793166- Add event received for .AV.O. MULN.O: 3
18:20:53.793225- Add event received for .AV.O. CEAD.O: 4
18:20:53.793283- Add event received for .AV.O. AMD.O: 5
18:20:53.793372- Add event received for .AV.O. XELA.O: 6
18:20:53.793432- Add event received for .AV.O. SQQQ.O: 7
18:20:53.793490- Add event received for .AV.O. AAPL.O: 8
18:20:53.793548- Add event received for .AV.O. AMZN.O: 9
18:20:53.793608- Add event received for .AV.O. ENDP.O: 10
18:20:53.793670- Add event received for .AV.O. GRAB.O: 11
18:20:53.793727- Add event received for .AV.O. NVDA.O: 12
18:20:53.793795- Add event received for .AV.O. QQQ.O: 13
18:20:53.864809- Add event received for .AV.O. TSLA.O: 14
18:20:53.864938- Add event received for .AV.O. MREO.O: 15
18:20:53.864993- Add event received for .AV.O. ADN.O: 16
18:20:53.865049- Add event 

<OpenState.Opened: 'Opened'>

As soon as the open method returns, the stream object is ready to be used. Its internal cache is constantly kept updated with the latest streaming information received from he platform. All this happens behind the scene, waiting for your application to pull out data from the cache.  

### Close the stream

In [10]:
nyse_active_volume_leaders.close()

<OpenState.Closed: 'Closed'>

## Close the session

In [11]:
rd.close_session()