# Project 3: Sparkify Data Warehouse on AWS  Test DWH ETL 

In [1]:
from etl import *

### Connect to Sparkify DWH Database.  

In [2]:
cur, conn = connect_DWH_db ('dwh-sp.cfg')

Connect Redshift CLuster sparkify-dwhCluster
host=sparkify-dwhcluster.cdgmk37ewlxc.us-west-2.redshift.amazonaws.com dbname=sparkify_dwh user=dwhuser password=XXXXX port=5439


### Drop all tables first then create tables.

In [3]:
drop_tables(cur, conn)
create_tables(cur, conn)

### Raw Data Quality Check before loading

In [4]:
cur.execute(staging_events_data_check)
cur.execute(staging_songs_data_check)

### Load All Tables:  First Staging Tables, then Dimensional Tables, and Fact Table. 
#### The following insert functions are defined in etl.py

In [5]:
%%time
load_staging_tables(cur, conn)

        
    COPY     staging_events   
    FROM     's3://udacity-dend/log_data'               
    IAM_ROLE 'arn:aws:iam::640555552535:role/dwhRole'               
    JSON     's3://udacity-dend/log_json_path.json'               

    
     COPY     staging_songs  
     FROM     's3://udacity-dend/song_data'             
     IAM_ROLE 'arn:aws:iam::640555552535:role/dwhRole'             
     JSON     'auto'

CPU times: user 1.19 ms, sys: 4.22 ms, total: 5.41 ms
Wall time: 3min 58s


In [6]:
%%time

insert_dimension_tables (cur, conn)

insert_artists_table
insert_users_table
insert_time_table
insert_songs_table
CPU times: user 2.45 s, sys: 748 ms, total: 3.2 s
Wall time: 29min 57s


In [7]:
%%time
insert_songplay_table (cur, conn)

CPU times: user 288 ms, sys: 26.9 ms, total: 314 ms
Wall time: 1min 15s


## Run sample queries as test

In [8]:
%%time
cur.execute("SELECT * FROM staging_songs LIMIT 5")
rows = cur.fetchall()
for r in rows: print(r)

(1, 'AR0MWD61187B9B2B12', None, None, '', 'International Noise Conspiracy', 'SOHOZBI12A8C132E3C', 'Smash It Up', 195.39546, 2000)
(1, 'ARV1JVD1187B9AD195', 35.91463, -79.05661, 'Chapel Hill, NC', 'Loudon Wainwright III', 'SOINBCU12A6D4F94C0', 'Human Cannonball', 190.48444, 1995)
(1, 'ARNLO5S1187B9B80CC', None, None, 'Pasadena, CA', 'Van Halen', 'SOLRYQR12A670215BF', 'Panama (Remastered Album Version)', 209.29261, 0)
(1, 'ARBVX4Y1187FB42E5E', None, None, 'Hull, England', 'The Housemartins', 'SOLPPOD12A6701D227', 'Me And The Farmer', 176.79628, 1987)
(1, 'AR0L04E1187B9AE90C', None, None, 'Wigan, Lancashire, England', 'The Verve', 'SOXQYSC12A6310E908', 'Bitter Sweet Symphony', 360.25424, 1997)
CPU times: user 1.74 ms, sys: 0 ns, total: 1.74 ms
Wall time: 62.8 ms


In [9]:
%%time
cur.execute("SELECT COUNT(*) FROM staging_songs")
rows = cur.fetchall()
for r in rows: print(r)

(14896,)
CPU times: user 1.47 ms, sys: 366 µs, total: 1.83 ms
Wall time: 63.8 ms


In [10]:
%%time
cur.execute("SELECT * FROM staging_events LIMIT 5")
rows = cur.fetchall()
for r in rows: print(r)

('Miami Horror', 'Logged In', 'Kate', 'F', 88, 'Harrell', 250.8273, 'paid', 'Lansing-East Lansing, MI', 'PUT', 'NextSong', 1540472624796.0, 293, 'Sometimes', 200, Decimal('1541548876796'), '"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', 97)
('The White Stripes', 'Logged In', 'Kate', 'F', 89, 'Harrell', 241.8673, 'paid', 'Lansing-East Lansing, MI', 'PUT', 'NextSong', 1540472624796.0, 293, 'My Doorbell (Album Version)', 200, Decimal('1541549126796'), '"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', 97)
('Juan Carmona', 'Logged In', 'Kate', 'F', 90, 'Harrell', 331.44118, 'paid', 'Lansing-East Lansing, MI', 'PUT', 'NextSong', 1540472624796.0, 293, 'Panales de Algodon', 200, Decimal('1541549367796'), '"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"', 97)
('Alison Krauss / Union Station', 'Logged In', 'Kate', 'F'

In [11]:
%%time
cur.execute("SELECT COUNT(*) FROM staging_events")
rows = cur.fetchall()
for r in rows: print(r)

(8056,)
CPU times: user 1.93 ms, sys: 0 ns, total: 1.93 ms
Wall time: 61.7 ms


In [12]:
%%time
cur.execute("SELECT * FROM users LIMIT 5")
rows = cur.fetchall()
for r in rows: print(r)

(97, 'Kate', 'Harrell', 'F', 'paid')
(63, 'Ayla', 'Johnson', 'F', 'free')
(55, 'Martin', 'Johnson', 'M', 'free')
(8, 'Kaylee', 'Summers', 'F', 'free')
(52, 'Theodore', 'Smith', 'M', 'free')
CPU times: user 2.4 ms, sys: 0 ns, total: 2.4 ms
Wall time: 60.2 ms


In [13]:
%%time
cur.execute("SELECT COUNT(*) FROM users LIMIT 5")
rows = cur.fetchall()
for r in rows: print(r)

(105,)
CPU times: user 1.55 ms, sys: 0 ns, total: 1.55 ms
Wall time: 62.1 ms


In [14]:
%%time
cur.execute("SELECT * FROM time LIMIT 5")
rows = cur.fetchall()
for r in rows: print(r)

('2018-11-07T00:01:16.796000', 0, 7, 45, 11, 2018, 2)
('2018-11-07T00:05:26.796000', 0, 7, 45, 11, 2018, 2)
('2018-11-07T00:09:27.796000', 0, 7, 45, 11, 2018, 2)
('2018-11-07T00:14:58.796000', 0, 7, 45, 11, 2018, 2)
('2018-11-07T00:17:49.796000', 0, 7, 45, 11, 2018, 2)
CPU times: user 2.15 ms, sys: 265 µs, total: 2.41 ms
Wall time: 62.7 ms


In [15]:
%%time
cur.execute("SELECT COUNT(*) FROM time LIMIT 5")
rows = cur.fetchall()
for r in rows: print(r)

(8023,)
CPU times: user 0 ns, sys: 1.7 ms, total: 1.7 ms
Wall time: 62.3 ms


In [16]:
%%time
cur.execute("SELECT * FROM songplay LIMIT 5")
rows = cur.fetchall()
for r in rows: print(r)

(0, '2018-11-25T16:14:24.796000', 49, 'paid', 'SOBONKR12A58A7A7E0', 'AR5E44Z1187B9A1D74', 923, 'San Francisco-Oakland-Hayward, CA', 'Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0')
(8, '2018-11-30T16:51:42.796000', 16, 'paid', 'SOBONKR12A58A7A7E0', 'AR5E44Z1187B9A1D74', 1076, 'Birmingham-Hoover, AL', '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.77.4 (KHTML, like Gecko) Version/7.0.5 Safari/537.77.4"')
(16, '2018-11-30T13:52:37.796000', 49, 'paid', 'SOBONKR12A58A7A7E0', 'AR5E44Z1187B9A1D74', 1096, 'San Francisco-Oakland-Hayward, CA', 'Mozilla/5.0 (Windows NT 5.1; rv:31.0) Gecko/20100101 Firefox/31.0')
(24, '2018-11-24T05:04:53.796000', 88, 'paid', 'SOBONKR12A58A7A7E0', 'AR5E44Z1187B9A1D74', 888, 'Sacramento--Roseville--Arden-Arcade, CA', '"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"')
(32, '2018-11-28T22:56:08.796000', 73, 'paid', 'SOBONKR12A58A7A7E0', 'AR5E44Z1187B9A1D7

In [17]:
%%time
cur.execute("SELECT COUNT(*) FROM songplay")
rows = cur.fetchall()
for r in rows: print(r)

(328,)
CPU times: user 0 ns, sys: 1.75 ms, total: 1.75 ms
Wall time: 63.6 ms


## Take down the cluster at the end of tests

In [None]:
drop_tables(cur, conn)

In [None]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh-sp.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

(DWH_DB_USER, DWH_DB_PASSWORD, DWH_DB)

pd.DataFrame({"Param":
                  ["DWH_CLUSTER_TYPE", "DWH_NUM_NODES", "DWH_NODE_TYPE", "DWH_CLUSTER_IDENTIFIER", "DWH_DB", "DWH_DB_USER", "DWH_DB_PASSWORD", "DWH_PORT", "DWH_IAM_ROLE_NAME"],
              "Value":
                  [DWH_CLUSTER_TYPE, DWH_NUM_NODES, DWH_NODE_TYPE, DWH_CLUSTER_IDENTIFIER, DWH_DB, DWH_DB_USER, DWH_DB_PASSWORD, DWH_PORT, DWH_IAM_ROLE_NAME]
             })

In [None]:
iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-west-2'
                  )

redshift = boto3.client('redshift',
                       region_name="us-west-2",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

In [None]:
#### CAREFUL!!
#-- Uncomment & run to delete the created resources
redshift.delete_cluster( ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,  SkipFinalClusterSnapshot=True)
#### CAREFUL!!

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

In [None]:
myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)