In [None]:
import sys
from collections import namedtuple

import pandas as pd
import numpy as np
from pympler import asizeof

from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from cassandra.protocol import NumpyProtocolHandler, LazyProtocolHandler
from cassandra.query import tuple_factory
from cassandra.util import Date
import cassandra.cqltypes

In [None]:
cluster = Cluster()

In [None]:
def size_of_fmt(num, suffix='B'):
    for unit in ['','K','M','G','T','P','E','Z']:
        if abs(num) < 1024.0:
            return "%3.1f%s%s" % (num, unit, suffix)
        num /= 1024.0
    return "%.1f%s%s" % (num, 'Yi', suffix)

def size_of(obj):
    if isinstance(obj, pd.DataFrame):
        return size_of_fmt(obj.memory_usage(deep=True).sum())
    return size_of_fmt(asizeof.asizeof(obj))

In [None]:
size_of('HELLO'), size_of('HELLO HELLO')

# Table schema

```cassandraql
CREATE TABLE fastsandra.time_series ( 
  event_date         date,       # Partition key
  instrument_id      int,
  event_timestamp    timestamp,
  value              double,
  PRIMARY KEY (event_date, instrument_id, event_timestamp)
);

```

# Default Protocol Handler

In [None]:
with cluster.connect('fastsandra') as session:
    results = session.execute(
        "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
    )
    rows = [r for r in results]
len(rows)

In [None]:
row = rows[0]
row

In [None]:
row.event_date, row.instrument_id, row.event_timestamp, row.value

In [None]:
type(row)

In [None]:
isinstance(row, tuple)

In [None]:
size_of(rows)

In [None]:
size_of(row)

In [None]:
pd.DataFrame([
    {'column': name, 'type': type(value), 'value': value, 'size': size_of(value)}
    for name, value in row._asdict().items()
]).set_index('column')

In [None]:
df = pd.DataFrame(rows)
df.head()

In [None]:
size_of(df)

In [None]:
(
    df
    .dtypes
    .rename('pandas')
    .to_frame()
    .join(
        pd.Series(dict(zip(results.column_names, results.column_types)))
        .apply(lambda x: x.typename)
        .rename('cassandra')
    )
)[['cassandra', 'pandas']]

In [None]:
type(df['event_date'][0]), df['event_date'][0]

In [None]:
df_clean = df.assign(
    event_date=lambda x : x['event_date'].apply(Date.date).astype('datetime64[ns]'),
    instrument_id=lambda x : x['instrument_id'].astype('int32') 
)
df_clean.dtypes

In [None]:
size_of(df_clean)

In [None]:
size_of_fmt(len(df_clean) * (8 + 4 + 8 + 8))

# Using `NumpyProtocolHandler`


## Installation


```bash
# First make sure you have the correct library installed on your system:
sudo apt install python3.7-dev libev4 libev-dev build-essential
# Activate your virtual environment and first install Cython and numpy
pip install Cython==0.29.14
pip install numpy==0.17.2
# Then install the cassandra-driver (this should take a few minutes)
pip install cassandra-driver
# Check that it worked:
python -c 'from cassandra.protocol import NumpyProtocolHandler;print(NumpyProtocolHandler)'
# Should print:
# <class 'cassandra.protocol.cython_protocol_handler.<locals>.CythonProtocolHandler'>
# If it doesn't print anything, it didn't work
```
If it doesn't work:
```bash
pip -v --no-cache-dir install cassandra-driver
```

## Usage

In [None]:
with cluster.connect('fastsandra') as session:
    ########## PREPARE SESSION FOR NUMPY HANDLER ###########
    session.row_factory = tuple_factory                    #
    session.client_protocol_handler = NumpyProtocolHandler #
    ########################################################
    results = session.execute(
        "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
    )
    np_batches = [b for b in results]
len(np_batches)

In [None]:
np_batch = np_batches[0]
type(np_batch), np_batch.keys()

In [None]:
np_batch['instrument_id']

In [None]:
np_batch['value'], np_batch['value'].dtype

In [None]:
np_batch['event_date'] 

In [None]:
np_batch['event_timestamp']

In [None]:
pd.DataFrame([{'column': k, 'numpy type': v.dtype, 'python type': type(v[0])} for k,v in np_batch.items()]).set_index('column')

In [None]:
np_df = pd.concat([pd.DataFrame(b) for b in np_batches], ignore_index=True)
np_df.head()

In [None]:
np_df.dtypes

In [None]:
size_of(np_df)

In [None]:
np_df_clean = np_df.assign(
    event_date=lambda x: x['event_date'].apply(Date.date).astype('datetime64[D]')
)

In [None]:
np_df_clean.dtypes

In [None]:
size_of(np_df_clean)

In [None]:
np_batch['event_date'].dtype, np_batch['event_timestamp'].dtype

# Tuning `NumpyProtocolHandler`

## Tuning Timestamps

In [None]:
import cassandra.numpy_parser as numpy_parser

In [None]:
numpy_parser._cqltype_to_numpy

In [None]:
cqltype_to_numpy_copy = numpy_parser._cqltype_to_numpy.copy()

In [None]:
numpy_parser._cqltype_to_numpy.update({
    cassandra.cqltypes.DateType: np.dtype('datetime64[ms]'),
    cassandra.cqltypes.TimestampType: np.dtype('datetime64[ms]'),
})
numpy_parser._cqltype_to_numpy

In [None]:
with cluster.connect('fastsandra') as session:
    session.row_factory = tuple_factory
    session.client_protocol_handler = NumpyProtocolHandler
    results = session.execute(
        "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
    )
    fnp_batches = [b for b in results]

In [None]:
fnp_batch = fnp_batches[0]
fnp_batch['event_timestamp'].dtype

In [None]:
np_batch['event_timestamp'].dtype

In [None]:
fnp_df = pd.concat((pd.DataFrame(b) for b in fnp_batches), ignore_index=True)

In [None]:
fnp_df.dtypes

In [None]:
size_of(fnp_df)

In [None]:
fnp_batch['event_date'].dtype, fnp_batch['event_date'][0]

## Tuning date

According to the cassandra driver documentation:
```python
# Values of the 'date'` type are encoded as 32-bit unsigned integers
# representing a number of days with epoch (January 1st, 1970) at the center of the
# range (2^31).

```

In [None]:
numpy_parser._cqltype_to_numpy.update({
  cassandra.cqltypes.SimpleDateType: np.dtype('>u4'),
})

In [None]:
with cluster.connect('fastsandra') as session:
    session.row_factory = tuple_factory
    session.client_protocol_handler = NumpyProtocolHandler
    results = session.execute(
        "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
    )
    fnp_batches2 = [b for b in results]

In [None]:
fnp_batch2 = fnp_batches2[0]
fnp_batch2['event_date']

In [None]:
(
    fnp_batch2['event_date'] - cassandra.cqltypes.SimpleDateType.EPOCH_OFFSET_DAYS
).astype('datetime64[D]')                           

In [None]:
def result_set_to_df(results: cassandra.cluster.ResultSet) -> pd.DataFrame:
    df = pd.DataFrame(pd.concat((pd.DataFrame(r) for r in  results), ignore_index=True))
    for name, dtype in zip(results.column_names, results.column_types):
        if dtype == cassandra.cqltypes.SimpleDateType:
            df[name] = (df[name] - cassandra.cqltypes.SimpleDateType.EPOCH_OFFSET_DAYS).astype('datetime64[D]')
    return df

In [None]:
with cluster.connect('fastsandra') as session:
    session.row_factory = tuple_factory  #required for Numpy results
    session.client_protocol_handler = NumpyProtocolHandler  # for a dict of NumPy arrays as result

    results = session.execute(
        "SELECT * FROM fastsandra.time_series WHERE event_date = '2019-10-01'"
    )
    fnp_df2 = result_set_to_df(results)
size_of(fnp_df2)

In [None]:
fnp_df2.dtypes

# Benchmark

In [None]:
def test_default():
    with cluster.connect('fastsandra') as session:
        results = session.execute(
            "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
        )
        return (
            pd.DataFrame((r for r in results))
            .assign(
                event_date=lambda x: x['event_date'].apply(Date.date).astype('datetime64[D]'),
                instrument_id=lambda x: x['instrument_id'].astype('int32') 
        ))
test_default().dtypes

In [None]:
def test_with_numpy_handler():
    numpy_parser._cqltype_to_numpy = cqltype_to_numpy_copy.copy()
    with cluster.connect('fastsandra') as session:
        session.row_factory = tuple_factory  #required for Numpy results
        session.client_protocol_handler = NumpyProtocolHandler  # for a dict of NumPy arrays as result

        results = session.execute(
            "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
        )
        return (
            pd.concat((pd.DataFrame(r) for r in results), ignore_index=True)
            .assign(event_date=lambda x: x['event_date'].apply(Date.date).astype('datetime64[D]'))
        )
    
test_with_numpy_handler().dtypes

In [None]:
def test_with_patched_numpy_handler():
    numpy_parser._cqltype_to_numpy.update({
        cassandra.cqltypes.DateType: np.dtype('datetime64[ms]'),
        cassandra.cqltypes.TimestampType: np.dtype('datetime64[ms]'),
        cassandra.cqltypes.SimpleDateType: np.dtype('>u4'),
    })
    with cluster.connect('fastsandra') as session:
        session.row_factory = tuple_factory  #required for Numpy results
        session.client_protocol_handler = NumpyProtocolHandler  # for a dict of NumPy arrays as result

        results = session.execute(
            "SELECT * from fastsandra.time_series where event_date = '2019-10-01'"
        )
        return result_set_to_df(results)
    
test_with_patched_numpy_handler().dtypes

In [None]:
%timeit test_default()

In [None]:
%timeit test_with_numpy_handler()

In [None]:
%timeit test_with_patched_numpy_handler()

In [None]:
%timeit df.assign(event_date=lambda x: x['event_date'].apply(Date.date).astype('datetime64[D]'))