In [1]:
import os
import glob
import csv
from cassandra.cluster import Cluster

In [2]:
events_file_path = os.getcwd() + '/event_data'

events_files = glob.glob(os.path.join(events_file_path, '*'))

In [3]:
def read_events_files(files):
    all_rows = []

    for file in files:
        with open(file, "r", encoding="utf8", newline="") as csvfile:
            csv_reader = csv.reader(csvfile)
            next(csv_reader)
            for row in csv_reader:
                all_rows.append(row)

    return all_rows


def events_preprocessing(all_rows):
    """this function create a new csv file for denormalized events data. The file should be smaller in size that the raw data file"""
    csv.register_dialect(
        "myDialect", quoting=csv.QUOTE_ALL, skipinitialspace=True)

    with open("event_datafile_new.csv", "w", encoding="utf8", newline="") as f:
        writer = csv.writer(f, dialect="myDialect")
        column_names = [
            "artist",
            "firstName",
            "gender",
            "itemInSession",
            "lastName",
            "length",
            "level",
            "location",
            "sessionId",
            "song",
            "userId",
        ]
        writer.writerow(column_names)
        for row in all_rows:
            if row[0] == "":
                continue
            writer.writerow(
                (
                    row[0],
                    row[2],
                    row[3],
                    row[4],
                    row[5],
                    row[6],
                    row[7],
                    row[8],
                    row[12],
                    row[13],
                    row[16],
                )
            )


all_rows = read_events_files(events_files)
events_preprocessing(all_rows)

### if you're running cassandra locally, you need to make sure cassandra driver is running locally by running:

#### sudo service cassandra status
#### sudo service cassandra start



In [4]:
def create_keyspace():
    """this method creates cluster and set a new keyspace if it does not exist"""
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
    try:
        session.execute("""
        CREATE KEYSPACE IF NOT EXISTS sparkify 
        WITH REPLICATION = 
        { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
        """)

    except Exception as e:
        print(e)
    try:
        session.set_keyspace('sparkify')
    except Exception as e:
        print(e)
    return session


session = create_keyspace()

In [5]:
def create_table(table_name: str, query: str):
    query = "CREATE TABLE IF NOT EXISTS {} {};".format(table_name, query)
    try:
        session.execute(query)
    except Exception as e:
        print(e)


def insert_file_into_table(file_path, session, table_name, columns):
    with open(file_path, encoding="utf8") as f:
        csvreader = csv.reader(f)
        next(csvreader)
        for row in csvreader:
            query = "INSERT INTO {}({})".format(table_name, columns)
            query = query + " VALUES (%s, %s, %s, %s, %s)"

            session.execute(
                query, (int(row[8]), int(row[3]),
                        row[0], row[9], float(row[5]))
            )

### Create sessions table and insert data 

In [6]:
table_name = "sessions"
create_query = "(sessionId int, itemInSession int, artist_name text, song text, song_length float, PRIMARY KEY (sessionId, itemInSession))"
file_path = "event_datafile_new.csv"
session_insert_query = "sessionId, itemInSession, artist_name, song, song_length"
create_table(table_name, create_query)

In [7]:
session_insert_query = (
    "INSERT INTO sessions (sessionId, itemInSession, artist_name, song, song_length)"
)
session_insert_query = session_insert_query + " VALUES (%s, %s, %s, %s, %s)"
with open(file_path, encoding="utf8") as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for row in csvreader:
        session.execute(
            session_insert_query, (int(row[8]), int(row[3]), row[0], row[9], float(row[5]))
        )

In [8]:
# test insertion into sessions
sessions_test_query = "select artist_name, song, song_length from sessions WHERE sessionId = 338 and itemInSession = 4"

try:
    rows = session.execute(sessions_test_query)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(artist_name='Faithless', song='Music Matters (Mark Knight Dub)', song_length=495.30731201171875)


### Create song_playlist table and insert data 

In [9]:

song_playlist_table = "song_playlist"
song_playlist_create_query = "(userid int, sessionid int, iteminsession int, firstname text, lastname text,  artist_name text, song text,\
PRIMARY KEY((userid, sessionid), iteminsession)) WITH CLUSTERING ORDER BY (iteminsession DESC)"

create_table(song_playlist_table, song_playlist_create_query)

In [10]:
song_playlist_insert_query = (
    "INSERT INTO song_playlist (userid, sessionid, iteminsession, firstname, lastname,  artist_name, song)"
)
song_playlist_insert_query = song_playlist_insert_query + " VALUES (%s, %s, %s, %s, %s,%s,%s)"
with open(file_path, encoding="utf8") as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for row in csvreader:
        session.execute(
            song_playlist_insert_query, (int(row[10]), int(row[8]), int(row[3]), row[1], row[4], row[0], row[9])
        )

In [11]:
# test insertion into song_playlist
song_playlist_test_query = "select artist_name, song, firstname,lastname, iteminsession from song_playlist where userid=10 and sessionid=182 "

try:
    rows = session.execute(song_playlist_test_query)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(artist_name='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz', iteminsession=3)
Row(artist_name='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz', iteminsession=2)
Row(artist_name='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz', iteminsession=1)
Row(artist_name='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz', iteminsession=0)


### Create users_playlist table and insert data 

In [12]:
users_playlist_table = "users_playlist"
users_playlist_create_query = "(song text,  userid int, firstname text, lastname text,  PRIMARY KEY (song, userid)) WITH CLUSTERING ORDER BY (userid DESC)"
users_playlist_insert_query = "song, userid, firstname, lastname"
create_table(users_playlist_table, users_playlist_create_query)


In [13]:
user_playlist_insert_query = (
    "INSERT INTO users_playlist (song, userid, firstname, lastname)"
)
user_playlist_insert_query = user_playlist_insert_query + " VALUES (%s, %s, %s, %s)"
with open(file_path, encoding="utf8") as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for row in csvreader:
        session.execute(
            user_playlist_insert_query, (row[9], int(row[10]), row[1], row[4])
        )

In [14]:
# test insertion into users_playlist
users_playlist_test_query = "select userid, firstname, lastname from users_playlist where song='All Hands Against His Own'"

try:
    rows = session.execute(users_playlist_test_query)
except Exception as e:
    print(e)

for row in rows:
    print(row)

Row(userid=95, firstname='Sara', lastname='Johnson')
Row(userid=80, firstname='Tegan', lastname='Levine')
Row(userid=29, firstname='Jacqueline', lastname='Lynch')


### Drop Tables

In [15]:
def drop_table(table_name, session):
    query = "drop table {}".format(table_name)
    try:
        _ = session.execute(query)
    except Exception as e:
        print(e)


tables_to_drop = ["sessions", "song_playlist", "users_playlist"]
for table in tables_to_drop:
    drop_table(table, session)

In [16]:
session.shutdown()