In [13]:
import configparser
import boto3

config = configparser.ConfigParser()
config.read("dwh.cfg")
CLUSTER_IDENTIFIER = config.get("CLUSTER", "CLUSTER_IDENTIFIER")

In [14]:
# Create cluster
redshift = boto3.client(
    "redshift",
    region_name=config.get("CLUSTER", "DB_REGION"),
    aws_access_key_id=config.get("AWS", "ACCESS_KEY"),
    aws_secret_access_key=config.get("AWS", "SECRET_KEY")
)
response = redshift.create_cluster(
    ClusterIdentifier=CLUSTER_IDENTIFIER,
    MasterUsername=config.get("CLUSTER", "DB_USER"),
    MasterUserPassword=config.get("CLUSTER", "DB_PASSWORD"),
    NodeType="dc2.large",
    ClusterType="multi-node",
    NumberOfNodes=4,
    IamRoles=[config.get("IAM_ROLE", "ARN")],
    DBName=config.get("CLUSTER", "DB_NAME")
)

In [29]:
# Cluster status - make sure it's 'available'
redshift.describe_clusters(ClusterIdentifier=CLUSTER_IDENTIFIER)['Clusters'][0]["ClusterStatus"]

'available'

In [30]:
# Execute ETL on cluster
%run create_tables.py
%run etl.py

In [31]:
# Connect to database
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CLUSTER'].values()))
cur = conn.cursor()

In [32]:
# Number of rows in events staging table
cur.execute("select count(*) from events_staging")
cur.fetchall()

[(8056,)]

In [33]:
# Number of rows in songs staging table
cur.execute("select count(*) from songs_staging")
cur.fetchall()

[(14896,)]

In [34]:
# Delete cluster
#redshift.delete_cluster(ClusterIdentifier=CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)