# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
import csv
import glob
import re
import os
import json
from datetime import datetime

import numpy as np
import pandas as pd
# import cassandra
from cassandra.cluster import Cluster


#### Merge all CSV files into one

In [2]:
# Consolidate all CSV files into one  
def merge_csvs():
    # Get current folder and subfolder event data
    filepath = os.getcwd() + '/event_data'
    print(f'{datetime.now()} Merging CSVs in {filepath}')

    # collect and join filepaths
    for root, dirs, files in os.walk(filepath):
        file_path_list = glob.glob(os.path.join(root,'*'))
    
    full_data_rows_list = [] 
    for f in file_path_list:
        with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
            csvreader = csv.reader(csvfile) 
            next(csvreader)
            for line in csvreader:
                full_data_rows_list.append(line) 

    csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

    with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
        writer = csv.writer(f, dialect='myDialect')
        writer.writerow(['artist','first_name','gender','item_in_session','last_name','length',\
                    'level','location','session_id','song','user_id'])
        for row in full_data_rows_list:
            if (row[0] == ''):
                continue
            writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))
    print(f'{datetime.now()} Finished merging CSVs. Merged csv is at {os.getcwd()}/event_datafile_new.csv')
    

# Part II. Complete the Apache Cassandra coding portion of your project. 

Now you are ready to work with the CSV file titled `event_datafile_new.csv`, located within the Workspace directory.  The `event_datafile_new.csv` contains the following columns: 

- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the `event_datafile_new.csv` after the code above is run:

<img src="images/image_event_datafile_new.jpg">

## Begin writing your Apache Cassandra code in the cells below

### Helpers

In [3]:
# Column name to number map
CSV_COLS = {"artist": 0, "first_name": 1, "gender":2, "item_in_session": 3,"last_name": 4,
            "length": 5,"level": 6,"location": 7,"session_id": 8,"song": 9,"user_id": 10}

In [4]:
def connect_to_cluster(host='127.0.0.1'):
    try:
        cluster = Cluster([host])
        print(f'{datetime.now()} Cluster created at {host} and connected to cluster.')
        return (cluster, cluster.connect())
    except Exception as e:
        print(e)

def create_keyspace(keyspace, repl_class='SimpleStrategy', repl_factor=1):
    replication = {'class' : repl_class, 'replication_factor' : repl_factor}
    try:
        session.execute(f"CREATE KEYSPACE IF NOT EXISTS {keyspace} WITH REPLICATION = {replication}")
        print(f'{datetime.now()} Created KEYSPACE {keyspace} with replication = {replication}')
    except Exception as e:
        print(e)
        
def set_keyspace(keyspace):
    try:
        session.set_keyspace(keyspace)
        print(f'{datetime.now()} Setting KEYSPACE {keyspace}')
    except Exception as e:
        print(e)
        
def execute_query(sql):
    try:
        return session.execute(sql)
    except Exception as e:
        print(e)
        
def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

def build_df(query):
    session.row_factory = pandas_factory
    result = session.execute(query, timeout=None)
    return result._current_rows

def build_insert_statment(table, columns):
    """Creates an insert statement for a given table and columns"""
    return f"""
    INSERT INTO {table} ({(', '.join(columns))})
    VALUES ({(', '.join(['%s'] * len(columns)))})
    """

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




# `DROP`/`CREATE` statements

In [5]:
session_and_item_drop = "DROP TABLE IF EXISTS song_details_by_session_and_item;"
session_and_item_create = """
CREATE TABLE song_details_by_session_and_item (
      artist text,
      song text,
      length float,
      session_id int,
      item_in_session int,
      PRIMARY KEY ((session_id, item_in_session), artist, song)
);
"""

by_user_session_drop = "DROP TABLE IF EXISTS song_details_by_user_session;"
by_user_session_create = """
CREATE TABLE song_details_by_user_session (
      artist text,
      song text,
      session_id int,
      item_in_session int,
      user_id int,
      first_name text,
      last_name text,
      PRIMARY KEY (user_id, session_id, item_in_session)
);
"""

user_details_by_song_drop = "DROP TABLE IF EXISTS user_details_by_song_name;"
user_details_by_song_create = """
CREATE TABLE user_details_by_song_name (
      song text,
      first_name text,
      last_name text,
      PRIMARY KEY (song, first_name, last_name)
);
"""

# `SELECT` statements

In [6]:
session_and_item_select = """
    SELECT artist, song, length
    FROM song_details_by_session_and_item  
    WHERE session_id = 338  
    AND item_in_session = 4;
    """

by_user_session_select = """
    SELECT artist, song, first_name, last_name
    FROM song_details_by_user_session
    WHERE user_id = 10 
    AND session_id = 182
    """

by_song_name_select = """
    SELECT first_name, last_name 
    FROM user_details_by_song_name
    WHERE song = 'All Hands Against His Own'
    """

# Create and load tables

In [7]:
def session_and_item_load():
    print(f'{datetime.now()} Loading table `session_and_item_load`')
    execute_query(session_and_item_drop)
    execute_query(session_and_item_create)
    file = 'event_datafile_new.csv'
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        for line in csvreader:
            query = build_insert_statment('song_details_by_session_and_item', ['artist', 'song', 'length', 'session_id', 'item_in_session'])
            session.execute(query, (line[CSV_COLS['artist']], 
                                    line[CSV_COLS['song']], 
                                    float(line[CSV_COLS['length']]), 
                                    int(line[CSV_COLS['session_id']]), 
                                    int(line[CSV_COLS['item_in_session']])))
    validate_load = execute_query("select count(*) from song_details_by_session_and_item;")
    print(f'{datetime.now()} {[r.count for r in validate_load][0]} rows loaded into `song_details_by_session_and_item`')


In [8]:
def by_user_session_load():
    print(f'{datetime.now()} Loading table `song_details_by_user_session`')
    execute_query(by_user_session_drop)
    execute_query(by_user_session_create)

    file = 'event_datafile_new.csv'
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        for line in csvreader:
            query = build_insert_statment('song_details_by_user_session', ['artist', 'song', 'session_id', 'item_in_session', 'user_id', 'first_name', 'last_name'])
            session.execute(query, (line[CSV_COLS['artist']], 
                                    line[CSV_COLS['song']], 
                                    int(line[CSV_COLS['session_id']]), 
                                    int(line[CSV_COLS['item_in_session']]), 
                                    int(line[CSV_COLS['user_id']]), 
                                    line[CSV_COLS['first_name']], 
                                    line[CSV_COLS['last_name']]))
    validate_load = execute_query("select count(*) from song_details_by_user_session;")
    print(f'{datetime.now()} {[r.count for r in validate_load][0]} rows loaded into `song_details_by_user_session`')

In [9]:
def by_song_name_load():
    print(f'{datetime.now()} Loading table `user_details_by_song_name`')
    execute_query(user_details_by_song_drop)
    execute_query(user_details_by_song_create)
    file = 'event_datafile_new.csv'
    with open(file, encoding = 'utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader) # skip header
        for line in csvreader:
            query = build_insert_statment('user_details_by_song_name', ['song', 'first_name', 'last_name'])
            session.execute(query, (line[CSV_COLS['song']], 
                                    line[CSV_COLS['first_name']], 
                                    line[CSV_COLS['last_name']]))

    validate_load = execute_query("select count(*) from user_details_by_song_name;")
    print(f'{datetime.now()} {[r.count for r in validate_load][0]} rows loaded into `user_details_by_song_name`')

#### Main function, which invokes all other functions

In [10]:
# Define cluster and session globally so we don't have to pass them into every function
cluster, session = connect_to_cluster()

def main():
    merge_csvs()
    create_keyspace('sparkify')
    set_keyspace('sparkify')
    session_and_item_load()
    by_user_session_load()
    by_song_name_load()

2019-08-31 21:30:07.539528 Cluster created at 127.0.0.1 and connected to cluster.


#### Call main function

In [11]:
if __name__ == '__main__':
    main()

2019-08-31 21:30:07.563911 Merging CSVs in /Users/fitscott/denano/sparkify-cassandra/event_data
2019-08-31 21:30:07.631190 Finished merging CSVs. Merged csv is at /Users/fitscott/denano/sparkify-cassandra/event_datafile_new.csv
2019-08-31 21:30:07.635463 Created KEYSPACE sparkify with replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
2019-08-31 21:30:07.636568 Setting KEYSPACE sparkify
2019-08-31 21:30:07.636641 Loading table `session_and_item_load`
2019-08-31 21:30:10.733541 6820 rows loaded into `song_details_by_session_and_item`
2019-08-31 21:30:10.733645 Loading table `song_details_by_user_session`
2019-08-31 21:30:13.531338 6820 rows loaded into `song_details_by_user_session`
2019-08-31 21:30:13.531462 Loading table `user_details_by_song_name`
2019-08-31 21:30:16.087833 6618 rows loaded into `user_details_by_song_name`


# Query #1: Session and item table
### Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

```
SELECT artist, song, length
FROM song_details_by_session_and_item  
WHERE session_id = 338  
AND item_in_session = 4;
```

#### Verify the data have been properly inserted into each table -- build a dataframe based on each query

In [12]:
session_and_item_df = build_df(session_and_item_select)
session_and_item_df.head()

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


# Query #2: Song details by user session
### Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
```
SELECT artist, song, first_name, last_name 
FROM music_history 
WHERE user_id = 10 
AND session_id = 182
```

#### Verify the data have been properly inserted into each table -- build a dataframe based on each query

In [13]:
song_details_by_user_session_df = build_df(by_user_session_select)
song_details_by_user_session_df.head()

Unnamed: 0,artist,song,first_name,last_name
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


# Query #3: User details by song name
### Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
```
SELECT first_name, last_name 
FROM user_details_by_song_name 
WHERE song = 'All Hands Against His Own'

```

#### Verify the data have been properly inserted into each table -- build a dataframe based on each query

In [14]:
by_song_name_df = build_df(by_song_name_select)
by_song_name_df.head()

Unnamed: 0,first_name,last_name
0,Jacqueline,Lynch
1,Sara,Johnson
2,Tegan,Levine


### Drop the tables then close out the sessions

In [15]:
execute_query("DROP TABLE song_details_by_session_and_item;")
execute_query("DROP TABLE song_details_by_user_session;")
execute_query("DROP TABLE user_details_by_song_name;")

<cassandra.cluster.ResultSet at 0x112565290>

In [16]:
session.shutdown()
cluster.shutdown()