# Local execution
Sometimes (often) you want to check how your pipeline is working locally without connecting to the remote netUnicorn instance.

Here's the manual way of creating the pipeline and executing it.

At first, install the netunicorn-executor (which is required for the local executor) and all packages for the lab.

## Preparation

In [1]:
%pip install -U netunicorn-executor nest-asyncio requests dnspython

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


If you do it in the already running asyncio loop (e.g., in Jupyter), you also need to install and import nest_asyncio. This is not required if you run it as a Python script.

In [2]:
import requests
import socket
import nest_asyncio
nest_asyncio.apply()

Create your pipeline. We'll use a simple one right now to check whether it is working that does nothing except just succeeding.

In [3]:
from netunicorn.base import Pipeline, Task, Success, Failure

class DummyTask(Task):
    def run(self):
        return 0

pipeline = Pipeline().then(DummyTask()).then(DummyTask())

To get the executor that will run on your computer, you can call the `get_local_executor` function from the `netunicorn.executor` module. It will do all the required steps for you and return the executor.

In [4]:
from netunicorn.executor import get_local_executor
executor = get_local_executor(pipeline)

INFO:executor_local:Parsed configuration: Gateway located on fake
INFO:executor_local:Current directory: /home/satyandra/Downloads/netunicorn_examples


And then, call the executor to start running the pipeline.

In [5]:
executor()

INFO:executor_local:Execution is finished, start reporting results.
INFO:executor_local:Skipping reporting results due to execution graph setting.


Now you can explore locally create log file and `executor.pipeline_results`.

In [7]:
print(executor.execution_graph_results)

<Success: defaultdict(<class 'list'>, {'e488bbbd-a097-48f0-9f7b-78c8abb03d19': [<Success: 0>], '34f2f567-206a-4f77-b6bd-aac7bc65e8bb': [<Success: 0>]})>


## Tasks and Pipelines

As a reminder, the basic API of netunicorn consists of the following things:

    Task - minimal and basic building blocks of netUnicorn
    Pipeline - ordered collection of Tasks
    Node - infrastructure abstraction
    Deployment - mapping of a pipeline to a node
    Experiment - set of Deployments


Here we will exercise in writing different tasks and pipelines to design experiments. Let's start with simple ones.

### Task
Tasks allow users to modularly express their intents for experiments. For example, "run the speedtest" is a Task. Tasks could be simple (start tcpdump) or very complex (watch YouTube for X seconds), users define what level of modularity the Task should implement.

Tasks would be later combined into Pipelines (for example, start tcpdump -> watch YouTube -> stop tcpdump).

Tasks consist of three parts:

- Requirements: bash commands that would be executed during compilation of a Docker image for this task. Most often used to install required programs or dependencies.
- \_\_init__ method: client-side initialization of the task. Allows users to pass some information to be used later (for example, URL of YouTube video to watch)
- run method: executor-side code that would be executed on a corresponding node. Implements actual task intent and can use information provided on the initialization step.


In [26]:
# This is the simplest and minimal example of Task implementation
from netunicorn.base import Task

class MyFirstTask(Task):
    def run(self):
        return True

In [27]:
# You can call the task().run() to execute it locally
MyFirstTask().run()

True

In [28]:
# This is a more complex task implementation that uses the initialization
class MySecondTask(Task):
    def __init__(self, word: str, *args, **kwargs):
        self.word = word
        super().__init__(*args, **kwargs)  # this is important, don't forget it
    
    def run(self):
        return f"I know the next word: {self.word}"

MySecondTask("Meow").run()

'I know the next word: Meow'

In [29]:
# Finish the Task: the task should accept N (int) seconds during the initialization, sleep for N seconds during the execution and return True

class SleepTask(Task):
    def __init__(self, N: int, *args, **kwargs):
        ### YOUR IMPLEMENTATION STARTS HERE

        ### YOUR IMPLEMENTATION ENDS HERE
        super().__init__(*args, **kwargs)

    def run(self):
        import time
        ### YOUR IMPLEMENTATION STARTS HERE
        # use time.sleep

        ### YOUR IMPLEMENTATION ENDS HERE
        return True

SleepTask(3).run()

True

### Pipeline

A Pipeline is the main block of the data collection. It's an ordered collection of tasks, separated into stages. Each stage contains one or more tasks, and all tasks on the stage would start in parallel, and the stage would finish when all tasks of the stage would finish. Afterward, the next stage will start.

The Pipeline would be deployed to working nodes (servers, PCs, VMs, anything that can run code) to be executed. After the execution, the results of the pipeline would be returned to a user.

Let's look at the Pipeline examples.

In [30]:
from netunicorn.base import Pipeline

# Three tasks after each other: 3 stages total
pipeline = (
    Pipeline()
    .then(MyFirstTask())
    .then(MySecondTask("meow"))
    .then(SleepTask(5))
)

print(pipeline)

Pipeline(37c3611c-f954-4f9b-9ee8-1b306b3bc6ee): {'root': [<__main__.MyFirstTask with name b69e5e5e-8e93-4140-bf03-ccb08119725f>], 1: [<__main__.MySecondTask with name c2187a1e-ee6b-4a73-99e4-fc47f31562fd>], 2: [<__main__.SleepTask with name eb39e871-efed-4b43-9788-54c2fb7f32d6>]}


In [31]:
# All Tasks in parallel: 1 stage total
pipeline = (
    Pipeline()
    .then([
        MyFirstTask(),
        MySecondTask("meow"),
        SleepTask(5)
    ])
)

print(pipeline)

Pipeline(784208bb-af3b-4305-acc4-cafda3cf20e3): {'root': [<__main__.MyFirstTask with name 0bcf5d4e-54f5-49f6-9c5a-7c1e1ff04299>, <__main__.MySecondTask with name 860e8c93-224c-4a16-8c47-bda528b4f9ae>, <__main__.SleepTask with name a69566ce-a18e-4d32-853c-8fc911b94804>]}


In [32]:
# Two Tasks in parallel and the third after they are finished: 2 stages in total
pipeline = (
    Pipeline()
    .then([
        MyFirstTask(),
        MySecondTask("meow"),
    ])
    .then(SleepTask(5))
)

print(pipeline)

Pipeline(32d43ae1-4dd3-4212-bb4a-441ced03a461): {'root': [<__main__.MyFirstTask with name 92fb19a8-f4a1-4534-b4f7-45e744b3a6cc>, <__main__.MySecondTask with name e7323a22-5fa4-444d-b413-613a2b511845>], 1: [<__main__.SleepTask with name 53a038df-875a-4ed1-98e4-c40afe19144c>]}


In [33]:
# Finish the Pipeline:
# stage 1: MyFirstTask()
# stage 2: MySecondTask("meow") and SleepTask(5) in parallel

### YOUR IMPLEMENTATION STARTS HERE

### YOUR IMPLEMENTATION ENDS HERE

## Implementing something useful

Let's design a simple pipeline where for a given name of a website, we will get its IP address, geolocation, and the content of the main page.

This will consist of multiple tasks and a pipeline that gathers them together.

In [34]:
class GetContentTask(Task):
    def __init__(self, url: str, *args, **kwargs):
        self.url = url
        super().__init__(*args, **kwargs)

    def run(self):
        import requests
        
        url = self.url
        if not url.startswith("https://"):
            url = "https://" + url

        response = requests.get(url)
        return response.text

GetContentTask("google.com").run()

'<!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content="Search the world\'s information, including webpages, images, videos and more. Google has many special features to help you find exactly what you\'re looking for." name="description"><meta content="noodp, " name="robots"><meta content="text/html; charset=UTF-8" http-equiv="Content-Type"><meta content="/images/branding/googleg/1x/googleg_standard_color_128dp.png" itemprop="image"><title>Google</title><script nonce="SJw0PhgWeKaQKA5zanY7PQ">(function(){var _g={kEI:\'WiEbZ7uoK8L6kPIP3PCtqAs\',kEXPI:\'0,3700270,679,435,541533,2891,89155,18161,162437,23024,6699,124314,2006,8155,8860,14491,22435,9779,62657,36747,3801,2412,33249,15816,1804,7734,8334,9764,9437,960,10853,1635,29276,27083,5203197,12,9466,999,22,5992667,2839845,1088,198,44,3,13,44,34,120,1,7,2,4,1,37,7,8,26797342,1198753,43886,3,1603,3,2124363,23029351,7954,1,208,4636,16436,2728,81317,16825,5797,885,14280,8182,5933,8453,35043,9929

In [35]:
class GetIPAddress(Task):
    def __init__(self, url: str, *args, **kwargs):
        self.url = url
        super().__init__(*args, **kwargs)

    def run(self):
        import dns.resolver
        try:
            url = self.url
            if url.startswith("https://"):
                url = url[8:]
            result = dns.resolver.resolve(url, 'A')
            return [ip.to_text() for ip in result][0]
        except Exception as e:
            return Failure(str(e))

GetIPAddress("https://www.google.com").run()

'142.250.72.164'

In [36]:
class GetLocationFromIPFromPreviousTasks(Task):
    """
    This task is more complicated.
    Tasks have an ability to read results from the previous tasks if needed, which are stored in self.previous_steps.
    self.previous_steps is a dictionary of task-name: list of results of executions, each result is either Success or Failure which contains raw data which must be unwrapped (.unwrap())
    Let's get IP address for geolocation from the previous steps
    """
    def __init__(self, task_name: str, *args, **kwargs):
        self.task_name = task_name  # providing a task name from which to get an IP address
        super().__init__(*args, **kwargs)

    def run(self):
        ip_address = self.previous_steps[self.task_name][0].unwrap()   # getting previous results -> our task name -> first execution result -> unwrap it to get the raw IP
        url = f"https://ipinfo.io/{ip_address}/json"
        response = requests.get(url)
        if response.status_code == 200:
            return response.json()  # Returns IP information as a dictionary
        else:
            return Failure(response.text)


task = GetLocationFromIPFromPreviousTasks("test")
task.previous_steps = {"test": [Success("142.250.72.164")]}  # this line would be set automatically by the executor, here we test it only to know whether it will work
task.run()
    

{'ip': '142.250.72.164',
 'hostname': 'lax17s50-in-f4.1e100.net',
 'city': 'Los Angeles',
 'region': 'California',
 'country': 'US',
 'loc': '34.0522,-118.2437',
 'org': 'AS15169 Google LLC',
 'postal': '90009',
 'timezone': 'America/Los_Angeles',
 'readme': 'https://ipinfo.io/missingauth'}

Now, let's combine our pipeline and run it with our local executor

In [37]:
website = "https://google.com"  # Put any other website here

pipeline = (
    Pipeline()
    .then(GetIPAddress(website, name="iptask"))  # name might be anything unique, just use the same in the next task
    .then(GetLocationFromIPFromPreviousTasks("iptask", name="location-task"))
    .then(GetContentTask(website, name="content-task"))
)

# running the pipeline locally
executor = get_local_executor(pipeline)
executor()

INFO:executor_local:Parsed configuration: Gateway located on fake
INFO:executor_local:Current directory: /home/satyandra/Downloads/netunicorn_examples
INFO:executor_local:Execution is finished, start reporting results.
INFO:executor_local:Skipping reporting results due to execution graph setting.


Now it's time to explore the results:

In [38]:
for taskname in executor.execution_graph_results.unwrap():
    print(f"Name of the task: {taskname}")
    print(f"List of results of executions: {executor.execution_graph_results.unwrap()[taskname]}")
    print("-" * 80 + "\n\n")  # let's visually separate the results

Name of the task: iptask
List of results of executions: [<Success: 142.251.40.46>]
--------------------------------------------------------------------------------


Name of the task: location-task
List of results of executions: [<Success: {'ip': '142.251.40.46', 'hostname': 'lax17s55-in-f14.1e100.net', 'city': 'Los Angeles', 'region': 'California', 'country': 'US', 'loc': '34.0522,-118.2437', 'org': 'AS15169 Google LLC', 'postal': '90009', 'timezone': 'America/Los_Angeles', 'readme': 'https://ipinfo.io/missingauth'}>]
--------------------------------------------------------------------------------


Name of the task: content-task
List of results of executions: [<Success: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content="Search the world's information, including webpages, images, videos and more. Google has many special features to help you find exactly what you're looking for." name="description"><meta content="noodp, " name="robots"