# 02. Using a custom stream
This tutorial shows how to use segyio with your own I/O.

In [None]:
## azure-storage-blob and smart_open libraries are needed only if you wish run azure examples.
%pip install dotenv pytest smart_open[azure] azure-storage-blob

In [None]:
import os
import shutil
import pytest

import segyio

## Using simple stream
Users can run segyio with any file-like object I/O.

In [None]:
stream = open("viking_small.segy", "rb")
with segyio.open_with(stream=stream, ignore_geometry=True) as f:
    print(f'trace count: {f.tracecount}')

# >>> trace count: 480

Such I/O freedom comes with the cost:
- First of all, it would always be slower than working with local files through `segyio.open()`.
- Secondly, applying custom sources might be incompatible with segyio in unpredictable ways (dev team would provide very limited assistance with those).

Next part of the tutorial shows some problems users might encounter.



## Example of cloud I/O

This part of tutorial uses Azure cloud and `smart_open` library, but its logic can be tailored to other I/Os.

To run the code below you need to 
 1) have `python/test-data/tiny.sgy` available in your Azure Blob Storage
 2) define `AZURE_ACCOUNT_NAME` (e.g. myaccount), `AZURE_BLOB_PATH` (e.g. container/tiny.sgy) and `AZURE_SAS_TOKEN` (with read/write permissions to tiny.sgy) environment variables in `.env` file.

💰 **Warning**: be aware of your cloud pricing model, as segyio would send to I/O as many requests as it would to a local disk.

In [None]:
import smart_open
from azure.storage.blob import BlobServiceClient
from dotenv import load_dotenv

load_dotenv()

def open_azure(uri: str, mode: str):
    if not mode.endswith('b'):
        mode += 'b'
    account_name = os.environ['AZURE_ACCOUNT_NAME']
    sas_token = os.environ['AZURE_SAS_TOKEN']
    account_url = f"https://{account_name}.blob.core.windows.net/?{sas_token}"
    transport_params = {
        'client': BlobServiceClient(account_url=account_url),
    }
    return smart_open.open(uri, mode, transport_params=transport_params)


blob = os.environ['AZURE_BLOB_PATH']
uri = f'azure://{blob}'
with segyio.open_with(stream=open_azure(uri, "rb")) as f:
    print(f'trace count: {f.tracecount}')

# >>> trace count: 6

## Testing
 
You can optionally try running some of the existing segyio tests on your own I/O to discover what should work and what definitely does not. For this you need to
1) have segyio tests source code available.
2) have `python/test-data/tiny.sgy` file present in your I/O system.

⚠️ **Note**: this information is presented only for convenience and is not an official part of segyio. There is no guarantee that testing framework won't be changed in the future without any notice.


In [None]:
# To run the tests and find out which operations are supported and how well they are supported,
# you need to run stream.py tests, pass "--custom_stream" marker and specify a plugin which knows how to make your stream.
def run_pytest(plugin):
   pytest.main(["--custom_stream", "--durations=5", "../test/stream.py"], plugins=[plugin])

### Running tests on local file
Note that running `segyio.open_with` on a local file obtained through Python's own `open(filename)` is slower than running `segyio.open(filename)` .

In [None]:
# Define your plugin class which implements 'make_stream' fixture.
# Testing framework will call 'make_stream(path, mode)'.
#
# Note that some tests will make a call several times to the same path. It is up
# to the user to assure that call to the same path returns the same data (think:
# creating or updating a file and then opening it again for reading).

class PyFilePlugin:
    @pytest.fixture(scope="function")
    def make_stream(self, tmp_path_factory):
        filepaths = {}

        def _make_stream(path, mode):
            filename = os.path.basename(path)
            if filename in filepaths:
                return open(filepaths[filename], mode)

            path_r = "../../test-data/" + filename
            path_w = tmp_path_factory.getbasetemp() / filename
            if mode == "rb":
                path = path_r
            elif mode == "w+b":
                path = path_w
            elif mode == "r+b":
                shutil.copy(path_r, path_w)
                path = path_w
            else:
                raise ValueError(f"Unsupported mode {mode}")

            filepaths.update({filename: path})
            return open(path, mode)
        yield _make_stream

In [None]:
run_pytest(PyFilePlugin())
# >>> [all expected tests passed]
# >>> ============================= slowest 5 durations ==============================
# >>> 0.01s call     test/stream.py::test_update
# >>> 0.01s call     test/stream.py::test_create
# >>> 0.01s teardown test/stream.py::test_reopen_closed
# >>> 0.01s call     test/stream.py::test_read[True]
# >>> 
# >>> (1 durations < 0.005s hidden.  Use -vv to show these durations.)
# >>> ========================= 9 passed, 3 xfailed in 0.32s =========================

### Running tests on cloud

Be aware that running tests on cloud can be slow as many requests are sent over.

In [None]:
class AzureDatasourcePlugin:
    def __init__(self, open_stream=open_azure):
        self.open_stream = open_stream

    @pytest.fixture(scope="function")
    def make_stream(self):
        uris = {}

        def _make_stream(path, mode):
            if path in uris:
                return self.open_stream(uris[path], mode)
            
            blob = os.environ['AZURE_BLOB_PATH']
            
            # note that used library doesn't actually support r+ and w+ modes,
            # but uri must be valid 
            if mode == "rb":
                uri = f'azure://{blob}'
            elif mode == "w+b":
                # hardcoded for simplicity
                uri = f'azure://test/new.sgy'
            elif mode == "r+b":
                # data could be copied to different blob to avoid overwriting,
                # but overwriting here for simplicity
                uri = f'azure://{blob}'
            else:
                raise ValueError(f"Unsupported mode {mode}")

            uris.update({path: uri})
            return self.open_stream(uri, mode)
        yield _make_stream

In [None]:
run_pytest(AzureDatasourcePlugin())
# >>> ============================================================================================= slowest 5 durations =============================================================================================
# >>> 9.71s call     test/stream.py::test_read[False]
# >>> 9.26s call     test/stream.py::test_read[True]
# >>> 1.97s call     test/stream.py::test_reopen_closed
# >>> 1.20s call     test/stream.py::test_read_cube
# >>> 1.10s call     test/stream.py::test_resource_deallocation[2]
# >>> =========================================================================================== short test summary info ===========================================================================================
# >>> FAILED ../test/stream.py::test_create - NotImplementedError: Azure Blob Storage support for mode 'wb+' not implemented
# >>> FAILED ../test/stream.py::test_update - NotImplementedError: Azure Blob Storage support for mode 'rb+' not implemented

We can see that `test_create` and `test_update` failed as functionality is not supported by this I/O. That informs us that this I/O is compatible only with segyio read functionality.

We also see that reading is very slow compared to local file, which is to be expected.


## Custom file-like objects

Sometimes it is desirable to implement own version of file-like object.

⚠️ **Note**: from segyio perspective implementing `read`/`write`/`seek`/`tell`/`flush`/`close`/`writable` methods with correct [signatures](https://github.com/python/cpython/blob/main/Lib/_pyio.py) should be a good starting point, but we do not guarantee that required functions won't be changed.

Instead of implementing brand new file-like object, lets just tweak `read` method of our Azure object to speed things up a bit.

In [None]:
# dumb, untested and buggy speedup, demo purpose only:
# cache last read chunk of data and use it if next request hits it
class AzureSpeedupIO():
    def __init__(self, azure_smart_open):
        self.azure = azure_smart_open
        self.cache = b''
        self.cache_tell = 0

    def __getattr__(self, name):
        return getattr(self.azure, name)

    def read(self, size):
        current_tell = self.azure.tell()
        offset = current_tell - self.cache_tell

        if offset < 0 or len(self.cache) - offset < size:
            greedy_read_size = 1024
            requested = max(size, greedy_read_size)
            self.cache = self.azure.read(requested)
            self.cache_tell = current_tell
            offset = 0
        else:
            # read must change tell position. ignore seek > EOF compatibility
            self.azure.seek(size, 1)

        return self.cache[offset:offset + size]


def open_stream_speedup(filename, mode):
    return AzureSpeedupIO(open_azure(filename, mode))

In [None]:
run_pytest(AzureDatasourcePlugin(open_stream=open_stream_speedup))
# >>> ============================================================================================= slowest 5 durations =============================================================================================
# >>> 3.29s call     test/stream.py::test_read[False]
# >>> 3.25s call     test/stream.py::test_read[True]
# >>> 0.90s call     test/stream.py::test_reopen_closed
# >>> 0.52s call     test/stream.py::test_read_cube
# >>> 0.47s call     test/stream.py::test_not_closing_does_not_cause_segfault

💡 **Note**: we see that tests became significantly faster, which shows how I/O implementation affects segyio performance.