# Machine Readable News Example with LSEG Data Library for Python

- Last update: April 2025

This example shows how developers may use the [LSEG Data Library for Python](https://developers.lseg.com/en/api-catalog/lseg-data-platform/lseg-data-library-for-python) Delivery layer feature to subscribe to the Machine Readable News (MRN) from LSEG Real-Time Distribution System (RTDS) and LSEG Real-Time Optimized (RTO). The example just connects to LSEG Real-Time platform via a WebSocket connection, then subscribes and displays MRN News data in a console. 

#### Machine Readable News Overview

Machine Readable News (MRN) is an advanced service for automating the consumption and systematic analysis of news. It delivers deep historical news archives, ultra-low latency structured news and news analytics directly to your applications. This enables algorithms to exploit the power of news to seize opportunities, capitalize on market inefficiencies, and manage event risk.

#### MRN Data model

MRN is published over the Real-Time platform using an Open Message Model (OMM) envelope in News Text Analytics domain messages. The Real-time News content set is made available over MRN_STORY RIC. The content data is contained in a FRAGMENT field that has been compressed and potentially fragmented across multiple messages, to reduce bandwidth and message size.

A FRAGMENT field has a different data type based on a connection type:

- RSSL connection (RTSDK [C++](https://developers.lseg.com/en/api-catalog/refinitiv-real-time-opnsrc/rt-sdk-cc)/[Java](https://developers.lseg.com/en/api-catalog/refinitiv-real-time-opnsrc/rt-sdk-java)/[C#](https://developers.lseg.com/en/api-catalog/refinitiv-real-time-opnsrc/rt-sdk-cc)): BUFFER type
- WebSocket connection: Base64 ASCII string

The data goes through the following series of transformations:

1. The core content data is a UTF-8 JSON string
2. This JSON string is compressed using gzip
3. The compressed JSON is split into several fragments (BUFFER or Base64 ASCII string) which each fit into a single update message
4. The data fragments are added to an update message as the FRAGMENT field value in a FieldList envelope

<img src="../images/mrn_process.png"/>

Therefore, to parse the core content data, the application will need to reverse this process. The WebSocket application also needs to convert a received Base64 string in a FRAGMENT field to bytes data before further process this field. This application uses Python [base64](https://docs.python.org/3/library/base64.html) and [zlib](https://docs.python.org/3/library/zlib.html) modules to decode Base64 string and decompress JSON string.

If you are not familiar with MRN concept, please visit the following resources which will give you a full explanation of the MRN data model and implementation logic:

- [Webinar Recording: Introduction to Machine Readable News](https://developers.lseg.com/news#news-accordion-nid-12045)
- [Introduction to Machine Readable News (MRN) with Enterprise Message API (EMA)](https://developers.lseg.com/en/article-catalog/article/introduction-machine-readable-news-mrn-elektron-message-api-ema).
- [MRN Data Models and the Real-Time SDK Implementation Guide](https://developers.lseg.com/en/api-catalog/refinitiv-real-time-opnsrc/rt-sdk-java/documentation#mrn-data-models-implementation-guide).
- [Introduction to Machine Readable News with WebSocket API](https://developers.lseg.com/en/article-catalog/article/introduction-machine-readable-news-elektron-websocket-api-refinitiv).

In [1]:
# Import all required library

import sys
import datetime
import time
import json
import base64
import zlib
import binascii
import lseg.data as ld
from lseg.data.delivery import omm_stream

In [2]:
# variables that related to news 
_news_envelopes = []
RIC_CODE = 'MRN_STORY'
DOMAIN = 'NewsTextAnalytics'
SERVICE = 'ELEKTRON_DD'

#### Methods to display Refresh, Status messages

In [None]:
def display_event(eventType, event):
    """Retrieve data: Callback function to display data or status events. """
    current_time = datetime.datetime.now().time()
    print('----------------------------------------------------------')
    print(f'>>> {eventType} event received at {current_time}')
    print(json.dumps(event, indent=2))
    return

### Method to assemble and display MRN news data from the Update message

In [None]:
def process_mrn_update(event):  
        """Process MRN Update messages."""
        message_json = event
        fields_data = message_json['Fields']

        # declare variables
        tot_size = 0
        guid = None

        try:
            # Get data for all required fields
            fragment = base64.b64decode(fields_data['FRAGMENT'])
            frag_num = int(fields_data['FRAG_NUM'])
            guid = fields_data['GUID']
            mrn_src = fields_data['MRN_SRC']

            #print("GUID  = %s" % guid)
            #print("FRAG_NUM = %d" % frag_num)
            #print("MRN_SRC = %s" % mrn_src)

            if frag_num > 1:  # We are now processing more than one part of an envelope - retrieve the current details
                guid_index = next((index for (index, d) in enumerate(_news_envelopes) if d['GUID'] == guid), None)
                envelop = _news_envelopes[guid_index]
                if envelop and envelop['data']['MRN_SRC'] == mrn_src and frag_num == envelop['data']['FRAG_NUM'] + 1:
                    print(f'process multiple fragments for guid {envelop["GUID"]}')

                    #print(f'fragment before merge = {len(envelop["data"]["FRAGMENT"])}')
                    # Merge incoming data to existing news envelop and getting FRAGMENT and TOT_SIZE data to local variables
                    fragment = envelop['data']['FRAGMENT'] = envelop['data']['FRAGMENT'] + fragment
                    envelop['data']['FRAG_NUM'] = frag_num
                    tot_size = envelop['data']['tot_size']
                    print(f'TOT_SIZE = {tot_size}')
                    print(f'Current FRAGMENT length = {len(fragment)}')

                    # The multiple fragments news are not completed, waiting.
                    if tot_size != len(fragment):
                        return None
                    # The multiple fragments news are completed, delete associate GUID envelop
                    elif tot_size == len(fragment):
                        del _news_envelopes[guid_index]
                else:
                    print(f'Error: Cannot find fragment for GUID {guid} with matching FRAG_NUM or MRN_SRC {mrn_src}')
                    return None
            else:  # FRAG_NUM = 1 The first fragment
                tot_size = int(fields_data['TOT_SIZE'])
                print(f'FRAGMENT length = {len(fragment)}')
                # The fragment news is not completed, waiting and add this news data to envelop object.
                if tot_size != len(fragment):
                    print(f'Add new fragments to news envelop for guid {guid}')
                    _news_envelopes.append({  # the envelop object is a Python dictionary with GUID as a key and other fields are data
                        'GUID': guid,
                        'data': {
                            'FRAGMENT': fragment,
                            'MRN_SRC': mrn_src,
                            'FRAG_NUM': frag_num,
                            "tot_size": tot_size
                        }
                    })
                    return None

            # News Fragment(s) completed, decompress and print data as JSON to console
            if tot_size == len(fragment):
                print(f'decompress News FRAGMENT(s) for GUID {guid}')
                decompressed_data = zlib.decompress(fragment, zlib.MAX_WBITS | 32)
                print(f'News = {json.loads(decompressed_data)}')

        except KeyError as keyerror:
            print('KeyError exception: ', keyerror)
        except IndexError as indexerror:
            print('IndexError exception: ', indexerror)
        except binascii.Error as b64error:
            print('base64 decoding exception:', b64error)
        except zlib.error as error:
            print('zlib decompressing exception: ', error)
        # Some console environments like Windows may encounter this unicode display as a limitation of OS
        except UnicodeEncodeError as encodeerror:
            print(f'UnicodeEncodeError exception. Cannot decode unicode character for {guid} in this environment: ', encodeerror)
        except Exception as specific_error:
            print(f'exception: str{specific_error}', sys.exc_info()[0])

#### Init and Open LD Session

In [None]:
try:
    # Open the data session
    ld.open_session()
    #ld.open_session(config_name='./lseg-data.devrel.config.json')
except Exception as ex:
    print("Error in open_session: " + str(ex))
    sys.exit(1) 

#### Create an OMM stream and register event callbacks

In [None]:
stream = omm_stream.Definition(
        name = RIC_CODE, 
        domain = DOMAIN,
        service = SERVICE).get_stream()

# Define the event callbacks
# Refresh - the first full image we get back from the server
stream.on_refresh(lambda event, item_stream  : display_event('Refresh', event))

# Update - as and when field values change, we receive updates from the server and process the MRN data
stream.on_update(lambda event, item_stream : process_mrn_update(event))

# Status - if data goes stale or item closes, we get a status message
stream.on_status(lambda event, item_stream : display_event('Status', event))

# Other errors
stream.on_error(lambda event, item_stream : display_event('Error', event))

<lseg.data.delivery.omm_stream.OMMStream object at 0x1f5f5ce6ad0>

#### Open the stream

In [7]:
# Send request to server and open stream
stream.open()
# We should receive the initial Refresh for the current field values
# followed by updates for the fields as and when they occur

----------------------------------------------------------
>>> Refresh event received at 14:25:14.019221
{
  "ID": 5,
  "Type": "Refresh",
  "Domain": "NewsTextAnalytics",
  "Key": {
    "Service": "ELEKTRON_DD",
    "Name": "MRN_STORY"
  },
  "State": {
    "Stream": "Open",
    "Data": "Ok"
  },
  "Qos": {
    "Timeliness": "Realtime",
    "Rate": "JitConflated"
  },
  "PermData": "AwEBEAAc",
  "SeqNumber": 62830,
  "Fields": {
    "PROD_PERM": 10001,
    "ACTIV_DATE": "2025-01-04",
    "RECORDTYPE": 30,
    "RDN_EXCHD2": "MRN",
    "TIMACT_MS": 65276147,
    "GUID": null,
    "CONTEXT_ID": 3752,
    "DDS_DSO_ID": 4232,
    "SPS_SP_RIC": ".[SPSML2L1",
    "MRN_V_MAJ": "2",
    "MRN_TYPE": "STORY",
    "MDU_V_MIN": null,
    "MDU_DATE": null,
    "MRN_V_MIN": "10",
    "MRN_SRC": "HK1_PRD_A",
    "MDUTM_NS": null,
    "FRAG_NUM": 1,
    "TOT_SIZE": 0,
    "FRAGMENT": null
  }
}


<OpenState.Opened: 'Opened'>

FRAGMENT length = 604
decompress News FRAGMENT(s) for GUID BSE7csqvP_2501172gHWMBzeSFWNvZrZzPMMCk4fOVyYtqUZ796SIp
News = {'altId': 'nBSE7csqvP', 'audiences': ['NP:BSEA', 'NP:BSEN', 'NP:CNR', 'NP:CNRA'], 'body': '\nPls refer enclosed file\n\nhttps://newsfile.refinitiv.com/getnewsfile/v1/story?guid=urn:newsml:reuters.com:20250117:nBSE8mWH9K\n\n', 'firstCreated': '2025-01-17T07:25:12.965Z', 'headline': 'SWARAJ ENGINES LTD. - 500407 - Board Meeting Outcome for Outcome Of The Board Meeting', 'id': 'BSE7csqvP_2501172gHWMBzeSFWNvZrZzPMMCk4fOVyYtqUZ796SIp', 'instancesOf': [], 'language': 'en', 'messageType': 2, 'mimeType': 'text/plain', 'provider': 'NS:BSE', 'pubStatus': 'stat:usable', 'subjects': ['B:1301', 'B:255', 'B:69', 'B:71', 'B:73', 'G:1', 'G:5B', 'G:CH', 'G:K', 'G:S', 'M:1QD', 'M:1WJ', 'M:1WK', 'M:2CM', 'M:2CQ', 'M:2CU', 'M:2CX', 'M:2DZ', 'M:32', 'M:3H', 'M:NU', 'M:Z', 'R:SWAR.NS', 'N2:ASIA', 'N2:ASXPAC', 'N2:AUTO', 'N2:AUTPT', 'N2:AUTPWR', 'N2:CMPNY', 'N2:CONPR', 'N2:CONPS', 'N2:CYCS

#### Close the stream and Session to stop subscription

In [None]:
try:
    stream.close()
    # Close the session
    ld.close_session() 
except Exception as ex:
    print(f'Error in close session: {str(ex)}')
 