In [1]:
%load_ext cql



In [158]:
%%cql
DROP KEYSPACE twitter 

'No results.'

In [159]:
%%cql
CREATE KEYSPACE twitter 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};

'No results.'

In [160]:
%keyspace twitter

Using keyspace twitter


In [161]:
%%cql
CREATE TABLE users (
    username text PRIMARY KEY,
    password text
)

'No results.'

In [162]:
%%cql
CREATE TABLE friends (
    username text,
    friend text,
    since timestamp,
    PRIMARY KEY (username, friend)
)

'No results.'

In [163]:
%%cql
CREATE TABLE followers (
    username text,
    follower text,
    since timestamp,
    PRIMARY KEY (username, follower)
)

'No results.'

In [164]:
%%cql
CREATE TABLE tweets (
    tweet_id uuid PRIMARY KEY,
    username text,
    body text
)

'No results.'

In [165]:
%%cql
CREATE TABLE userline (
    username text,
    time timeuuid,
    tweet_id uuid,
    PRIMARY KEY (username, time)
) WITH CLUSTERING ORDER BY (time DESC)

'No results.'

In [166]:
%%cql
CREATE TABLE timeline (
    username text,
    time timeuuid,
    tweet_id uuid,
    PRIMARY KEY (username, time)
) WITH CLUSTERING ORDER BY (time DESC)

'No results.'

In [167]:
from cassandra.cluster import Cluster

cluster = Cluster(['127.0.0.1'])
session = cluster.connect('twitter')

In [168]:
import datetime
import loremipsum
import string
import time
import uuid
import random
from uuid import uuid1, UUID

In [169]:
random.seed( 103 )

In [170]:
# Oldest account is 10 years
origin = int(
    time.time() +
    datetime.timedelta(days=365.25 * 10).total_seconds() * 1e6)
now = int(time.time() * 1e6)


In [171]:
print origin, now

315577444709734 1444709734044478


In [172]:
num_users = 10
max_tweets = 100


In [173]:
# Generate number of tweets based on a Zipfian distribution
sample = [random.paretovariate(15) - 1 for x in range(max_tweets)]
normalizer = 1 / float(max(sample)) * max_tweets
num_tweets = [int(x * normalizer) for x in sample]

In [174]:
print num_tweets

[66, 9, 19, 20, 3, 1, 14, 1, 7, 35, 33, 1, 1, 3, 28, 0, 3, 1, 2, 11, 3, 8, 20, 20, 8, 24, 13, 28, 4, 7, 13, 48, 15, 9, 4, 5, 3, 75, 35, 36, 11, 19, 5, 10, 2, 8, 4, 1, 41, 1, 26, 18, 1, 10, 100, 75, 12, 23, 19, 5, 12, 14, 7, 27, 9, 21, 1, 9, 0, 16, 33, 12, 12, 15, 3, 7, 13, 16, 7, 10, 6, 12, 17, 6, 18, 30, 11, 67, 5, 41, 3, 6, 27, 0, 14, 33, 47, 17, 6, 6]


In [175]:
# Prepared statements, reuse as much as possible by binding new values
add_user_query = None
userline_query = None
timeline_query = None
get_followers_query = None
get_tweets_query = None
tweets_query = None

In [176]:
# NOTE: Having a single userline key to store all of the public tweets is not
#       scalable.  This result in all public tweets being stored in a single
#       partition, which means they must all fit on a single node.
#
#       One fix for this is to partition the timeline by time, so we could use
#       a key like !PUBLIC!2010-04-01 to partition it per day.  We could drill
#       down even further into hourly keys, etc.  Since this is a demonstration
#       and that would add quite a bit of extra code, this excercise is left to
#       the reader.
PUBLIC_USERLINE_KEY = '!PUBLIC!'

In [177]:
def get_random_string():
    return ''.join(random.sample(string.letters, 10))

In [178]:
def save_user(username, password):
    """
    Saves the user record.
    """
    global add_user_query
    if add_user_query is None:
        add_user_query = session.prepare("""
            INSERT INTO users (username, password)
            VALUES (?, ?)
            """)

    session.execute(add_user_query, (username, password))

In [179]:
def get_follower_usernames(username, count=5000):
    """
    Given a username, gets the usernames of the people following that user.
    """
    global get_followers_query
    if get_followers_query is None:
        get_followers_query = session.prepare("""
            SELECT follower FROM followers WHERE username=? LIMIT ?
            """)

    rows = session.execute(get_followers_query, (username, count))
    return [row.follower for row in rows]

In [180]:
def get_tweet(tweet_id):
    """
    Given a tweet id, this gets the entire tweet record.
    """
    global get_tweets_query
    if get_tweets_query is None:
        get_tweets_query = session.prepare("""
            SELECT * FROM tweets WHERE tweet_id=?
            """)

    results = session.execute(get_tweets_query, (tweet_id, ))
    if not results:
        raise NotFound('Tweet %s not found' % (tweet_id,))
    else:
        return results[0]

In [181]:
def _timestamp_to_uuid(time_arg):
    microseconds = int(time_arg * 1e6)
    timestamp = int(microseconds * 10) + 0x01b21dd213814000L

    time_low = timestamp & 0xffffffffL
    time_mid = (timestamp >> 32L) & 0xffffL
    time_hi_version = (timestamp >> 48L) & 0x0fffL

    rand_bits = random.getrandbits(8 + 8 + 48)
    clock_seq_low = rand_bits & 0xffL
    clock_seq_hi_variant = 0b10000000 | (0b00111111 & ((rand_bits & 0xff00L) >> 8))
    node = (rand_bits & 0xffffffffffff0000L) >> 16
    return UUID(
        fields=(time_low, time_mid, time_hi_version, clock_seq_hi_variant, clock_seq_low, node),
        version=1)

In [182]:
def save_tweet(tweet_id, username, tweet, timestamp=None):
    """
    Saves the tweet record.
    """

    global tweets_query
    global userline_query
    global timeline_query

    # Prepare the statements required for adding the tweet into the various timelines
    # Initialise only once, and then re-use by binding new values
    if tweets_query is None:
        tweets_query = session.prepare("""
            INSERT INTO tweets (tweet_id, username, body)
            VALUES (?, ?, ?)
            """)

    if userline_query is None:
        userline_query = session.prepare("""
            INSERT INTO userline (username, time, tweet_id)
            VALUES (?, ?, ?)
            """)

    if timeline_query is None:
        timeline_query = session.prepare("""
            INSERT INTO timeline (username, time, tweet_id)
            VALUES (?, ?, ?)
            """)

    if timestamp is None:
        now = uuid1()
    else:
        now = _timestamp_to_uuid(timestamp)

    # Insert the tweet
    session.execute(tweets_query, (tweet_id, username, tweet,))
    # Insert tweet into the user's timeline
    session.execute(userline_query, (username, now, tweet_id,))
    # Insert tweet into the public timeline
    session.execute(userline_query, (PUBLIC_USERLINE_KEY, now, tweet_id,))

    # Get the user's followers, and insert the tweet into all of their streams
    futures = []
    follower_usernames = [username] + get_follower_usernames(username)
    for follower_username in follower_usernames:
        futures.append(session.execute_async(
            timeline_query, (follower_username, now, tweet_id,)))

    for future in futures:
        future.result()


In [183]:
def get_tweet_sample():
    return loremipsum.get_sentence()

In [184]:
for i in range(num_users):
    username = get_random_string()
    save_user(username, get_random_string())
    creation_date = random.randint(origin, now)

    for _ in range(num_tweets[i % max_tweets]):
        save_tweet(uuid.uuid1(), username, get_tweet_sample(), timestamp=random.randint(creation_date, now))

    print "created user"

created user
created user
created user
created user
created user
created user
created user
created user
created user
created user


In [185]:
%cql SELECT * FROM users

username,password
qCbDvVEnkj,LMGEmqlUCf
JWOGjevrqw,btFhApJSPc
CaOlFcWRur,JYQVKkPdeO
xwMInYckLg,cZEftUvCTo
xvDQguJmVi,UuHISgkOdh
aKhLsRvobE,wiSlQxkmsa
uWkCmTdgFc,VzRqATDXHW
bFixEMlTIO,mOxPbNMsHF
gDBreHTNmM,lzPMZSbOiu
sSfgFLUDQw,iyBOoAClva


In [71]:
#https://github.com/twissandra/twissandra/blob/master/cass.py

SyntaxError: invalid syntax (<ipython-input-71-04e20bc5fcee>, line 1)

In [187]:
%cql SELECT * FROM tweets LIMIT 10

tweet_id,body,username
11e2b65a-7161-11e5-bca0-080027e4c06a,Risus dolor ullamcorper maecenas blandit.,gDBreHTNmM
11f4c6ce-7161-11e5-bca0-080027e4c06a,Neque justo aenean fringilla metus imperdiet cursus.,gDBreHTNmM
11dcfbac-7161-11e5-bca0-080027e4c06a,Class fames ultrices imperdiet at ullamcorper ut dictum bibendum ullamcorper accumsan quam feugiat.,gDBreHTNmM
11cdc01a-7161-11e5-bca0-080027e4c06a,Neque magna vel cras augue mattis nisl tempus senectus porttitor neque nullam.,gDBreHTNmM
125b8b0c-7161-11e5-bca0-080027e4c06a,Lacus fames accumsan metus morbi vivamus.,bFixEMlTIO
1267b21a-7161-11e5-bca0-080027e4c06a,Risus vitae morbi sit hymenaeos.,CaOlFcWRur
127cfb20-7161-11e5-bca0-080027e4c06a,Donec magna pretium feugiat varius cursus.,CaOlFcWRur
11e00d60-7161-11e5-bca0-080027e4c06a,Purus porta taciti proin parturient eleifend nisi ligula elementum torquent.,gDBreHTNmM
1218d726-7161-11e5-bca0-080027e4c06a,Metus vitae arcu tincidunt hac augue convallis posuere tortor felis.,uWkCmTdgFc
1233151e-7161-11e5-bca0-080027e4c06a,Lacus fusce.,xvDQguJmVi


In [188]:
%cql SELECT * FROM userline LIMIT 10

username,time,tweet_id
qCbDvVEnkj,b9794000-0d52-1eb0-97ab-743961671b6d,1243c594-7161-11e5-bca0-080027e4c06a
!PUBLIC!,2b1f4000-89a9-1fe5-abe5-b0c2861840a6,11b34294-7161-11e5-bca0-080027e4c06a
!PUBLIC!,c9e54000-77d7-1fc6-ba71-f6a73e37faeb,11f90018-7161-11e5-bca0-080027e4c06a
!PUBLIC!,ef3d4000-b230-1fbf-85da-37e8948890d6,120683fa-7161-11e5-bca0-080027e4c06a
!PUBLIC!,9f514000-d146-1fb7-9d7e-2a7a7e305ab3,120a4bac-7161-11e5-bca0-080027e4c06a
!PUBLIC!,00594000-3efc-1fab-9932-c6dfb0ae9213,1227c722-7161-11e5-bca0-080027e4c06a
!PUBLIC!,f4394000-fe55-1fa7-b8c9-eb2bacdad06d,126fcd38-7161-11e5-bca0-080027e4c06a
!PUBLIC!,2d214000-2bf3-1fa2-a80c-4eae98224a31,11e4427c-7161-11e5-bca0-080027e4c06a
!PUBLIC!,68454000-deca-1f87-b373-941668eb8fc9,127f5c58-7161-11e5-bca0-080027e4c06a
!PUBLIC!,22314000-2c20-1f87-a60e-7bdaacfe10b1,11fadd48-7161-11e5-bca0-080027e4c06a


In [189]:
%cql SELECT * FROM timeline LIMIT 10

username,time,tweet_id
qCbDvVEnkj,b9794000-0d52-1eb0-97ab-743961671b6d,1243c594-7161-11e5-bca0-080027e4c06a
JWOGjevrqw,ff794000-e9ad-1f61-bd12-ec5c841cefc0,12409bbc-7161-11e5-bca0-080027e4c06a
JWOGjevrqw,c3294000-d4e8-1ad7-b24f-147c8c9210ac,123ef33e-7161-11e5-bca0-080027e4c06a
JWOGjevrqw,4b694000-2e36-146b-b2f3-5de971c0ef1c,123dcfd6-7161-11e5-bca0-080027e4c06a
CaOlFcWRur,f4394000-fe55-1fa7-b8c9-eb2bacdad06d,126fcd38-7161-11e5-bca0-080027e4c06a
CaOlFcWRur,68454000-deca-1f87-b373-941668eb8fc9,127f5c58-7161-11e5-bca0-080027e4c06a
CaOlFcWRur,5a894000-46e9-1ec0-8db0-87cf9f8a9bbc,12778b86-7161-11e5-bca0-080027e4c06a
CaOlFcWRur,b8094000-1c1a-1eaa-8c31-524adddcc21c,12791f3c-7161-11e5-bca0-080027e4c06a
CaOlFcWRur,3fc94000-b671-1e7b-9ab7-c39a7611155b,12610dfc-7161-11e5-bca0-080027e4c06a
CaOlFcWRur,4ca94000-d69a-1d39-9cce-80cc847b0a60,1265beb0-7161-11e5-bca0-080027e4c06a
