In [1]:
# Import Python packages 
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv

# Part I. ETL Pipeline for Pre-Processing the Files

In [2]:
# checking your current working directory
print('My working directory :')
print(os.getcwd())

# Get your current folder and subfolder event data
filepath = os.getcwd() + '/event_data'
print('My path :')
print(filepath)

# Create a for loop to create a list of files and collect each filepath
for root, dirs, files in os.walk(filepath):
    
# join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root,'*'))
    #print(file_path_list)

My working directory :
/home/anthelix/Documents/projetGit/20200123_project2_cassandra
My path :
/home/anthelix/Documents/projetGit/20200123_project2_cassandra/event_data


In [3]:
# initiating an empty list of rows that will be generated from each file
full_data_rows_list = [] 
    
# for every filepath in the file path list 
for f in file_path_list:

# reading csv file 
    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)
        
 # extracting each data row one by one and append it        
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line) 
            
# uncomment the code below if you would like to get total number of rows 
print(len(full_data_rows_list))

8056


In [4]:

# uncomment the code below if you would like to check to see what the list of event data rows will look like
#print(full_data_rows_list)

# creating a smaller event data csv file called event_datafile_full csv that will be used to insert data into the \
# Apache Cassandra tables
csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist','firstName','gender','itemInSession','lastName','length',\
                'level','location','sessionId','song','userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[12], row[13], row[16]))


# Part II. Complete the Apache Cassandra coding portion of your project. 

The **event_datafile.csv** contains the following columns: 
- artist 
- firstName of user
- gender of user
- item number in session
- last name of user
- length of the song
- level (paid or free song)
- location of the user
- sessionId
- song title
- userId


<img src="image/image_event_datafile_new.jpg">

#### Creating a Cluster

In [5]:
# This should make a connection to a Cassandra instance your local machine 
# (127.0.0.1)

from cassandra.cluster import Cluster
cluster = Cluster()

# To establish connection and begin executing queries, need a session
session = cluster.connect()

#### Create Keyspacem

In [6]:
session.execute("""
CREATE KEYSPACE IF NOT EXISTS uda
WITH REPLICATION =
{'class' : 'SimpleStrategy', 'replication_factor' : 1}
""")


<cassandra.cluster.ResultSet at 0x7fe22893fc18>

#### Set KEYSPACE to the keyspace

In [7]:
session.set_keyspace('uda')

Create queries to ask the following three questions of the data

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182

### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
    

---

# Partition key

key: minimum set of attributes that uniquely identify an entity

The partition key is responsible for distributing data among nodes. A partition key is the same as the primary key when the primary key consists of a single column.  
Partition keys belong to a node. Cassandra is organized into a cluster of nodes, with each node having an equal part of the partition key hashes.  
The Partition Key is useful for locating the data in the node in a cluster  
Selecting a proper partition key helps avoid overloading of any one node in a Cassandra cluster.


 while the primary key represents a unique gym record/row, all gyms within a country reside on the same partition

partion key are fields used to create row key in order to 
* determine how the data is partitioned
* determine what is phisically stored in a single row
* referred as row key or partition key


# Clustering key

Clustering keys are responsible for sorting data within a partition. Each primary key column after the partition key is considered a clustering key  
The clustering key specifies the sorted order of the data within the selected partition

clustering ket are column family fields to cluster a row key in order to  
* create logical sets inside a single row
* allow more flexible search schemes such as range range
* referred as column key or cluster key


### Chebotko Mapping rules 

Based  on  the  above  data  modeling  principles,  we  definefive mapping rules that guide a query-driven transition from aconceptual data model to a logical data model.  
* MR1 (Entities and Relationships).**Entity  and  relationshiptypes map to tables**, while entities and relationships map to ta-ble rows. Attribute types that describe entities and relationships at the conceptual level must be preserved as table columns atthe logical level. Violation of this rule may lead to data loss
* MR2 (Equality Search Attributes).**Equality   search   at-tributes**, which are used in a query predicate, map to the prefixcolumns of a table primary key. Such columns must include allpartition key columns and, optionally, one or more clusteringkey  columns.  Violation  of  this  rule  may  result  in  inability  tosupport query requirements.
* MR3 (Inequality Search Attributes).**An inequality  search attribute, which is used in a query predicate, maps to a tableclustering key column**. In the primary key definition, a column that participates in inequality search must follow columns that participate in equality search. Violation of this rule may resultin inability to support query requirements.
* MR4 (Ordering Attributes).**Ordering attributes, which arespecified  in  a  query,  map  to  clustering  key  columns**  withascending or descending clustering order as prescribed by thequery. Violation of this rule may result in inability to supportquery requirements.
* MR5 (Key Attributes).**Key attribute types map to primarykey  columns**.  A  table  that  stores  entities  or  relationships  asrows  must  include  key  attributes  that  uniquely  identify  theseentities  or  relationships  as  part  of  the  table  primary  key  touniquely  identify  table  rows.  Violation  of  this  rule  may  leadto data loss.


   * MR1 (Entités et relations) : les types d'entités et de relations correspondent aux tables, tandis que les entités et les relations correspondent aux lignes des tables. Les types d'attributs qui décrivent les entités et les relations au niveau conceptuel doivent être conservés sous forme de colonnes de table au niveau logique. Le non-respect de cette règle peut entraîner la perte de données
   * MR2 (Equality Search Attributes) : les attributs de recherche sur l'égalité, utilisés dans un prédicat de requête, correspondent aux préfixes de la clé primaire d'une table. Ces colonnes doivent comprendre toutes les colonnes de clés de partition et, éventuellement, une ou plusieurs colonnes de clés de regroupement. Le non-respect de cette règle peut entraîner l'impossibilité de prendre en charge les exigences de la requête.
   * MR3 (Inequality Search Attributes) : un attribut de recherche d'inégalité, utilisé dans un prédicat de requête, correspond à une colonne de clé de regroupement de tables. Dans la définition de la clé primaire, une colonne qui participe à la recherche d'inégalités doit suivre les colonnes qui participent à la recherche d'égalité. Le non-respect de cette règle peut entraîner l'impossibilité de répondre aux exigences de la requête.
   * MR4 (Ordering Attributes) : les attributs d'ordre, qui sont spécifiés dans une requête, correspondent à des colonnes clés de regroupement avec un ordre de regroupement ascendant ou descendant tel que prescrit par la requête. Le non-respect de cette règle peut entraîner l'impossibilité de répondre aux exigences de la requête.
  * MR5 (Key Attributes) : les types d'attributs clés correspondent à des colonnes de clés primaires. Une table qui stocke des entités ou des relations sous forme de rangées doit inclure des attributs clés qui identifient de manière unique ces entités ou relations dans le cadre de la clé primaire de la table qui identifie les rangées de la table. Le non-respect de cette règle peut entraîner la perte de données.


![ERD project2](./image/erd_project2.png)

![Mapping Rules](./image/mappingRules.png)

1. les types d'entités et de relations correspondent aux tableaux
2. les attributs clés correspondent aux principales colonnes clés
3. Les attributs de recherche de l'égalité permettent de partitionner les colonnes clés
4. Les attributs de recherche sur les inégalités permettent de regrouper les colonnes
5. Ordonner les attributs de la carte pour regrouper les colonnes

1. name each table, identify the primary entity type for which we are quering. if querying by attributs of other related entities, we append those separated with `_by_`
2. identify the primary key
    * en ajoutant des colonnes de cles de partition basees sur les attributs de requete
    * en regroupant les colonnes pour garantir l'unicite et l'ordre de tri souhaite
3. on complete la table en ajoutant tout attribut suplementaire identifie par la requete

1. mapping entity and relationship in a box with his title and by order of the query
2. mapping search equality attributes (as K)
3. mapping serch inequality attributes (as C)
4. mapping ordering attributes (asc or desc)
5. mapping key attributes (desired attributes)

### 1. Give me the artist, song title and song's length in the music app history that was heard during  sessionId = 338, and itemInSession  = 4

![quer1](./image/quer1.png)

##### Table song_by_session  
As we search in `sessionId` and `itemInSession`,  they become the partition key. `artist`, `song` and `length` are what we looking for.  
![Table](./image/query1_table.png)

In [13]:
# create table for the first query
query = "CREATE TABLE IF NOT EXISTS song_by_session"
query = query + "(sessionId INT, itemInSession INT, artist VARCHAR, song VARCHAR, length DECIMAL, PRIMARY KEY(sessionId, itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)

In [14]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO song_by_session(sessionId, itemInSession, artist, song, length)"
        query = query + "VALUES (%s, %s, %s, %s, %s)"
        session.execute(query, (int(line[8]), int(line[3]), line[0], line[9], float(line[5])))

In [62]:
query = "SELECT * FROM song_by_session WHERE sessionId = 338 AND itemInSession = 4"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row.artist,"---", row.song,"---" ,row.length)

Faithless --- Music Matters (Mark Knight Dub) --- 495.3073


### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
![query2](./image/query2.png)

##### Table song_by_user  
As we search in `userId` and `sessionId`, they becomes the partition key and `itemInSession` the clustering key for sorted the `song`. `artist`, `song`, `fisrtName` and `lastName` are what we looking for. 
![Table](./image/query2_table.png)

In [55]:
# create table for the second query
query = "CREATE TABLE IF NOT EXISTS song_by_user"
query = query +"(userId INT, sessionId INT, itemInSession INT, artist VARCHAR, song VARCHAR, firstName VARCHAR, lastName VARCHAR, PRIMARY KEY((userId, sessionId), itemInSession))"

try:
    session.execute(query)
except Exception as e:
    print(e)

In [86]:
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
## TO-DO: Assign the INSERT statements into the `query` variable
        query = "INSERT INTO song_by_user(userId, sessionId, itemInSession, artist, song, firstName, lastName)"
        query = query + "VALUES (%s, %s, %s, %s, %s, %s, %s)"
        ## TO-DO: Assign which column element should be assigned for each column in the INSERT statement.
        ## For e.g., to INSERT artist_name and user first_name, you would change the code below to `line[0], line[1]`
        session.execute(query, (int(line[10]), int(line[8]), int(line[3]), line[0], line[9], line[1], line[4]))

NoHostAvailable: ('Unable to complete the operation against any hosts', {<Host: 127.0.0.1:9042 datacenter1>: ConnectionException('Pool is shutdown',)})

In [79]:
query = "SELECT * FROM song_by_user WHERE userId = 10 AND sessionId = 182"
try:
    rows2 = session.execute(query)
except Exception as e:
    print(e)
    


In [76]:
# to check the order
for row in rows2:
    print(row[:4])

(10, 182, 0, 'Down To The Bone')
(10, 182, 1, 'Three Drives')
(10, 182, 2, 'Sebastien Tellier')
(10, 182, 3, 'Lonnie Gordon')


In [80]:
# the answer to the query2
for row in rows2:
    print(row.artist,"---" ,row.song, "---", row.firstname, row.lastname)
# name of artist, song (sorted by itemInSession) and user (first and last name) 

Down To The Bone --- Keep On Keepin' On --- Sylvie Cruz
Three Drives --- Greece 2000 --- Sylvie Cruz
Sebastien Tellier --- Kilometer --- Sylvie Cruz
Lonnie Gordon --- Catch You Baby (Steve Pitron & Max Sanna Radio Edit) --- Sylvie Cruz


### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'
![query3](./image/query3.png)

##### Table user_by_song  
As we search in `song` it's become the primary key and `userId` the clustering to find `firstName` and `lastName`. 
![Table](./image/query3_table.png)

In [25]:
# create table for the third query
query = "CREATE TABLE IF NOT EXISTS user_by_song"
query = query + "(song VARCHAR, userId INT, firstName VARCHAR, lastNAME VARCHAR, PRIMARY KEY(song, userId))"

try:
    session.execute(query)
except Exception as e:
    print(e)

In [31]:
# We have provided part of the code to set up the CSV file. Please complete the Apache Cassandra code below#
file = 'event_datafile_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
## TO-DO: Assign the INSERT statements into the `query` variable
        query = "INSERT INTO user_by_song(song, userId, firstName, lastName)"
        query = query + "VALUES (%s, %s, %s, %s)"
        ## TO-DO: Assign which column element should be assigned for each column in the INSERT statement.
        ## For e.g., to INSERT artist_name and user first_name, you would change the code below to `line[0], line[1]`
        session.execute(query, (line[9], int(line[10]), line[1], line[4]))

In [83]:
query = "SELECT * FROM user_by_song WHERE song='All Hands Against His Own'"
try:
    rows2 = session.execute(query)
except Exception as e:
    print(e)

for row in rows2:
    print("{} {}".format(row[-2], row[-1]))
# Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own

Jacqueline Lynch
Tegan Levine
Sara Johnson


# drop the tables

In [84]:
query1 = "DROP TABLE song_by_session"
query2 = "DROP TABLE song_by_user"
query3 = "DROP TABLE user_by_song"

try:
    rows1 = session.execute(query1)
    rows2 = session.execute(query2)
    rows3 = session.execute(query3)
except Exception as e:
    print(e)

In [85]:
session.shutdown()
cluster.shutdown()