# Pipelines 

In azure ML, a pipeline is a collection of components that depend on each other's outputs and run sequentially.

A good when trying to combine the components together is to always start with a `test-first` approach and create an end to end pipeline on the local machine. Only after the tests have finished running, we can try and run the pipeline on the cloud.

## Creating a local pipeline 

Let us define three simple components:

1. The first one accepts a string and cleans it by removing all the punctuation marks and converting it to lower case.
2. The second one accepts a string and returns a list of words.
3. The third one accepts a list of words and returns a dictionary with the word count.

Let us start by creating a folder called `components` and add the following files and folders:

```bash
components
├── clean
│   ├── __init__.py
│   ├── component.py
│   └── run.yaml
├── count
│   ├── __init__.py
│   ├── component.py
│   └── run.yaml
└── tokenize
    ├── __init__.py
    ├── component.py
    └── run.yaml
```

**clean component:**

```python
# Importing regex
import re

# Argument parsing 
import argparse

def main(txt_file: str, txt_file_out: str): 
    """
    Function to clean the incoming string 
    
    The cleaning removes punctuations, lowercases the string and strips it 
    """
    # Define the list to store the strings
    list_of_strings = []

    # Open the file (replace 'your_file.txt' with your file name)
    with open(txt_file, 'r') as file:
        # Read each line in the file
        for line in file:
            # Strip the newline character and append to the list
            list_of_strings.append(line)

    # Clean the strings
    for x in list_of_strings:
        # Remove punctuations
        x = re.sub(r'[^\w\s]', '', x)
        # Lowercase the string
        x = x.lower()
        # Strip the string
        x = x.strip()

    # Writing the cleaned strings to a file
    with open(txt_file_out, 'w') as file:
        for x in list_of_strings:
            file.write(x)

    return 

if __name__ == '__main__':
    # Parsing the arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--txt_file", help="Input txt file")
    parser.add_argument("--txt_file_out", help="Output txt file")
    args = parser.parse_args()

    # Calling the main function
    main(txt_file=args.txt_file)
```

**tokenization component:**

```python
# Argument parsing 
import argparse

def main(txt_file: str, txt_file_out: str): 
    """
    Function to tokenize the incoming string 
    
    The tokenization splits the string into words
    """
    # Define the list to store the strings
    list_of_strings = []

    # Open the file (replace 'your_file.txt' with your file name)
    with open(txt_file, 'r') as file:
        # Read each line in the file
        for line in file:
            # Strip the newline character and append to the list
            list_of_strings.append(line)

    # Tokenize the strings
    for x in list_of_strings:
        # Split the string into words
        x = x.split()

    # Writing the tokenized strings to a file
    with open(txt_file_out, 'w') as file:
        for x in list_of_strings:
            file.write(x)

    return

if __name__ == '__main__':
    # Parsing the arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--txt_file", help="Input txt file")
    parser.add_argument("--txt_file_out", help="Output txt file")
    args = parser.parse_args()

    # Calling the main function
    main(txt_file=args.txt_file, txt_file_out=args.txt_file_out)
```

**count component:**

```python
# Argument parsing
import argparse

# Counting function 
from collections import Counter

def main(txt_file: str, txt_file_out: str):
    """
    Function to count the incoming tokens
    """
    # Define the list to store the strings
    list_of_strings = []

    # Open the file (replace 'your_file.txt' with your file name)
    with open(txt_file, 'r') as file:
        # Read each line in the file
        for line in file:
            # Strip the newline character and append to the list
            list_of_strings.append(line)

    # Creating a counter 
    counter = Counter(list_of_strings)

    # Sorting by most common
    token_counts = counter.most_common()

    # Writing the token counts to a file
    with open(txt_file_out, 'w') as file:
        for x in token_counts:
            file.write(f"{x[0]}: {x[1]}\n")

    return

if __name__ == '__main__':
    # Parsing the arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--txt_file", help="Input txt file")
    parser.add_argument("--txt_file_out", help="Output txt file")
    args = parser.parse_args()

    # Calling the main function
    main(txt_file=args.txt_file, txt_file_out=args.txt_file_out)
```

In [1]:
# Now let us define a list of strings and save them into a txt file 
# called "list_of_strings.txt"
list_of_strings = ["Hypercube is awesome", "Flexitricity is amazing too"]

# Now let us save the list of strings into a txt file
with open("list_of_strings.txt", "w") as f:
    for string in list_of_strings:
        f.write(string + "\n")

# Now let us import the components 
from components.clean.component import main as clean
from components.tokenize.component import main as tokenize
from components.count.component import main as count

# Wrapping the components into a pipeline
def main(
        input_file: str, 
        output_file_cleaned: str,
        output_file_tokenized: str,
        output_file_counted: str
):
    clean(txt_file=input_file, txt_file_out=output_file_cleaned)
    tokenize(txt_file=output_file_cleaned, txt_file_out=output_file_tokenized)
    count(txt_file=output_file_tokenized, txt_file_out=output_file_counted)

Now let us test the pipeline localy using the triple A pattern: 

1. **Arrange**: Create the inputs for the pipeline
2. **Act**: Run the pipeline
3. **Assert**: Check the results


In [2]:
# Lets print the input file 
with open("list_of_strings.txt", "r") as f:
    print(f.read())

Hypercube is awesome
Flexitricity is amazing too



In [3]:
# Arranging 
import pandas as pd 
input_file = 'list_of_strings.txt'
output_file_cleaned = 'list_of_strings_cleaned.txt'
output_file_tokenized = 'list_of_strings_tokenized.txt'
output_file_counted = 'list_of_strings_counted.txt'

# Acting
main(
    input_file=input_file, 
    output_file_cleaned=output_file_cleaned,
    output_file_tokenized=output_file_tokenized,
    output_file_counted=output_file_counted
)

# Asserting 
output_text = open(output_file_counted, "r").read().split("\n")
assert output_text[0] == 'is: 2'

Number of strings: 2
Number of cleaned strings: 2
Output file: list_of_strings_cleaned.txt
Reading file: list_of_strings_cleaned.txt
Number of strings: 2
Number of tokens: 7
Output file: list_of_strings_tokenized.txt


: 

Now that we have the pipeline working localy, we can start creating the pipeline on the cloud. Keep in mind, the the only things that will change will be the local file paths will change to the file paths in azure. 

## Pipeline on the cloud

Now that we have all the components working locally, we can start creating the pipeline on the cloud.

The first step is to upload the component code and the metadata to the datastore. 

Then we can start connecting them in the azure ml `designer` tab. 

The typical metadata of our components is: 

```yaml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
type: command

name: <component name>
display_name: <component name in display>

environment: azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1

inputs:
  txt_file:
    type: uri_file  

outputs:
  txt_file_out:
    type: uri_file

command: >-
  python component.py
  --txt_file ${{inputs.txt_file}} 
  --txt_file_out ${{outputs.txt_file_out}}
```

The pipeline schema is: 

![pipeline schema](./images/simple_pipeline.png)

Notice that the only thing we are missing is the very first input to the `Data Ingestor` component. Lets rectify that in the next section. 

## Creating a data asset for the pipeline 

The easiest way to create a data asset is to upload a .txt file to azure blob storage and from the ml studio register that file as a data asset.

In [None]:
# Creating a a list of strings 
list_of_strings = ["Hypercube is awesome", "Flexitricity is amazing too", "Machine Learning is the future"]

# Saving the list of strings into a txt file
with open("list_of_strings_azure.txt", "w") as f:
    for string in list_of_strings:
        f.write(string + "\n")

The uploading of the file can be done in several ways but the easiest is to use the azure storage explorer.

![uploading a file](./images/data_asset.png)

Now we can navigate to the Azure ML studio's `Data` tab and register the file as a data asset.

![registering a data asset](./images/data_aset_created.png)

Now we can return to the designer tab and add this data asset as an input to our pipeline. 

## Full pipeline with the data asset input

Having our data asset registered, we can now run the pipeline on the cloud. 

The full ran pipeline looks the following: 

![full pipeline](images/full_pipeline_green.png)

The pipeline output is saved in azure via: 

https://electircityml3408517355.blob.core.windows.net/azureml-blobstore-9c2c557d-1e90-48ba-bbdf-b4c6c27e1ff1/azureml/a3d23810-9a96-43ea-83af-060400d4c806/txt_file_out 

The path is convoluted, but the contents of the file are the following: 

```txt
is: 3
hypercube: 1
awesome: 1
flexitricity: 1
amazing: 1
too: 1
machine: 1
learning: 1
the: 1
future: 1
```