# 1. Installation procedure

### turn on autoreloading

In [1]:
%load_ext autoreload
%autoreload 2
%reload_ext autoreload 

### install ray library
uncomment this to run it once on your machine, then recomment it

In [2]:
#!pip install -U "ray[default]"

Collecting ray[default]
  Downloading ray-1.6.0-cp38-cp38-win_amd64.whl (15.6 MB)
Collecting grpcio>=1.28.1
  Using cached grpcio-1.39.0-cp38-cp38-win_amd64.whl (3.2 MB)
Collecting redis>=3.5.0
  Using cached redis-3.5.3-py2.py3-none-any.whl (72 kB)
Collecting aiohttp-cors
  Using cached aiohttp_cors-0.7.0-py3-none-any.whl (27 kB)
Collecting aioredis<2
  Using cached aioredis-1.3.1-py3-none-any.whl (65 kB)
Collecting opencensus
  Using cached opencensus-0.7.13-py2.py3-none-any.whl (127 kB)
Collecting py-spy>=0.2.0
  Using cached py_spy-0.3.8-py2.py3-none-win_amd64.whl (1.4 MB)
Collecting aiohttp
  Using cached aiohttp-3.7.4.post0-cp38-cp38-win_amd64.whl (635 kB)
Collecting gpustat
  Using cached gpustat-0.6.0-py3-none-any.whl
Collecting colorful
  Using cached colorful-0.5.4-py2.py3-none-any.whl (201 kB)
Collecting hiredis
  Using cached hiredis-2.0.0-cp38-cp38-win_amd64.whl (18 kB)
Collecting async-timeout
  Using cached async_timeout-3.0.1-py3-none-any.whl (8.2 kB)
Collecting blessin

### start ray server

you only need to run this once per jupyter session.

In [None]:
#MAC USERS: 

#Before running ipynb it is recommended to run this cell:
!ray stop #command line 

In [1]:
#ALL THE USERS:

#to obtain the local ip node, run:
!ray start --head --port=6379 --redis-password="cbgt2"

2021-09-28 03:21:09,466	INFO scripts.py:588 -- Local node IP: 192.168.1.208
2021-09-28 03:21:18,407	SUCC scripts.py:627 -- --------------------
2021-09-28 03:21:18,407	SUCC scripts.py:628 -- Ray runtime started.
2021-09-28 03:21:18,407	SUCC scripts.py:629 -- --------------------
2021-09-28 03:21:18,407	INFO scripts.py:631 -- Next steps
2021-09-28 03:21:18,407	INFO scripts.py:632 -- To connect to this Ray runtime from another node, run
2021-09-28 03:21:18,407	INFO scripts.py:636 --   ray start --address='192.168.1.208:6379' --redis-password='cbgt2'
2021-09-28 03:21:18,407	INFO scripts.py:641 -- Alternatively, use the following Python code:
2021-09-28 03:21:18,408	INFO scripts.py:644 -- import ray
2021-09-28 03:21:18,408	INFO scripts.py:645 -- ray.init(address='auto', _redis_password='cbgt2')
2021-09-28 03:21:18,408	INFO scripts.py:653 -- If connection fails, check your firewall settings and network configuration.
2021-09-28 03:21:18,408	INFO scripts.py:658 -- To terminate the Ray runtim

2021-09-28 03:21:16,186	INFO services.py:1245 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [None]:
#use the local ip node obtained and run:
!ray start --address='local ip node:6379' --redis-password='cbgt2' #command line 

In [None]:
#if it doesn't work, shut down the ray server and start it again
#if some ray processes are still opened, run:
!ray stop --force #command line 

### install autopep8

In [None]:
!pip install --upgrade autopep8

### how to run autopep8

uncomment and run this on any code files you create before pushing to github

In [None]:
#Example on how to use it to prevent indentation problems:
#!autopep8 --in-place --aggressive --aggressive <filename>.py

# 2. How to create pipelines 

### import the core cbgt code and any of your files

In [4]:
import cbgt as cbgt
#add import statement here for any files you make
#Example: 
#from tracetype import *
#import init_params as par 

2021-08-25 09:09:04,045	INFO worker.py:800 -- Connecting to existing Ray cluster at address: 192.168.1.201:6379


### create an empty pipeline

In [5]:
pl = cbgt.Pipeline()

### basic variable assignment

Use `pipeline.variablename` to write to a specific variable.

Current two kinds of basic assignment are permitted:

- assigning a "constant" (which can be a whole python expression but it's constant in that the value is calculated immediately and not during pipeline execution)
- copying a value from another pipeline variable

More is probably on the way

In [6]:
pl.hello = "Hello World!"
pl.world = pl.hello

In [7]:
# these are special objects, not ordinary object properties
pl.hello

<backend.VariablePlaceholder at 0x1e9593e3670>

In [8]:
#to check which kind of assignements we have introduced into the pipeline:
pl.modulelist

[<backend.BasicAssignmentModule at 0x1e9593e3700>,
 <backend.BasicCopyModule at 0x1e9593e39a0>]

### "code block" modules

A "code block" can be created in the form of a function that takes "self" as its only argument. (You can technically use any variable name.)

Python doesn't support true code blocks unlike some other languages.

Use `pipeline.add(yourfunction)` to add a code block.

In [9]:
def helper1(number):
    # this function is defined in a "normal way" outside the context of the pipeline.
    # this function be basically anything, it's just to demonstrate that outside functions
    # can be used in code block modules
    return (number+1, number+2)

In [10]:
def codeblock(self):
    # you can do pretty much anything you want in here
    self.y, self.z = helper1(self.x)
    self.x += 1234567
    

In [11]:
#add codeblock to the pipeline:
pl.add(codeblock)

<backend.Pipeline at 0x1e9593dd700>

In [13]:
#to check we have introduced the codeblock into the pipeline:
pl.modulelist

[<backend.BasicAssignmentModule at 0x1e9593e3700>,
 <backend.BasicCopyModule at 0x1e9593e39a0>,
 <backend.CodeTaskFunctionModule at 0x1ea9d97e8b0>]

### function modules

Functions can be transformed into modules by specifying their inputs and outputs.  There is a special syntax for doing this.

`pipeline.output = pipeline[function](arguments=whatever)`

If a function has multiple outputs (a tuple) then use the `.shape()` function to change the output into the needed shape. This function takes a length (or list of lengths for nested tuples) as input. 

In [16]:
def helper2(first,second):
    # function with 1 output and 2 inputs
    return "".join([first,second])

In [17]:
#Without using a pipeline, we use to call function in this way:
#joined = helper2(hello, second="!!!!")

In [18]:
#Using a pipeline:
pl.joined = pl[helper2](pl.hello, second="!!!!")
#arguments can be passed either by position or name, just like when you call the function normally

In [19]:
#to check we have introduced the function into the pipeline:
pl.modulelist

[<backend.BasicAssignmentModule at 0x1e9593e3700>,
 <backend.BasicCopyModule at 0x1e9593e39a0>,
 <backend.CodeTaskFunctionModule at 0x1ea9d97e8b0>,
 <backend.FunctionModule at 0x1ea9d97ed60>]

In [20]:
# you can pass in external values:
pl.a,pl.b = pl[helper1](10000).shape(2)

In [21]:
#to check we have introduced a second function into the pipeline:
pl.modulelist

[<backend.BasicAssignmentModule at 0x1e9593e3700>,
 <backend.BasicCopyModule at 0x1e9593e39a0>,
 <backend.CodeTaskFunctionModule at 0x1ea9d97e8b0>,
 <backend.FunctionModule at 0x1ea9d97ed60>,
 <backend.FunctionModule at 0x1ea9d97ec10>]

### pipeline composition

You can build up a pipeline and then `.add(...)` it to another pipeline.

You can use multiple `.add()` in one line.

In [22]:
#Defining a new function:

def countingblock(self):
    try:
        self.counter += 1
    except:
        self.counter = 1

In [23]:
#Creating another pipeline, containing the new function: 
anotherpipeline = cbgt.Pipeline().add(countingblock)

In [24]:
#You can create another pipeline, introducing more than 1 function (or the same function) at the same time:
anotherpipeline = cbgt.Pipeline().add(countingblock).add(countingblock).add(countingblock)

In [25]:
#to check we have well-created the new pipeline:
anotherpipeline.modulelist

[<backend.CodeTaskFunctionModule at 0x1ea9d97ebe0>,
 <backend.CodeTaskFunctionModule at 0x1ea9d97ea30>,
 <backend.CodeTaskFunctionModule at 0x1ea9d97ebb0>]

In [26]:
#Adding the new pipeline ('anotherpipeline') to the previous one ('pl'):
pl.add(anotherpipeline)

<backend.Pipeline at 0x1e9593dd700>

In [28]:
#To check we have added correctly 'anotherpipeline' to 'pl':
pl.modulelist

[<backend.BasicAssignmentModule at 0x1e9593e3700>,
 <backend.BasicCopyModule at 0x1e9593e39a0>,
 <backend.CodeTaskFunctionModule at 0x1ea9d97e8b0>,
 <backend.FunctionModule at 0x1ea9d97ed60>,
 <backend.FunctionModule at 0x1ea9d97ec10>,
 <backend.PipelineModule at 0x1ea9d97e7c0>]

# 3. How to run a pipeline

### running a pipeline

1. You can optionally create a dictionary of variable values as the initial state of the pipeline

2. You then use `executionmanager.run(pipeline,...)` which returns a new dictionary of the results

In [29]:
#Create a dictionary:
environment = {
    'x': 100,    #fill in with whatever you need as the starting variables
}

In [30]:
#Execute the pipeline: 
results = cbgt.ExecutionManager(cores=7).run(pl,environment)

In [31]:
#Print the obtained results: 
results

{'x': 1234667,
 'hello': 'Hello World!',
 'world': 'Hello World!',
 'y': 101,
 'z': 102,
 'joined': 'Hello World!!!!!',
 'a': 10001,
 'b': 10002,
 'counter': 3}

In [32]:
locals().update(results)

In [33]:
counter

3

2021-08-26 01:52:51,219	ERROR import_thread.py:88 -- ImportThread: Error while reading from socket: (10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)
2021-08-26 01:52:51,241	ERROR worker.py:468 -- print_logs: Error while reading from socket: (10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)
2021-08-26 01:52:51,242	ERROR worker.py:1191 -- listen_error_messages_raylet: Error while reading from socket: (10054, 'An existing connection was forcibly closed by the remote host', None, 10054, None)
