# Put Your Cassandra Python Driver On Steroids With Apache Arrow


# Topic of the day

```
+----------------------------+  IO   +---------------+
| Batch of records as binary | ----> | Analytics Job |
+----------------------------+       +---------------+
```

In [None]:
%cd ..

In [None]:
import os

import cassandra.cluster
import cassandra.protocol
import humanize
import pandas as pd
import pyarrow as pa
import pyarrow.compute as pc
from pympler.asizeof import asizeof

import cassarrow
import cassarrow.impl

# Python Cassandra Driver: Tuples

In [None]:
MIGRATION = """
CREATE TABLE IF NOT EXISTS cassarrow.time_series
(
    event_date      date,
    instrument_id   int,
    event_timestamp timestamp,
    value           double,
    PRIMARY KEY (event_date, instrument_id, event_timestamp)
);
"""

QUERY = "SELECT * FROM cassarrow.time_series WHERE event_date = '2019-10-02'"

```
| event_date   |   instrument_id | event_timestamp     |      value |
|:-------------|----------------:|:--------------------|-----------:|
| 2019-10-02   |               1 | 2019-10-02 08:00:00 | 0.127755   |
| 2019-10-02   |               1 | 2019-10-02 08:15:00 | 0.256553   |
| 2019-10-02   |               1 | 2019-10-02 08:30:00 | 0.820371   |
| 2019-10-02   |               1 | 2019-10-02 08:45:00 | 0.711016   |
| 2019-10-02   |               1 | 2019-10-02 09:00:00 | 0.00108124 |
```

In [None]:
cluster = cassandra.cluster.Cluster()
session = cluster.connect("cassarrow")

In [None]:
results = list(session.execute(QUERY))

In [None]:
len(results)

In [None]:
results[0]

In [None]:
isinstance(results[0], tuple)

In [None]:
sum(row.value for row in results) / len(results)

In [None]:
%timeit sum(row.value for row in results) / len(results)

In [None]:
actual_size = asizeof(results)
humanize.naturalsize(actual_size)

```
| column          | type      |   bits |   bytes |
|:----------------|:----------|-------:|--------:|
| event_date      | date      |     32 |       4 |
| instrument_id   | int       |     32 |       4 |
| event_timestamp | timestamp |     64 |       8 |
| value           | double    |     64 |       8 |
| total           | -         |    192 |      24 |
```

In [None]:
expected_size = len(results) * 24
humanize.naturalsize(expected_size, True)

In [None]:
actual_size / expected_size

In [None]:
type(results[0].event_date)

In [None]:
try:
    results[0].event_date.strftime("%Y-%m-%d")
except AttributeError as e:
    print(repr(e))

## In Summary:

* ❌ Not a great API for analytics
* ❌ Slow
* ❌ Uses a lot of memory
* ❌ Unusual types

# The solution: `cassarrow`, convert raw data directly to Apache Arrow

```
+-------------------+  Network   +----------+  Python Driver   +--------+
| Cassandra Cluster | ---------> | Raw Data | ---------------> | Tuples |
+-------------------+            +----------+                  +--------+


+-------------------+  Network   +----------+  C++ Code   +-------------+
| Cassandra Cluster | ---------> | Raw Data | ----------> | Arrow Table |
+-------------------+            +----------+             +-------------+


```

![Row to Column](https://arrow.apache.org/img/simd.png)

# Step 1: Convert the cassandra metadata to an Arrow `Schema`


## Native types


| Cassandra   | pyarrow              | Note         |
|:------------|:---------------------|:-------------|
| ascii       | `pa.string()`        |              |
| bigint      | `pa.int64()`         |              |
| blob        | `pa.binary()`        |              |
| boolean     | `pa.bool_()`         |              |
| date        | `pa.date32()`        |              |
| decimal     |                      | Incompatible |
| double      | `pa.float64()`       |              |
| duration    | `pa.duration("ns")`  |              |
| float       | `pa.float32()`       |              |
| int         | `pa.int32()`         |              |
| smallint    | `pa.int16()`         |              |
| text        | `pa.string()`        |              |
| time        | `pa.time64("ns")`    |              |
| timestamp   | `pa.timestamp("ms")` |              |
| timeuuid    | `pa.binary(16)`      |              |
| tinyint     | `pa.int8()`          |              |
| uuid        | `pa.binary(16)`      |              |
| varchar     | `pa.string()`        |              |
| varint      |                      | Incompatible |


## Collections / UDT

| Cassandra   | pyarrow     | Note   |
|:------------|:------------|:-------|
| list        | `pa.list_`  |        |
| map         | `pa.map_`   |        |
| set         | `pa.list_`  |        |
| udt         | `pa.struct` |        |


# Step 2: Converting the data

```
+------------------+     +----------------+           +--------------+     +-------+
|  Date32Builder   | --> |  Date32Array   | ------+-> | Record Batch | --> | Table |
+------------------+     +----------------+       |   +--------------+     +-------+
                                                  |
                                                  |
                                                  |
+------------------+     +----------------+       |
| TimestampBuilder | --> | TimestampArray | ------+
+------------------+     +----------------+       |
                                                  |
                                                  |
                                                  |
+------------------+     +----------------+       |
|  DoubleBuilder   | --> |  DoubleArray   | ------+
+------------------+     +----------------+       |
                                                  |
                                                  |
                                                  |
+------------------+     +----------------+       |
|   Int32Builder   | --> |   Int32Array   | ------+
+------------------+     +----------------+
```

In [None]:
with cassarrow.install_cassarrow(session) as cassarrow_session:
    table = cassarrow.result_set_to_table(cassarrow_session.execute(QUERY))

In [None]:
table[:5]

In [None]:
pc.mean(table["value"])

In [None]:
%timeit pc.mean(table['value']).as_py()

In [None]:
humanize.naturalsize(table.nbytes)

In [None]:
table.schema

In [None]:
table.to_pandas().head()

## Arrow summary

* ✅ Great API for analytics
* ✅ Fast data manipulation
* ✅ Memory efficient
* ✅ Fast convertion
* ✅ Schema available
* ✅ No special/proprietary types

# Benchmark

In [None]:
def execute_default(session, query):
    results = session.execute(query)
    return list(results)

In [None]:
def execute_cassarrow(session, query):
    with cassarrow.install_cassarrow(session) as cassarrow_session:
        results = cassarrow_session.execute(query)
    return cassarrow.result_set_to_table(results)

In [None]:
%timeit execute_default(session, QUERY)

In [None]:
%timeit execute_cassarrow(session, QUERY)

In [None]:
def get_binary(name: str) -> bytes:
    full_name = os.path.join("tests", "select", name)
    with open(full_name, "rb") as fp:
        return fp.read()


data = get_binary("time_series/0011.bin")

In [None]:
def parse_default(data: bytes) -> list[tuple]:
    msg_arrow = cassandra.protocol._ProtocolHandler.decode_message(
        5, {}, 3, 0, 8, data, None, []
    )
    return msg_arrow.parsed_rows

In [None]:
def parse_cassarrow(data) -> pa.RecordBatch:
    msg_arrow = cassarrow.impl.ArrowProtocolHandler.decode_message(
        5, {}, 3, 0, 8, data, None, []
    )
    return msg_arrow.parsed_rows

In [None]:
%timeit parse_default(data)

In [None]:
%timeit parse_cassarrow(data)

# Conclusion

* Check the code on https://github.com/0x26res/cassarrow
* Install it: `pip install cassarrow`
* Apply the same method to a similar problem!
