# Part I. ETL Pipeline for Pre-Processing the Files

#### Import Python packages 

In [1]:
# Import Python packages 
import os
import pandas as pd
import math
import warnings
warnings.filterwarnings('ignore')

import cassandra
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement

#### Creating list of filepaths to process original event csv data files

In [2]:
# Write your code here
dirName = 'youtube_event_data'
listOfFile = os.listdir(dirName)
allFiles = list()
# Iterate over all the entries
for entry in listOfFile:
    # Create full path
    fullPath = os.path.join(dirName, entry)
    # If entry is a directory then get the list of files in this directory 
    if os.path.isdir(fullPath):
        allFiles = allFiles + getListOfFiles(fullPath)
    else:
        allFiles.append(fullPath)

#### Processing the CSV files to create the new `youtube_events_data_new.csv` file

In [3]:
# Write your code here
df = pd.DataFrame()
for file in allFiles:
    df1 = pd.read_csv (file)
    df = df.append(df1)
df.to_csv('youtube_events_data_new.csv') 

In [4]:
# check the number of rows in your new csv file
df = pd.read_csv('youtube_events_data_new.csv')
len(df)

8056

# Part II. Complete the Apache Cassandra coding portion of your project. 

**TODO: You have to complete the cell below with your `youtube_events_data_new.csv`columns and a screenshot of CSV file data**

## The new CSV file titled `youtube_events_data_new.csv`, The event_datafile_new.csv contains the following columns: 

- Column 1
- Column 2
- Column 3
- Column 4
.
.
.


The image below is a screenshot of what the denormalized data should appear like in the <font color=red>**youtube_events_data_new.csv**</font> after the code above is run:<br>

<img src="images/i.jpg">

#### Creating a Cluster

In [5]:
# Create a Cassandra Cluster
try: 
    cluster = Cluster(['127.0.0.1'], port=9042) 
    session = cluster.connect()
except Exception as e:
    print(e)

#### Create Keyspace `youtubedb`

In [6]:
try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS youtubedb 
    WITH REPLICATION = 
    { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }""")
    
    rows = session.execute("""SELECT * FROM system_schema.keyspaces""")
except Exception as e:
    print(e)
    
for row in rows:
    print (row)

Row(keyspace_name='system_auth', durable_writes=True, replication=OrderedMapSerializedKey([('class', 'org.apache.cassandra.locator.SimpleStrategy'), ('replication_factor', '1')]))
Row(keyspace_name='system_schema', durable_writes=True, replication=OrderedMapSerializedKey([('class', 'org.apache.cassandra.locator.LocalStrategy')]))
Row(keyspace_name='youtubedb', durable_writes=True, replication=OrderedMapSerializedKey([('class', 'org.apache.cassandra.locator.SimpleStrategy'), ('replication_factor', '1')]))
Row(keyspace_name='system_distributed', durable_writes=True, replication=OrderedMapSerializedKey([('class', 'org.apache.cassandra.locator.SimpleStrategy'), ('replication_factor', '3')]))
Row(keyspace_name='system', durable_writes=True, replication=OrderedMapSerializedKey([('class', 'org.apache.cassandra.locator.LocalStrategy')]))
Row(keyspace_name='system_traces', durable_writes=True, replication=OrderedMapSerializedKey([('class', 'org.apache.cassandra.locator.SimpleStrategy'), ('repli

#### Set Keyspace `youtubedb`

In [7]:
try:
    session.set_keyspace('youtubedb')
except Exception as e:
    print(e)

#### Create Table `youtube`

In [8]:
query = """CREATE TABLE IF NOT EXISTS youtube (youtuber text, auth text, firstName text, gender text, itemInSession text, lastName text, length text, level text, location text, method text, page text, registration text, sessionId text, video text, status text,ts text,userId text, 
PRIMARY KEY (userId,sessionId,itemInSession));"""

try:
    session.execute("drop table if exists youtube")
    session.execute(query)
except Exception as e:
    print(e)

#### Insert Data in `youtube`Tabel

In [9]:
query = "INSERT INTO youtube (youtuber,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,video,status,ts,userId)"
query = query + " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,?)"

df = pd.read_csv('youtube_events_data_new.csv')
data_all = df.values.astype('str')

prepared = session.prepare(query)

#dividing data into chunks
chunks = 100
chunk_size = math.ceil(len(data_all)/chunks)

# loop over the chunks and insert it in keyspace
for i in range(chunks): 
    if i==chunks-1:
        data = data_all[i*chunk_size:-1]
    else:
        data = data_all[i*chunk_size:(i+1)*chunk_size]    
    try:
        batch = BatchStatement()
        for i in range(len(data)):
            batch.add(prepared, (data[i][1], data[i][2], data[i][3], data[i][4], data[i][5], data[i][6], data[i][7], data[i][8], data[i][9], data[i][10], data[i][11], data[i][12], data[i][13], data[i][14], data[i][15], data[i][16], data[i][17]))

        session.execute(batch)
    except Exception as e:
        print(e)

## Create queries to ask the following three questions of the data

1. Give me the youtuber, video title and video's length in the YouTube app history that was heard during  sessionId = 338, and itemInSession  = 4


2. Give me only the following: name of youtuber, video (sorted by itemInSession) and user (first and last name) for userid = 10, sessionid = 182
    

3. Give me every user name (first and last) in my YouTube app history who watched the video 'All Hands Against His Own'

####  `Query 1`

In [10]:
query = "select youtuber,video,length from youtube where sessionId = '338' and itemInSession='4' allow filtering"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row)

Row(youtuber='Faithless', video='Music Matters (Mark Knight Dub)', length='495.3073')


####  `Query 2`

In [11]:
query = "select youtuber, video, firstname, lastname from youtube WHERE userId ='10.0' and sessionId ='182' allow filtering"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print (row)

Row(youtuber='Down To The Bone', video="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
Row(youtuber='Three Drives', video='Greece 2000', firstname='Sylvie', lastname='Cruz')
Row(youtuber='Sebastien Tellier', video='Kilometer', firstname='Sylvie', lastname='Cruz')
Row(youtuber='Lonnie Gordon', video='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')


####  `Query 3`

In [12]:
query = "select firstname, lastname from youtube WHERE video ='All Hands Against His Own' ALLOW FILTERING"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
for row in rows:
    print(row)

Row(firstname='Sara', lastname='Johnson')
Row(firstname='Tegan', lastname='Levine')
Row(firstname='Jacqueline', lastname='Lynch')


### Drop the tables before closing out the sessions

In [13]:
# droping table
query = "drop table youtube"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [14]:
# droping keyspace
try: 
    session.execute("""DROP KEYSPACE youtubedb""")
    rows = session.execute("""DESCRIBE keyspaces""")
except Exception as e:
    print(e)

for row in rows:
    print (row)

Row(keyspace_name='system', type='keyspace', name='system')
Row(keyspace_name='system_auth', type='keyspace', name='system_auth')
Row(keyspace_name='system_distributed', type='keyspace', name='system_distributed')
Row(keyspace_name='system_schema', type='keyspace', name='system_schema')
Row(keyspace_name='system_traces', type='keyspace', name='system_traces')
Row(keyspace_name='system_views', type='keyspace', name='system_views')
Row(keyspace_name='system_virtual_schema', type='keyspace', name='system_virtual_schema')
Row(keyspace_name='university', type='keyspace', name='university')


### Close the session and cluster connection¶

In [15]:
# clossing connection
session.shutdown()
cluster.shutdown()