In [1]:
import cassandra
from cassandra.cluster import Cluster, ConnectionException

In [2]:
nodes = ['127.0.0.1'] # list of cluster devices

In [3]:
cluster = Cluster(nodes)

In [4]:
keyspace = 'employees'

In [5]:
def getClusterSession(cluster):
    try:        
        session = cluster.connect()
        
        print('Connection established successfully.')
        return session
        
    except(ConnectionException, Exception) as exception :
        print ("Error while connecting to Cassandra Cluster\n", exception)
        

def closeSession(session):
    #closing database connection.
    if(session):
        session.shutdown()
        print("Cassandra connection is closed")

In [6]:
# test connectivity to cluster

# session = getClusterSession(cluster)
# closeSession(session)

In [7]:
def executeCommand(cluster, keyspace='', command='', data = ()):
    
    """
        a multipurpose helper method to execute command on cassandra db
    """
    
    session = getClusterSession(cluster)
    
    if keyspace != '':
        session.set_keyspace(keyspace)
        
    try:
        if data:
            result = session.execute(command, data)
        else:
            result = session.execute(command)
            
        print('Command executed successfully.' + command)
        
    except(Exception) as error :
        print ("Error while executing command.." + command[:15], error)
    finally:
        closeSession(session)
    return result

In [8]:
command = '''CREATE KEYSPACE IF NOT EXISTS ''' + keyspace + ''' 
             WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor' : 1}
            '''
res = executeCommand(cluster, command=command)
print(res)

Connection established successfully.
Command executed successfully.CREATE KEYSPACE IF NOT EXISTS employees 
             WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor' : 1}
            
Cassandra connection is closed
<cassandra.cluster.ResultSet object at 0x0000027B3D8803C8>


Perfect, so let's say business need to access data on following two criterias: -

* Fetch all the employees given an joining year
* Fetch all the employees that report to given manager

In Apache Cassadra, we will explicitly have to model both queries.

In [9]:
cmd = '''
        CREATE TABLE IF NOT EXISTS EmployeeJoinYear(
            EmployeeID INT,
            EmployeeName TEXT,
            ManagerID INT,
            YearJoined INT,
            PRIMARY KEY(YearJoined, EmployeeID)
            );'''

executeCommand(cluster, keyspace, command = cmd)

Connection established successfully.
Command executed successfully.
        CREATE TABLE IF NOT EXISTS EmployeeJoinYear(
            EmployeeID INT,
            EmployeeName TEXT,
            ManagerID INT,
            YearJoined INT,
            PRIMARY KEY(YearJoined, EmployeeID)
            );
Cassandra connection is closed


<cassandra.cluster.ResultSet at 0x27b3d87d940>

In [10]:
cmd = 'INSERT INTO EmployeeJoinYear (EmployeeID, EmployeeName, YearJoined, ManagerID) VALUES (%s, %s, %s, %s)'

records = [(1, 'A', 2013, None), (2, 'B', 2013, None), (3, 'C', 2014, None),(4, 'D', 2014, None), (5, 'E', 2015, None)]

for record in records:
    executeCommand(cluster, keyspace, command=cmd, data=record)
    print('-'*35)

Connection established successfully.
Command executed successfully.INSERT INTO EmployeeJoinYear (EmployeeID, EmployeeName, YearJoined, ManagerID) VALUES (%s, %s, %s, %s)
Cassandra connection is closed
-----------------------------------
Connection established successfully.
Command executed successfully.INSERT INTO EmployeeJoinYear (EmployeeID, EmployeeName, YearJoined, ManagerID) VALUES (%s, %s, %s, %s)
Cassandra connection is closed
-----------------------------------
Connection established successfully.
Command executed successfully.INSERT INTO EmployeeJoinYear (EmployeeID, EmployeeName, YearJoined, ManagerID) VALUES (%s, %s, %s, %s)
Cassandra connection is closed
-----------------------------------
Connection established successfully.
Command executed successfully.INSERT INTO EmployeeJoinYear (EmployeeID, EmployeeName, YearJoined, ManagerID) VALUES (%s, %s, %s, %s)
Cassandra connection is closed
-----------------------------------
Connection established successfully.
Command execute

In [None]:
# insertEmployee(cluster, 'employees', (1, 'Ali', 2013, None))
# insertEmployee(cluster, 'employees', (2, 'Khan', 2009, None))

In [12]:
cmd = 'SELECT * FROM EmployeeJoinYear WHERE YearJoined = %s;'

year = (2013,)

res = executeCommand(cluster, keyspace, command=cmd, data=year)

for row in res:
    print(row)

Connection established successfully.
Command executed successfully.SELECT * FROM EmployeeJoinYear WHERE YearJoined = %s;
Cassandra connection is closed
Row(yearjoined=2013, employeeid=1, employeename='A', managerid=None)
Row(yearjoined=2013, employeeid=2, employeename='B', managerid=None)


In [13]:
cmd = '''DROP KEYSPACE employees'''

executeCommand(cluster, command = cmd)

Connection established successfully.
Command executed successfully.DROP KEYSPACE employees
Cassandra connection is closed


<cassandra.cluster.ResultSet at 0x27b3e7def28>

In [14]:
cluster.shutdown()