# Part 1 
## Pre-processing the data file and get the data

In [1]:
import pandas as pd
import cassandra
import os 
import time 
import warnings
warnings.filterwarnings('ignore')

In [2]:
def get_csv_file_paths(folder_path):
    """
    Recursively retrieves the absolute paths of all csv files in the specified folder and its subfolders.
    Args:
        folder_path (str): Path to the root folder containing JSON files.
    Returns:
        list: A list of absolute file paths.
    """
    csv_file_paths = []
    for root, _, files in os.walk(folder_path):
        for filename in files:
            if filename.endswith(".csv"):
                csv_file_paths.append(os.path.join(root, filename))
    return csv_file_paths

In [3]:
# get all csv data pathes
csv_data = get_csv_file_paths('event_data/')

In [4]:
# get all coloumns we need 
columns = ['artist','firstName','gender','itemInSession','lastName','length','level','location','sessionId','song','userId']
event_data = pd.DataFrame(columns=columns) # create an empty data frame with same columns names
for i in range(len(csv_data)): # loop for each csv file
    df = pd.read_csv(csv_data[i],usecols=columns)
    df.dropna(subset = 'artist',inplace=True) # drop the row that has artist value Null
    event_data = pd.concat([event_data, df],ignore_index = True)
event_data.to_csv('event_datafile_new.csv',index=False)

In [5]:
len(event_data)

6831

#### Now we are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>
##### The **``event_datafile_new.csv``** contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/1.jpg">

# Part 2 
Now it's time to create our cassandra cluster and create sparkify keyspace
After we are interesting in to answer 3 questions **Remmber that we model the database table based on the queries**
- `Q1` Give the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4
- `Q2` Give artist name, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
- `Q3` Give every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

#### Let's create the cluster and keyspace

In [4]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e :
    print(e)

#### Create the keyspace Sparkify

In [5]:
try:
    session.execute("""
                       CREATE KEYSPACE IF NOT EXISTS sparkify
                       WITH REPLICATION = 
                       {'class': 'SimpleStrategy' , 'replication_factor' : 1}
    
                    """
                   )
except Exception as e :
    print(e)

In [6]:
# connect to our keyspace
try:
    session.set_keyspace('sparkify')
except Exception as e :
    print(e)

## Create the tables based on query

### Q1
`Q1` Give the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

- **SELECT** artist, song, length **FROM** session_item **WHERE** sessionId = `338` and itemInSession = `4` <br>
The primary key will be (sessionId , itemInSession) where
**sessionId** is the `partition key` and the **itemInSession** is the `clustering column`


In [7]:
query = "CREATE TABLE IF NOT EXISTS session_item"
query = query + "(sessionId int , itemInsession int ,artist text, song text, length float, PRIMARY KEY(sessionId, itemInsession))"
try:
    session.execute(query)
    print('session_item table created')
except Exception as e :
    print(e)

session_item table created


In [8]:
# read the data 
data = pd.read_csv('event_datafile_new.csv')
data.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Des'ree,Kaylee,F,1,Summers,246.30812,free,"Phoenix-Mesa-Scottsdale, AZ",139,You Gotta Be,8.0
1,Mr Oizo,Kaylee,F,3,Summers,144.03873,free,"Phoenix-Mesa-Scottsdale, AZ",139,Flat 55,8.0
2,Tamba Trio,Kaylee,F,4,Summers,177.18812,free,"Phoenix-Mesa-Scottsdale, AZ",139,Quem Quiser Encontrar O Amor,8.0
3,The Mars Volta,Kaylee,F,5,Summers,380.42077,free,"Phoenix-Mesa-Scottsdale, AZ",139,Eriatarka,8.0
4,Infected Mushroom,Kaylee,F,6,Summers,440.2673,free,"Phoenix-Mesa-Scottsdale, AZ",139,Becoming Insane,8.0


#### Insert the data `[sessionId, itemInSession, artist, song, length ]` into session_item table

In [9]:
# filter the data 
session_item = data[['sessionId', 'itemInSession', 'artist', 'song', 'length']]
query = "INSERT into session_item (sessionId, itemInsession, artist, song, length) VALUES(%s,%s,%s,%s,%s)"
start_time = time.time()
for row in session_item.iterrows():
    session.execute(query , (row[1][0],row[1][1],row[1][2],row[1][3],row[1][4]))
print("--- %s seconds ---" % (time.time() - start_time))   

--- 199.74666929244995 seconds ---


#### Let's check if the data inserted and check our query 
- `Q1` **SELECT** artist, song, length **FROM** session_item **WHERE** sessionId = `338` and itemInSession = `4` <br>

In [10]:
Q1 = 'SELECT artist, song, length FROM session_item WHERE sessionId = 338 and itemInSession = 4'
try:
    rows = session.execute(Q1)
except Exception as e :
    print(e)
for row in rows:
    print(row)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=495.30731201171875)


### Q2
`Q2` Give artist name, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

- **SELECT** artist, song, firstName, lastName **FROM** user_session **WHERE** userid = `10` and sessionid = `182` <br>
The primary key will be ((userid, sessionid),itemInSession)  where
**(userid, sessionid)** is the `partition key` and the **itemInSession** is the `clustering column`
- we are using the clause - **WITH CLUSTERING ORDER BY (itemInSession ASC)**, to sort our data based on itemInSession

In [11]:
# Create user_session table
query = "CREATE TABLE IF NOT EXISTS user_session"
query = query + "(userid int ,sessionid int, itemInsession int ,artist text, song text, firstName text, lastName text\
                    , PRIMARY KEY((userid, sessionid) , itemInsession)) WITH CLUSTERING ORDER BY (itemInsession ASC)"
try:
    session.execute(query)
    print('user_session table created')
except Exception as e :
    print(e)

user_session table created


#### Insert the data `[userid, sessionid, itemInsession, artist, song, firstName, lastName ]` into user_session table

In [12]:
# filter the data 
user_session = data[['userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName']]
query = "INSERT into user_session (userid, sessionid, itemInsession, artist, song, firstName, lastName) VALUES(%s,%s,%s,%s,%s,%s,%s)"
start_time = time.time()
for row in user_session.iterrows():
    session.execute(query , (int(row[1][0]),row[1][1],row[1][2],row[1][3],row[1][4],row[1][5],row[1][6]))
print("--- %s seconds ---" % (time.time() - start_time))   

--- 206.63524079322815 seconds ---


In [13]:
Q2 = 'SELECT artist,song, firstName, lastName FROM user_session WHERE userid = 10 and sessionid = 182'
try:
    rows = session.execute(Q2)
except Exception as e :
    print(e)
for row in rows:
    print(row)

Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


### Q3
`Q3` Give every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

- **SELECT** firstName, lastName **FROM** user_song **WHERE** song = `'All Hands Against His Own'`<br>
The primary key will be (song, userId)  where
**song** is the `partition key` and the **userId** is the `clustering column`

In [14]:
# Create user_song table
query = "CREATE TABLE IF NOT EXISTS user_song"
query = query + "(song text ,userid int, firstName text, lastName text, PRIMARY KEY(song, userid))"
try:
    session.execute(query)
    print('user_song table created')
except Exception as e :
    print(e)

user_song table created


#### Insert the data `[song, userid, firstName, lastName ]` into user_song table

In [15]:
# filter the data 
user_song = data[['song', 'userId', 'firstName', 'lastName']]
query = "INSERT into user_song (song, userid, firstName, lastName) VALUES(%s,%s,%s,%s)"
start_time = time.time()
for row in user_song.iterrows():
    session.execute(query , (row[1][0],int(row[1][1]),row[1][2],row[1][3]))
print("--- %s seconds ---" % (time.time() - start_time))    

--- 207.61009764671326 seconds ---


In [16]:
Q3 = "SELECT song, firstName, lastName FROM user_song WHERE song = 'All Hands Against His Own'"
try:
    rows = session.execute(Q3)
except Exception as e :
    print(e)
for row in rows:
    print(row)

Row(song='All Hands Against His Own', firstname='Jacqueline', lastname='Lynch')
Row(song='All Hands Against His Own', firstname='Tegan', lastname='Levine')
Row(song='All Hands Against His Own', firstname='Sara', lastname='Johnson')


## Drop the tables before closing out the sessions

In [17]:
session.execute("DROP TABLE IF EXISTS sparkify.session_item")
session.execute("DROP TABLE IF EXISTS sparkify.user_session")
session.execute("DROP TABLE IF EXISTS sparkify.user_song")

<cassandra.cluster.ResultSet at 0x199f16af510>

## Close the session and cluster connection

In [18]:
session.shutdown()
cluster.shutdown()