In [8]:
from cassandra.cluster import Cluster

cluster = Cluster(["00.00.000.000"], port=00000)
session = cluster.connect()

In [14]:
# Execution Profiles

from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import WhiteListRoundRobinPolicy, DowngradingConsistencyRetryPolicy
from cassandra.query import tuple_factory
from cassandra.query import ConsistencyLevel

profile = ExecutionProfile(
    load_balancing_policy=WhiteListRoundRobinPolicy(["00.00.000.000"]),
    retry_policy=DowngradingConsistencyRetryPolicy(),
    consistency_level=ConsistencyLevel.LOCAL_QUORUM,
    serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
    request_timeout=15,
    row_factory=tuple_factory
)

cluster = Cluster(["61.80.148.154"], port=30003, execution_profiles={EXEC_PROFILE_DEFAULT : profile})
session = cluster.connect()

In [19]:
# Prepared Statements
# Queries that are parsed by Cassandra and then saved for later use. When the driver uses a prepared statement,
# it only needs to send the values of parameters to bind. This lowers network traffic and CPU utilization
# within Cassandra because Cassandra does not have to re-parse the query each time.

user_lookup_stmt = session.prepare("SELECT * FROM users WHERE user_id=?")

users = []
for user_id in user_ids_to_query:
    user = session.execute(user_lookup_stmt, [user_id])
    users.append(user)

InvalidRequest: Error from server: code=2200 [Invalid query] message="No keyspace has been specified. USE a keyspace, or explicitly specify keyspace.tablename"

In [4]:
import pickle

with open("user_similarity.pkl", "rb") as f:
    data = pickle.load(f)

In [24]:
# Execution Profiles

from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import WhiteListRoundRobinPolicy, DowngradingConsistencyRetryPolicy
from cassandra.query import tuple_factory
from cassandra.query import ConsistencyLevel

profile = ExecutionProfile(
    load_balancing_policy=WhiteListRoundRobinPolicy(["00.00.000.000"]),
    retry_policy=DowngradingConsistencyRetryPolicy(),
    consistency_level=ConsistencyLevel.LOCAL_QUORUM,
    serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL,
    request_timeout=15,
    row_factory=tuple_factory
)

cluster = Cluster(["00.00.000.000"], port=00000, execution_profiles={EXEC_PROFILE_DEFAULT : profile})
session = cluster.connect()

In [14]:
# INSERT image similarity data into cassandra
for user in data.keys():
    user_similarity_stmt = session.prepare(
    f"INSERT INTO user_profile_test.similarity_test (user_id, similarity, target_id) VALUES ({int(user)}, ?, ?)"
    )
    for user_similarity, target_id in data[user]:
        session.execute(user_similarity_stmt, [user_similarity, int(target_id)])

In [28]:
# Create user_rating data 
import random
from collections import defaultdict

user_rating = defaultdict(list)

for i in range(600):
    user_id = str(random.randrange(0, 1001))
    while user_id in user_rating:
        user_id = str(random.randrange(0, 1001))

    for i in range(random.randrange(1, 20)):
        target_id = str(random.randrange(0, 1001))
        while target_id in user_rating[user_id]:
            target_id = str(random.randrange(0, 1001))
        
        random_rating = random.randrange(1, 11)
        user_rating[user_id].append([target_id, random_rating])

import pickle
with open("user_rating.pkl", "wb") as f:
    pickle.dump(user_rating, f)

In [40]:
# INSERT user_rating datat into cassandra
for user_id in user_rating.keys():
    user_rating_stmt = session.prepare(
        f"INSERT INTO user_profile_test.rating_test (user_id, rating, target_id) VALUES ({int(user_id)}, ?, ?)"
    )
    for target_id, rating in user_rating[user_id]:
        session.execute(user_rating_stmt, [-rating, int(target_id)])

In [46]:
# Load user_rating data from cassandra
rating_query = session.prepare("SELECT * FROM user_profile_test.rating_test WHERE user_id=?")

user_id = 16
user_favored_list = session.execute(rating_query, [int(user_id)])
for user_id, rating, target_id in user_favored_list:
    print(user_id, rating, target_id)

16 -10.0 37
16 -9.0 423
16 -8.0 100
16 -6.0 515
16 -5.0 578
16 -4.0 634
16 -3.0 757
16 -1.0 883


In [52]:
rating_query = session.prepare("SELECT * FROM user_profile_test.rating_test WHERE user_id=?")

user_id = 16
user_favored_list = [target_id for _, _, target_id in session.execute(rating_query, [int(user_id)])]

similarity_query = "SELECT * FROM user_profile_test.similarity_test WHERE user_id=%s"
similarity_query_futures = [session.execute_async(similarity_query, [target_id]) for target_id in user_favored_list[:min(len(user_favored_list), 5)]]

# wait for them to complete and use the results
for future in similarity_query_futures:
    rows = future.result()
    for row in rows:
        print(row)
    print("@@@@@@@@@@")

(37, 3.3306690738754696e-16, 37)
(37, 0.24209631979465485, 810)
(37, 0.25164565443992615, 715)
(37, 0.27493324875831604, 504)
(37, 0.2955370545387268, 894)
(37, 0.3080121576786041, 619)
(37, 0.3159542679786682, 32)
(37, 0.32923978567123413, 790)
(37, 0.3390137851238251, 241)
(37, 0.3401041626930237, 364)
(37, 0.3521246612071991, 157)
(37, 0.3607839047908783, 203)
(37, 0.3716958463191986, 830)
(37, 0.3772693872451782, 862)
(37, 0.3805544376373291, 785)
(37, 0.39461785554885864, 292)
(37, 0.39556601643562317, 794)
(37, 0.41371452808380127, 701)
(37, 0.4161909222602844, 23)
(37, 0.4184437096118927, 75)
(37, 0.42557260394096375, 712)
(37, 0.42821624875068665, 533)
(37, 0.42921119928359985, 975)
(37, 0.4299812316894531, 485)
(37, 0.4346989691257477, 463)
(37, 0.4360242187976837, 740)
(37, 0.4389529824256897, 778)
(37, 0.44023647904396057, 927)
(37, 0.4471352994441986, 626)
(37, 0.45344099402427673, 994)
(37, 0.4589538276195526, 223)
(37, 0.46092408895492554, 799)
(37, 0.4624967873096466, 12

In [None]:
# Non-prepared statements
session.execute(
    """
    INSERT INTO users (name, credits, user_id)
    VALUES (%s, %s, %s)
    """,
    ("John O'Reilly", 42, uuid.uuid1()),
)

session.execute(
    """
    INSERT INTO users (name, credits, user_id, username)
    VALUES (%(name)s, %(credits)s, %(user_id)s, %(name)s)
    """,
    {'name': "John O'Reilly", 'credits': 42, 'user_id': uuid.uuid1()}
)

In [None]:
# Execute_async()
# Instead of waiting for the query to complete and returning rows directly, this method almost immediately
# returns a ResponseFuture object. There are two ways of getting the final result from this object.

# The first, by calling result()
from cassandra import ReadTimeout

query = "SELECT * FROM users WHERE user_id=%s"
future = session.execute_async(query, [user_id])

# ... do some other work
try:
    rows = future.result()
    user = rows[0]
    print(user.name, user.age)
except ReadTimeout:
    log.exception("Query timed out")

In [None]:
# build a list of futures
futures = []
query = "SELECT * FROM users WHERE user_id=%s"
for user_id in ids_to_fetch:
    futures.append(session.execute_async(query, [user_id]))

# wait for them to complete and use the results
for future in futures:
    rows = future.result()
    print(rows[0].name)

In [None]:
# Instead of calling result(), you can attach callback and errback functions through the
# add_callback(), add_errback(), add_callbacks() methods.
def handle_success(rows):
    user = rows[0]
    try:
        process_user(user.name, user.age, user.id)
    except Exception:
        log.error("Failed to process user %s", user.id)
        # don't re-raise errors in the callback

def handle_error(exception):
    log.error("Failed to fetch user info: %s", exception)

future = session.execute_async(query)
future.add_callbacks(handle_success, handle_error)

In [20]:
# Setting a Consistency Level
# The consistency level used for a query determines how many of the replicas of the data you are
# interacting with need to respond for the query to be considered a success.
# You can specify a different default by setting the ExecutionProfile.consistency_level 
# for the execution profile with key EXEC_PROFILE_DEFAULT. 
# To specify a different consistency level per request, wrap queries in a SimpleStatement

from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement

query = SimpleStatement(
    "INSERT INTO users (name, age) VALUES (%s, %s)",
    consistency_level = ConsistencyLevel.QUORUM,
)
session.execute(query, ("John", 42))