## Criação das tabelas no Redshift e Carregamento dos dados salvos no S3

In [None]:
%load_ext sql

In [None]:
import pandas as pd
import boto3
import json
import logging

from pathlib import Path
from botocore.exceptions import ClientError

import warnings
warnings.filterwarnings('ignore')

### Carregar parametros do Data Warehouse (DWH) do arquivo

In [None]:
import configparser
config = configparser.ConfigParser()
config.read_file(open('dwh.cfg'))

KEY                    = config.get('AWS','KEY')
SECRET                 = config.get('AWS','SECRET')

DWH_CLUSTER_TYPE       = config.get("DWH","DWH_CLUSTER_TYPE")
DWH_NUM_NODES          = config.get("DWH","DWH_NUM_NODES")
DWH_NODE_TYPE          = config.get("DWH","DWH_NODE_TYPE")

DWH_CLUSTER_IDENTIFIER = config.get("DWH","DWH_CLUSTER_IDENTIFIER")
DWH_DB                 = config.get("DWH","DWH_DB")
DWH_DB_USER            = config.get("DWH","DWH_DB_USER")
DWH_DB_PASSWORD        = config.get("DWH","DWH_DB_PASSWORD")
DWH_PORT               = config.get("DWH","DWH_PORT")

DWH_IAM_ROLE_NAME      = config.get("DWH", "DWH_IAM_ROLE_NAME")

### Criar clientes para IAM, EC2, S3 and Redshift

In [None]:
import boto3

ec2 = boto3.resource('ec2',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                    )

s3 = boto3.resource('s3',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                   )

iam = boto3.client('iam',aws_access_key_id=KEY,
                     aws_secret_access_key=SECRET,
                     region_name='us-east-1'
                  )

redshift = boto3.client('redshift',
                       region_name="us-east-1",
                       aws_access_key_id=KEY,
                       aws_secret_access_key=SECRET
                       )

In [None]:
roxProjectBucket = s3.Bucket("roxproject")
for obj in roxProjectBucket.objects.all():
    print(obj)

s3.ObjectSummary(bucket_name='roxproject', key='data/Person.Person.csv')
s3.ObjectSummary(bucket_name='roxproject', key='data/Production.Product.csv')
s3.ObjectSummary(bucket_name='roxproject', key='data/Sales.Customer.csv')
s3.ObjectSummary(bucket_name='roxproject', key='data/Sales.SalesOrderDetail.csv')
s3.ObjectSummary(bucket_name='roxproject', key='data/Sales.SalesOrderHeader.csv')
s3.ObjectSummary(bucket_name='roxproject', key='data/Sales.SpecialOfferProduct.csv')
s3.ObjectSummary(bucket_name='roxproject', key='raw_data/Person.Person.csv')
s3.ObjectSummary(bucket_name='roxproject', key='raw_data/Production.Product.csv')
s3.ObjectSummary(bucket_name='roxproject', key='raw_data/Sales.Customer.csv')
s3.ObjectSummary(bucket_name='roxproject', key='raw_data/Sales.SalesOrderDetail.csv')
s3.ObjectSummary(bucket_name='roxproject', key='raw_data/Sales.SalesOrderHeader.csv')
s3.ObjectSummary(bucket_name='roxproject', key='raw_data/Sales.SpecialOfferProduct.csv')


### Status Cluster
- Rodar bloco até o status do cluster ficar `Available`

In [None]:
def prettyRedshiftProps(props):
    pd.set_option('display.max_colwidth', -1)
    keysToShow = ["ClusterIdentifier", "NodeType", "ClusterStatus", "MasterUsername", "DBName", "Endpoint", "NumberOfNodes", 'VpcId']
    x = [(k, v) for k,v in props.items() if k in keysToShow]
    return pd.DataFrame(data=x, columns=["Key", "Value"])

myClusterProps = redshift.describe_clusters(ClusterIdentifier=DWH_CLUSTER_IDENTIFIER)['Clusters'][0]
prettyRedshiftProps(myClusterProps)

Unnamed: 0,Key,Value
0,ClusterIdentifier,doxcluster
1,NodeType,dc2.large
2,ClusterStatus,available
3,MasterUsername,doxuser
4,DBName,dox
5,Endpoint,"{'Address': 'doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com', 'Port': 5439}"
6,VpcId,vpc-73bc5214
7,NumberOfNodes,2


### Guardar dados do cluster

- endpoint and role ARN

<font color='red'>NÂO RODE até que status do cluster esteja "Available" </font>

In [None]:
DWH_ENDPOINT = myClusterProps['Endpoint']['Address']
DWH_ROLE_ARN = myClusterProps['IamRoles'][0]['IamRoleArn']
print("DWH_ENDPOINT :: ", DWH_ENDPOINT)
print("DWH_ROLE_ARN :: ", DWH_ROLE_ARN)

DWH_ENDPOINT ::  doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com
DWH_ROLE_ARN ::  arn:aws:iam::029258854698:role/doxRole


### Abrir porta TCP para acesso ao endpoint do cluster

In [None]:
try:
    vpcSecurityGroup = myClusterProps['VpcSecurityGroups'][0]['VpcSecurityGroupId']
    vpc = ec2.Vpc(id=myClusterProps['VpcId'])
    defaultSg = list(vpc.security_groups.filter(GroupIds=[vpcSecurityGroup]))[0]
    print(defaultSg)
    defaultSg.authorize_ingress(
        GroupName=defaultSg.group_name,
        CidrIp='0.0.0.0/0',
        IpProtocol='TCP',
        FromPort=int(DWH_PORT),
        ToPort=int(DWH_PORT)
    )
except Exception as e:
    print(e)

ec2.SecurityGroup(id='sg-847966fc')


### Teste conexão com o cluster

In [None]:
conn_string="postgresql://{}:{}@{}:{}/{}".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)
print(conn_string)
%sql $conn_string

postgresql://doxuser:Passw0rd123@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox


'Connected: doxuser@dox'

In [None]:
%%sql

DROP TABLE IF EXISTS SalesOrderDetail;
DROP TABLE IF EXISTS SpecialOfferProduct;
DROP TABLE IF EXISTS SalesOrderHeader;
DROP TABLE IF EXISTS Customer;
DROP TABLE IF EXISTS Product;
DROP TABLE IF EXISTS Person;

CREATE TABLE IF NOT EXISTS Person
(
	BusinessEntityID INTEGER NOT NULL,
	PersonType VARCHAR(10),
	NameStyle INTEGER,
	Title VARCHAR(10),
	FirstName VARCHAR(50),
	MiddleName VARCHAR(50),
	LastName VARCHAR(50),
	Suffix VARCHAR(10),
	EmailPromotion INTEGER,
	AdditionalContactInfo VARCHAR(2000),
	Demographics VARCHAR(1000),
	rowguid VARCHAR(36),
	ModifiedDate TIMESTAMP,
	PRIMARY KEY(BusinessEntityID)
);

CREATE TABLE IF NOT EXISTS Product
(
	ProductID INTEGER NOT NULL,
	Name VARCHAR(50),
	ProductNumber VARCHAR(10),
	MakeFlag INTEGER,
	FinishedGoodsFlag INTEGER,
	Color VARCHAR(20),
	SafetyStockLevel INTEGER,
	ReorderPoint INTEGER,
	StandardCost FLOAT,
	ListPrice FLOAT,
	Size VARCHAR(10),
	SizeUnitMeasureCode VARCHAR(10),
	WeightUnitMeasureCode VARCHAR(10),
	Weight FLOAT,
	DaysToManufacture INTEGER,
	ProductLine VARCHAR(10),
	Class VARCHAR(10),
	Style VARCHAR(10),
	ProductSubcategoryID INTEGER,
	ProductModelID INTEGER,
	SellStartDate TIMESTAMP,
	SellEndDate TIMESTAMP,
	DiscontinuedDate TIMESTAMP,
	rowguid VARCHAR(36),
	ModifiedDate TIMESTAMP,
	PRIMARY KEY (ProductID)
);

CREATE TABLE IF NOT EXISTS Customer
(
	CustomerID INTEGER NOT NULL,
	PersonID INTEGER,
	StoreID INTEGER,
	TerritoryID INTEGER,
	AccountNumber VARCHAR(10),
	rowguid VARCHAR(36),
	ModifiedDate TIMESTAMP,
	PRIMARY KEY (CustomerID),
	FOREIGN KEY (PersonID) REFERENCES Person(BusinessEntityID)
);

CREATE TABLE IF NOT EXISTS SalesOrderHeader
(
	SalesOrderID INTEGER NOT NULL,
	RevisionNumber INTEGER,
	OrderDate TIMESTAMP,
	DueDate TIMESTAMP,
	ShipDate TIMESTAMP,
	Status INTEGER,
	OnlineOrderFlag INTEGER,
	SalesOrderNumber VARCHAR(10),
	PurchaseOrderNumber VARCHAR(20),
	AccountNumber VARCHAR(20),
	CustomerID INTEGER,
	SalesPersonID INTEGER,
	TerritoryID INTEGER,
	BillToAddressID INTEGER,
	ShipToAddressID INTEGER,
	ShipMethodID INTEGER,
	CreditCardID INTEGER,
	CreditCardApprovalCode VARCHAR(20),
	CurrencyRateID INTEGER,
	SubTotal FLOAT,
	TaxAmt FLOAT,
	Freight FLOAT,
	TotalDue FLOAT,
	Comment VARCHAR(10),
	rowguid VARCHAR(36),
	ModifiedDate TIMESTAMP,
	PRIMARY KEY (SalesOrderID),
	FOREIGN KEY (CustomerID) REFERENCES Customer(CustomerID)
);

CREATE TABLE IF NOT EXISTS SpecialOfferProduct
(
	SpecialOfferID INTEGER NOT NULL,
	ProductID INTEGER NOT NULL,
	rowguid VARCHAR(36),
	ModifiedDate TIMESTAMP,
	PRIMARY KEY (SpecialOfferID, ProductID),
	FOREIGN KEY (ProductID) REFERENCES Product(ProductID)
);

CREATE TABLE IF NOT EXISTS SalesOrderDetail
(
	SalesOrderID INTEGER NOT NULL,
	SalesOrderDetailID INTEGER NOT NULL,
	CarrierTrackingNumber VARCHAR(20),
	OrderQty INTEGER,
	ProductID INTEGER,
    SpecialOfferID INTEGER,
    UnitPrice FLOAT,
    UnitPriceDiscount FLOAT,
    LineTotal FLOAT,
    rowguid VARCHAR(36),
    ModifiedDate TIMESTAMP,
    PRIMARY KEY (SalesOrderID, SalesOrderDetailID),
    FOREIGN KEY (SalesOrderID) 
        REFERENCES SalesOrderHeader(SalesOrderID),
    FOREIGN KEY (SpecialOfferID, ProductID) 
        REFERENCES SpecialOfferProduct(SpecialOfferID, ProductID),
    FOREIGN KEY (ProductID) 
        REFERENCES Product(ProductID)
);

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.
Done.


[]

In [None]:
%%sql 

SELECT count(*) FROM Person

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
1 rows affected.


count
0


In [None]:
qry = f"""
    COPY Person
    FROM 's3://roxproject/data/Person.Person.csv'
    IAM_ROLE '{DWH_ROLE_ARN}'
    IGNOREHEADER 1
    DELIMITER ';'
"""

%sql $qry

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
Done.


[]

In [None]:
%%sql

SELECT COUNT(*) FROM person

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
1 rows affected.


count
19972


In [None]:
qry = f"""
    COPY Product
    FROM 's3://roxproject/data/Production.Product.csv'
    IAM_ROLE '{DWH_ROLE_ARN}'
    IGNOREHEADER 1
    DELIMITER ';'
"""

%sql $qry

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
Done.


[]

In [None]:
%%sql

SELECT COUNT(*) FROM Product

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
1 rows affected.


count
504


In [None]:
qry = f"""
    COPY Customer
    FROM 's3://roxproject/data/Sales.Customer.csv'
    IAM_ROLE '{DWH_ROLE_ARN}'
    IGNOREHEADER 1
    DELIMITER ';'
"""

%sql $qry

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
Done.


[]

In [None]:
%%sql

SELECT COUNT(*) FROM Customer

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
1 rows affected.


count
19820


In [None]:
qry = f"""
    COPY SalesOrderHeader
    FROM 's3://roxproject/data/Sales.SalesOrderHeader.csv'
    IAM_ROLE '{DWH_ROLE_ARN}'
    IGNOREHEADER 1
    DELIMITER ';'
"""

%sql $qry

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
Done.


[]

In [None]:
%%sql

SELECT COUNT(*) FROM SalesOrderHeader

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
1 rows affected.


count
31465


In [None]:
qry = f"""
    COPY SpecialOfferProduct
    FROM 's3://roxproject/data/Sales.SpecialOfferProduct.csv'
    IAM_ROLE '{DWH_ROLE_ARN}'
    IGNOREHEADER 1
    DELIMITER ';'
"""

%sql $qry

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
Done.


[]

In [None]:
%%sql

SELECT COUNT(*) FROM SpecialOfferProduct

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
1 rows affected.


count
538


In [None]:
qry = f"""
    COPY SalesOrderDetail
    FROM 's3://roxproject/data/Sales.SalesOrderDetail.csv'
    IAM_ROLE '{DWH_ROLE_ARN}'
    IGNOREHEADER 1
    DELIMITER ';'
"""

%sql $qry

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
Done.


[]

In [None]:
%%sql

SELECT COUNT(*) FROM SalesOrderDetail

 * postgresql://doxuser:***@doxcluster.craeubgpe2r5.us-east-1.redshift.amazonaws.com:5439/dox
1 rows affected.


count
121317
