# Cassandra Study 5-ETL Pipeline

In [1]:
# Import Python packages 
import cassandra
import os
import glob
import csv
import prettytable as pt

#### glob.glob() return a possibly empty list of path names

In [2]:
print(os.getcwd())
filepath = os.getcwd() + '/hotel_data'
file_path_list = []
for root, dirs, files in os.walk(filepath):
    #file_path_list = glob.glob(os.path.join(root,'*'))
    this_Hierarchy_list = glob.glob(os.path.join(root,'*.csv'))
    #print(this_Hierarchy_list)
    for path in this_Hierarchy_list:
        file_path_list.append(path)
        #print(path)
#print(file_path_list)

/Users/Michael/Documents/BigDataPractice/Project2


#### with open() as csvfile: to open csvfile, and automatically call close() to close file.
argument newline = ' ' means to allow the csv module to handle the line endings itself, if no this argument, probably add an extra line ending which we don't want.

#### Dialects and Formatting Parameters
use csv.register_dialect() to make it easier to specify the format of input records specific formatting parameters are grouped together into dialects.
quoting = csv.QUOTE_ALL, QUOTE_ALL is a csv module constant, instructs writer objects to quote all fields.
skipinitialspace=True, spaces immediately following the delimiter are ignored.

In [3]:
full_data_rows_list = [] 
for f in file_path_list:

    with open(f, 'r', encoding = 'utf8', newline='') as csvfile: 
        # creating a csv reader object 
        csvreader = csv.reader(csvfile) 
        next(csvreader)       
        for line in csvreader:
            #print(line)
            full_data_rows_list.append(line)   
#print(len(full_data_rows_list))
#print(full_data_rows_list)

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('hotel_info_new.csv', 'w', encoding = 'utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['hotel_id','hotel_name','hotel_phone','country','postal_code','state_or_province',\
                     'city','street','poi_name','poi_description'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9]))


#### sum(1 for lin in open('filename.txt'))Get line count of a large file cheaply in Python

In [4]:
with open('hotel_info_new.csv', 'r', encoding = 'utf8') as f:
    print(sum(1 for line in f))

5


In [5]:
from cassandra.cluster import Cluster
try:
    cluster = Cluster(['127.0.0.1'])
    session = cluster.connect()
except Exception as e:
    print(e)    

In [6]:
try:
    session.execute("""
    create keyspace if not exists hotel
    with replication=
    {'class':'SimpleStrategy','replication_factor':1
    }""")
except Exception as e:
    print(e)    

In [7]:
try:
    session.set_keyspace('hotel')
except Exception as e:
    print(e)        

In [8]:
query = "CREATE TYPE IF NOT EXISTS address "
query = query + "(street text, city text, state_or_province text, postal_code text, country text)"
try:
    session.execute(query)
except Exception as e:
    print(e)

In [9]:
query = "CREATE TABLE IF NOT EXISTS hotels "
query = query + "(id text, name text, phone text, address frozen<address>, PRIMARY KEY (id)) "

try:
    session.execute(query)
except Exception as e:
    print(e)

In [10]:
file = 'hotel_info_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:
        query = "INSERT INTO hotels (id, name, phone, address) "
        query = query + "VALUES (%s, %s, %s, {street:%s, city:%s, state_or_province:%s, \
                         postal_code:%s, country:%s})"
        
        try:
            session.execute(query, (line[0], line[1], line[2], line[7],line[6], line[5], line[4], line[3]))
        except Exception as e:
            print(e)

In [11]:
query = "SELECT id, name, phone, address FROM hotels WHERE id = 'A1111'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

t = pt.PrettyTable(['id', 'name', 'phone', 'address'])
for row in rows:
    t.add_row([row.id, row.name, row.phone, row.address])
    
print(t)    

+-------+---------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------+
|   id  |         name        |  phone   |                                                                address                                                                |
+-------+---------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------+
| A1111 | Crowne Plaza Tainan | 63911899 | address(street='289 Zhouping Road Anping District', city='Tainan', state_or_province='Taiwan', postal_code='70841', country='Taiwan') |
+-------+---------------------+----------+---------------------------------------------------------------------------------------------------------------------------------------+


In [12]:
query = "CREATE TABLE IF NOT EXISTS hotels_by_poi "
query = query + "(poi_name text, hotel_id text, name text, phone text, address frozen<address>, PRIMARY KEY (poi_name, hotel_id)) "
try:
    session.execute(query)
except Exception as e:
    print(e)
    
query = "CREATE TABLE IF NOT EXISTS pois_by_hotel"
query = query + "(hotel_id text, poi_name text, description text, PRIMARY KEY (hotel_id, poi_name))"

try:
    session.execute(query)
except Exception as e:
    print(e)

In [13]:
file = 'hotel_info_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:        
        query = "INSERT INTO hotels_by_poi (poi_name, hotel_id, name, phone, address)"
        query = query + " VALUES (%s, %s, %s, %s, {street:%s, city:%s, state_or_province:%s, \
                         postal_code:%s, country:%s})"
        
        try:
            session.execute(query, (line[8], line[0], line[1], line[2], line[7],line[6], line[5], line[4], line[3]))
        except Exception as e:
            print(e)

In [14]:
query = "SELECT poi_name, hotel_id, name, phone, address FROM hotels_by_poi WHERE poi_name='Tainan Confucius Temple'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

t = pt.PrettyTable(['poi_name', 'hotel_id', 'name', 'phone', 'address'])
for row in rows:
    t.add_row([row.poi_name, row.hotel_id, row.name, row.phone, row.address])
    
print(t)     

+-------------------------+----------+--------------------+----------+----------------------------------------------------------------------------------------------------------------------------------------+
|         poi_name        | hotel_id |        name        |  phone   |                                                                address                                                                 |
+-------------------------+----------+--------------------+----------+----------------------------------------------------------------------------------------------------------------------------------------+
| Tainan Confucius Temple |  A1112   | Silks Place Tainan | 62136290 | address(street='No. 1  Heyi Rd West Central District', city='Tainan', state_or_province='Taiwan', postal_code='700', country='Taiwan') |
+-------------------------+----------+--------------------+----------+--------------------------------------------------------------------------------------------------

In [15]:
file = 'hotel_info_new.csv'

with open(file, encoding = 'utf8') as f:
    csvreader = csv.reader(f)
    next(csvreader) # skip header
    for line in csvreader:        
        query = "INSERT INTO pois_by_hotel (hotel_id, poi_name, description)"
        query = query + " VALUES (%s, %s, %s)"
        
        try:
            session.execute(query, (line[0], line[8], line[9]))
        except Exception as e:
            print(e)

In [16]:
query = "SELECT hotel_id, poi_name, description FROM pois_by_hotel WHERE hotel_id='A1112'"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)

t = pt.PrettyTable(['hotel_id', 'poi_name', 'description'])
for row in rows:
    t.add_row([row.hotel_id, row.poi_name, row.description])
    
print(t)     

+----------+------------------------------+-------------------------------------------------------------------------------------------------+
| hotel_id |           poi_name           |                                           description                                           |
+----------+------------------------------+-------------------------------------------------------------------------------------------------+
|  A1112   | Tainan Art Museum Building 2 | Striking space for contemporary Taiwanese art & sculpture exhibitions with a restaurant & cafe. |
|  A1112   |   Tainan Confucius Temple    |  Small 17th-century Confucian temple featuring traditional architecture & a tranquil courtyard. |
+----------+------------------------------+-------------------------------------------------------------------------------------------------+


In [17]:
query = "drop table hotels"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)
    
query = "drop table hotels_by_poi"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)  

query = "drop table pois_by_hotel"
try:
    rows = session.execute(query)
except Exception as e:
    print(e)        

In [18]:
session.shutdown()
cluster.shutdown()