In [1]:
from IPython.display import Image 

In [2]:
import time

from ocean_lib.ocean.ocean import Ocean
from ocean_lib.config import Config

config = Config('config.ini')
ocean = Ocean(config)

print(f"config.network_url = '{config.network_url}'")
print(f"config.block_confirmations = {config.block_confirmations.value}")
print(f"config.metadata_cache_uri = '{config.metadata_cache_uri}'")
print(f"config.provider_url = '{config.provider_url}'")

config.network_url = 'https://rinkeby.infura.io/v3/d163c48816434b0bbb3ac3925d6c6c80'
config.block_confirmations = 0
config.metadata_cache_uri = 'https://aquarius.oceanprotocol.com'
config.provider_url = 'https://provider.rinkeby.oceanprotocol.com'


In [3]:
import os
from ocean_lib.web3_internal.wallet import Wallet

wallet = Wallet(ocean.web3, os.getenv('MY_TEST_KEY'), transaction_timeout=20, block_confirmations=config.block_confirmations)

print(f"public address = '{wallet.address}'")

public address = '0x2338e4e94AEe1817701F65f2c751f7c844b0e43b'


In [4]:
DATA_ddo = ocean.assets.resolve("did:op:1F7eEDD29299F6aa33d4711b8e6e122466f199e1")
data_token = ocean.get_data_token(DATA_ddo.data_token_address)
token_address = data_token.address

print(f"Data token info = '{DATA_ddo.values['dataTokenInfo']}'")
print(f"Dataset name = '{DATA_ddo.metadata['main']['name']}'")
print(f"Data token address = '{data_token.address}'")

Data token info = '{'address': '0x1F7eEDD29299F6aa33d4711b8e6e122466f199e1', 'name': 'Astonishing Cormorant Token', 'symbol': 'ASTCOR-81', 'decimals': 18, 'cap': 1000.0}'
Dataset name = 'sample-data-testing-EEG-zip'
Data token address = '0x1F7eEDD29299F6aa33d4711b8e6e122466f199e1'


## Acquire datatokens for data and algorithm

For compute-to-data, we need at least one data token and one algorithm token. Let's check if we have any of the required data tokens in our wallet.

In [5]:
token_address = data_token.address

In [6]:
from ocean_lib.web3_internal.currency import pretty_ether_and_wei
print(f"Data Scientist has {pretty_ether_and_wei(data_token.balanceOf(wallet.address), data_token.symbol())} data tokens.")

Data Scientist has 0 ASTCOR-81 (0 wei) data tokens.


You won't have any the first time you run this code (or after you run a compute job). We can either purchase some data tokens using the Ocean marketplace app or using the Python API.

There are 2 options for publishing datasets on the Ocean marketplace. You can publish with fixed price or dynamic pricing. For simplicity, we have published the BCI dataset with fixed price. 
The code below is taken from the ocean.py tutorial for buying data tokens with [fixed price](https://github.com/oceanprotocol/ocean.py/blob/8087ca8d7bfcd489fead45b59cdf5021d21e2d9d/READMEs/fixed-rate-exchange-flow.md). 

In [7]:
#Search for exchange_id from a specific block retrieved at 3rd step
#for a certain data token address (e.g. token_address).
logs = ocean.exchange.search_exchange_by_data_token(token_address)
#E.g. First exchange is the wanted one.
exchange_id = logs[0].args.exchangeId

    Searched blocks 10310451-10311450. TokenRegistered event not yet found.
    Searched blocks 10309451-10310450. TokenRegistered event not yet found.
    Searched blocks 10308451-10309450. TokenRegistered event not yet found.
    Searched blocks 10307451-10308450. TokenRegistered event not yet found.
    Searched blocks 10306451-10307450. TokenRegistered event not yet found.
    Searched blocks 10305451-10306450. TokenRegistered event not yet found.
    Searched blocks 10304451-10305450. TokenRegistered event not yet found.
    Searched blocks 10305607-10306606. 1 ExchangeCreated events detected so far.
    Searched blocks 10306607-10307606. 1 ExchangeCreated events detected so far.
    Searched blocks 10307607-10308606. 1 ExchangeCreated events detected so far.
    Searched blocks 10308607-10309606. 1 ExchangeCreated events detected so far.
    Searched blocks 10309607-10310606. 1 ExchangeCreated events detected so far.
    Searched blocks 10310607-10311606. 1 ExchangeCreated events

In [8]:
from ocean_lib.web3_internal.currency import to_wei
tx_result = ocean.exchange.buy_at_fixed_rate(to_wei(1), wallet, to_wei(5), exchange_id, token_address)
assert tx_result, "failed buying data tokens"

In [9]:
print(f"Data Scientist has {pretty_ether_and_wei(data_token.balanceOf(wallet.address), data_token.symbol())} data tokens.")

Data Scientist has 1 ASTCOR-81 (1000000000000000000 wei) data tokens.


Let's purchase some algorithm tokens. 

In [10]:
ALG_ddo = ocean.assets.resolve("did:op:617e7d2c21A99DB19A0435B1C704d4494c6115de")
alg_token = ocean.get_data_token(ALG_ddo.data_token_address)

print(f"Alg token info = '{ALG_ddo.values['dataTokenInfo']}'")
print(f"Alg name = '{ALG_ddo.metadata['main']['name']}'")

Alg token info = '{'address': '0x617e7d2c21A99DB19A0435B1C704d4494c6115de', 'name': 'BCITEST0', 'symbol': 'BCITEST1', 'decimals': 18, 'cap': 1000.0}'
Alg name = 'BCI Algorithm'


In [11]:
print(f"Data Scientist has {pretty_ether_and_wei(alg_token.balanceOf(wallet.address), alg_token.symbol())} algorithm tokens.")

Data Scientist has 57.1 BCITEST1 (57137857142857142900 wei) algorithm tokens.


## Start compute job

Only inputs needed: DATA_did, ALG_did. Everything else can get computed as needed.

In [12]:
DATA_did = DATA_ddo.did

compute_service = DATA_ddo.get_service('compute')

from ocean_lib.web3_internal.constants import ZERO_ADDRESS
from ocean_lib.models.compute_input import ComputeInput

In [13]:
# order & pay for dataset
dataset_order_requirements = ocean.assets.order(
    DATA_did, wallet.address, service_type=compute_service.type
)
DATA_order_tx_id = ocean.assets.pay_for_service(
        ocean.web3,
        dataset_order_requirements.amount,
        dataset_order_requirements.data_token_address,
        DATA_did,
        compute_service.index,
        ZERO_ADDRESS,
        wallet,
        dataset_order_requirements.computeAddress,
    )

If a cell shows an error, try to run it again.

In [14]:
ALG_did = ALG_ddo.did

algo_service = ALG_ddo.get_service('access')

# order & pay for algo
algo_order_requirements = ocean.assets.order(
    ALG_did, wallet.address, service_type=algo_service.type
)
ALG_order_tx_id = ocean.assets.pay_for_service(
        ocean.web3,
        algo_order_requirements.amount,
        algo_order_requirements.data_token_address,
        ALG_did,
        algo_service.index,
        ZERO_ADDRESS,
        wallet,
        algo_order_requirements.computeAddress,
)

In [15]:
compute_inputs = [ComputeInput(DATA_did, DATA_order_tx_id, compute_service.index)]

In [16]:
job_id = ocean.compute.start(
    compute_inputs,
    wallet,
    algorithm_did=ALG_did,
    algorithm_tx_id=ALG_order_tx_id,
    algorithm_data_token=alg_token.address
)
print(f"Started compute job with id: {job_id}")

Started compute job with id: 253e62e8b77c4e57980e56fc67f111b5


## Monitor logs / algorithm output

You can check the job status as many times as needed:

In [17]:
status_dict = ocean.compute.status(DATA_did, job_id, wallet)
while status_dict['statusText'] != 'Job finished':
    status_dict = ocean.compute.status(DATA_did, job_id, wallet)
    print(status_dict)
    time.sleep(1)

{'ok': True, 'status': 20, 'statusText': 'Configuring volumes'}
{'ok': True, 'status': 20, 'statusText': 'Configuring volumes'}
{'ok': True, 'status': 20, 'statusText': 'Configuring volumes'}
{'ok': True, 'status': 20, 'statusText': 'Configuring volumes'}
{'ok': True, 'status': 20, 'statusText': 'Configuring volumes'}
{'ok': True, 'status': 20, 'statusText': 'Configuring volumes'}
{'ok': True, 'status': 20, 'statusText': 'Configuring volumes'}
{'ok': True, 'status': 20, 'statusText': 'Configuring volumes'}
{'ok': True, 'status': 20, 'statusText': 'Configuring volumes'}
{'ok': True, 'status': 30, 'statusText': 'Provisioning success'}
{'ok': True, 'status': 30, 'statusText': 'Provisioning success'}
{'ok': True, 'status': 40, 'statusText': 'Running algorithm '}
{'ok': True, 'status': 40, 'statusText': 'Running algorithm '}
{'ok': True, 'status': 60, 'statusText': 'Publishing results'}
{'ok': True, 'status': 60, 'statusText': 'Publishing results'}
{'ok': True, 'status': 60, 'statusText': '

This will output the status of the current job.
Here is a list of possible results: [Operator Service Status description](https://github.com/oceanprotocol/operator-service/blob/main/API.md#status-description).

Once you get `{'ok': True, 'status': 70, 'statusText': 'Job finished'}`, Bob can check the result of the job.

In [18]:
result = ocean.compute.result_file(DATA_did, job_id, 0, wallet)  # 0 index, means we retrieve the results from the first dataset index

Sometimes the result is empty. When this happens, I just start the compute job again.

In [19]:
str(result).split('\\n')

["b'Getting input...",
 'Reading from C2D container...',
 'DID: 1F7eEDD29299F6aa33d4711b8e6e122466f199e1',
 'Asset file /data/inputs/1F7eEDD29299F6aa33d4711b8e6e122466f199e1/0 exists: True',
 'Extracting data...',
 'Listing files...',
 '------ extracted',
 '--------- sample_data_single_column',
 '------------ sample_col_mathematic.feather',
 '------------ sample_col_music.feather',
 '------------ sample_col_eyesclosed.feather',
 '------------ sample_col_memory.feather',
 '------------ sample_col_eyesopen.feather',
 'Reading files...',
 'Data shape: (5, 61, 1)',
 "'"]