# Part I. ETL Pipeline for Pre-Processing the Files

## PLEASE RUN THE FOLLOWING CODE FOR PRE-PROCESSING THE FILES

#### Import Python packages 

In [2]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

#### Creating list of filepaths to process original event csv data files

In [3]:
# checking your current working directory
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

/data-engineering/01_data_modeling/Project


#### Processing the files to create the data file csv that will be used for Apache Casssandra tables

In [4]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
#print(len(full_data_rows_list))
# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


In [5]:
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

6821


In [8]:
data = pd.read_csv('event_datafile_new.csv')

# Part II. Complete the Apache Cassandra coding portion of your project. 

## Now you are ready to work with the CSV file titled <font color=red>event_datafile_new.csv</font>, located within the Workspace directory.  The event_datafile_new.csv contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId

The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**event_datafile_new.csv**</font> after the code above is run:<br>

<img src="images/image_event_datafile_new.jpg">

##### class to facilitate the project
* initialise the connection
* create keyspace and table
* drop table
* set keyspace
* insert cassandra table from dataframe
* query to output result as dataframe
* close connection

In [51]:
class CassandraDataframe:
    def __init__(self, df: pd.DataFrame):
        self.df = df 
        self.columns = self.df.columns
        self.dtypes = self.df.dtypes
        self.cluster = None 
        self.session = None
        # self.__connect_cassandra()

    def connect_cassandra(self):
        from cassandra.cluster import Cluster
        try: 
            self.cluster = Cluster(['cassandra-seed'], port=9042) #If you have a locally installed Apache Cassandra instance
            self.session = self.cluster.connect()
        except Exception as e:
            print(e)

    def create_cassandra_keyspace(self, keyspace:str):
        # TO-DO: Create a Keyspace 
        try:
            query = f"CREATE KEYSPACE IF NOT EXISTS {keyspace} "
            query += """WITH REPLICATION = 
            { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }"""
            self.session.execute(query)

        except Exception as e:
            print(e)

        # print(query)
    
    def set_cassandra_keysapce(self, keyspace):
        # TO-DO: Set KEYSPACE to the keyspace specified above
        try:
            self.session.set_keyspace('events')
        except Exception as e:
            print(e)

    def drop_cassandra_table(self, table_name:str):
        query = f"DROP TABLE IF EXISTS {table_name}"
        try:
            self.session.execute(query)
        except Exception as e:
            print(e)

    def __get_df_column_name(self):
        return (', ').join(self.df.columns)

    def __set_column_type(self, partition_key:list, cluster_key:list):
        res = []
        for col, Dtype in zip(self.df.dtypes.index, self.df.dtypes.values):

            Dtype_mapping = {
                'object': 'text',
                'float64': 'float',
                'int64': 'int'
            }
            data_type = Dtype_mapping[str(Dtype)]
            res.append(f'{col} {data_type}')
        if partition_key:
            res.append(f"PRIMARY KEY (({(', ').join(partition_key)}), {(', ').join(cluster_key)})")
        return (', ').join(res)

    def create_cassandra_table(self, table_name:str, partition_key:list, cluster_key:list, drop_table_first:bool=False):
        if drop_table_first:
           self. drop_cassandra_table(table_name)
        query = f"CREATE TABLE IF NOT EXISTS {table_name} "
        query = query + f"({self.__set_column_type(partition_key, cluster_key)})"
        try:
            self.session.execute(query)
        except Exception as e:
            print(e)
        

    def insert_df_2_cassandra_table(self, table_name):
        columns = self.__get_df_column_name()
        for key, row in self.df.iterrows():
            query = f"insert into {table_name} ({columns})"
            insert_values_query = f" VALUES ({(', ').join(['%s']*len(self.df.columns))})"
            query = query + insert_values_query
            res =[]
            for key, value in enumerate(row):
                if data[data.columns[key]].dtype == np.dtype('O'):
                    res.append(value)
                elif data[data.columns[key]].dtype == np.dtype('float64'):
                    res.append(float(value))
                elif data[data.columns[key]].dtype == np.dtype('int64'):
                    res.append(int(value))
                else:
                    res.append(value)
            self.session.execute(query, res)
            
            
    
    def cassandra_query_2_df(self, query):
        from cassandra.query import dict_factory
        try:
            self.session.row_factory = dict_factory
            rows = self.session.execute(query)
        except Exception as e:
            print(e)

        res = {}
        if rows:
            for key in rows.one().keys():
                res[key] = []

        for row in rows:
            for key, value in row.items():
                res[key].append(value)

        return pd.DataFrame.from_dict(res)
        # return res

    def close_connection(self):
        self.session.shutdown()
        self.cluster.shutdown()

## Begin writing your Apache Cassandra code in the cells below

#### Initialise the class CassandraDataframe

In [52]:
cassandra_df = CassandraDataframe(data)

#### Creating a Cluster & Create Keyspace

In [53]:
cassandra_df.connect_cassandra()

#### Create Keyspace

In [54]:
cassandra_df.create_cassandra_keyspace('music')

#### Set Keyspace

In [55]:
cassandra_df.set_cassandra_keysapce('music')

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.

## Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'




In [56]:
# primary key is session id and itemInSession as the 1st query need sessionId and itemInSession as filters
cassandra_df.create_cassandra_table('song_details', partition_key=['sessionId', 'itemInSession'], cluster_key=['userId'], drop_table_first=True)
cassandra_df.insert_df_2_cassandra_table('song_details')

#### Do a SELECT to verify that the data have been inserted into each table

In [57]:
cassandra_df.cassandra_query_2_df(query="select * from song_details").head()

Unnamed: 0,sessionid,iteminsession,userid,artist,firstname,gender,lastname,length,level,location,song
0,313,1,50,Los Rodriguez,Ava,F,Robinson,205.217514,free,"New Haven-Milford, CT",Enganchate Conmigo
1,304,0,37,The Gerbils,Jordan,F,Hicks,27.01016,free,"Salinas, CA",(iii)
2,781,17,44,De-Phazz,Aleena,F,Kirby,205.426483,paid,"Waterloo-Cedar Falls, IA",Astrud Astronette
3,556,5,29,Tom Waits,Jacqueline,F,Lynch,197.850983,paid,"Atlanta-Sandy Springs-Roswell, GA",Whistlin' Past The Graveyard
4,648,20,49,Jimmy Eat World,Chloe,F,Cuevas,166.007706,paid,"San Francisco-Oakland-Hayward, CA",The Middle


In [58]:
cassandra_df.cassandra_query_2_df(query="select artist, song, length from song_details where sessionId=338 and itemInSession=4")

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


### COPY AND REPEAT THE ABOVE THREE CELLS FOR EACH OF THE THREE QUESTIONS

##### first query

For the query 1, we use `partition key` (`sessionId` and `itemInSession`) as composite partition key to find a set of rows. `userId` is used as `clustering key`. These two keys together can be a primary key to identify each row.

In [60]:
# primary key is session id and itemInSession as the 1st query need sessionId and itemInSession as filters
cassandra_df.create_cassandra_table('song_details', partition_key=['sessionId', 'itemInSession'], cluster_key=['userId'], drop_table_first=True)
cassandra_df.insert_df_2_cassandra_table('song_details')
cassandra_df.cassandra_query_2_df(query="select artist, song, length from song_details where sessionId=338 and itemInSession=4")

Unnamed: 0,artist,song,length
0,Faithless,Music Matters (Mark Knight Dub),495.307312


##### second query

In [61]:
# primary key is session id and itemInSession as the 1st query need sessionId and userid as filters
cassandra_df.create_cassandra_table('artist_details', partition_key=['sessionId', 'userId'], cluster_key=['itemInSession'], drop_table_first=True)
cassandra_df.insert_df_2_cassandra_table('artist_details')
cassandra_df.cassandra_query_2_df(query="select artist, song, firstName, lastName from artist_details where sessionId=182 and userid=10")

Unnamed: 0,artist,song,firstname,lastname
0,Down To The Bone,Keep On Keepin' On,Sylvie,Cruz
1,Three Drives,Greece 2000,Sylvie,Cruz
2,Sebastien Tellier,Kilometer,Sylvie,Cruz
3,Lonnie Gordon,Catch You Baby (Steve Pitron & Max Sanna Radio...,Sylvie,Cruz


##### third query

In [62]:
# primary key is session id and itemInSession as the 1st query need sessionId and song as filters
cassandra_df.create_cassandra_table('user_details', partition_key=['song'], cluster_key=['userId'], drop_table_first=True)
cassandra_df.insert_df_2_cassandra_table('user_details')
cassandra_df.cassandra_query_2_df(query="select artist, song, firstName, lastName from user_details where song='All Hands Against His Own'")

Unnamed: 0,artist,song,firstname,lastname
0,The Black Keys,All Hands Against His Own,Jacqueline,Lynch
1,The Black Keys,All Hands Against His Own,Tegan,Levine
2,The Black Keys,All Hands Against His Own,Sara,Johnson


### Drop the tables before closing out the sessions

In [63]:
cassandra_df.drop_cassandra_table('music_app_history')

### Close the session and cluster connection¶

In [64]:
cassandra_df.close_connection()