In [1]:
import cassandra
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement, BatchStatement

from joblib import Parallel, delayed                                                                                                                                

from tqdm import tqdm

import warnings
warnings.filterwarnings('ignore')

In [2]:
index=False

In [3]:
"""
sudo docker run --name cassandra -p 127.0.0.1:9042:9042 -e CASSANDRA_CLUSTER_NAME=GangaTest -e CASSANDRA_ENDPOINT_SNITCH=GossipingPropertyFileSnitch -e CASSANDRA_DC=datacenter1 -d cassandra
"""
def cassandra_connection():
    """
    Connection object for Cassandra
    :return: session, cluster
    """
    cluster = Cluster(['127.0.0.1'], port=9042)
    session = cluster.connect()
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS fakehealthcareorg
        WITH REPLICATION =
        { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
        """)
    session.set_keyspace('fakehealthcareorg')
    return session, cluster


In [4]:
session, cluster = cassandra_connection()

# Creating the keyspace

In [5]:
session.execute("""create keyspace IF NOT EXISTS dev
... with replication = {'class':'SimpleStrategy','replication_factor':1};""")

<cassandra.cluster.ResultSet at 0x7fb6a9ac2050>

In [6]:
session.execute("""use dev;""")
session.execute("""DROP TABLE IF EXISTS JOB""")
session.execute("""DROP TABLE IF EXISTS BLOB""")

<cassandra.cluster.ResultSet at 0x7fb6a8a41890>

In [7]:
try:
    session.execute("""CREATE TABLE IF NOT EXISTS JOB(
        id INT PRIMARY KEY, status varchar, name varchar, subjobs INT, application varchar,
        backend varchar, backend_actualCE varchar, comment varchar
    )""")
except Exception as e:
    print(e)

# Inserting the data:
1. 10K
2. 100K
3. 1000K

In [8]:
# 1. 10K
import pickle
jobs = pickle.load(open("rows.pkl", "rb"))
blobs = pickle.load(open("blobs.pkl", "rb"))

In [11]:
blobs[0].keys()

dict_keys(['jid', 'inputsandbox', 'outputsandbox', 'info', 'comment', 'time', 'application', 'backend', 'inputfiles', 'outputfiles', 'non_copyable_outputfiles', 'id', 'status', 'name', 'inputdir', 'outputdir', 'inputdata', 'outputdata', 'splitter', 'subjobs', 'master', 'postprocessors', 'virtualization', 'merger', 'do_auto_resubmit', 'metadata', 'been_queued', 'parallel_submit'])

In [12]:
query = session.prepare("""
    INSERT INTO JOB(id, status, name, subjobs, application, backend, backend_actualCE, comment)
    VALUES          (?,      ?,    ?,       ?,           ?,       ?,                ?,       ?)
    """)

if index:
    session.execute("""CREATE INDEX name_ind ON devices (name) USING CONSISTENCY ALL;""")


In [85]:
%%time
size = 100000
_jobs = jobs[:size]
_blobs = blobs[:size]
with tqdm(total=size) as progress:
    for i, (row, blob) in enumerate(zip(_jobs, _blobs)):
        row[0] = i+1
        blob['jid'] = i+1
        try:
            session.execute(query.bind(row))        
        except Exception as e:
            print(e)
        progress.update(1)

100%|██████████| 100000/100000 [00:52<00:00, 1906.84it/s]

CPU times: user 27.3 s, sys: 5.48 s, total: 32.8 s
Wall time: 52.4 s





In [14]:
from joblib import Parallel, delayed
rows = [[i]+row[1:] for i, row in enumerate(_jobs)]
def insert(data):
    flag = session.execute(
        query.bind(data)
    )
    return flag

NameError: name '_jobs' is not defined

In [15]:
size = 100000
_blobs = pickle.load(open("../../data/blobs.pkl", "rb"))[:size]
blobs = []
for i, blob in enumerate(_blobs):
    blob["jid"] = i+1
    blobs.append(blob)
    
size = 100000
_jobs = jobs[:size]
_blobs = blobs[:size]

In [28]:
rows = [[i]+row[1:] for i, row in enumerate(_jobs)]
start = 0
batch_size = 250
with tqdm(total=int(len(rows)/batch_size)) as progress:
    for end in range(batch_size, len(rows)+1, batch_size):
#         batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
#         for row in rows[start:end]:
#             batch.add(query, row)

#         session.execute(batch)    
        progress.set_description(f"{start}-{end}")
        progress.update(1)
        start = end

99750-100000: 100%|██████████| 400/400 [00:00<00:00, 1798.11it/s]


In [22]:
int(len(rows)/250)

400

In [180]:
%%time
future = session.execute_async("SELECT * FROM JOB")
rows = future.result()

CPU times: user 11.3 ms, sys: 471 µs, total: 11.7 ms
Wall time: 80.2 ms


In [181]:
%%time
len(list(rows))

CPU times: user 482 ms, sys: 22.7 ms, total: 504 ms
Wall time: 1.37 s


100000

In [182]:
%%time
rows = session.execute("SELECT * FROM JOB")

CPU times: user 7.22 ms, sys: 6.82 ms, total: 14 ms
Wall time: 68.4 ms


In [183]:
%%time
len(list(rows))

CPU times: user 228 ms, sys: 16.6 ms, total: 245 ms
Wall time: 1.17 s


100000