# 3. Cassandra to Arrow

We use some code from the Cassandra server to read the SSTable, but instead of de/serializing to/from CQL, we use an [Arrow IPC stream](http://arrow.apache.org/), which is stored in a columnar format and better suited for analytics.

Data transformations:

1. SSTable on disk
2. Deserialized into Java Object in C* server
3. Client makes request to server (not to C* DB)
4. Data serialized via Arrow IPC stream
5. Sent across network
6. Arrow IPC stream received by client
7. Transformed into Arrow Table / cuDF

**Pros:**
- doesn't make request to the main Cassandra DB, which lessens the load and allows for other operations to run
- less de/serialization involved using the Arrow IPC stream

**Cons:**
- don't want to have to start Cassandra or use the JVM
- complex architecture

In [15]:
import pyarrow as pa
import pandas as pd
import socket

HOST = '127.0.0.1'
PORT = 9143

In [32]:
def read_bytes(sock, n):
    data = b''
    while len(data) < n:
        more = sock.recv(n - len(data))
        if not more:
            raise EOFError("Socket connection ended before reading specified number of bytes")
        data += more
    return data

def read_u8(sock):
    data = read_bytes(sock, 8)
    return int.from_bytes(data, byteorder='big')

# read data from socket
def fetch_data():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.connect((HOST, PORT))
        sock.sendall(b'hello world\n')
        num_tables = read_u8(sock)
        table_buffers = []
        for i in range(num_tables):
            print('receiving table', i)
            table_size = read_u8(sock)
            buf = read_bytes(sock, table_size)
            table_buffers.append(buf)
    return table_buffers

In [33]:
buffers = fetch_data()
for i, buf in enumerate(buffers):
    if i > 0:
        print()
    print('parsing table', i)
    reader = pa.ipc.open_stream(buf)
    arrow_table = reader.read_all()
    print(arrow_table.to_pandas()) # for visualization

receiving table 0
receiving table 1
parsing table 0
  partition key       liveness_info_tstamp  item_count   last_update_timestamp
0          uiop 2021-07-19 17:11:39.401960          32 2021-07-19 17:11:39.403
1          qwer 2021-07-19 17:11:05.241304         246 2021-07-19 17:11:05.243
2          abcd 2021-07-19 17:10:50.190249          12 2021-07-19 17:10:50.192
3          1234 2021-07-19 17:10:39.050207           5 2021-07-19 17:10:39.052
4          9876 2021-07-19 17:10:38.041705           2 2021-07-19 17:10:38.047
parsing table 1
Empty DataFrame
Columns: [partition key, liveness_info_tstamp]
Index: []


In [5]:
!ls ../cpp/build

CMakeCache.txt          deletion_time.h         sstable_statistics.cpp
[1m[36mCMakeFiles[m[m              results.json            sstable_statistics.h
CPackConfig.cmake       rules.ninja             sstable_summary.cpp
CPackSourceConfig.cmake sstable_data.cpp        sstable_summary.h
build.ninja             sstable_data.h          [31msstable_to_arrow[m[m
cmake_install.cmake     sstable_index.cpp       table.parquet
deletion_time.cpp       sstable_index.h


In [6]:
parquet_table = pd.read_parquet("../cpp/build/table.parquet")

In [7]:
parquet_table

Unnamed: 0,_timestamp,partition key,clustering key,data,sensor_value,station_id
0,1924-04-12,{'org.apache.cassandra.db.marshal.UUIDType': b...,1970-01-01,"vulputate. Vestibulum at imperdiet metus, et ...",97.651955,b'(\xdfc\xb7\xccWC\xcb\x97R\xfa\xe6\x9d\x16S\xda'
