# Using the Platform's Streaming API

The platform's Streaming API enables working with data in the platform as streams.
    For more information, see the platform's [streams](https://www.iguazio.com/docs/latest-release/data-layer/stream/) documentation.

## Initialize

In [1]:
import v3io.dataplane

Create a `dataplane` client:

In [2]:
v3io_client = v3io.dataplane.Client()

> **Note**: You can pass to the client the `endpoint` and `access_key` parameters explicitly.
> The following code is equivalent to the default values:
>
> ``` python
> from os import getenv
> v3io_client = v3io.dataplane.Client(endpoint='http://v3io-webapi:8081',
>                                     access_key=getenv('V3IO_ACCESS_KEY'))
> ```
>
> When running python code on local machine that connects to a remote Iguazio platform, you can obtain the URL of your cluster by copying the API URL of the web-APIs service (`webapi`) from the **Services** dashboard page. You can select between two types of URLs:
>
> - **HTTPS Direct** (recommended) &mdash; a URL of the format `https://<tenant IP>:<web-APIs port>`; for example, `https://default-tenant.app.mycluster.iguazio.com:8443`.
> - **HTTPS** &mdash; a URL of the format `https://webapi.<tenant IP>`; for example, `https://webapi.default-tenant.app.mycluster.iguazio.com`.
>
> You can get the access key from the platform dashboard: select the user-profile picture or icon from the top right corner of any page, and select **Access Keys** from the menu. In the **Access Keys** window, either copy an existing access key or create a new key and copy it. Alternatively, you can get the access key by checking the value of the `V3IO_ACCESS_KEY` environment variable in a web-shell or Jupyter Notebook service.
>
> For more information see the platform's [Data-Service Web-API General Structure](https://www.iguazio.com/docs/latest-release/data-layer/reference/web-apis/data-service-web-api-gen-struct/) documentation.

> **Number of maximum parallel connections**: Another noteworthy parameter is `max_connections`, which defines the number of maximum parallel connections when performing batch operations.
> If left unspecified, the default is 8 connections.
> For more information see the [Put Multiple Records](#Put-Multiple-Records) section in this tutorial.

### Set the Data Path

All data in the platform is stored in user-defined data containers.
This tutorial uses the predefined "users" container.
For more information refer to the platform's [data-containers](https://www.iguazio.com/docs/latest-release/data-layer/containers/) documentation.

In [3]:
CONTAINER = 'users'

Set the data path for storing the stream:

> **Note**: The following code uses the `V3IO_USERNAME` environment variable to store the data in the current user folder. When running python code on local machine that connects to a remote Iguazio platform, you should set this value to the user name you use for logging in to the system. Alternatively, you can get the user name by checking the value of the `V3IO_USERNAME` environment variable in a web-shell or Jupyter Notebook service.

In [4]:
from os import getenv, path

V3IO_USERNAME = getenv('V3IO_USERNAME')
STREAM_PATH = path.join(V3IO_USERNAME, 'data', 'v3io', 'stream')

## Create a Stream

Creates and configures a new stream. The new stream is available immediately upon its creation.

You must configure the stream's shard count (the number of shards in the stream). You can increase the shard count at any time, but you cannot reduce it.

You can also set the stream's retention period (default is 24 hours). After this period elapses, when new records are added to the stream, the earliest ingested records are deleted.

In [5]:
response = v3io_client.stream.create(container=CONTAINER,
                                     stream_path=STREAM_PATH,
                                     shard_count=8)
print(response.status_code)

204


## Put Records

Use the `put` method to add records to a stream.

The following example defines a function that converts text to lowercase words:

In [6]:
import re
regex_non_lower_case = re.compile('[^a-z]')

def text_to_words(text):
    # split into words
    words = text.split()
    # convert to lower case
    words = [w.lower() for w in words]
    # Keep only letters
    words = [regex_non_lower_case.sub('', w) for w in words]
    
    return words

text1 = "WOLF, meeting with a Lamb astray from the fold, resolved not to lay violent hands on him, but to find some plea to justify to the Lamb the Wolf’s right to eat him. He thus addressed him: “Sirrah, last year you grossly insulted me.” “Indeed,” bleated the Lamb in a mournful tone of voice, “I was not then born.” Then said the Wolf, “You feed in my pasture.” “No, good sir,” replied the Lamb, “I have not yet tasted grass.” Again said the Wolf, “You drink of my well.” “No,” exclaimed the Lamb, “I never yet drank water, for as yet my mother’s milk is both food and drink to me.” Upon which the Wolf seized him and ate him up, saying, “Well! I won’t remain supperless, even though you refute every one of my imputations.” The tyrant will always find a pretext for his tyranny."
words1 = text_to_words(text1)
len(words1)

146

The following code converts the list of words to a record.
A record is a list of dictionaries, where for each dictionary the `data` key contains the record data.
The sample code displays the first 5 records:

In [7]:
records = [{'data': word} for word in words1]
print(records[:5])

[{'data': 'wolf'}, {'data': 'meeting'}, {'data': 'with'}, {'data': 'a'}, {'data': 'lamb'}]


In [8]:
response = v3io_client.stream.put_records(container=CONTAINER,
                                          stream_path=STREAM_PATH,
                                          records=records)
print(response.status_code)

200


The following code writes another set of records to the stream:

In [9]:
text2 = "A BAT who fell upon the ground and was caught by a Weasel pleaded to be spared his life. The Weasel refused, saying that he was by nature the enemy of all birds. The Bat assured him that he was not a bird, but a mouse, and thus was set free. Shortly afterwards the Bat again fell to the ground and was caught by another Weasel, whom he likewise entreated not to eat him. The Weasel said that he had a special hostility to mice. The Bat assured him that he was not a mouse, but a bat, and thus a second time escaped. It is wise to turn circumstances to good account."
response = v3io_client.stream.put_records(container=CONTAINER,
                                          stream_path=STREAM_PATH,
                                          records=[{'data': word} for word in text_to_words(text2)])

## Get Records

Multiple consumer instances can consume data from the same stream.
A consumer retrieves records from a specific shard.
It's recommended that you distribute the data consumption among several consumer instances ("workers"), assigning each instance one or more shards.

For each shard, the consumer should determine the location within the shard from which to begin consuming records.
This can be the earliest ingested record, the end of the shard, the first ingested record starting at a specific time, or a specific record identified by its sequence number (a unique record identifier that is assigned by the platform based on the record’s ingestion time).
The consumer first uses a seek operation to obtain the desired consumption location, and then provides this location as the starting point for its record consumption.
The consumption operation returns the location of the next record to consume within the shard, and this location should be used as the location for a subsequent consumption operation, allowing for sequential record consumption.

In [10]:
shard_id = 0
seek_response = v3io_client.stream.seek(container=CONTAINER,
                                       stream_path=STREAM_PATH,
                                       shard_id=shard_id,
                                       seek_type='EARLIEST')
location = seek_response.output.location

Use the `get_records` method to read from a specific shard of the stream (retrieve records).
By default `get_records` limits the number of records returned to 1,000.
For the sake of this demonstration, the sample code limits the number of returned records to 10 by setting the `limit` parameter.

> **Note**: You will notice that some words are missing, this is because we sent different words on different shards and we query only shard '0' in this example.

In [11]:
get_response = v3io_client.stream.get_records(container=CONTAINER,
                                              stream_path=STREAM_PATH,
                                              shard_id=0,
                                              location=location,
                                              limit=10)
words_shard0 = [record.data.decode('utf-8') for record in get_response.output.records]
print(words_shard0)

['wolf', 'fold', 'him', 'to', 'him', 'you', 'in', 'not', 'feed', 'the']


In [12]:
location = get_response.output.next_location

get_response = v3io_client.stream.get_records(container=CONTAINER,
                                          stream_path=STREAM_PATH,
                                          shard_id=0,
                                          location=location,
                                          limit=10)
words_shard0 = [record.data.decode('utf-8') for record in get_response.output.records]
print(words_shard0)

['again', 'well', 'drank', 'is', 'which', 'up', 'though', 'the', 'his', 'ground']


## Describe Stream

Use the `describe` method to retrieve a stream's configuration, including the shard count and retention period.

In [13]:
response = v3io_client.stream.describe(container=CONTAINER,
                                       stream_path=STREAM_PATH)
print(response.status_code)

200


In [14]:
print(f'Shards: {response.output.shard_count}\nRetention period: {response.output.retention_period_hours} hours')

Shards: 8
Retention period: 24 hours


## Update Stream

Use the `update` method to updates a stream's configuration by increasing its shard count.
The changes are applied immediately.

> **Note**: You can increase the shard count at any time, but you cannot reduce it.
> From the platform's perspective, there's virtually no cost to creating even thousands of shards.
> However, if you increase a stream's shard count after its creation, new records with a previously used partition key will be assigned either to the same shard that was previously used for this partition key or to a new shard.
> For more information see the platform's [stream sharding and partitioning](https://www.iguazio.com/docs/latest-release/data-layer/stream/#stream-sharding-and-partitioning) documentation.

> **Note**: To use the streaming API to consume records from new shards after a shard-count increase, you must first restart the consumer application.

In [15]:
response = v3io_client.stream.update(container=CONTAINER,
                                     stream_path=STREAM_PATH,
                                     shard_count=10)
print(response.status_code)

204


Describe the stream again to see the updated shard count:

In [16]:
response = v3io_client.stream.describe(container=CONTAINER,
                                       stream_path=STREAM_PATH)
print(response.output.shard_count)

10


## Assigning Shard IDs

In the previous section, when you retrieved the stream records you didn't get all words.
The reason is that by default, the platform assigns records to shards using a Round Robin algorithm, and the sample consumer code reads from a single shard.
If you would like to ensure that a consumer gets all the words, you can optionally assign a record to specific stream shard by specifying a related shard ID by setting the `shard_id` value, or associate the record with a specific partition key to ensure that similar records are assigned to the same shard (by setting the `partition_key` value).
For more information see the platform's [stream sharding and partitioning](https://www.iguazio.com/docs/latest-release/data-layer/stream/#stream-sharding-and-partitioning) documentation.

Let's ignore the existing shards (shards 0 through 9) and increase the number of shards

In [17]:
response = v3io_client.stream.update(container=CONTAINER,
                                     stream_path=STREAM_PATH,
                                     shard_count=12)

Write the first text to shard 10:

In [18]:
response = v3io_client.stream.put_records(container=CONTAINER,
                                          stream_path=STREAM_PATH,
                                          records=[{'data': word, 'shard_id': 10} for word in words1])

And write the second text to shard 11:

In [19]:
response = v3io_client.stream.put_records(container=CONTAINER,
                                          stream_path=STREAM_PATH,
                                          records=[{'data': word, 'shard_id': 11} for word in text_to_words(text2)])

Now, read from shard 10:

In [20]:
shard_id = 10
seek_response = v3io_client.stream.seek(container=CONTAINER,
                                   stream_path=STREAM_PATH,
                                   shard_id=shard_id,
                                   seek_type='EARLIEST')
location = seek_response.output.location

get_response = v3io_client.stream.get_records(container=CONTAINER,
                                          stream_path=STREAM_PATH,
                                          shard_id=shard_id,
                                          location=location)
words_shard10 = [record.data.decode('utf-8') for record in get_response.output.records]
print(f'{words_shard10[:10]}...')
if(words_shard10 == words1[:len(words_shard10)]):
    print("All words appear as expected")
else:
    print("Error, something is wrong")

['wolf', 'meeting', 'with', 'a', 'lamb', 'astray', 'from', 'the', 'fold', 'resolved']...
All words appear as expected


## Put Multiple Records

One way to increase performance is to send many requests towards the data layer and wait for all the responses to arrive (rather than send each request and wait for the response).
The SDK supports this through batching.
Any API call can be made through the client's built in `batch` object.
The API call receives the exact same arguments it would normally receive (except for `raise_for_status`), and does not block until the response arrives.
To wait for all pending responses, call the `wait` method of the `batch` object.

> **Note**: The number of parallel connections is determined by the `max_connections` parameter when you created the client. For instance, to set 16 parallel connections you should have in the beginning of the notebook `v3io_client = v3io.dataplane.Client(max_connections=16)`. The default is 8 connections.

> **Note**: The SDK also supports asynchronous API, which may also be useful to put multple objects. This capability is not demonstrated here, but you can read about it in the [v3io-py readme](https://github.com/v3io/v3io-py/blob/development/README.md#support-for-asyncio-experimental).

`put_records` accepts up to a maximum of 1,000 records.
If the records limit is exceeded, the request fails.
Therefore, you need to call `put_records` multiple times.
The following example sends a large number of events.
It creates a generator that returns a list of events.
Each event has the following record:
``` python
'user': <user_id>
'time': <event_time>
'url': <url>
```
Each user is selected at random.
The URL is also be a random string, and the `event_time` of the generated events is monotonically increased.

In [21]:
import random
import string
from datetime import timedelta

def generate_events(users, start_time, num_events, batch_size):
    curr_time = start_time
    
    # letters to use for the URL generation 
    url_letters = string.ascii_lowercase + string.digits
    
    # Iterate through the number of events in batch_size increments
    for i in range(0, num_events, batch_size):
        events = []
        for j in range(batch_size):
            # choose a length to generate for the URL suffix
            length = random.choice(range(20,30))
            
            # choose a user and the URL
            event = {
                'user': random.choice(users),
                'time': curr_time,
                'url': f"https://example.com/{''.join(random.choice(url_letters) for i in range(length))}"
            }
            
            # increase the time for the next event randomly by a range of 0 to 10000 microseconds
            curr_time += timedelta(microseconds=random.choice(range(0,10000)))
            events.append(event)
            
        # yield returns the events as a generator, we will receive this in our loop
        yield events

Test the generator by printing a few events:

In [22]:
random.seed(42)
from datetime import datetime

for events in generate_events(users=[f'user_{index}' for index in range(0,100)],
                              start_time=datetime(2020, 10, 7, 5, 53, 0, 69161),
                              num_events=4,
                              batch_size=2):
    print(events)

[{'user': 'user_3', 'time': datetime.datetime(2020, 10, 7, 5, 53, 0, 69161), 'url': 'https://example.com/rpoig8f1cbfno6b9m80o2'}, {'user': 'user_0', 'time': datetime.datetime(2020, 10, 7, 5, 53, 0, 78815), 'url': 'https://example.com/k1vrjnvgfygwwqc38hyf9sxm'}]
[{'user': 'user_84', 'time': datetime.datetime(2020, 10, 7, 5, 53, 0, 79954), 'url': 'https://example.com/osfogyr3xkxwnrek8pk3'}, {'user': 'user_81', 'time': datetime.datetime(2020, 10, 7, 5, 53, 0, 86170), 'url': 'https://example.com/9oudocuzrenun5z3jqip98q1'}]


Seek the latest position in one of the shards, in order to later retrieve the new data:

In [23]:
shard_id = 0
seek_response = v3io_client.stream.seek(container=CONTAINER,
                                   stream_path=STREAM_PATH,
                                   shard_id=shard_id,
                                   seek_type='LATEST')
location = seek_response.output.location

In [24]:
import json

num_users = 10000
num_events = 100000
batch_size = 1000

for events in  generate_events(users=[f'user_{index}' for index in range(num_users)],
                               start_time=datetime.now(),
                               num_events=num_events,
                               batch_size=batch_size):
    records = [{'data': json.dumps(event, default=str), 'partition_key': event.get('user')} for event in events]
    
    v3io_client.batch.stream.put_records(container=CONTAINER,
                                         stream_path=STREAM_PATH,
                                         records=records)

# wait for all writes to complete
responses = v3io_client.batch.wait()    

The looped `put_records` interface in the previous code block sends all `put_records` requests to the data layer in parallel.
When `wait` is called, it blocks until either all responses arrive &mdash; in which case it returns a `Responses` object that contains the `responses` of each call &mdash; or an error occurs &mdash; in which case an exception is thrown.
You can pass `raise_for_status` to `wait`, and it behaves as previously explained.

> **Note:** The `batch` object is stateful, therefore you can only create one batch at a time.
> However, you can create multiple parallel batches yourself through the client's `create_batch` interface.

Test the received record from the previously obtained location:

In [25]:
get_response = v3io_client.stream.get_records(container=CONTAINER,
                                          stream_path=STREAM_PATH,
                                          shard_id=shard_id,
                                          location=location)
records = get_response.output.records

print(f'Received {len(records)} records')
print(f'First record: {json.loads(get_response.output.records[0].data)}')

Received 1000 records
First record: {'user': 'user_7491', 'time': '2022-09-15 17:53:54.679908', 'url': 'https://example.com/uea3ge8n6qiwepxsk28t7a9t'}


## Delete Stream

Deletes a stream object along with all of its shards.

In [26]:
response = v3io_client.stream.delete(container=CONTAINER, stream_path=STREAM_PATH)
print(response.status_code)

204


Alternatively, when running on the Iguazio platform (not from remote), you can use the following command:

In [None]:
import shutil
V3IO_STREAM_PATH = path.join(path.sep, 'v3io', CONTAINER, STREAM_PATH)
shutil.rmtree(V3IO_STREAM_PATH)

Or use this command:

In [None]:
!rm -r $V3IO_STREAM_PATH