# ETL pipeline -- Data modeling with apache cassandra

In [1]:
import os,glob
import numpy as np
import pandas as pd
import json
import datetime,time

In [15]:
from cassandra.cluster import Cluster

## Reading Data from csv files

In [14]:
data_path = 'E:/Web/Dataset/Event data/event_datafile_new.csv'

dataset = pd.read_csv(data_path)
dataset.sample(2)

Unnamed: 0,artist,firstName,gender,itemInSession,lastName,length,level,location,sessionId,song,userId
5534,Hans Zimmer_ James Newton Howard,Kinsley,F,8,Young,265.50812,paid,"Red Bluff, CA",759,Nycteris,85.0
243,Hardline,Matthew,M,4,Jones,234.73587,paid,"Janesville-Beloit, WI",439,Everything,36.0


## Data modeling

### create connection

In [18]:
cluster = Cluster()
session = cluster.connect()

### create keyspace

In [24]:
query = """
            create keyspace if not exists sparkify
            with replication =
            { 'class':'SimpleStrategy' ,'replication_factor':1}
        """
session.execute(query)

<cassandra.cluster.ResultSet at 0x19305cada20>

### set keyspace

In [25]:
session.set_keyspace('sparkify')

### Now we need to create tables to run the following queries. Remember, with Apache Cassandra you model the database tables on the queries you want to run.
### Create queries to ask the following three questions of the data
### 1. Give me the artist, song title and song's length in the music app history that was heard during sessionId = 338, and itemInSession = 4
### 2. Give me only the following: name of artist, song (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
### 3. Give me every user name (first and last) in my music app history who listened to the song 'All Hands Against His Own'

In [90]:
query = """
            create table if not exists session(
                artist text,
                song text,
                length decimal,
                session_id int,
                itemInSession int,
                primary key (Session_id,itemInSession)
            );
        """
session.execute(query)

query = """
            create table if not exists albums(
                artist text,
                song text,
                first_name text,
                last_name text,
                user_id float,
                session_id int,
                itemInSession int,
                primary key ((user_id,session_id),itemInSession)
            );
        """
session.execute(query)

query = """
            create table if not exists users(
                first_name text,
                last_name text,
                user_id float,
                song text,
                primary key (song,user_id)
            );
        """
session.execute(query)

<cassandra.cluster.ResultSet at 0x19305bdfc88>

## insert into tables

### ------------- insert into session table ----------------

In [46]:
session_data = dataset[['artist','song','length','sessionId','itemInSession']].values.tolist()
query = "insert into session (artist,song,length,session_id,itemInSession) values (%s,%s,%s,%s,%s)"

for row in session_data:
    session.execute(query,row)

### validate query #1

In [51]:
query = "select artist,song,length from session where Session_id = 338 and itemInSession = 4"
rows  = session.execute(query)
for row in rows:
    print(row)

Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=Decimal('495.3073'))


### ---------------- insert into albums table -----------------

In [91]:
albums_data = dataset[['artist','song','firstName','lastName','userId','sessionId','itemInSession']].values.tolist()
query = "insert into albums (artist,song,first_name,last_name,user_id,session_id,itemInSession) values (%s,%s,%s,%s,%s,%s,%s)"

for row in albums_data:
    session.execute(query,row)

### validate query #2

In [116]:
query = "select artist,song,first_name,last_name from albums where user_id = 10 and session_id = 182"
rows  = session.execute(query)
for row in rows:
    print(row)

Row(artist='Down To The Bone', song="Keep On Keepin' On", first_name='Sylvie', last_name='Cruz')
Row(artist='Three Drives', song='Greece 2000', first_name='Sylvie', last_name='Cruz')
Row(artist='Sebastien Tellier', song='Kilometer', first_name='Sylvie', last_name='Cruz')
Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', first_name='Sylvie', last_name='Cruz')


### ----------------- insert into users table ----------------------

In [98]:
users_data = dataset[['firstName','lastName','userId','song']].values.tolist()
query = "insert into users (first_name,last_name,user_id,song) values (%s,%s,%s,%s)"

for row in users_data:
    session.execute(query,row)

### validate query #2

In [111]:
query = "select user_id,first_name,last_name,song from users where song='All Hands Against His Own'"
rows  = session.execute(query)
for row in rows:
    print(row.user_id , row.first_name,row.last_name,row.song)

29.0 Jacqueline Lynch All Hands Against His Own
80.0 Tegan Levine All Hands Against His Own
95.0 Sara Johnson All Hands Against His Own


## end connection

In [117]:
session.shutdown()
cluster.shutdown()