# Lesson 6: Make the Pipeline Adaptable

You now use the concept of dynamic task mapping so that the book information is extracted from each text file in parallel.

### 6.1. Link to Airflow UI

Run the following cell to the link to the Airflow UI. If asked for username and password, make sure to type `airflow` for both.

In [None]:
import os
airflow_ui = os.environ.get('DLAI_LOCAL_URL').format(port=8080)
airflow_ui #username:airflow password:airflow

<div style="background-color:#fff6e4; padding:15px; border-width:3px; border-color:#f5ecda; border-style:solid; border-radius:6px"> ⏳ <b>Note <code>Airflow UI</code>:</b> 

<p>Changes to dags may take up to 30 seconds to show up in the Airflow UI in this environment! </p>
<p>In the Airflow UI, if you see the error "504 Gateway Timeout", this can happen after 2 hours or after some time of inactivity 25 minutes (if there's no activity for 20 minutes, the jupyter kernel stops and if there's no kernel for 5 minutes, then the jupyter notebook stops and the resources are released). In this case, make sure to refresh the notebook, run the cell that outputs the link to the Airflow UI and then use the link to open the Airflow UI. </p>
</div>

### 6.2. What is the issue with the current fetch_data dag

**Note:** The video starts with showing you the code from the previous lesson to explain the current issue with the dag: `fetch_data`

In the task `transform_book_description_files`, you're iterating through all of the text files in one task and repeat the same process. If the task fails due to a formatting error in one text file, then you would need to repeat again the same processing for all the files. The better approach is to process the text files in parallel, i.e., transform the task `transform_book_description_files` into parallel tasks, where each task processes one text file. You'll learn how to do that using Dynamic Task Mapping.

```python
@task
    def transform_book_description_files(book_description_files: list) -> list:
        import json
        import os

        list_of_book_data = []
        
        for book_description_file in book_description_files:
            with open(
                os.path.join(BOOK_DESCRIPTION_FOLDER, book_description_file), "r"
            ) as f:
                book_descriptions = f.readlines()
            
            # the rest of the code
            # .....
        
            list_of_book_data.append(book_descriptions)

        

        return list_of_book_data

```

### 6.3. Simple dynamic task mapping example

Here's a dag showing a simple dynamic task mapping example. Make sure to run the cell and trigger the dag in the Airflow UI.

In [None]:
%%writefile ../../dags/simple_dynamic_task_mapping.py 

from airflow.sdk import dag, task 


@dag
def simple_mapping():

    @task
    def get_numbers():
        import random
        
        return [_ for _ in range(random.randint(0, 3))]  # [0,1,2]

    _get_numbers = get_numbers()


    @task
    def mapped_task_one(my_constant_arg: int, my_changing_arg: int):
        return my_constant_arg + my_changing_arg

    _mapped_task_one = mapped_task_one.partial(
        my_constant_arg=10
    ).expand(my_changing_arg=_get_numbers)
  

simple_mapping()

Adding another task to the same simple example. Make sure to run the cell and trigger the dag in the Airflow UI.

In [None]:
%%writefile ../../dags/simple_dynamic_task_mapping.py 

from airflow.sdk import dag, task 


@dag
def simple_mapping():

    @task
    def get_numbers():
        import random
        
        return [_ for _ in range(random.randint(0, 3))]  # [0,1,2]

    _get_numbers = get_numbers()


    @task
    def mapped_task_one(my_constant_arg: int, my_changing_arg: int):
        return my_constant_arg + my_changing_arg

    _mapped_task_one = mapped_task_one.partial(
        my_constant_arg=10
    ).expand(my_changing_arg=_get_numbers)


    @task
    def mapped_task_two(my_cookie_number: int):
        print(f"There are {my_cookie_number} cookies in the jar!")

    mapped_task_two.expand(my_cookie_number=_mapped_task_one)    

simple_mapping()

### 6.4. Applying dynamic task mapping to `fetch_data`

Dynamically map the `transform_book_description_files` task over `list_book_description_files`.

Here are the changes made to `transform_book_description_files`:
- the input was changed from list to string
- the outer for-loop was removed (that looped through the text files)
- the descriptions of the books in one file `book_descriptions` are returned
- the `.expand` was used to change the task into a dynamic task

```python
    _transform_book_description_files = transform_book_description_files.expand(
        book_description_file=_list_book_description_files
    )
```


Here are the changes made to `create_vector_embeddings`:
- the input was changed to `book_data`
- the outer for-loop was removed
- the description embeddings are directly returned
- the .expand was used to change the task into a dynamic task

```python
    _create_vector_embeddings = create_vector_embeddings.expand(
        book_data=_transform_book_description_files
    )
```

Run the following cell, wait for 30 seconds then check the Airflow UI to trigger the dag. 

**Note:** Depending on when you're starting this lesson, if the Airflow UI does not have the dags from the previous lesson and this is the first time you write the dag to the folder `dags`, you can just unpause the dag. It will run automatically since it's scheduled to run every hour. If you trigger the dag, you will see two runs: the automatic one and the triggered one. 

In [None]:
%%writefile ../../dags/fetch_data.py 

from airflow.sdk import chain, dag, task, Asset
from pendulum import datetime

COLLECTION_NAME = "Books"
BOOK_DESCRIPTION_FOLDER = "/home/jovyan/include/data"
EMBEDDING_MODEL_NAME = "BAAI/bge-small-en-v1.5"


@dag(
    start_date=datetime(2025, 4, 1),
    schedule="@hourly"
)
def fetch_data():

    @task
    def create_collection_if_not_exists() -> None:
        from airflow.providers.weaviate.hooks.weaviate import WeaviateHook

        hook = WeaviateHook("my_weaviate_conn")
        client = hook.get_conn()

        existing_collections = client.collections.list_all()
        existing_collection_names = existing_collections.keys()

        if COLLECTION_NAME not in existing_collection_names:
            print(f"Collection {COLLECTION_NAME} does not exist yet. Creating it...")
            collection = client.collections.create(name=COLLECTION_NAME)
            print(f"Collection {COLLECTION_NAME} created successfully.")
            print(f"Collection details: {collection}")

    _create_collection_if_not_exists = create_collection_if_not_exists()

    @task
    def list_book_description_files() -> list:
        import os

        book_description_files = [
            f for f in os.listdir(BOOK_DESCRIPTION_FOLDER) if f.endswith(".txt")
        ]
        return book_description_files

    _list_book_description_files = list_book_description_files()

    @task
    def transform_book_description_files(book_description_file: str) -> str:
        import json
        import os

        with open(
            os.path.join(BOOK_DESCRIPTION_FOLDER, book_description_file), "r"
        ) as f:
            book_descriptions = f.readlines()

        titles = [
            book_description.split(":::")[1].strip()
            for book_description in book_descriptions
        ]
        authors = [
            book_description.split(":::")[2].strip()
            for book_description in book_descriptions
        ]
        book_description_text = [
            book_description.split(":::")[3].strip()
            for book_description in book_descriptions
        ]

        book_descriptions = [
            {
                "title": title,
                "author": author,
                "description": description,
            }
            for title, author, description in zip(
                titles, authors, book_description_text
            )
        ]

        return book_descriptions

    _transform_book_description_files = transform_book_description_files.expand(
        book_description_file=_list_book_description_files
    )

    @task
    def create_vector_embeddings(book_data: list) -> list:
        from fastembed import TextEmbedding

        embedding_model = TextEmbedding(EMBEDDING_MODEL_NAME)


        book_descriptions = [book["description"] for book in book_data]
        description_embeddings = [
            list(map(float, next(embedding_model.embed([desc]))))
            for desc in book_descriptions
        ]



        return description_embeddings

    _create_vector_embeddings = create_vector_embeddings.expand(
        book_data=_transform_book_description_files
    )

    @task(
        outlets=[Asset("my_book_vector_data")]
    )
    def load_embeddings_to_vector_db(
        list_of_book_data: list, list_of_description_embeddings: list
    ) -> None:
        from airflow.providers.weaviate.hooks.weaviate import WeaviateHook
        from weaviate.classes.data import DataObject

        hook = WeaviateHook("my_weaviate_conn")
        client = hook.get_conn()
        collection = client.collections.get(COLLECTION_NAME)

        for book_data_list, emb_list in zip(
            list_of_book_data, list_of_description_embeddings
        ):
            items = []

            for book_data, emb in zip(book_data_list, emb_list):
                item = DataObject(
                    properties={
                        "title": book_data["title"],
                        "author": book_data["author"],
                        "description": book_data["description"],
                    },
                    vector=emb,
                )
                items.append(item)

            collection.data.insert_many(items)

    _load_embeddings_to_vector_db = load_embeddings_to_vector_db(
        list_of_book_data=_transform_book_description_files,
        list_of_description_embeddings=_create_vector_embeddings,
    )

    chain(_create_collection_if_not_exists, _load_embeddings_to_vector_db)


fetch_data()

**Optional Part**: Add your own book files

Feel free to add your own book description and then trigger the fetch_data dag.

In [None]:
# Add your own book description file
# Format 
# [Integer Index] ::: [Book Title] ([Release year]) ::: [Author] ::: [Description]

my_book_description = """0 ::: The Idea of the World (2019) ::: Bernardo Kastrup ::: An ontological thesis arguing for the primacy of mind over matter.
1 ::: Exploring the World of Lucid Dreaming (1990) ::: Stephen LaBerge ::: A practical guide to learning and enjoying lucid dreams.
"""

my_book_description_file_name = "my_descs_1.txt"

# Write to file
with open(f"../../include/data/{my_book_description_file_name}", 'w') as f:
    f.write(my_book_description)

In [None]:
# ## Remove a book description file
 
# import os

# my_book_description_file_name = "my_descs_1.txt"

# file_path = f"../../include/data/{my_book_description_file_name}"

# # Remove the file
# if os.path.exists(file_path):
#     os.remove(file_path)
# else:
#     print(f"File not found: {file_path}")

## 6.5. Resources

- [Create dynamic Airflow tasks](https://www.astronomer.io/docs/learn/dynamic-tasks/): Learn all about dynamic task mapping in Airflow.
- Tip: you can limit the number of concurrently running mapped task instances using the task-level parameters `max_active_tis_per_dag` and `max_active_tis_per_dagrun`.
- [Airflow configuration reference - AIRFLOW__CORE__MAX_MAP_LENGTH](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-map-length): By default you can have up to 1024 dynamically mapped instances per task. Use this configuration environment variable to modify that limit.

<div style="background-color:#fff6ff; padding:13px; border-width:3px; border-color:#efe6ef; border-style:solid; border-radius:6px">

<p> ⬇ &nbsp; <b>Download Notebooks:</b> 1) click on the <em>"File"</em> option on the top menu of the notebook and then 2) click on <em>"Download as"</em> and select <em>"Notebook (.ipynb)"</em>.</p>

<p> 📒 &nbsp; For more help, please see the <em>"Appendix – Tips, Help, and Download"</em> Lesson.</p>

</div>