In [73]:
"""
packages: pandas, cassandra driver

"""
import sys, timeit
import pandas as pd
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.policies import RetryPolicy, TokenAwarePolicy, DCAwareRoundRobinPolicy
import uuid

"""
Each time a new connection is created and the server requires authentication, 
a new instance of this class will be created by the corresponding AuthProvider
to handler that authentication. 
"""
    
auth_provider = PlainTextAuthProvider(username='cassandra', 
                                      password='R7vCBaqL84yB')

"""
The list of contact points to try connecting for cluster discovery.
Defaults to loopback interface.

Note: When using DCAwareLoadBalancingPolicy with no explicit local_dc set (as is the default),
the DC is chosen from an arbitrary host in contact_points. In this case, contact_points should
contain only nodes from a single, local DC.

Note: In the next major version, if you specify contact points, you will also be required to 
also explicitly specify a load-balancing policy. This change will help prevent cases where 
users had hard-to-debug issues surrounding unintuitive default load-balancing policy behavior.

port = 9042
The server-side port to open connections to. Defaults to 9042.

If you install a cassandra 1 node in your local computer, then:

contact_points = ['127.0.0.1'] 

without authentication

"""

#contact_points = ['35.227.48.63', '35.227.96.7', '35.196.169.200']
contact_points = ['35.227.48.63']

"""
A load balancing policy will determine which node to run an insert or query. 
Since a client can read or write to any node, sometimes that can be inefficient. 
If a node receives a read or write owned on another node, it will coordinate that request
for the client. We can use a load balancing policy to control that action. 

The TokenAwarePolicy ensures that the request will go to the node or replica responsible for
the data indicated by the primary key. It is wrapped around DCAwareRoundRobinPolicy to 
make sure the requests stay in the local datacenter.  

"""

cluster = Cluster(
    contact_points=contact_points , auth_provider=auth_provider,
    load_balancing_policy= TokenAwarePolicy(DCAwareRoundRobinPolicy(local_dc='us-east1')),
    default_retry_policy = RetryPolicy()
    )


# connect to the cluster
session = cluster.connect()

In [74]:
"""
Creat a demo keyspace with 'SimpleStrategy' to keep same replication_factor
for all datacenters.

Replication_factor = 2

We can use 'Network Topology'that can use different replication factors for
different data center

KEYSPACE is the container for tables in Cassandra.

DURABILITY

Writes in Cassandra are durable. All writes to a replica node are recorded 
both in memory and in a commit log on disk before they are acknowledged as a success.
If a crash or server failure occurs before the memtables are flushed to disk, 
the commit log is replayed on restart to recover any lost writes. 
In addition to the local durability (data immediately written to disk), 
the replication of data on other nodes strengthens durability.

You can manage the local durability to suit your needs for consistency using
the commitlog_sync option in the cassandra.yaml file. S
et the option to either periodic or batch.
"""

KEYSPACE = 'demo'
session.execute('DROP KEYSPACE IF EXISTS %s' % KEYSPACE)
session.execute("""
        CREATE KEYSPACE IF NOT EXISTS %s
        WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' }
        AND durable_writes = true;
        """ % KEYSPACE)

<cassandra.cluster.ResultSet at 0x1118619b0>

In [75]:
from cqlengine import columns as c
from cqlengine.models import Model
from cqlengine import LeveledCompactionStrategy

# Define a model which is a Python class that represents a CQL table

"""
Create movies table with movie_id as primary key, 
release_year is integer, and title as text formats.
It returns a movide_id uuid
""" 
class movies(Model):
    movie_id = c.UUID(primary_key=True)
    release_year = c.Integer()
    title = c.Text()
    def __repr__(self):
        return '%s' % self.movie_id
    
"""
Create movies_by_actor table with actor as primary key, 
release_year is interger as clustering key with descending order,
movie_id is uuid also as clustering key with asceding order,
genres as text, rating as float, and title as text formats.
"""       
class movies_by_actor(Model):
    actor = c.Text(primary_key=True)
    release_year = c.Integer(primary_key=True, clustering_order="DESC")
    movie_id = c.UUID(primary_key=True, clustering_order="ASC")
    genres = c.Set(c.Text())
    rating = c.Float()
    title = c.Text()
    
"""
Create a movies_rating table to store rating score of each user_id for each movie
and a movies_rating table to incrementally count for number of rating for each movie
"""

class movies_rating(Model):
    user_id = c.UUID(primary_key = True)
    movie_id = c.UUID(primary_key = True)
    title = c.Text(primary_key=True, clustering_order='ASC')
    release_year = c.Integer(primary_key=True, clustering_order="DESC")
    rating_time = c.TimeUUID(primary_key=True, clustering_order='DESC')
    rating = c.Float()
    
class movies_rating_count(Model):
    movie_id = c.UUID(primary_key = True)
    title = c.Text(primary_key=True, clustering_order='ASC')
    release_year = c.Integer(primary_key=True, clustering_order="DESC")
    #rating = c.Float()
    rating_count = c.Counter()

In [76]:
"""
Connect management for cqlengine
"""

from cqlengine import connection
connection.setup(contact_points, KEYSPACE, auth_provider=auth_provider, 
                load_balancing_policy= TokenAwarePolicy(DCAwareRoundRobinPolicy(local_dc='us-east1')),
                default_retry_policy = RetryPolicy())

In [77]:
from cqlengine.management import drop_table, sync_table

# Drop the tables if exist for demo (tutorial) purpose

drop_table(movies)
drop_table(movies_by_actor)
drop_table(movies_rating)

# Sync models with cql tables so we can insert records 

sync_table(movies)
sync_table(movies_by_actor)
sync_table(movies_rating)

In [78]:
"""
connect to KEYSPACE in the cluster to create materialized view 
"""
session = cluster.connect(KEYSPACE)
session.execute('USE %s' % KEYSPACE)

<cassandra.cluster.ResultSet at 0x11167a0b8>

In [79]:
"""
In Cassandra, queries are optimized by primary key definition. 
Standard practice is to create the table for the query, 
and create a new table if a different query is needed. 
These additional tables had to be updated manually in the client application.
A materialized view automatically receives the updates from its source table.
"""
# create a materialized view (mv) named 'movies_mv'
# python driver doesnot support create materialized view using cqlengine
# so we have to use session.execute to create the query for it

m_mv = "movies_mv"
session.execute(
    """
    CREATE MATERIALIZED VIEW IF NOT EXISTS %s AS
    SELECT title, release_year, movie_id, genres, actor FROM movies_by_actor
    WHERE title IS NOT NULL AND release_year IS NOT NULL AND movie_id IS NOT NULL AND actor IS NOT NULL
    PRIMARY KEY ((title, release_year), actor, movie_id);
    """ 
    % m_mv
)

<cassandra.cluster.ResultSet at 0x11167a160>

In [80]:
# we're using a simple movie database for tutorial
df = pd.read_csv('moviedb_demo.csv')
df.head(5)

Unnamed: 0,Title,Release Year,Genre 1,Genre 2,Actor,IMDB Rating
0,The Martian,2015,science fiction,drama,Matt Damon,8.0
1,Good Will Hunting,1997,drama,indie,Matt Damon,8.3
2,Mad Max: Fury Road,2015,fantasy,drama,Charlize Theron,8.1
3,Bridge of Spies,2015,thriller,drama,Tom Hanks,7.6
4,La La Land,2016,romance,drama,Emma Stone,8.1


In [81]:
"""
The below code imports each row in the file moviedb_demo into the cassandra cluster,
it will insert records to 2 table ''movies'' and ''movies_by_actor'',
whenever a record is inserted into the movies_by_actor table,
it will automatically updated in its materialized view which is ''movies_mv'' table. 

"""
def main():
    for i in range(0,len(df)):
        try:
            getmovieID = movies.create(movie_id = uuid.uuid4(), 
                                        release_year = df.iloc[i,1], 
                                        title = df.iloc[i,0]
                                    )

            mvid = (str(getmovieID))
            movies_by_actor.create(actor = df.iloc[i,4],
                                   release_year = df.iloc[i,1], 
                                   movie_id = uuid.UUID(mvid), 
                                   genres = {df.iloc[i,2], df.iloc[i,3]}, 
                                   rating = df.iloc[i,5],
                                   title = df.iloc[i,0]
                                   )

        except Exception:
            pass
main()