# *Data Modelling with Apache Cassandra :*
## *ETL Pipeline for Pre-Processing the files*

#### *Import the required Python Packages*

In [152]:
import pandas as pd
import cassandra
import os
import glob
import re
import numpy as np
import csv

#### *Get the file_path of event_data:*

In [153]:
#get the current working directory with event_data
file_path = os.getcwd() + '/event_data'


print(file_path)

/home/dharani/Udacity/DataModelingwithApacheCassandra/event_data


In [154]:
#Loop through the file_path to collect a list of files and corresponding file path
for root, dirs, files in os.walk(file_path):
    
    file_path_list = glob.glob(os.path.join(root,'*'))

#### *Processing the event_data to create a new data file to use in Apache Cassandra*

In [155]:
# Initialising empty list of rows that will be generated from each file
new_event_data = []

for f in file_path_list:
    with open(f, 'r', encoding='utf8', newline ='') as csvfile: 
        csvreader = csv.reader(csvfile)
        next(csvreader)
        for line in csvreader:
            new_event_data.append(line)


In [156]:
print(len(new_event_data))

8056


In [199]:
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                     'level','location','sessionId','song','userId'])
    for row in new_event_data:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [158]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


## *Apache Cassandra Code :*

In [160]:
import cassandra

In [161]:
from cassandra.cluster import Cluster
try: 
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)

#### *Create KeySpace*

In [162]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity
    WITH REPLICATION =
    {'class':'SimpleStrategy','replication_factor':1}
    """)
except Exception as e:
    print(e)

#### *Set to KeySpace*

In [163]:
try:
    session.set_keyspace('udacity')
except Exception as e:
    print(e)

#### *Query 1:  Give me the artist, song title and song's length in the music app history* 
#### *that was heard during sessionId = 338, itemInSession = 4*

In [164]:
# Create table for query 1
query1 = ("""
            CREATE TABLE IF NOT EXISTS song_details
            (session_id       INT
            ,item_in_session  INT
            ,artist           TEXT
            ,song_title       TEXT
            ,song_length      FLOAT
            ,PRIMARY KEY(session_id, item_in_session))
        """)
try:
    session.execute(query1)
except Exception as e:
    print(e)


In [165]:
#Inserting data into song_details
insert_query1 = (""" 
                    INSERT INTO song_details
                        (session_id
                        ,item_in_session
                        ,artist
                        ,song_title
                        ,song_length)
                VALUES (%s, %s, %s, %s, %s) 
                """)

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as csvfile:
    csvreader = csv.reader(csvfile)
    next(csvreader)
    for line in csvreader:
        session.execute(insert_query1, ( int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

In [166]:
#SELECT statement for Query1
select_query1 = ("""
                    SELECT artist
                          ,song_title
                          ,song_length
                    FROM  song_details
                    WHERE session_id      = %s
                    AND   item_in_session = %s
                 """)

try:
    result_set = session.execute(select_query1, (338, 4))
    result_data = pd.DataFrame(list(result_set))
except Exception as e:
    print(e)

print(result_data)

      artist                       song_title  song_length
0  Faithless  Music Matters (Mark Knight Dub)   495.307312


#### *Query 2: Give me only the following: name of artist, song (sorted by itemInSession) and*
#### *user (first and last name) for userid = 10, sessionid = 182*

In [167]:
# Create table for query 2
query2 = ("""
            CREATE TABLE IF NOT EXISTS song_details_user
            (user_id          INT
            ,session_id       INT
            ,item_in_session  INT
            ,artist           TEXT
            ,song_title       TEXT
            ,user_first_name  TEXT
            ,user_last_name   TEXT
            ,PRIMARY KEY(user_id, session_id, item_in_session))
        """)
try:
    session.execute(query2)
except Exception as e:
    print(e)

In [168]:
#Inserting data into song_details
insert_query2 = (""" 
                    INSERT INTO song_details_user
                        (user_id
                        ,session_id
                        ,item_in_session
                        ,artist
                        ,song_title
                        ,user_first_name
                        ,user_last_name)
                VALUES (%s, %s, %s, %s, %s, %s, %s) 
                """)

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as csvfile:
    csvreader = csv.reader(csvfile)
    next(csvreader)
    for line in csvreader:
        session.execute(insert_query2,(int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

In [187]:
#SELECT statement for Query2
select_query2 = ("""
                    SELECT artist
                          ,song_title
                          ,user_first_name
                          ,user_last_name
                    FROM  song_details_user
                    WHERE user_id         = %s
                    AND   session_id      = %s
                 """)

try:
    result_set2  = session.execute(select_query2, (10, 182))
    df = pd.DataFrame(list(result_set2), columns = ['Artist','Song','UserFirstName', 'UserLastName'])
    print(df.to_string(index=False))
except Exception as e:
    print(e)


            Artist                                               Song UserFirstName UserLastName
  Down To The Bone                                 Keep On Keepin' On        Sylvie         Cruz
      Three Drives                                        Greece 2000        Sylvie         Cruz
 Sebastien Tellier                                          Kilometer        Sylvie         Cruz
     Lonnie Gordon  Catch You Baby (Steve Pitron & Max Sanna Radio...        Sylvie         Cruz


#### *Query3: Give me every user name (first and last) in my music app history who*
#### *listened to the song 'All Hands Against His Own'*


In [190]:
# Create table for query 3
query3 = ("""
            CREATE TABLE IF NOT EXISTS user_details
            (song_title       TEXT
            ,user_id          INT
            ,user_first_name  TEXT
            ,user_last_name   TEXT
            ,PRIMARY KEY(song_title, user_id))
        """)
try:
    session.execute(query3)
except Exception as e:
    print(e)

In [191]:
#Inserting data into user_details
insert_query3 = (""" 
                INSERT INTO user_details
                        (song_title
                        ,user_id
                        ,user_first_name
                        ,user_last_name)
                VALUES (%s, %s, %s, %s) 
                """)

file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as csvfile:
    csvreader = csv.reader(csvfile)
    next(csvreader)
    for line in csvreader:
        session.execute(insert_query3, (line[9], int(line[10]), line[1], line[4]))

In [198]:
#SELECT statement for Query3
select_query3 = ("""
                    SELECT user_first_name
                          ,user_last_name
                    FROM  user_details
                    WHERE song_title      = 'All Hands Against His Own'
                 """)

try:
    result_set3 = session.execute(select_query3)
    df = pd.DataFrame(list(result_set3), columns = ['FirstName', 'LastName'])
    print(df.to_string(index=False))
except Exception as e:
    print(e)


  FirstName LastName
 Jacqueline    Lynch
      Tegan   Levine
       Sara  Johnson


### *Drop the tables before closing out the sessions*

In [150]:
session.execute("DROP TABLE IF EXISTS song_details ")
session.execute("DROP TABLE IF EXISTS song_details_user")
session.execute("DROP TABLE IF EXISTS user_details")

<cassandra.cluster.ResultSet at 0x7f4ebe9ab890>

### *Close the connection*

In [151]:
session.shutdown()
cluster.shutdown()