#### Import Python packages 

In [1]:
# Import Python packages 
import pandas as pd
import cassandra

In [2]:
from etl import preprocess

In [3]:
# Import Queries
from sql_queries import session_item_create, user_session_create, song_user_create
from sql_queries import session_item_insert, user_session_insert, song_user_insert
from sql_queries import session_item_select, user_session_select, song_user_select
from sql_queries import drop_table_queries

In [4]:
from utils import execute_query, insert_from_df, result_as_df

### Run ETL Pipeline for Pre-Processing the Files

In [5]:
df = preprocess()
df.head()

/Users/keneudeh/Documents/Projects/udacity-data-engineer-nanodegree/0-Data-Modeling/Projects/sparkify-data-etl-cassandra
Num lines: 6821


Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
0,Harmonia,Ryan,M,0,Smith,655.77751,free,"San Jose-Sunnyvale-Santa Clara, CA",583,Sehr kosmisch,26
1,The Prodigy,Ryan,M,1,Smith,260.07465,free,"San Jose-Sunnyvale-Santa Clara, CA",583,The Big Gundown,26
2,Train,Ryan,M,2,Smith,205.45261,free,"San Jose-Sunnyvale-Santa Clara, CA",583,Marry Me,26
3,Sony Wonder,Samuel,M,0,Gonzalez,218.06975,free,"Houston-The Woodlands-Sugar Land, TX",597,Blackbird,61
4,Van Halen,Tegan,F,2,Levine,289.38404,paid,"Portland-South Portland, ME",602,Best Of Both Worlds (Remastered Album Version),80


### Set up DB

#### Create a Cluster

In [6]:
# This should make a connection to a Cassandra instance your local machine 
from cassandra.cluster import Cluster

try:
    
    cluster = Cluster(['127.0.0.1'])
    # To establish connection and begin executing queries, need a session
    session = cluster.connect()
    
except Exception as e:
    print(e)

#### Create Keyspace

In [7]:
# Create Keyspace
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS sparkifydb
    WITH REPLICATION = 
    { 'class': 'SimpleStrategy', 'replication_factor': 1 }
    """)
except Exception as e:
    print(e)

#### Set Keyspace

In [8]:
# Set Keyspace
try:
    session.set_keyspace('sparkifydb')
except Exception as e:
    print(e)

### Create Collections to answer queries

#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

In [9]:
# CREATE collection
execute_query(session, session_item_create)

<cassandra.cluster.ResultSet at 0x119c21950>

In [10]:
# INSERT values into collection
insert_from_df(session, df, ['sessionId', 'itemInSession', 'artist', 'song', 'length'], session_item_insert)

In [11]:
# SELECT to verify that the data have been inserted into table
res = execute_query(session, session_item_select)
result_as_df(res, columns=['session_id', 'session_item_id', 'artist_name', 'song_title', 'song_length'])

Unnamed: 0,session_id,session_item_id,artist_name,song_title,song_length
0,338,4,Faithless,495.3073,Music Matters (Mark Knight Dub)


#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

In [12]:
# CREATE collection
execute_query(session, user_session_create)

<cassandra.cluster.ResultSet at 0x119c21550>

In [13]:
# INSERT values into collection
insert_from_df(session, df, ['userId', 'sessionId', 'itemInSession', 'artist', 'firstName', 'lastName', 'song'], user_session_insert)

Do a SELECT to verify that the data have been inserted into each table

In [14]:
# SELECT to verify that the data have been inserted into table
res = execute_query(session, user_session_select)
result_as_df(res, columns=['user_id', 'session_id', 'session_item_id', 'artist_name', 'first_name', 
                           'last_name', 'song_title'])


Unnamed: 0,user_id,session_id,session_item_id,artist_name,first_name,last_name,song_title
0,10,182,0,Down To The Bone,Sylvie,Cruz,Keep On Keepin' On
1,10,182,1,Three Drives,Sylvie,Cruz,Greece 2000
2,10,182,2,Sebastien Tellier,Sylvie,Cruz,Kilometer
3,10,182,3,Lonnie Gordon,Sylvie,Cruz,Catch You Baby (Steve Pitron & Max Sanna Radio...


#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [15]:
# CREATE collection
execute_query(session, song_user_create)            

<cassandra.cluster.ResultSet at 0x10fdd5c90>

In [16]:
# INSERT values into collection
insert_from_df(session, df, ['song', 'firstName', 'lastName'], song_user_insert)

Do a SELECT to verify that the data have been inserted into each table

In [17]:
# SELECT to verify that the data have been inserted into table
res = execute_query(session, song_user_select)
result_as_df(res, columns=['song_title', 'first_name', 'last_name'])

Unnamed: 0,song_title,first_name,last_name
0,All Hands Against His Own,Jacqueline,Lynch


### Drop the tables before closing out the sessions

In [18]:
for query in drop_table_queries:
    execute_query(session, query)

### Close the session and cluster connection¶

In [19]:
session.shutdown()
cluster.shutdown()