In [1]:
# import libraries for sdk "Infrastructure as code" and for parsing the configertion file .
import boto3
import configparser

In [21]:
config = configparser.ConfigParser()
config.read("dwh.cfg")

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = [config.get("DWH", "DWH_IAM_ROLE_NAME")]

In [24]:
redshift = boto3.client(
    "redshift",
    region_name                     = "us-west-2",
    aws_access_key_id               = config.get("AWS", "KEY"),
    aws_secret_access_key           = config.get("AWS", "SECRET")
)

response = redshift.create_cluster(
    ClusterIdentifier               = DWH_CLUSTER_IDENTIFIER,
    MasterUsername                  = config.get("DWH", "DWH_DB_USER"),
    MasterUserPassword              = config.get("DWH", "DWH_DB_PASSWORD"),
    NodeType                        = config.get("DWH", "DWH_NODE_TYPE"),
    ClusterType                     = config.get("DWH", "DWH_CLUSTER_TYPE"),
    NumberOfNodes                   = int(config.get("DWH", "DWH_NUM_NODES")),
    IamRoles                        = [config.get("IAM_ROLE", "ARN")],
    DBName                          = "dwh_project3"
)

In [4]:
# establish ETL on the redshift cluster
# create tables(create_tables.py) and start transfer data from staging to redshift cluster (etl.py)
%run create_tables.py
%run etl.py

In [7]:
# create cursor to check values of staging rows (if it loads correctly or not )
conn = psycopg2.connect("host={} dbname={} user={} password={} port={}".format(*config['CUR'].values()))
cur = conn.cursor()
# check rows of events staging file
cur.execute("select count(*) from events_staging")
print(cur.fetchall())
# check rows of songs staging file
cur.execute("select count(*) from songs_staging")
print(cur.fetchall())

[(8056,)]
[(14896,)]


In [10]:
# Delete the redshift cluster to avoid extra costs and relase the space and resources.
redshift.delete_cluster(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER, SkipFinalClusterSnapshot=True)

{'Cluster': {'ClusterIdentifier': 'dwhcluster21west',
  'NodeType': 'dc2.large',
  'ClusterStatus': 'deleting',
  'MasterUsername': 'dwhuser',
  'DBName': 'dwh_project3',
  'Endpoint': {'Address': 'dwhcluster21west.cvlw03muivfx.us-west-2.redshift.amazonaws.com',
   'Port': 5439},
  'ClusterCreateTime': datetime.datetime(2022, 10, 21, 20, 31, 49, 183000, tzinfo=tzlocal()),
  'AutomatedSnapshotRetentionPeriod': 1,
  'ClusterSecurityGroups': [],
  'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-04a6af88313441527',
    'Status': 'active'}],
  'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
    'ParameterApplyStatus': 'in-sync'}],
  'ClusterSubnetGroupName': 'default',
  'VpcId': 'vpc-026d79a0229140013',
  'AvailabilityZone': 'us-west-2c',
  'PreferredMaintenanceWindow': 'sun:11:30-sun:12:00',
  'PendingModifiedValues': {},
  'ClusterVersion': '1.0',
  'AllowVersionUpgrade': True,
  'NumberOfNodes': 4,
  'PubliclyAccessible': True,
  'Encrypted': False,
  'Tags': 