# Tutorial - Remote Arrow

This notebook serves as an interactive tutorial (and documentation) to **Remote Arrow**.
More broadly it introduces you to **Apache Arrow** and **Apache Arrow Flight**.

**Apache Arrow** is a language-independent columnar memory format for flat and hierarchical data. <br>
It became increasingly popular for big data applications because of its high performance and efficacy in handling tabular data, <br>
as well as its rich eco-system of 3rd-party libraries. <br>

**Apache Arrow Flight** is a framework for high-speed transmission of Apache Arrow Tables.  <br>
It achieves that by eliminating the overhead for (de)serialization of the data and allowing parallel data streaming. <br>
Arrow Flight defines a set of protocols and interfaces that enables a language and platform agnostic approach to sharing data. <br>

**Remote Arrow** is a **custom** framework built on top of Apache Arrow Flight and aims to solve the problem of redundancy <br>
and performance bottlenecks when common or shared datasets have to be accessed by multiple clients. <br>
It does so by outsourcing the ownership and computational burden to a centralized server and allowing clients to access the data through remote queries. <br>
Each query result requested by a client is subsequently made available to all other clients. This is also true for new datasets uploaded by a client. <br>

This eliminates the need for data transfers between clients, redudant computation of complex queries, or unnecessary storage of potentially large data. <br>


You can learn more by reading through the *README* and sources listed below:
#### Sources:

1. [Apache Arrow](https://arrow.apache.org/ "Apache Arrow")
2. [Introducing Apache Arrow Flight](https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/ "Introducing Apache Arrow Flight") 
3. [Apache Arrow Flight API](https://arrow.apache.org/docs/format/Flight.html "Apache Arrow Flight API") 
4. [Benchmarking Apache Arrow Flight (Tanveer Ahmad, Zaid Al Ars and H. Peter Hofstee)](https://arxiv.org/pdf/2204.03032.pdf) 
5. [Apache Arrow Zero Memory Copy](https://towardsdatascience.com/apache-arrow-read-dataframe-with-zero-memory-69634092b1a)


In [None]:
# Run this cell only if you're not using the Docker container: hoangln1/remote-arrow:tutorial
!pip3 install pandas
!pip3 install pyarrow==9.0.0

# Download the csv file
!pip3 install gdown
!gdown --id "1Ko0pE194OMvFH0Vp_AJIBXvezF6B9NKY"


## Arrow Table
Let's take a look at **the Arrow Table** format by loading a CSV file:

In [None]:

import pyarrow as pa
import pyarrow.csv as csv

table = csv.read_csv('hawaii_covid.csv')
print(table.schema)

Arrow Tables support a variety of common functions for handling tabular data, such as **join**, **sort_by**, **group_by**, **drop**, **select** or **take**:

In [None]:
table.select([5]) # Get column at index 5

In [None]:
table.num_rows

If you're familiar with **pandas** and/or **SQL** it may come to you as a relieve that **Arrow Table** provides similar methods and APIs. <br>
When in doubt, you can always convert Arrow Tables into pandas dataframes.

In [None]:
table.take([0, 1, 2]).to_pandas() # Get row 0, 1, and 2 and convert to pandas df

### **So why all the trouble with Arrow?**
On one hand Arrow stores the data in **columnar** format which makes a lot of computational routines substantially faster,<br>
especially when iterating through large chunks of data. That is, because a columns are stored in contiguous memory, <br>
which is exceedingly easier to read in chunks compared to row-based memory layouts, as illustrated here [4]: <br>

![image](https://arrow.apache.org/img/simd.png)

Secondly, it provides a language-agnostic way to pass tabular data from one application to another. <br>
Last but not least, it also allows for a technique called zero-copy reads. <br>
Instead of copying the data and passing it from one application to another, <br>
we can simply pass a pointer to the location in memory [5]. <br>


So far, we have uploaded and modified our dataset only locally. <br> 
Let's see how we can make them accessible to other clients.

## Starting the server
Remote Arrow uses one server to manage datasets and exposes a fixed set of commands (upload, download, query) to all clients in the network. <br>
This set of commands can, however, be extended and adapted depending on your specific needs. <br>

As mentioned before, Remote Arrow uses the **Apache Arrow Flight API** which specifies a list of services that need to be supported (e.g. **doGet**, **doPut**, **doAction**). <br>
This is reminiscent of the **REST** architecture style (**PUT**, **POST**, **DELETE**, **GET**). <br> 
The bottomline is that both simply define a uniform interface. We are, however, free in implementing our own custom **actions and features** and the way our data is being handled internally.

In contrast to **REST** which uses **HTTP** for communication, **Arrow Flight** is built on top of gRPC. <br>
Just like **Arrow** itself, **Arrow Flight** is also language-agnostic, <br>
so you can develop clients in any language as long as it follows/supports the **Arrow Flight API** [4]. <br>

In this tutorial, we are going to use Python to demonstrate client and server interactions. <br>
First, let's start the server.

In [None]:
import subprocess
import time

SERVER_IP = "localhost"
SERVER_PORT = "5005"

server_process = subprocess.Popen(["python3","server.py","--host", SERVER_IP,"--port", SERVER_PORT])
pid = server_process.pid
time.sleep(2) # Wait for server to boot before running other cells

The server is now running in the background as long as the notebook is active, i.e. its kernel is running. <br>
Of course, you can also run it in a regular terminal if you want to read the console output, for example:

`python3 server --host <HOST> --port <PORT>`

For now, let's connect a client to the server and see how we can upload our local CSV file:

In [None]:
from RemoteArrow import RemoteDataset
client_A = RemoteDataset("hawaii_covid.csv", hostname=SERVER_IP+":"+SERVER_PORT)

Here, we have instantiated a **RemoteDataset** object with a path to the local CSV file and the server's address. <br>
Under the hood, our object (**client_A**) converted that file into an **Arrow Table** (just as above) and sent it to the server via **doPut**.

It does so without having to serialize or convert the data again. The server simply receives it in batches and reconstruct the **Arrow Table** from it.

The dataset now resides on the server and is available to all clients as a **Flight** with **ID 0**. <br>

You will also find that the server has stored our file in the ***/datasets*** subdirectory in **.parquet** format. <br>
The **.parquet** format allows for highly memory-efficient compression, when writing or reading Arrow Tables. <br>

In [None]:
client_A.list_flights()

A **Flight** is nothing but an access point that corresponds to one dataset or data chunk. <br>
A client may ask the server for all available Flights that it can access, i.e. download or query against. <br>

Technically, a data chunk is also a "dataset" but for the sake of this tutorial we refer to original data (e.g. a CSV file) as **datasets** <br>
and any temporarily generated data as **data chunks**, i.e. the result of a query. <br>
In any event, both are internally handled as **Arrow Tables** and exposed as Flights.<br>

Furthermore, instantiating **client_A** as a **RemoteDataset** also overrides the object's **Arrow Table methods** with corresponding RPC calls to our server. <br>
What that means is that calling methods like **take** or **select**, as we did in the previous section above, <br>
will not locally compute the query, but instead command the server to do the exact same query for us (on the dataset we just uploaded), <br>
and return the result to us through the RPC channel. As mentioned before, it will also make the result available as a **Flight**.<br>

In [None]:
query_result = client_A.take([0,1,2]) # Will be executed on the server, and the result saved in our local variable "query_result"
query_result.to_pandas() 

In [None]:
# Query results are made available as new Flights
client_A.list_flights()

Until now, it may not be totally clear why this is useful, if we could just have computed the same query locally <br>
without having to send it to the server first. But keep in mind that **client_A** may be much more restricted <br>
in computational power or memory resources in comparison to our server. <br>

Also we may now delete the dataset on **client_A** (both on disk and in runtime memory). <br>


Now, let's pretend to be another client (**client_B**) and see how we can interact with one of the Flights. <br>
We can **inspect()** a Flight first, to get insight on its metadata (e.g. *schema*, *num of rows*, ...).

In [None]:
client_B = RemoteDataset() # Instantiate client with default host (localhost:5005) and no file

In [None]:
client_B.inspect(0) # Inspect Flight 0
client_B.connect(0) # Connect to Flight 0

Connecting to a Flight means that all of our local calls are once again overriden with **RPC calls** <br>
that command the server to do the queries against the dataset associated with that Flight.

In [None]:
client_B.take([0,1,2,3,4,5]).to_pandas() # Call take() on Flight 0 and convert result to pandas

Note that, **client_B** can now query against the original dataset (**hawaii_covid**) without ever downloading it. <br>

This becomes incredibly efficient, when you want to query against a dataset that is extremly large, <br>
or you simply don't want to download or store it in it's entirety. <br>

One key aspect to understand, however, is that **only the first method call** that follows the RemoteDataset object (**client_B**), <br>
invokes a remote procedure call. In the case above that means, only **client_B.take([0,1,2,3,4,5])** is being sent and processed by the server.<br>
The result is being returned as an Arrow Table object and **locally** converted to a pandas dataframe. <br>


If we want to download the entire dataset (or data chunk) of the Flight that our client is connected to, we can use **fetch()**.

In [None]:
ds = client_B.fetch() # Download the entire dataset of the connected Flight and store the result in "ds"

# Convert it to pandas dataframe and continue to process it locally
df = ds.to_pandas()
print("Original num of rows: ", len(df))

dropped = df.dropna()
print("Num of rows without nulls: ", len(dropped))

dropped["case_month"]

As you can see, there is a lot of flexibility in switching between accessing local and shared datasets.<br>
Making intermediate query results publicly available also creates an increased synergy between clients, <br>
if they are working in the context of a larger application or system. <br>

One client may upload a huge dataset, another can compute a bunch of queries against it and the third client has access to all of the results <br>
and can cherry pick only the ones relevant to its task. In any case, we can eliminate redundancy and potentially boost performance and efficacy on all ends.


# Actions

**Apache Arrow Flight** defines another interface to interact with Flights, so-called **Actions**. <br>
Actions are application-specific commands. Our server provides Actions for **saving, deleting, and clearing Flights** as well as **shutting down the server**. <br>

Any client can send actions by properly setting the action **type** and **body** (payload) that are defined by our server. <br>

In [None]:
client_C = RemoteDataset()
client_C.list_actions()

In [None]:
client_C.action(type="save", body="1 my_new_dataset") # Request server to save Flight 1 as "my_new_dataset.parquet")

In [None]:
client_C.action("clear") # Clear all flights (not affecting .parquet files)

In [None]:
client_C.list_flights()

All Flights have now been cleared and the list of available Flights is empty. <br>
However, the files that have been saved should still be stored on our server. <br>

Let's see what happens if our server crashes or reboots gracefully.

In [None]:
client_C.action("shutdown")
time.sleep(5) # Wait for server to properly shutdown before running other cells

In [None]:
# Restart the server
server_process = subprocess.Popen(["python3","server.py","--host", SERVER_IP,"--port", SERVER_PORT])
pid = server_process.pid

time.sleep(2) # Wait for server to boot
client_D = RemoteDataset()

You should see that the server recovered the saved datasets. <br>
As a final step in this tutorial, we can delete them via the "delete" actions. The corresponding .parquet files will also be wiped from the server.

In [None]:
client_D.action("delete", "0")
client_D.action("delete", "1")

client_D.list_flights()

Note that, the IDs did not disappear and were not rearranged, inorder to maintain consistency throughout the runtime of the server. <br>
This is necessary when deleting specific Flights one by one, whereas "clearing" all Flights basically removes the necessity to maintain that order. <br>

Of course you can implement many more actions, such as *deleting all files*, *renaming a file*, *rebooting the server*, *check server health*, etc., <br>
the list is virtually endless, which underlines the versatility and extensability of **Arrow** and **Arrow Flight**.

## Conclusion
In this tutorial, we have examined how **Remote Arrow** can help to outsource the burden of computation, storage and shared access of data on a single server. <br>
Though still in its prototype phase, it can offer tremendous flexibility for managing shared tabular data.<br>
This effect becomes more evident when datasets are used in a network of heterogenous applications that work in different languages, <br>
even more so when the shared data is extremely large and/or the clients low in computational prowess.

Another aspect, we have not discussed in-depth yet is the remarkable boost in transmission speed, that has been benchmarked by *Ahmad et al.* [4]. <br>
Their publication cites that *"Flight is able to achieve up to **6000 MB/s and 4800 MB/s throughput**"* <br>
and companies like **Dremio** that use Arrow Flight **perform 20x and 30x better** compared to traditional connection methods. <br>
That means Arrow Flight becomes an even better choice for applications that require long-haul communication over the internet. <br>

### Limitations
One major limitation is that the Remote Arrow server is a single-point of failure. <br>
It is also synchronous and can only serve on request at a time, making it <br>
suboptimal for high-frequency applications or systems that require massive scaling. <br>
Some of these problems may be ameliorated by instantiating multiple servers and setting up a load-balancer between clients and servers. <br>
This, however, comes at the cost of increased complexity for managing the shared data and handling asynchronous access and race conditions. <br>

**Arrow Flight** is also still in its developing stage and thus not as stable or prevalent as more established frameworks for data transfers. <br>
Moreover, the API and documentation is rather opaque and can be somewhat restrictive, where a lot of the development of **Remote Arrow** consisted of <br>
workarounds and trial & error. However, we are confident that **Arrow** and **Arrow Flight** will find their way in becoming industry-standards as time progresses.


### Future work
If you're interested in contributing or developing your own application, please refer to the sources above and check out the Git repositories. <br>
**Remote Arrow** could be extended by developing new actions, scrutinizing and benchmarking current features, as well as supporting different file formats. <br>
Writing **Remote Arrow** clients in different languages would also be helpful, especially when testing it in an in-vivo multi-client system. <br>

Regardless of the use-case, hopefully you have learned that **Arrow** especially in combination with **Arrow Flight** provides a highly performant framework for working with big data.