In [7]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### After import necessary libraries, we reach to create list of filepaths to access into original even_data csv data file

In [8]:
#Check current working directory
print(os.getcwd())

#Get the  current directory and sub-directory event data (get the path to `event_data` folder)
filepath=os.getcwd() + '/event_data'

#Create a list of files and collect each filepath
for root,dirs, files in os.walk(filepath):
    #using glob to Join the filepath and roots with sub-directories
    file_path_list = glob.glob(os.path.join(root,'*'))
    print(file_path_list)

/Users/alexdinh
['/Users/alexdinh/event_data/2018-11-15-events.csv', '/Users/alexdinh/event_data/2018-11-22-events.csv', '/Users/alexdinh/event_data/2018-11-09-events.csv', '/Users/alexdinh/event_data/2018-11-18-events.csv', '/Users/alexdinh/event_data/2018-11-04-events.csv', '/Users/alexdinh/event_data/2018-11-01-events.csv', '/Users/alexdinh/event_data/2018-11-27-events.csv', '/Users/alexdinh/event_data/2018-11-10-events.csv', '/Users/alexdinh/event_data/2018-11-20-events.csv', '/Users/alexdinh/event_data/2018-11-17-events.csv', '/Users/alexdinh/event_data/2018-11-06-events.csv', '/Users/alexdinh/event_data/2018-11-03-events.csv', '/Users/alexdinh/event_data/2018-11-28-events.csv', '/Users/alexdinh/event_data/2018-11-12-events.csv', '/Users/alexdinh/event_data/2018-11-25-events.csv', '/Users/alexdinh/event_data/2018-11-26-events.csv', '/Users/alexdinh/event_data/2018-11-11-events.csv', '/Users/alexdinh/event_data/2018-11-14-events.csv', '/Users/alexdinh/event_data/2018-11-23-events.c

#### Processing files to create the data CSV that will be used for Apache Cassandra tables



In [9]:
#To generate each file, we create an empty list of rows
full_data_rows_list=[]

#access to each by each filepath in file_path_list created above
for f in file_path_list:
    
    #reading CSV files
    with open(f,'r',encoding ='utf8',newline='') as csvfile:
    
        #we create a csv reader object 
        csvreader=csv.reader(csvfile)
        next(csvreader)
        
        #Extract each data row one by one and append it
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line)

#show total number of rows
print(len(full_data_rows_list))

#show the list: here is the first row
print(full_data_rows_list[0])
    

8056
['Harmonia', 'Logged In', 'Ryan', 'M', '0', 'Smith', '655.77751', 'free', 'San Jose-Sunnyvale-Santa Clara, CA', 'PUT', 'NextSong', '1.54102E+12', '583', 'Sehr kosmisch', '200', '1.54224E+12', '26']


### Notice: As we can see from above, the output of full_data_rows_list is in String in default. We need to change values to be more equivalent with exact utilisation. e.g with numeric values like length to float, userId to integer etc
Datatype in recommendation:
- STRING: Artist, last name, first name, gender, song,level,Location
- INT: itemInSession, SessionId, UserId
- FLOAT: length

#### Now, create a small CSV files including several determined rows of data to insert into Apache Cassandra table

In [13]:
csv.register_dialect('myDialect',quoting=csv.QUOTE_ALL,skipinitialspace=True)

with open('event_datafile_new.csv','w',encoding='utf8',newline='') as f:
    writer =csv.writer(f,dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))



In [14]:
#check the number of the rows in our csv file
with open('event_datafile_new.csv','r',encoding='utf8') as f:
    print(sum(1 for line in f))

6821


## Complete the Apache Cassandra coding portion of project

### After we create a CSV file named Event_datafile_new.csv, with columns:

- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

### Create a cluster

In [15]:
from cassandra.cluster import Cluster
try:
    cluster=Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

### Create and connect as keyspace


In [16]:
try:
    session.execute("""
    create keyspace if not exists proj2
    with replication=
    {'class':'SimpleStrategy','replication_factor':1}
    """)
except Exception as e:
    print(e)
    
#set keyspance
try:
    session.set_keyspace('proj2')
except Exception as e:
    print(e)

## Create queries to answer questions:

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




### Create` artist_record_session` table with
- artist varchar
- song varchar
- length float
- sessionId int
- itemInSession int
- PRIMARY KEY(sessionId,itemInSession)

In [17]:
query="""
CREATE TABLE IF NOT EXISTS artist_record_session(
    artist varchar,
    song varchar,
    length float,
    sessionId int,
    itemInSession int,
    PRIMARY KEY(sessionId,itemInSession)
    )
"""
try:
    session.execute(query)
except Exception as e:
    print(e)
    

In [18]:
#Insert values from csv file to cassandra table
file='event_datafile_new.csv'

with open(file,encoding='utf8') as f:
    csvreader =csv.reader(f)
    next(csvreader) #skip header 
    
    #Assign the insert statements into `query`
    for line in csvreader:
        query="INSERT INTO artist_record_session(artist ,song ,length,sessionId,itemInSession)"
        query=query+"VALUES(%s,%s,%s,%s,%s)"
        
        #Assign which column element should be assigned for each column in insert statement
        
        session.execute(query,(line[0],line[9],float(line[5]),int(line[8]),int(line[3])))
        

### Create  `song_playlist_session` table with:
- artist varchar
- song varchar
- lastName varchar
- firstName varchar
- userId int
- sessionId int
- itemInSession int
-  PRIMARY KEY: sessionId,userId ,itemInSession with composition key `sessionId,userId`
    

#### For the creation of `song_playlist_session` for the question 2, we have notice at Sorted By `itemInSession` this could be a Composition keys here with Partition keys of `sessionId` and `userId`, and `itemInSession` is clustering column which can be used for order sorting

In [19]:
query="""
CREATE TABLE IF NOT EXISTS song_playlist_session(
    artist varchar,
    song varchar,
    lastName varchar,
    firstName varchar,
    userId int,
    sessionId int,
    itemInSession int,
    PRIMARY KEY((sessionId,userId),itemInSession)
    )
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

#Insert values from csv file to cassandra table
file='event_datafile_new.csv'

with open(file,encoding='utf8') as f:
    csvreader =csv.reader(f)
    next(csvreader) #skip header 
    
    #Assign the insert statements into `query`
    for line in csvreader:
        query="INSERT INTO song_playlist_session(artist ,song ,lastName,firstName,userId,sessionId,itemInSession)"
        query=query+"VALUES(%s,%s,%s,%s,%s,%s,%s)"
        
        
        session.execute(query,(line[0],line[9],line[1],line[4],int(line[10]),int(line[8]),int(line[3])))

### Creat `user_record_session` table with:
- song varchar
- lastName varchar
- firstName varchar
- userId int
- PRIMARY KEY: song(Partition key),userId

#### We initiate the Primary key with Partition key `song` and clustering column `userId` due to prevending of overwriting in values. 

In [20]:
query="""
CREATE TABLE IF NOT EXISTS user_record_session(
    song varchar,
    lastName varchar,
    firstName varchar,
    userId int,
    PRIMARY KEY((song),userId)
    )
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

#Insert values from csv file to cassandra table
file='event_datafile_new.csv'

with open(file,encoding='utf8') as f:
    csvreader =csv.reader(f)
    next(csvreader) #skip header 
    
    #Assign the insert statements into `query`
    for line in csvreader:
        query="INSERT INTO user_record_session(song ,lastName,firstName,userId)"
        query=query+"VALUES(%s,%s,%s,%s)"
        
        
        session.execute(query,(line[9],line[1],line[4],int(line[10])))

### Now step by step reaching out all the required questions from the project:
#### We will use `SELECT required columns FROM a_table WHERE condition1 AND condition2` to conduct our target

In [21]:
#q1 Give me the artist, song title and song's length in the music app history that was heard during 
#sessionId = 338, and itemInSession = 4

query="""
    SELECT artist,song,length FROM artist_record_session 
    WHERE sessionId = 338
    AND itemInSession =4
    """
try:
    rows=session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.artist,row.song,row.length)

Faithless Music Matters (Mark Knight Dub) 495.30731201171875


In [22]:
#q2  Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name)\
# for userid = 10, sessionid = 182
query="""
    SELECT artist,song,firstName,lastName FROM song_playlist_session 
    WHERE userId=10
    AND sessionId=182
    """
try:
    rows=session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.artist,row.song,row.firstname,row.lastname)

Down To The Bone Keep On Keepin' On Cruz Sylvie
Three Drives Greece 2000 Cruz Sylvie
Sebastien Tellier Kilometer Cruz Sylvie
Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Cruz Sylvie


In [23]:
#q3 Give me every user name (first and last) in my music app history 
#who listened to the song 'All Hands Against His Own
query="""
    SELECT firstName,lastName FROM user_record_session
    WHERE song='All Hands Against His Own'
    """
try:
    rows=session.execute(query)
except Exception as e:
    print(e)

for row in rows:
    print(row.firstname,row.lastname)

Lynch Jacqueline
Levine Tegan
Johnson Sara


In [25]:
#drop tables and shutdown cluster and session
query="""
drop TABLE artist_record_session
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

query="""
drop TABLE song_playlist_session
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

query="""
drop TABLE user_record_session
"""
try:
    session.execute(query)
except Exception as e:
    print(e)

Error from server: code=2200 [Invalid query] message="unconfigured table artist_record_session"
Error from server: code=2200 [Invalid query] message="unconfigured table song_playlist_session"


In [17]:
cluster.shutdown()
session.shutdown()