# Python examples of DataPlane bindings

Mimic the `e2sar_reas_test` in the C++ unit test suite.

In [1]:
import time
import sys

## IMPORTANT: Update the path to your built Python module. Use the absolute path to make life easier.
sys.path.append(
    '/home/ubuntu/dev-e2sar/build/src/pybind')

import e2sar_py

In [2]:

# dir(e2sar_py.DataPlane)
# dir(e2sar_py.DataPlane.Reassembler)
# dir(e2sar_py.DataPlane.Segmenter)

In [3]:
DP_IPV4_ADDR = "127.0.0.1"
DP_IPV4_PORT = 10000
data_id = 0x0505   # decimal value: 1085
eventSrc_id = 0x11223344   # decimal value: 287454020

## DPReasTest1

This is a test that uses local host to send/receive fragments. It does NOT use control plane.


1. First initialize "Segmenter" objects.

In [4]:
# Set up URI for segmenter
SEG_URI = f"ejfat://useless@192.168.100.1:9876/lb/1?sync=192.168.0.1:12345&data={DP_IPV4_ADDR}:{DP_IPV4_PORT}"
seg_uri = e2sar_py.EjfatURI(uri=SEG_URI, tt=e2sar_py.EjfatURI.TokenType.instance)

# Set up sflags
sflags = e2sar_py.DataPlane.SegmenterFlags()
sflags.useCP = False  # turn off CP. Default value is True
sflags.syncPeriodMs = 1000
sflags.syncPeriods = 5

assert(sflags.syncPeriodMs == 1000)
assert(sflags.useCP == False)
assert(sflags.syncPeriods == 5)

print("Segmenter flags:")
print(f"  syncPeriodMs={sflags.syncPeriodMs}")
print(f"  useCP={sflags.useCP}")
print(f"  mtu={sflags.mtu}")
print(f"  syncPeriods={sflags.syncPeriods}")

Segmenter flags:
  syncPeriodMs=1000
  useCP=False
  mtu=1500
  syncPeriods=5


In [5]:
# Init segmenter object
seg = e2sar_py.DataPlane.Segmenter(seg_uri, data_id, eventSrc_id, sflags)

The message we need to send.

In [6]:
SEND_STR = "THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND."

send_context = SEND_STR.encode('utf-8')
print(send_context)
print(len(send_context))

b'THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.'
65


2. Initilize the Reassembler.

In [7]:
# Set the reassembler URI
REAS_URI_ = f"ejfat://useless@192.168.100.1:9876/lb/1?sync=192.168.0.1:12345&data={DP_IPV4_ADDR}"
reas_uri = e2sar_py.EjfatURI(uri=REAS_URI_, tt=e2sar_py.EjfatURI.TokenType.instance)

# Make sure the token matches the one in the string
def get_inst_token(uri : e2sar_py.EjfatURI):
    try:
        token = uri.get_instance_token().value()
        print("Instance Token:", token)
    except RuntimeError as e:
        print("Instance Token - Error:", e)

get_inst_token(reas_uri)

# Get data plane IPv4 address
print("Data Plane Address (v4):", str(reas_uri.get_data_addr_v4().value()[0]))

# Config the reassembler flags

rflags = e2sar_py.DataPlane.ReassemblerFlags()
rflags.useCP = False  # turn off CP. Default value is True
rflags.withLBHeader = True  # LB header will be attached since there is no LB

assert rflags.period_ms == 100  # default value of the C++ constructor
assert rflags.useCP == False

print("Reassembler flags:")
print(f"  period_ms={rflags.period_ms}")  # should be 100 according to the C++ constructor
print(f"  useCP={rflags.useCP}")
print(f"  validateCert = {rflags.validateCert}")
print(f"  epoch_ms = {rflags.epoch_ms}")
print(f"  setPoint = {rflags.setPoint}")
print(f"  [Kp, Ki, Kd] = [{rflags.Kp}, {rflags.Ki}, {rflags.Kd}]")
print(f"  cpV6 = {rflags.cpV6}")
print(f"  portRange = {rflags.portRange}")
print(f"  withLBHeader = {rflags.withLBHeader}")

Instance Token: useless
Data Plane Address (v4): 127.0.0.1
Reassembler flags:
  period_ms=100
  useCP=False
  validateCert = True
  epoch_ms = 1000
  setPoint = 0.0
  [Kp, Ki, Kd] = [0.0, 0.0, 0.0]
  cpV6 = False
  portRange = -1
  withLBHeader = True


In [8]:
# Init the reassembler object
reas = e2sar_py.DataPlane.Reassembler(
    reas_uri, e2sar_py.IPAddress.from_string(DP_IPV4_ADDR), DP_IPV4_PORT, 1, rflags)

In [9]:

res = reas.OpenAndStart()  # the DP address must be available
assert res.value() == 0

In [10]:
res = reas.getStats()
print(res)

(0, 0, 0, 0, 0, <E2SARErrorc.NoError: 0>)


3. Send the contexts with the Segmenter object.

In [11]:
# Start segmenter threads
res = seg.OpenAndStart()
assert res.value() == 0

res = seg.getSendStats()
if (res[1] != 0):
    print(f"Error encountered after opening send socket: {res[2]}")
    # exit(-1)

In [12]:
# Send the data
for i in range(5):
    print(f"Sending data...                Rd {i}")
    res = seg.addToSendQueue(send_context, len(send_context))
    assert(res.value() == 0)
    res = seg.getSendStats()
    if (res[1] != 0):
        print(f"  SendStats: {res}")
        print(f"  Error encountered sending event frame: {i}, error: {res[2]}")
    time.sleep(1)

res = seg.getSendStats()
# assert(res[0] == 5)  # hold for the first run
print(f"Sent {res[0]} data frames")

if (res[1] != 0):
    print(f"SendStats: {res}")
    print(f"After sending data --  error: {res[2]}")
    # exit()

Sending data...                Rd 0
Sending data...                Rd 1
Sending data...                Rd 2
Sending data...                Rd 3
Sending data...                Rd 4
Sent 5 data frames


In [13]:

# Prepare Python list to hold the output data
recv_bytes_list = [None]  # Placeholder for the byte buffer

# Call getEvent on the instance
for i in range(5):
    print(f"Receiving data...                Rd {i}")
    res, recv_event_len, recv_event_num, recv_data_id = reas.getEvent(recv_bytes_list)

    if (res != 0):  # -1 or -2 (has_error)
        continue
    recv = recv_bytes_list[0].decode('utf-8')
    print(f"  recv_buf:\t {recv}")
    assert(recv_data_id == data_id)
    print(f"  bufLen:\t {recv_event_len}")
    print(f"  eventNum:\t {recv_event_num}")
    print(f"  dataId:\t {recv_data_id}")


Receiving data...                Rd 0
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 0
  dataId:	 1285
Receiving data...                Rd 1
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 1
  dataId:	 1285
Receiving data...                Rd 2
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 2
  dataId:	 1285
Receiving data...                Rd 3
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 3
  dataId:	 1285
Receiving data...                Rd 4
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 4
  dataId:	 1285


## DPReasTest2

Test segmentation and reassembly on local host with no control plane (basic segmentation) using small MTU.

In [14]:
# Set up sflags
sflags2 = e2sar_py.DataPlane.SegmenterFlags()
sflags2.useCP = False  # turn off CP. Default value is True
sflags2.syncPeriodMs = 1000
sflags2.syncPeriods = 5
sflags2.mtu = 80

assert(sflags2.syncPeriodMs == 1000)
assert(sflags2.useCP == False)
assert(sflags2.syncPeriods == 5)
assert(sflags2.mtu == 80)

print("Segmenter flags:")
print(f"  syncPeriodMs={sflags2.syncPeriodMs}")
print(f"  useCP={sflags2.useCP}")
print(f"  mtu={sflags2.mtu}")
print(f"  syncPeriods={sflags2.syncPeriods}")

Segmenter flags:
  syncPeriodMs=1000
  useCP=False
  mtu=80
  syncPeriods=5


In [15]:
SEG_URI2 = f"ejfat://useless@192.168.100.1:9876/lb/1?sync=192.168.0.1:12345&data={DP_IPV4_ADDR}:{DP_IPV4_PORT}"
seg_uri2 = e2sar_py.EjfatURI(uri=SEG_URI2, tt=e2sar_py.EjfatURI.TokenType.instance)

In [16]:
# Initialize a new Segmenter objects
seg2 = e2sar_py.DataPlane.Segmenter(seg_uri2, data_id, eventSrc_id, sflags2)

res = seg2.OpenAndStart()
assert(res.value() == 0)

res = seg2.getSendStats()
if (res[1] != 0):
    print(f"Error encountered after opening send socket: {res[2]}")
    # exit(-1)

In [17]:
# Send data with the small MTU
for i in range(5):
    print(f"Sending data...                Rd {i}")
    res = seg2.addToSendQueue(send_context, len(send_context))
    assert(res.value() == 0)
    res = seg2.getSendStats()
    # print(f"  getSendStats:\t {res}")
    if (res[1] != 0):
        print(f"  SendStats: {res}")
        print(f"  Error encountered sending event frame: {i}, error: {res[2]}")
    time.sleep(1)

res = seg2.getSendStats()
# assert(res[0] == 25)  # hold for the 1st run
print(f"Sent {res[0]} data frames")

if (res[1] != 0):
    print(f"SendStats: {res}")
    print(f"After sending data --  error: {res[2]}")
    # exit()

Sending data...                Rd 0
Sending data...                Rd 1
Sending data...                Rd 2
Sending data...                Rd 3
Sending data...                Rd 4
Sent 25 data frames


In [18]:
# Prepare Python list to hold the output data
recv_bytes_list = [None]  # Placeholder for the byte buffer

# Call getEvent on the instance
for i in range(5):
    print(f"Receiving data...                Rd {i}")
    res, recv_event_len, recv_event_num, recv_data_id = reas.getEvent(recv_bytes_list)

    if (res != 0):  # -1 or -2 (has_error)
        continue
    recv = recv_bytes_list[0].decode('utf-8')
    print(f"  recv_buf:\t {recv}")
    assert(recv_data_id == data_id)
    print(f"  bufLen:\t {recv_event_len}")
    print(f"  eventNum:\t {recv_event_num}")
    print(f"  dataId:\t {recv_data_id}")

Receiving data...                Rd 0
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 0
  dataId:	 1285
Receiving data...                Rd 1
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 1
  dataId:	 1285
Receiving data...                Rd 2
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 2
  dataId:	 1285
Receiving data...                Rd 3
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 3
  dataId:	 1285
Receiving data...                Rd 4
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE WE WANT TO SEND EVERY 1 SECOND.
  bufLen:	 65
  eventNum:	 4
  dataId:	 1285


## DPReasTest3

Test creationg of reassemblers with different parameters.

In [19]:
# Create reassembler with 1 recv thread

rflags_m = e2sar_py.DataPlane.ReassemblerFlags()
reas_a = e2sar_py.DataPlane.Reassembler(
    reas_uri, e2sar_py.IPAddress.from_string(DP_IPV4_ADDR), 19522, 1, rflags_m)

print("This reassembler has")
print(f"   {reas_a.get_numRecvThreads()} threads;")
print(f"   listening on ports {reas_a.get_recvPorts()[0]}:{reas_a.get_recvPorts()[1]};")
print(f"   using portRange: {reas_a.get_portRange()}.")

assert reas_a.get_numRecvThreads() == 1
assert reas_a.get_recvPorts() == (19522, 19522)
assert reas_a.get_portRange() == 0

This reassembler has
   1 threads;
   listening on ports 19522:19522;
   using portRange: 0.


In [20]:
# Create reassembler with 4 recv threads
reas_b = e2sar_py.DataPlane.Reassembler(
    reas_uri, e2sar_py.IPAddress.from_string(DP_IPV4_ADDR), 19522, 4, rflags_m)

print("This reassembler has")
print(f"   {reas_b.get_numRecvThreads()} threads;")
print(f"   listening on ports {reas_b.get_recvPorts()[0]}:{reas_b.get_recvPorts()[1]};")
print(f"   using portRange: {reas_b.get_portRange()}.")

assert reas_b.get_numRecvThreads() == 4
assert reas_b.get_recvPorts() == (19522, 19525)
assert reas_b.get_portRange() == 2

This reassembler has
   4 threads;
   listening on ports 19522:19525;
   using portRange: 2.


In [21]:
# Create reassembler with 7 recv threads
reas_c = e2sar_py.DataPlane.Reassembler(
    reas_uri, e2sar_py.IPAddress.from_string(DP_IPV4_ADDR), 19522, 7, rflags_m)

print("This reassembler has")
print(f"   {reas_c.get_numRecvThreads()} threads;")
print(f"   listening on ports {reas_c.get_recvPorts()[0]}:{reas_c.get_recvPorts()[1]};")
print(f"   using portRange: {reas_c.get_portRange()}.")

assert reas_c.get_numRecvThreads() == 7
assert reas_c.get_recvPorts() == (19522, 19529)
assert reas_c.get_portRange() == 3

This reassembler has
   7 threads;
   listening on ports 19522:19529;
   using portRange: 3.


In [22]:
# 4 threads with portRange override
rflags_m.portRange = 10

reas_d = e2sar_py.DataPlane.Reassembler(
    reas_uri, e2sar_py.IPAddress.from_string(DP_IPV4_ADDR), 19522, 4, rflags_m)

print("This reassembler has")
print(f"   {reas_d.get_numRecvThreads()} threads;")
print(f"   listening on ports {reas_d.get_recvPorts()[0]}:{reas_d.get_recvPorts()[1]};")
print(f"   using portRange: {reas_d.get_portRange()}.")

assert reas_d.get_numRecvThreads() == 4
assert reas_d.get_recvPorts() == (19522, 20545)
assert reas_d.get_portRange() == 10


This reassembler has
   4 threads;
   listening on ports 19522:20545;
   using portRange: 10.


In [23]:
# 4 threads with low portRange override
rflags_m.portRange = 1

reas_e = e2sar_py.DataPlane.Reassembler(
    reas_uri, e2sar_py.IPAddress.from_string(DP_IPV4_ADDR), 19522, 4, rflags_m)

print("This reassembler has")
print(f"   {reas_e.get_numRecvThreads()} threads;")
print(f"   listening on ports {reas_e.get_recvPorts()[0]}:{reas_e.get_recvPorts()[1]};")
print(f"   using portRange: {reas_e.get_portRange()}.")

assert reas_e.get_numRecvThreads() == 4
assert reas_e.get_recvPorts() == (19522, 19523)
assert reas_e.get_portRange() == 1

This reassembler has
   4 threads;
   listening on ports 19522:19523;
   using portRange: 1.


## DPReasTest4

Test segmentation and reassembly on local host (no control plane) with multiple segementer threads.


In [25]:
# Create 4 segmenter objects

SEG_URI_BASE = f"ejfat://useless@192.168.100.1:9876/lb/1?sync=192.168.0.1:12345&data={DP_IPV4_ADDR}"
SEG_URI_PORT_BASE = 19522

sflags = e2sar_py.DataPlane.SegmenterFlags()
sflags.syncPeriodMs = 1000
sflags.syncPeriods = 5
sflags.useCP = False

def create_seg_obj(
        seg_idx: int,
        data_idx, evt_SrcId,
        seg_flags : e2sar_py.DataPlane.SegmenterFlags
        ) -> e2sar_py.DataPlane.Segmenter:
    seg_port = SEG_URI_PORT_BASE + seg_idx
    seg_uri = SEG_URI_BASE + ":" + str(seg_port)
    print(f"Creating Segmenter with uri:\n    {seg_uri}")
    return e2sar_py.DataPlane.Segmenter(
        e2sar_py.EjfatURI(uri=seg_uri, tt=e2sar_py.EjfatURI.TokenType.instance),
        data_idx, evt_SrcId, seg_flags)


seg_obj_list = []
for i in range(4):
    seg_obj_list.append(create_seg_obj(
        i, data_id, eventSrc_id, sflags))
assert len(seg_obj_list) == 4



Creating Segmenter with uri:
    ejfat://useless@192.168.100.1:9876/lb/1?sync=192.168.0.1:12345&data=127.0.0.1:19522
Creating Segmenter with uri:
    ejfat://useless@192.168.100.1:9876/lb/1?sync=192.168.0.1:12345&data=127.0.0.1:19523
Creating Segmenter with uri:
    ejfat://useless@192.168.100.1:9876/lb/1?sync=192.168.0.1:12345&data=127.0.0.1:19524
Creating Segmenter with uri:
    ejfat://useless@192.168.100.1:9876/lb/1?sync=192.168.0.1:12345&data=127.0.0.1:19525


In [26]:
# Create reassembler with no control plane. 1 recv thread listening on 4 ports.

rflags = e2sar_py.DataPlane.ReassemblerFlags()
rflags.useCP = False  # turn off CP
rflags.withLBHeader = True   # LB header will be attached since there is no LB
rflags.portRange = 2   # use 2^portRange=4 ports

reas = e2sar_py.DataPlane.Reassembler(
    reas_uri, e2sar_py.IPAddress.from_string(DP_IPV4_ADDR), 19522, 1, rflags)
print("This reassembler has")
print(f"   {reas.get_numRecvThreads()} threads;")
print(f"   listening on ports {reas.get_recvPorts()[0]}:{reas.get_recvPorts()[1]};")
print(f"   using portRange: {reas.get_portRange()}.")

This reassembler has
   1 threads;
   listening on ports 19522:19525;
   using portRange: 2.


In [27]:
# Start the 4 segmenters
for i in range(4):
    print(f"Seg{i + 1}.openAndStart()")
    res = seg_obj_list[i].OpenAndStart()
    if (res.has_error()):
        print(f"    Error encountered opening sockets and starting segmenter{i + 1} threads: {res.error().message()}")
    assert res.value() == 0

Seg1.openAndStart()
Seg2.openAndStart()
Seg3.openAndStart()
Seg4.openAndStart()


In [28]:
# Start the reassembler
res = reas.OpenAndStart()
if (res.has_error()):
    print(f"    Error encountered opening sockets and starting reassembler threads: {res.error().message()}")
    assert res.value() == 0

In [29]:
# Get the sender stats
for i in range(4):
    res = seg_obj_list[i].getSendStats()
    if (res[1] != 0):
        print(f"Error encountered after opening send socket in segmenter{i + 1}: {res[2]}")

In [30]:
def get_send_str(idx):
    prefix_str = "THIS IS A VERY LONG EVENT MESSAGE FROM SEGMENTER"
    suffix_str = " WE WANT TO SEND EVERY 1 MILLISECOND."
    return prefix_str + str(idx) + suffix_str

def send_data_async(
        segmenter : e2sar_py.DataPlane.Segmenter,
        send_str : str,
        seg_id: int,
        msg_id: int) -> None:
    # print(f"Segmenter{seg_id + 1} is sending message {msg_id + 1}...")
    res = segmenter.addToSendQueue(send_str.encode('utf-8'), len(send_str),(1 + seg_id) * 1000 + msg_id)
    assert res.value() == 0
    res = segmenter.getSendStats()
    # print(f"  getSendStats:\t {res}")
    if (res[1] != 0):
        print(f"  SendStats: {res}")
        print(f"  Error encountered sending event frames in Segmenter{seg_id + 1}: {res[2]}")

for msg_id in range(5):
    for seg_id in range(4):
        send_data_async(seg_obj_list[seg_id], get_send_str(seg_id), seg_id, msg_id)
        time.sleep(0.001)

# Print the sent frames
for seg_id in range(4):
    res = seg_obj_list[seg_id].getSendStats()
    print(f"Segmenter {seg_id + 1} sent {res[0]} data frames")
    assert(res[1] == 0)


Segmenter 1 sent 5 data frames
Segmenter 2 sent 5 data frames
Segmenter 3 sent 5 data frames
Segmenter 4 sent 5 data frames


In [31]:
# Prepare Python list to hold the output data
recv_bytes_list = [None]  # Placeholder for the byte buffer

# Call getEvent on the instance
for i in range(20):
    print(f"Receiving data...                Rd {i}")
    res, recv_event_len, recv_event_num, recv_data_id = reas.getEvent(recv_bytes_list)

    if (res != 0):  # -1 or -2 (has_error)
        continue
    recv = recv_bytes_list[0].decode('utf-8')
    print(f"  recv_buf:\t {recv}")
    print(f"  bufLen:\t {recv_event_len}")
    print(f"  eventNum:\t {recv_event_num}")
    print(f"  dataId:\t {recv_data_id}")

res = reas.getStats()
print(f"Reassembler stats: {res}")

assert(res[0] == 0)  # no losses
# assert(res[1] == 20)  # hold for the 1st try

Receiving data...                Rd 0
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE FROM SEGMENTER0 WE WANT TO SEND EVERY 1 MILLISECOND.
  bufLen:	 86
  eventNum:	 1000
  dataId:	 1285
Receiving data...                Rd 1
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE FROM SEGMENTER1 WE WANT TO SEND EVERY 1 MILLISECOND.
  bufLen:	 86
  eventNum:	 2000
  dataId:	 1285
Receiving data...                Rd 2
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE FROM SEGMENTER2 WE WANT TO SEND EVERY 1 MILLISECOND.
  bufLen:	 86
  eventNum:	 3000
  dataId:	 1285
Receiving data...                Rd 3
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE FROM SEGMENTER3 WE WANT TO SEND EVERY 1 MILLISECOND.
  bufLen:	 86
  eventNum:	 4000
  dataId:	 1285
Receiving data...                Rd 4
  recv_buf:	 THIS IS A VERY LONG EVENT MESSAGE FROM SEGMENTER0 WE WANT TO SEND EVERY 1 MILLISECOND.
  bufLen:	 86
  eventNum:	 1001
  dataId:	 1285
Receiving data...                Rd 5
  recv_buf:	 THIS IS A VERY LONG