## Introduction to Asynchronous Programming

### Understanding Asynchronous Programming
Asynchronous programming is a paradigm that allows for non-blocking operations. Unlike traditional synchronous programming where tasks are completed one after another, asynchronous programming enables the execution of multiple tasks effectively and efficiently, particularly in `I/O-bound` and high-latency operations. This approach is beneficial in web services and `API interactions`, where waiting for a response can lead to inefficiencies.

### Advantages
- **Non-blocking Operations**: Allows other parts of your program to run without waiting for a task to complete.
- **Improved Performance**: Better utilization of system resources, especially in network or I/O bound tasks.
- **Enhanced Responsiveness**: Applications remain responsive, especially in UI or server-side operations.

### Real-world Applications
- **Web Servers**: Handling multiple incoming HTTP requests.
- **Data Fetching**: Simultaneous API requests or database queries.
- **Real-Time Data Processing**: Streaming data processing in chatbots or live feeds.



## Synchronous vs. Asynchronous Data Fetching

### Synchronous Data Fetching

In [1]:
import time

def fetch_data_sync(api_name, delay):
    print(f"Fetching data from {api_name}... (delay: {delay} seconds)")
    time.sleep(delay)
    print(f"Received data from {api_name}!")

def run_sync_fetches():
    print("Synchronous Data Fetching Start")
    fetch_data_sync("API 1", 3)
    fetch_data_sync("API 2", 2)
    print("Synchronous Data Fetching End")

run_sync_fetches()

Synchronous Data Fetching Start
Fetching data from API 1... (delay: 3 seconds)
Received data from API 1!
Fetching data from API 2... (delay: 2 seconds)
Received data from API 2!
Synchronous Data Fetching End


In the above code, the `fetch_data_sync` function simulates fetching data synchronously. Notice how the program waits for each API call to complete before moving to the next one. This leads to a total delay of 5 seconds, the sum of both API call delays.

## Asynchronous Data Fetching

In [5]:
import asyncio

async def fetch_data_async(api_name, delay):
    print(f"Fetching data from {api_name} asynchronously... (delay: {delay} seconds)")
    await asyncio.sleep(delay)
    print(f"Received data from {api_name} asynchronously!")

async def run_async_fetches():
    print("Asynchronous Data Fetching Start")
    await asyncio.gather(
        fetch_data_async("API 1", 3),
        fetch_data_async("API 2", 2),
    )
    print("Asynchronous Data Fetching End")

# Jupyter already runs an event loop in the background. This can cause conflicts when trying to start a new event loop using asyncio.run()
await run_async_fetches()
# asyncio.run(run_async_fetches())


Asynchronous Data Fetching Start
Fetching data from API 1 asynchronously... (delay: 3 seconds)
Fetching data from API 2 asynchronously... (delay: 2 seconds)
Received data from API 2 asynchronously!
Received data from API 1 asynchronously!
Asynchronous Data Fetching End


In [2]:
# import asyncio

# async def fetch_data_async(api_name, delay):
#     print(f"Fetching data from {api_name} asynchronously... (delay: {delay} seconds)")
#     await asyncio.sleep(delay)
#     print(f"Received data from {api_name} asynchronously!")


# async def run_async_fetches():
#     print("Asynchronous Data Fetching Start")
#     await asyncio.gather(
#         fetch_data_async("API 1", 3),
#         fetch_data_async("API 2", 2),
#     )
#     print("Asynchronous Data Fetching End")

In [3]:
# Jupyter already runs an event loop in the background. This can cause conflicts when trying to start a new event loop using asyncio.run()
# def run_async_in_notebook(coroutine):
#     loop = asyncio.get_event_loop()
#     if loop.is_running():
#         task = loop.create_task(coroutine)
#         return task
#     else:
#         return loop.run_until_complete(coroutine)


# run_async_in_notebook(run_async_fetches())

<Task pending name='Task-5' coro=<run_async_fetches() running at /tmp/ipykernel_143328/2577022757.py:9>>

Asynchronous Data Fetching Start
Fetching data from API 1 asynchronously... (delay: 3 seconds)
Fetching data from API 2 asynchronously... (delay: 2 seconds)
Received data from API 2 asynchronously!
Received data from API 1 asynchronously!
Asynchronous Data Fetching End


In this asynchronous version, the `fetch_data_async` function uses `async` and `await` to define non-blocking behavior. `asyncio.gather` runs multiple asynchronous functions concurrently. This approach reduces the total wait time, demonstrating the efficiency of asynchronous programming.

## Introduction to OpenAI API

### Overview of OpenAI API
The OpenAI API provides access to advanced AI models like GPT-3, offering capabilities in natural language processing. These models are adept at understanding and generating human-like text, making them valuable for applications like chatbots, translation, content creation, and more. One of the key features of the OpenAI API is its ability to handle streaming data, enabling real-time applications.


### Basic Interaction with OpenAI API

In [43]:
import os
from pprint import pprint
import openai
VARIABLE_KEY = os.environ.get("OPENAI_API_KEY")
client = openai.OpenAI(
  api_key=VARIABLE_KEY
)


In [64]:
def query_openai(query, data):
    completion = client.chat.completions.create(
                model="gpt-3.5-turbo",
                messages=[
                    {"role": "system", "content": f"You are a redash visualization assistant. \n This data includes information about a company so address yours question regarding to the data in a proffessional manner: {data}"},
                    {"role": "user", "content": query }
                ]
            )
    answer = completion.choices[0].message.content
    return answer

data = """
10 Academy is a not-for-profit community-owned initiative that has developed scalable, financially sustainable, and effective training programs to place young Africans into careers for the 4th Industrial Revolution.

10 Academy focuses on creating pathways for young Africans to secure high-paying and relevant tech jobs. With a 37.5% representation of women among our graduates, we have already helped 176 individuals across Africa find improved employment. Our commitment remains strong for the next cohort of learners.

10 Academy’s vision is that youth in Africa will have a rich set of pathways to secure decent, impactful, and highly multiplicative employment with social, economic, and environmental benefits.
"""
# Example query
# query_result = query_openai("Translate the following English text to French: 'Hello, how are you?'", data)
query_result = query_openai("What does 10academy do in one line sentence?'", data)
pprint(query_result)

('10 Academy is a not-for-profit initiative that provides effective training '
 'programs to place young Africans into high-paying tech jobs for the 4th '
 'Industrial Revolution.')


This code demonstrates a basic query to the OpenAI API, where we send a prompt for translation. Note the usage of `openai.Completion.create()` to interact with the API.

## Introduction to Redash

### What is Redash?
Redash is an open-source tool for querying, visualizing, and sharing data. It supports various data sources and is designed to democratize data access across organizations. Redash's primary function is to facilitate data exploration through SQL queries, visualization of results, and dashboard creation.

### Asynchronous Tasks in Redash
Asynchronous tasks in Redash are important for handling large datasets or complex queries. This can improve user experience by allowing the UI to remain responsive while data operations are processed in the background.



### Sample Asynchronous Data Query in Redash

In [37]:
async def async_redash_query(query_id):
    # In practice, make an async HTTP request to the Redash API
    # Here, we simulate a delay for a long-running query
    print(f"Fetching data from asynchronously... (delay: 2 seconds)")
    await asyncio.sleep(2)  # Simulating a long-running query
    print(f"Data fetched")
    return f"Result of query {query_id}"


async def main():
    query_result = await async_redash_query(12345)
    print(f"Process Ended")
    print(query_result)


# run_async_in_notebook(main())
await main()

Fetching data from asynchronously... (delay: 2 seconds)
Data fetched
Process Ended
Result of query 12345


In this example, we demonstrate how to perform an asynchronous query to Redash, simulating a long-running operation. This method allows other operations to continue while waiting for the query to complete.


## Introduction to Celery

### Understanding Celery
Celery is a distributed task queue that excels in handling asynchronous tasks and scheduling in web applications. It is particularly useful for long-running operations, enabling these tasks to be processed in the background, independent of the main application flow. This separation enhances performance and responsiveness.

### Key Features
- **Distributed Nature**: Manage tasks across multiple worker nodes.
- **Support for Multiple Brokers**: Compatible with various message brokers like RabbitMQ, Redis.
- **Task Scheduling**: Ability to schedule and execute tasks at a later time.



### Basic Setup with Celery

In [38]:
import asyncio
from celery.result import AsyncResult
from celery_app import add, multiply

# Submit tasks to Celery
result_add = add.delay(4, 4)
result_multiply = multiply.delay(6, 2)


async def get_result_async(celery_result: AsyncResult, task_name: str):
    while not celery_result.ready():
        await asyncio.sleep(1)  # Wait for 1 second before checking again
    result = celery_result.get()
    print(f"{task_name} Result: {result}")


async def main():
    print("Tasks submitted. Waiting for results...")
    # Run both get_result_async concurrently
    await asyncio.gather(
        get_result_async(result_add, "Addition"),
        get_result_async(result_multiply, "Multiplication"),
    )


# run_async_in_notebook(main())
await(main())

Tasks submitted. Waiting for results...
Multiplication Result: 12
Addition Result: 8


In this example, we demonstrate a basic task queue using Celery. The `add` function, defined as a Celery task, is submitted for execution with `add.delay()`. The `result.get(timeout=10)` retrieves the task outcome, showcasing how Celery handles background tasks.


### Redis as a Message Broker and Result Backend
- **Redis**: An in-memory data structure store, often used with Celery as a message broker and result backend.
- **Role in Celery**: Facilitates the queuing of tasks and storage of their results.


NOTE
----------------------------------------------------
using async/await directly is more suitable for writing asynchronous code within a single application or coroutine context. It allows you to write non-blocking code and leverage the benefits of asynchronous programming when interacting with I/O-bound operations like network requests or file I/O.

While async/await is powerful for writing asynchronous code, it doesn't provide the same level of distributed task execution, task management, and scaling capabilities as Celery. If your application requires distributed task processing, task queues, or scalability across multiple workers, Celery is a more suitable choice.

In some cases, you might even combine Celery and async/await together. For example, you can use Celery to manage and distribute tasks across workers and leverage async/await within the task implementation to write asynchronous code when interacting with I/O-bound operations.

Ultimately, the choice between using Celery or async/await directly depends on your specific application requirements and the nature of the tasks you need to execute.

## Integrating OpenAI API Asynchronously

### Efficient Handling of OpenAI API Requests
Asynchronous integration of the OpenAI API is critical for handling multiple requests or streaming data efficiently. This approach can significantly improve the scalability and responsiveness of applications using the OpenAI API for natural language processing tasks.



### Streaming Responses with OpenAI API

In [62]:
from openai import ChatCompletion

def get_response_stream(prompt):
    # client = ChatCompletion()
    response_stream = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}],
        temperature=0,
        max_tokens=729,
        top_p=1,
        stream=True,
    )
    return response_stream

In the `get_response_stream` function, we utilize the OpenAI ChatCompletion API with streaming enabled (`stream=True`). This function sets up a response stream for real-time data processing.


### Processing Streamed Responses

In [66]:
import textwrap
from IPython.display import display, clear_output, HTML


async def process_streamed_responses(response_stream):
    response_text = ""
    for chunk in response_stream:
        chunk_message = chunk.choices[0].delta.content
        if chunk_message is not None:  # Check if chunk_message is not None
            response_text += chunk_message
        is_complete = chunk.choices[0].finish_reason is not None
        wrapped_text = textwrap.fill(response_text, width=80)  # Wrap text at 80 characters
        clear_output(wait=True)
        display(HTML(f"<div style='text-align: left;'><pre>{wrapped_text}</pre></div>"))
        if is_complete:
            break


async def main():
    stream = get_response_stream("Tell me a story about a space adventure. in 5 line of sentence")
    await process_streamed_responses(stream)


# run_async_in_notebook(main())
await main()

The asynchronous function `process_streamed_responses` iterates over the OpenAI response stream, handling each response chunk as it arrives. This method is particularly useful for applications requiring real-time data processing, like chatbots or live content generation.

## Enhancing Data Queries with Celery in Redash

### Understanding the Collaboration
Redash, known for its robust data visualization and analytics capabilities, effectively uses Celery to manage and execute data queries. This integration plays a crucial role in maintaining the efficiency and responsiveness of Redash, especially when dealing with complex and time-consuming data queries.

### How Redash Uses Celery
- **Background Query Execution**: Redash leverages Celery to run data queries in the background. This means that when a user executes a query, instead of waiting for the query to complete, the task is handed over to Celery.
- **Task Queue Management**: Celery manages a queue of data query tasks. When a query is initiated, it is added to this queue and processed by available Celery workers.
- **Asynchronous Processing**: By delegating query execution to Celery, Redash ensures that the main application thread remains unblocked, enhancing the user experience by preventing delays in the UI.

### Advantages of this Integration
- **Improved Performance**: Offloading query execution to Celery allows Redash to handle multiple queries more efficiently, enhancing overall performance.
- **Scalability**: This setup enables Redash to scale more effectively, as additional worker nodes can be added to handle increased query load.
- **Reliability**: By managing queries as background tasks, Celery provides a more robust and fault-tolerant system, as query execution is isolated from the main application logic.


In this example, `run_query` is a task defined in Redash that uses Celery. The `.delay` method submits the query to Celery's task queue for execution. The `task_result.get(timeout=20)` is used to retrieve the result, demonstrating how Redash handles asynchronous query execution.

## Conclusion

### Recap of Key Concepts
- **Asynchronous Programming**: We delved into how asynchronous programming facilitates efficient, non-blocking operations, essential in handling I/O-bound tasks and improving the responsiveness of data-driven applications.
- **OpenAI API**: Our exploration included basic interactions with the OpenAI API, emphasizing its potential in processing streaming responses asynchronously for real-time data analysis and query generation.
- **Redash**: The role of Redash in data visualization and analytics was highlighted, with a particular focus on its ability to benefit from asynchronous data processing, enhancing its functionality in handling complex queries and large datasets.
- **Celery**: We introduced Celery as a distributed task queue, essential for managing background tasks and processing heavy data operations, using Redis for efficient task queuing and result storage.

### Applying These Concepts in the Week 3 Challenge
- **Natural Language Driven Data Exploration**: The integration of the OpenAI API with Redash, as outlined in your project, allows for the translation of natural language queries into SQL, empowering users to interact with data intuitively. This application demonstrates the practical use of AI in enhancing user experience and data accessibility.
- **Responsive Redash Add-Ons**: By applying asynchronous programming techniques, your Redash add-on can efficiently handle data queries and updates, ensuring a smooth and responsive user interface, even when processing complex data operations from YouTube, Slack, or Gmeet insights.
- **Efficient Backend Systems**: Utilizing Celery for task management in your backend systems ensures that heavy data processing tasks, such as generating new SQL queries or updating dashboards, do not hinder the overall application performance.
- **End-to-End Insight Extraction**: The challenge's goal of autonomous knowledge discovery is well-served by combining these technologies. Large Language Models (LLMs) can analyze and interpret user queries, Celery can manage the data processing workload, and Redash can present the insights in a user-friendly manner.

### Encouragement for Integration and Experimentation
I encourage you to leverage these technologies in your project for the Week 3 challenge. The synergy between asynchronous programming, AI-driven natural language processing, and efficient data management through Celery and Redash presents a robust framework. This integration not only enhances the user experience by providing real-time, interactive data exploration but also paves the way for innovative approaches in business intelligence and data analytics.

Experiment with these components to see how they can be integrated into a cohesive system, enhancing your solution's capability to transform natural language inquiries into insightful data visualizations and actionable intelligence.