# Data Modeling with Cassandra

#### Data Engineering NanoDegree Project 2

A startup called Sparkify has been collecting data and their analysis team is interested in understanding what songs users are listening to. The project is to ingest the data from all the CSV files into a NoSQL database using Apache Cassandra. As the engineer, the task is to create a complete ETL pipeline using python. Tables creatione is based on specific questions, and modeled so the queries running are consistent and the process is repetitive.

## Environment

Using a [Jupyter Notebook](https://jupyter.org/) environement to organize and create the project.

## Libraries

```python
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
```

## Package Contents

This package contains two subfolders, event_data and images. The event_data folder are logs of data that are processed into a final data file which is used to populate Apache Cassandra tables. There is a Project_1B_Project_Template.ipynb notebook which is the test notebook for the project.

This apache_cassandra_etl.ipynb file contains processes to structure and run the ETL process.

## Instructions

1. Import Python Packages
2. Creating list of filepaths to process original event csv data files
3. Processing the files to create the data file csv that will be used for Apache Casssandra tables
4. Create a Cluster
5. Create Keyspace
6. Set Keyspace
7. Create Tables for Each Query
8. Insert Data into Tables
9. QA Queries for Each Table
10. Drop Tables
11. Close Connection

# Part I. ETL Pipeline for Pre-Processing the Files

In [None]:
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

In [None]:
filepath = os.getcwd() + '/event_data'
for root, dirs, files in os.walk(filepath):
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

In [None]:
full_data_rows_list = [] 
 
for f in file_path_list: 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        csvreader = csv.reader(csvfile) 
        next(csvreader)     
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))

In [None]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

# Part II. Complete the Apache Cassandra coding portion of your project 

#### Now that the event_datafile_new is created, we can create table structures based on specific queries.

Below is the <font color=red>**event_datafile_new.csv**</font> structure for reference.
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

## Create Cluster and Keyspace for Apache Cassandra

#### Create Cluster

In [None]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
    print('Cluster Created: ' + str(cluster))
    print('Session Created: ' + str(session))
except Exception as e:
    print(e)

#### Create Keyspace

In [None]:
try: 
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS udacity
    WITH REPLICATION = 
    {'class' : 'SimpleStrategy', 'replication_factor': 1}""")
    print('Keyspace Created: udacity')
except Exception as e:
    print(e)

#### Set Keyspace

In [None]:
try:
    session.set_keyspace('udacity')
    print('Keyspace Set: udacity')
except Exception as e:
    print(e)

## Create queries to ask the following three questions of the data

#### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


#### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

#### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

### Query 1

##### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

Create Table

The query asks to return the artist name, song title and duration for a specific session id and item in session. We'll use the session id and item within that session as the primary key to query against in the table.

In [None]:
query = "CREATE TABLE IF NOT EXISTS song_by_session"
query = query + "(session_id int, item_in_session int, artist text, song text, length decimal, PRIMARY KEY(session_id, item_in_session))"
try:
    session.execute(query)
    print('Table Created: song_by_session')
except Exception as e:
    print(e)

Insert Data Into Table

In [None]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO song_by_session(session_id, item_in_session, artist, song, length)"
        query = query + "VALUES(%s, %s, %s, %s, %s)"
        try:
            session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))
        except Exception as e:
            print(e)
    print('Values Inserted Into Table: song_by_session')

Validate Query & View Results in DataFrame

In [None]:
session_details_list = []
session_details_columns = ['artist', 'song', 'length']

query = """SELECT artist, song, length
           FROM song_by_session
           WHERE session_id = 338
           AND item_in_session = 4
        """
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
  
for row in rows:
    session_details_list.append((row.artist, row.song, row.length))

session_df = pd.DataFrame(session_details_list, columns=session_details_columns)
session_df.head()

### Query 2

##### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

Create table

The query asks to return the artist name, song title, and users first and last name sorted by the item in the session. We'll use the user id and session id as the primary key and the item in the session as the clustering column to sort by and query against in the table.

In [None]:
query = "CREATE TABLE IF NOT EXISTS song_details_by_user "
query = query + "(user_id int, session_id int, artist text, item_in_session int, song text, first_name text, last_name text, PRIMARY KEY((user_id, session_id), item_in_session))"
try:
    session.execute(query)
    print('Table Created: song_details_by_user')
except Exception as e:
    print(e)

Insert Data Into Table

In [None]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO song_details_by_user(user_id, session_id, item_in_session, artist, song, first_name, last_name)"
        query = query + "VALUES(%s, %s, %s, %s, %s, %s, %s)"
        try:
            session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))
        except Exception as e:
            print(e)
    print('Values Inserted Into Table: song_details_by_user')

Validate Query & View Results in DataFrame

In [None]:
song_details_list = []
song_details_columns = ['artist', 'song', 'first_name', 'last_name']

query = """SELECT artist, song, first_name, last_name
           FROM song_details_by_user
           WHERE user_id = 10
           AND session_id = 182
        """
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    song_details_list.append((row.artist, row.song, row.first_name, row.last_name))

song_df = pd.DataFrame(song_details_list, columns=song_details_columns)
song_df.head()

### Query 3

##### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

Create Table

The query asks to return the users first and last name who listened to a specific song. We'll use the song and user id as the primary key to query against in the table.

In [None]:
query = "CREATE TABLE IF NOT EXISTS user_by_song "
query = query + "(song text, user_id int, first_name text, last_name text, PRIMARY KEY(song, user_id))"
try:
    session.execute(query)
    print('Table Created: user_by_song')
except Exception as e:
    print(e)

Insert Data Into Table

In [None]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader)
    for line in csvreader:
        query = "INSERT INTO user_by_song(song, user_id, first_name, last_name)"
        query = query + "VALUES(%s, %s, %s, %s)"
        try:
            session.execute(query, (line[9], int(line[10]), line[1], line[4]))
        except Exception as e:
            print(e)
    print('Values Inserted Into Table: users_by_song')

Validate Query & View Results in DataFrame

In [None]:
user_list = []
user_columns = ['first_name', 'last_name']

query = """SELECT first_name, last_name
           FROM user_by_song
           WHERE song = 'All Hands Against His Own'
        """
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    user_list.append((row.first_name, row.last_name))

user_df = pd.DataFrame(user_list, columns=user_columns)
user_df.head()

### Drop Tables

In [None]:
def drop_tables(tables):
    for table in tables:
        query = "DROP TABLE " + table
        try:
            session.execute(query)
            print('Dropped Table: ' + table)
        except Exception as e:
            print(e)

In [None]:
tables = ['song_by_session','song_details_by_user','user_by_song']
drop_tables(tables)

### Close the session and cluster connection

In [None]:
session.shutdown()
cluster.shutdown()

## License

[Udacity](htpps://www.udacity.com/)