In [1]:
import pandas as pd
import cassandra
import numpy as np
import re
import os
import glob
import json
import csv

In [2]:
import logging
logging.getLogger('cassandra').setLevel(logging.ERROR)

#### Creating list of filepaths to process original event csv data files

In [3]:
print(os.getcwd())

/home/omar/BigData/Data Modeling with apache cassandra/Data Modeling With Apache Cassandra


In [4]:
filepath = os.getcwd() + '/event_data'
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
#     print(file_path_list)

#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [5]:
full_data_rows_list = [] 
for file in file_path_list:
    with open(file,'r',encoding='utf8',newline='') as csvfile:
        reader = csv.reader(csvfile)
        next(reader) #for skipping header
        for row in reader:
            full_data_rows_list.append(row)

In [82]:
# dd = pd.read_csv('event_datafile.csv')
# dd

In [65]:
df = pd.DataFrame(full_data_rows_list)
df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16
0,Kenny G,Logged In,Chloe,F,53,Cuevas,256.57424,paid,"San Francisco-Oakland-Hayward, CA",PUT,NextSong,1540940000000.0,648,Everlasting,200,1542410000000.0,49
1,Randy Crawford,Logged In,Chloe,F,54,Cuevas,251.402,paid,"San Francisco-Oakland-Hayward, CA",PUT,NextSong,1540940000000.0,648,Rio De Janeiro Blue (Album Version),200,1542410000000.0,49
2,Placebo,Logged In,Chloe,F,55,Cuevas,224.02567,paid,"San Francisco-Oakland-Hayward, CA",PUT,NextSong,1540940000000.0,648,Breathe Underwater,200,1542410000000.0,49
3,Poison The Well,Logged In,Chloe,F,56,Cuevas,184.60689,paid,"San Francisco-Oakland-Hayward, CA",PUT,NextSong,1540940000000.0,648,Riverside,200,1542410000000.0,49
4,Justin Bieber,Logged In,Chloe,F,57,Cuevas,196.88444,paid,"San Francisco-Oakland-Hayward, CA",PUT,NextSong,1540940000000.0,648,U Smile,200,1542410000000.0,49


In [7]:
csv.register_dialect('dialect',quoting=csv.QUOTE_ALL, skipinitialspace=True)
with open('event_datafile.csv', 'w', encoding='utf8', newline='') as file:
    writer = csv.writer(file, dialect='dialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if(row[0]==''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [8]:
# checking the number of rows in your csv file
with open('event_datafile.csv','r',encoding='utf8') as file:
    print(sum(1 for line in file))

6821


## Part 2 Apache Cassandra database

In [None]:
!pip install cassandra-driver

In [9]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])

# To establish connection and begin executing queries, need a session
session = cluster.connect()

In [10]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkify 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"""
)

except Exception as e:
    print(e)

In [11]:
try:
    session.set_keyspace('sparkify')
except Exception as e:
    print(e)

## First Query

In [12]:
query = """CREATE TABLE IF NOT EXISTS music_lib_by_session
           (session_id int,item_in_session int, artist text,song text, length float , PRIMARY KEY (session_id,item_in_session))"""
try:
    session.execute(query)
except Exception as e:
    print(e)

In [13]:
file = "event_datafile.csv"
with open (file, 'r', encoding='utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # for skipping header
    
    for line in csvreader:
        query = """INSERT INTO music_lib_by_session (session_id, item_in_session, artist, song, length)
        VALUES (%s, %s, %s, %s, %s)"""
        session.execute(query,(int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

### Doing a SELECT to verify that the data have been inserted into each table

In [73]:
query = "SELECT * FROM sparkify.music_lib_by_session WHERE session_id = 648 AND item_in_session = 55;"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [71]:
for row in rows:
    print('Artist: {0}\nsessionId: {1}\nItemInSession: {2}\nLength: {3}\nSong: {4}\n'\
          .format(row.artist,row.session_id,row.item_in_session,row.length,row.song))

Artist: Placebo
sessionId: 648
ItemInSession: 55
Length: 224.02566528320312
Song: Breathe Underwater



In [74]:
q1 = pd.DataFrame(data = rows,columns=['Artist','sessionId','ItemInSession','Length','Song'])
q1

Unnamed: 0,Artist,sessionId,ItemInSession,Length,Song
0,648,55,Placebo,224.025665,Breathe Underwater


### Second Query


In [52]:
try:
    session.execute("""
                    CREATE TABLE IF NOT EXISTS music_lib_by_user (
                    user_id INT,
                    session_id INT,
                    item_in_session INT,
                    artist TEXT,
                    song TEXT,
                    first_name TEXT,
                    last_name TEXT,
                    PRIMARY KEY ((user_id,session_id),item_in_session))
    """)
except Exception as e:
    print(e)

In [76]:
file = "event_datafile.csv"
with open(file, 'r',encoding='utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # for skipping header
    
    for line in csvreader:
        query = """
        INSERT INTO music_lib_by_user (user_id, session_id, item_in_session, artist, song, first_name, last_name)
        VALUES (%s,%s,%s,%s,%s,%s,%s)
        """
        session.execute(query,(int(line[10]),int(line[8]),int(line[3]),line[0],line[9],line[1],line[4]))

In [77]:
query = "SELECT artist,song,first_name,last_name FROM music_lib_by_user where user_id = 10 and session_id = 182"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

In [63]:
for row in rows:
    print('Artist: {0}\nsong: {1}\nfirst_name: {2}\nlast_name:'\
          .format(row.artist,row.song,row.first_name,row.last_name))

Artist: Down To The Bone
song: Keep On Keepin' On
first_name: Sylvie
last_name:
Artist: Three Drives
song: Greece 2000
first_name: Sylvie
last_name:
Artist: Sebastien Tellier
song: Kilometer
first_name: Sylvie
last_name:
Artist: Lonnie Gordon
song: Catch You Baby (Steve Pitron & Max Sanna Radio Edit)
first_name: Sylvie
last_name:


In [78]:
q2 = pd.DataFrame(data = rows,columns=['Artist','Song','first_name','last_name'])
q2

Unnamed: 0,Artist,Song,first_name,last_name
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


## Third Query

In [83]:
try:
    session.execute("""
        CREATE TABLE IF NOT EXISTS user_info_by_song(
        song text,
        user_id int,
        first_name text,
        last_name text,
        PRIMARY KEY (song,user_id))
    """)
except Exception as e:
    print(e)

In [104]:
file = 'event_datafile.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    
    for line in csvreader:
        query = "INSERT INTO user_info_by_song (song, user_id, first_name, last_name)"
        query = query + " VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [108]:
rows = session.execute("SELECT song, first_name,last_name FROM user_info_by_song where song='All Hands Against His Own'")

In [106]:
for row in rows:
    print(row.first_name,row.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


In [109]:
q3 = pd.DataFrame(data = rows,columns=['song','first_name','last_name'])
q3

Unnamed: 0,song,first_name,last_name
0,All Hands Against His Own,Jacqueline,Lynch
1,All Hands Against His Own,Tegan,Levine
2,All Hands Against His Own,Sara,Johnson
