# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [33]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
import time

## (A) Preprocessing with csv module

#### Creating list of filepaths to process original event csv data files

In [34]:
# checking the current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data/'

# check to remove assicated jupyter notebook folder if notebook run more than once
all_dirs = [dirs for root, dirs, files in os.walk(filepath)]
ipynb_file_path = os.getcwd() + '/event_data/.ipynb_checkpoints'
if ['.ipynb_checkpoints'] in all_dirs:
    shutil.rmtree(ipynb_file_path)

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))

/home/workspace


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [35]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# total number of rows 
print(f'Total number of rows in all files are  {len(full_data_rows_list)}')
# the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        # remove when artist column is NA
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


Total number of rows in all files are  8056


In [36]:
# check the number of rows in newly created 'event_datafile_new.csv' file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(f'Number of rows in newly created "event_datafile_new.csv" file: {sum(1 for line in f) -1}') # -1 for header line

Number of rows in newly created "event_datafile_new.csv" file: 6820


## (B) Preprocessing with pandas for analysis & testing purposes

In [37]:
def process_data(filepath):
    """iterate over all files in a path
    
    Parameters:
    ----------- 
    filepath: path of the directory containing .csv files
    
    Returns:
    --------
    all_files: list of all files matching the extension .csv 
               from the directory
    
    """
    
    # get all files matching extension from directory 
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.csv'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    # get total number of files found
    num_files = len(all_files)
    print('{} files found in {}'.format(num_files, filepath))  
    return all_files

In [38]:
# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data/'

file_names = process_data(filepath)

df = pd.DataFrame()
for each in file_names:
    df = df.append(pd.read_csv(each, index_col=False), ignore_index=True, sort=False)


30 files found in /home/workspace/event_data/


In [39]:
#check file structure
df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userId
0,Barry Tuckwell/Academy of St Martin-in-the-Fie...,Logged In,Mohammad,M,0,Rodriguez,277.15873,paid,"Sacramento--Roseville--Arden-Arcade, CA",PUT,NextSong,1540510000000.0,961,Horn Concerto No. 4 in E flat K495: II. Romanc...,200,1543280000000.0,88.0
1,Jimi Hendrix,Logged In,Mohammad,M,1,Rodriguez,239.82975,paid,"Sacramento--Roseville--Arden-Arcade, CA",PUT,NextSong,1540510000000.0,961,Woodstock Inprovisation,200,1543280000000.0,88.0
2,Building 429,Logged In,Mohammad,M,2,Rodriguez,300.61669,paid,"Sacramento--Roseville--Arden-Arcade, CA",PUT,NextSong,1540510000000.0,961,Majesty (LP Version),200,1543280000000.0,88.0
3,The B-52's,Logged In,Gianna,F,0,Jones,321.54077,free,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540870000000.0,107,Love Shack,200,1543280000000.0,38.0
4,Die Mooskirchner,Logged In,Gianna,F,1,Jones,169.29914,free,"New York-Newark-Jersey City, NY-NJ-PA",PUT,NextSong,1540870000000.0,107,Frisch und g'sund,200,1543280000000.0,38.0


In [40]:
# drop columns having "NaN" in artist column
df_modified = df[df['artist'].notna()] 

# pick required columns
df_required = df_modified.iloc[:, [0, 2, 3, 4, 5, 6, 7, 8, 12, 13, 16]]

#show the required DataFrame to be saved into file
df_required.head()

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Barry Tuckwell/Academy of St Martin-in-the-Fie...,Mohammad,M,0,Rodriguez,277.15873,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Horn Concerto No. 4 in E flat K495: II. Romanc...,88.0
1,Jimi Hendrix,Mohammad,M,1,Rodriguez,239.82975,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Woodstock Inprovisation,88.0
2,Building 429,Mohammad,M,2,Rodriguez,300.61669,paid,"Sacramento--Roseville--Arden-Arcade, CA",961,Majesty (LP Version),88.0
3,The B-52's,Gianna,F,0,Jones,321.54077,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Love Shack,38.0
4,Die Mooskirchner,Gianna,F,1,Jones,169.29914,free,"New York-Newark-Jersey City, NY-NJ-PA",107,Frisch und g'sund,38.0


### Now, Sanity check for pandas and CSV reader style

In [41]:
#---comparson between the .csv files with csv module and pandas 
#---for double check 

#save the file from csv module into "df_cross_check" Dataframe
df_cross_check  = pd.read_csv("event_datafile_new.csv")

# save the df_required DataFrame from pandas into .csv file
out_file_path = 'event_datafile_new_pandas.csv'
df_required.to_csv(out_file_path, index=False)

#test the lengths of each DataFrame
print(len(df_cross_check.index))
print(len(df_required.index))

6820
6820


-----------

# Part II. Apache Cassandra Usage 

### Now we have <font color=red>event_datafile_new.csv</font>  and corresponding DataFrame <font color=red> df_cross_check </font>  which contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of the denormalized data 

<img src="images/image_event_datafile_new.jpg">

### Lets assume, we need the following insights.  

#### 1. The artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


#### 2. Name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### 3. Every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'  

For this, we need to model our database tables in Apache cassandra database based on the queries. 

### 1. Creating  cluster (making a connection to a cassandra instance)

In [42]:
from cassandra.cluster import Cluster

# make a cluster object
cluster = Cluster() 

# esteblish a connection with local machine to create session for executing queries 
session = cluster.connect() 

### 2. Create Keyspace

important : always keep keyspaces names in lower. The cassandra system stores the keyspace names in lower case.

In [43]:
try:
    session.execute("""
    
    CREATE KEYSPACE IF NOT EXISTS project_nosql
    WITH REPLICATION = {'class': 'SimpleStrategy',
                        'replication_factor' : 1}
    """)
except Exception as e:
    print(e)
    

### 3. Set KEYSPACE to the keyspace specified above

In [44]:
try:
    session.set_keyspace('project_nosql')
except Exception as e:
    print(e)

 --------------------

## Query 1  
### The artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4   

#### Primary Key determination:  
#### Analysing our data for uniqueness for session_id and iteminsession

First we check the uniqueness of data for columns so to make it as a primary key or not. For this, we use pandas dataframe.

In [45]:
df_ = df_cross_check[['sessionId', 'itemInSession']] #

In [46]:
df__ = df_.drop_duplicates(subset=['sessionId', 'itemInSession'])
#df_.sort_values(['sessionId', 'itemInSession'])

In [47]:
print(len(df__.index))
print(len(df_cross_check.index))

6820
6820


#### Since the above combination of  {'sessionId', 'itemInSession'}  has no any duplicates. So we can take this combination as a composite key. We take session_id as partition key and itemInSession as clustering column


#### Create Table

In [48]:
query_1 =  """ CREATE TABLE IF NOT EXISTS tbl_music_info_4_session_item
             ( 
             session_id     int, 
             item_insession int, 
             artist         text, 
             title          text, 
             song_length    decimal,
             PRIMARY KEY    (session_id, item_insession))
         """

try:
    session.execute(query_1)
except Exception as e:
    print(e)

In [49]:
start = time.process_time()

file = 'event_datafile_new.csv'
with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO tbl_music_info_4_session_item (session_id, item_insession, artist, title, song_length)"
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))
        
print(time.process_time() - start)

3.3635754529999993


#### comparing time with insertion with pandas 

In [23]:
start = time.process_time()

for index, row in df_cross_check.iterrows():
    query = "INSERT INTO tbl_music_info_4_session_item (session_id, item_insession, artist, title, song_length)"
    query = query + "VALUES (%s, %s, %s, %s, %s)"
    session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

print(time.process_time() - start)
# row[0] or df_cross_check.iloc[index, 0] or df_cross_check.loc[index, "artist"]

5.454064913000001


##### We see for pandas , single-line data inseretion is taking more time.

#### Verification of that the data have been inserted into table "tbl_music_info_4_session_item".

In [50]:
query = """ 
SELECT 
    count(*)
FROM 
    tbl_music_info_4_session_item;
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
print(rows.current_rows)



[Row(count=6820)]


#### *We get warning since the above defined query is a full scan, not performant and should be avoided. But E have used it since right now, we are working with not a big data file.

#### Now the required query based on selection creteria session_id = 338 and itemInSession = 4 

In [51]:
query = """ 
SELECT 
    artist, 
    title, 
    song_length
FROM 
    tbl_music_info_4_session_item
WHERE
    session_id = 338 and item_insession = 4 
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row. artist, row.title, row.song_length)

Faithless Music Matters (Mark Knight Dub) 495.3073


#### Testing

In [52]:
df_cross_check.loc[(df_cross_check['sessionId'] == 338) & (df_cross_check['itemInSession'] == 4), \
                   ['artist', 'song','length']]

Unnamed: 0,artist,song,length
444,Faithless,Music Matters (Mark Knight Dub),495.3073


 ------

## Query 2 
### The name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182  



#### Primary key determination :

#### 1. Uniqueness check : (user_id and session_id) as PK

In [53]:
df_ = df_cross_check[['userId', 'sessionId']]
print(len(df_.index))
print(len(df_.drop_duplicates().index))

6820
776


Therefore, `(user_id and session_id) as PK : fail`
#### 2. Uniqueness check : (user_id, session_id, itemInSession) as PK

In [54]:
df_ = df_cross_check[['userId', 'sessionId', 'itemInSession']]
print(len(df_.index))
print(len(df_.drop_duplicates().index))

6820
6820


Therefore,  `(user_id, session_id, itemInSession) as PK : Pass`  

But here order of data is also to be cosidered. Order of data in the table (within each partition) will be by default in ascending order of clustering column "item_insession"  during storage and retrieval as well.  
Therfore, `PK = ((user_id, session_id), itemInSession)` 

**If asked in descending order:**  
we add "WITH CLUSTERING ORDER BY (item_insession DESC)"  
https://docs.datastax.com/en/cql-oss/3.3/cql/cql_using/useCompoundPrimaryKey.html  

**But we can also fine-tune the display order using the ORDER BY clause.**  
https://docs.datastax.com/en/cql-oss/3.3/cql/cql_using/useQueryColumnsSort.html  
for that the partition key must be defined in the WHERE clause and the ORDER BY clause defines the clustering column to use for ordering. (which is not required if table created with clustering column as ascending order)

#### Create table

In [55]:
query_1 =  """ CREATE TABLE IF NOT EXISTS tbl_music_info_4_user_session
             (user_id        int,
              session_id     int,
              item_insession int,
              artist         text, 
              title          text, 
              first_name     text, 
              last_name      text, 
              PRIMARY KEY    ((user_id, session_id), item_insession)
              ); -- WITH CLUSTERING ORDER BY (item_insession DESC)
         """

try:
    session.execute(query_1)
except Exception as e:
    print(e)                   

#### Data load into table

In [56]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO tbl_music_info_4_user_session ( user_id, session_id, item_insession, artist, title, first_name, last_name)"
        query = query + "VALUES (%s, %s, %s, %s, %s,  %s, %s)"
        session.execute(query, (int(line[-1]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

In [64]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO tbl_music_info_4_user_session_diff ( user_id, session_id, item_insession, artist, title, first_name, last_name)"
        query = query + "VALUES (%s, %s, %s, %s, %s,  %s, %s)"
        session.execute(query, (int(line[-1]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

#### Check for successful insertion

In [69]:
query = """ 
SELECT 
    count(*)
FROM 
    tbl_music_info_4_user_session;
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
print(rows.current_rows)



[Row(count=6820)]


#### *We get warning since the above defined query is a full scan, not performant and should be avoided. But we have used it since right now, we are working with not a big data file.

#### Now the required query for selection

In [58]:
query = """ 
SELECT 
    item_insession, artist, title, first_name, last_name
FROM 
    tbl_music_info_4_user_session
WHERE
     user_id = 10 and session_id = 182
ORDER BY 
     item_insession ASC; -- ORDER BY clause not necessary here
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print( row.item_insession, row.artist, row.title, row.first_name, row.last_name)

0 Down To The Bone Keep On Keepin' On Sylvie Cruz
1 Three Drives Greece 2000 Sylvie Cruz
2 Sebastien Tellier Kilometer Sylvie Cruz
3 Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio Edit) Sylvie Cruz


### Testing

In [35]:
df_cross_check.loc[(df_cross_check['userId'] == 10) & (df_cross_check['sessionId'] == 182), \
                   ['artist', 'song','firstName','lastName','itemInSession']].sort_values('itemInSession', ascending=True)

Unnamed: 0,artist,song,firstName,lastName,itemInSession
4704,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz,0
4705,Three Drives,Greece 2000,Sylvie,Cruz,1
4706,Sebastien Tellier,Kilometer,Sylvie,Cruz,2
4707,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz,3


 -------

## Query 3 
### Every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'  

#### Primary key determination :

In [36]:
df_required.loc[df_required['song'] == 'All Hands Against His Own']
#or
#array = ['All Hands Against His Own']
#df_required.loc[df_required['song'].isin(array)]

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
3286,The Black Keys,Tegan,F,25,Levine,196.91057,paid,"Portland-South Portland, ME",611,All Hands Against His Own,80.0
6012,The Black Keys,Sara,F,31,Johnson,196.91057,paid,"Winston-Salem, NC",152,All Hands Against His Own,95.0
7391,The Black Keys,Jacqueline,F,50,Lynch,196.91057,paid,"Atlanta-Sandy Springs-Roswell, GA",559,All Hands Against His Own,29.0


Since title is in where clause, so it should be taken as primary key. but if we take it as a simple primary key then we will get only the last row (7391) in above output  since other rows will be overridden. So we need to take a composite key ( title, first_name, last_name) as primary key. 

In [37]:
df_ = df_cross_check[['song', 'userId']]
print(len(df_.index))
print(len(df_.drop_duplicates().index))

6820
6618


We see that, selecting only three columns (songs, first_name, and last_name) gives 202 rows as duplicate rows. But in our scenario (to find user_id for a perticular title), we will not have effect in our final result and also in our results, we only need one copy for any redundant rows. Apache Cassandra do not support duplicate rows. So these redundant rows will automatically not taken into account while insertion. Therfore, our choice ( title, user_id) as primary key is valid.

#### Table creation

In [38]:
query_3 =  """ CREATE TABLE IF NOT EXISTS tbl_music_info_4_title
             (title         text, 
              user_id       int, 
              first_name    text,
              last_name     text,
              PRIMARY KEY   (title, user_id))
         """

try:
    session.execute(query_3)
except Exception as e:
    print(e)
                    

#### Data insertion

In [39]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO tbl_music_info_4_title (title, user_id, first_name, last_name)"
        query = query + "VALUES (%s, %s, %s, %s)"
        session.execute(query, (line[9], int(line[-1]), line[1], line[4]))

#### Verification  and testing of data insertion

In [40]:
query = """
SELECT 
    count(*)
FROM
    tbl_music_info_4_title
"""

rows =  session.execute(query)

print(rows.current_rows)

[Row(count=6618)]


#### Required query (selection)

In [41]:
query = """
SELECT 
    first_name, last_name
FROM
    tbl_music_info_4_title
WHERE
    title = 'All Hands Against His Own'
"""

rows =  session.execute(query)

for row in rows:
    print(row.first_name, row.last_name)

Jacqueline Lynch
Tegan Levine
Sara Johnson


#### Testing with pandas

In [42]:
df_required.loc[df_required['song'] == 'All Hands Against His Own', ['firstName', 'lastName']]
#or
#array = ['All Hands Against His Own']
#df_required.loc[df_required['song'].isin(array)]

Unnamed: 0,firstName,lastName
3286,Tegan,Levine
6012,Sara,Johnson
7391,Jacqueline,Lynch


## Drop the tables before closing out the sessions and delete intermediate .csv files

In [43]:
tables = ['tbl_music_info_4_session_item', 'tbl_music_info_4_user_session', 'tbl_music_info_4_title']

for each_table in tables:
    query = 'DROP TABLE ' + each_table + ';'
    print(query)
    session.execute(query)
    
os.remove("event_datafile_new.csv")
os.remove("event_datafile_new_pandas.csv")

DROP TABLE tbl_music_info_4_session_item;
DROP TABLE tbl_music_info_4_user_session;
DROP TABLE tbl_music_info_4_title;


### Close the session and cluster connection¶

In [44]:
session.shutdown()
cluster.shutdown()

-------

## For Learning purpose: Difference between composite and compound keys:The output structure

In [67]:
query = """ 
SELECT *
FROM 
    tbl_music_info_4_user_session limit 50;
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.user_id, row.session_id, row.item_insession, row.artist, row.title, row.first_name, row.last_name)

58 768 0 System of a Down Sad Statue Emily Benson
58 768 1 Ghostland Observatory Stranger Lover Emily Benson
58 768 2 Evergreen Terrace Zero Emily Benson
85 776 2 Deftones Head Up (LP Version) Kinsley Young
85 776 3 The Notorious B.I.G. Playa Hater (Amended Version) Kinsley Young
85 776 4 Orchestral Manoeuvres In The Dark Stay (The Black Rose And The Universal Wheel) Kinsley Young
85 776 5 Vangelis Roxane's Veil Kinsley Young
85 776 6 Spandau Ballet TRUE Kinsley Young
85 776 8 The Pussycat Dolls I'm Done Kinsley Young
85 776 9 The Hollies I'm Down Kinsley Young
85 776 10 Lily Allen LDN Kinsley Young
85 776 11 Fall Out Boy She's My Winona Kinsley Young
85 776 12 Metallica The Unforgiven II Kinsley Young
85 776 13 O-Zone Dragostea din tin (ma-ya-hi) Kinsley Young
85 776 14 Murs Bad Man! Kinsley Young
85 776 15 Basshunter Boten Anna [Radio edit] Kinsley Young
85 776 16 Maelo Ruiz Te Va A Doler Kinsley Young
85 776 17 Dwight Yoakam You're The One Kinsley Young
85 776 18 Fragma Everytime Yo

In [63]:
query_1 =  """ CREATE TABLE IF NOT EXISTS tbl_music_info_4_user_session_diff
             (user_id        int,
              session_id     int,
              item_insession int,
              artist         text, 
              title          text, 
              first_name     text, 
              last_name      text, 
              PRIMARY KEY    (user_id, session_id, item_insession)
              ); -- WITH CLUSTERING ORDER BY (item_insession DESC)
         """

try:
    session.execute(query_1)
except Exception as e:
    print(e)                   

In [68]:
query = """ 
SELECT *
FROM 
    tbl_music_info_4_user_session_diff limit 50;
"""
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row.user_id, row.session_id, row.item_insession, row.artist, row.title, row.first_name, row.last_name)

23 177 2 Dwight Yoakam You're The One Morris Gilmore
23 351 0 Foals Blue Blood Morris Gilmore
23 351 1 'N Sync/Phil Collins Trashin' The Camp (Phil And 'N Sync Version) Morris Gilmore
23 841 0 Eminem Just Lose It Morris Gilmore
53 52 2 Mynt Playa Haters Celeste Williams
53 52 3 Taylor Swift You Belong With Me Celeste Williams
53 52 4 Amy Winehouse Valerie Celeste Williams
53 52 5 Jimmy Eat World Dizzy Celeste Williams
53 215 0 Dido I'm No Angel Celeste Williams
53 215 1 Lifehouse You And Me (Wedding Version) Celeste Williams
53 215 2 Gary U.S. Bonds Angelyne Celeste Williams
53 215 3 Throw Me The Statue Noises Celeste Williams
53 311 0 Shakira Estoy AquÃÂ­ Celeste Williams
53 438 1 Barry Tuckwell/Academy of St Martin-in-the-Fields/Sir Neville Marriner Horn Concerto No. 4 in E flat K495: II. Romance (Andante cantabile) Celeste Williams
53 438 2 Gary Allan Nothing On But The Radio Celeste Williams
53 438 3 Charttraxx Karaoke Fireflies Celeste Williams
53 485 0 Coldplay A Rush Of Blood T