In [None]:
import boto3 
import pandas as pd
import numpy as np
from io import StringIO
import time

In [None]:
import configparser 
config = configparser.ConfigParser()
config.read_file(open(r'C:\Users\Test\Desktop\Knowledge\2.DVD_rental\cluster.config')) #always put r

In [None]:
KEY                       = config.get('AWS', 'KEY')
SECRET                    = config.get('AWS', 'SECRET')
AWS_REGION                = config.get('AWS', 'AWS_REGION')

SCHEMA_NAME               = config.get('GLUE', 'SCHEMA_NAME')
S3_STAGING_DIR            = config.get('GLUE', 'S3_STAGING_DIR')
S3_OUTPUT_BUCKET_NAME     = config.get('GLUE', 'S3_OUTPUT_BUCKET_NAME')
S3_OUTPUT_DIRECTORY       = config.get('GLUE', 'S3_OUTPUT_DIRECTORY')
S3_BUCKET_NAME            = config.get('GLUE', 'S3_BUCKET_NAME')

DWH_CLUSTER_TYPE          = config.get('DWH', 'DWH_CLUSTER_TYPE')
DWH_NUM_NODES             = config.get('DWH', 'DWH_NUM_NODES')
DWH_NODE_TYPE             = config.get('DWH', 'DWH_NODE_TYPE')
DWH_CLUSTER_IDENTIFIER    = config.get('DWH', 'DWH_CLUSTER_IDENTIFIER')
DWH_DB                    = config.get('DWH', 'DWH_DB')
DWH_DB_USER               = config.get('DWH', 'DWH_DB_USER')
DWH_DB_PASSWORD           = config.get('DWH', 'DWH_DB_PASSWORD')
DWH_PORT                  = config.get('DWH', 'DWH_PORT')
DWH_DB_USER               = config.get('DWH', 'DWH_DB_USER')
DWH_IAM_ROLE_NAME         = config.get('DWH', 'DWH_IAM_ROLE_NAME')

In [None]:
s3 =            boto3.resource
                ('s3',
                region_name=AWS_REGION,
                aws_access_key_id=KEY,
                aws_secret_access_key=SECRET
                )

athena_client = boto3.client(
                "athena",
                aws_access_key_id = KEY,
                aws_secret_access_key= SECRET,
                region_name=AWS_REGION,
                )

In [None]:
#showing what is inside of our bucket 
bucket=s3.Bucket(S3_BUCKET_NAME)
log_data_files = [filename.key for filename in bucket.objects.filter(Prefix='')] # Prefix use for getting files with specific str
#in this case all files in S3 we considere as objects 'objects.filter'

In [None]:
Dict = {}
def download_and_load_query_results(
    client: boto3.client, query_response: Dict
) -> pd.DataFrame:
    while True:
        try:
            #This function only loads first 1000 rows
            client.get_query_results(
                    QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:
            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    s3_client = boto3.client(
        "s3",
        aws_access_key_id=KEY,
        aws_secret_access_key=SECRET,
        region_name=AWS_REGION,
    )
    s3_client.download_file(
        S3_OUTPUT_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )
    return pd.read_csv(temp_file_location) 

In [None]:
#1 ACTOR
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_actor",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
actor = download_and_load_query_results(athena_client, response)

#2 ADDRESS
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_address",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
address = download_and_load_query_results(athena_client, response)

#3 CATEGORY
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_category",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
category = download_and_load_query_results(athena_client, response)

#4 CITY
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_city",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
city = download_and_load_query_results(athena_client, response)

#5 COUNTRY
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_COUNTRY",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
country = download_and_load_query_results(athena_client, response)

#6 CUSTOMER
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_CUSTOMER",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
customer = download_and_load_query_results(athena_client, response)

#7 FILM
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_FILM",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
film = download_and_load_query_results(athena_client, response)

#8 FILM_ACTOR
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_FILM_ACTOR",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
film_actor = download_and_load_query_results(athena_client, response)

#9 FILM_CATEGORY
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_FILM_CATEGORY",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
film_category = download_and_load_query_results(athena_client, response)

#10 INVENTORY
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_INVENTORY",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
inventory = download_and_load_query_results(athena_client, response)

#11 LANGUAGE
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_LANGUAGE",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
language = download_and_load_query_results(athena_client, response)

#12 PAYMENT
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_PAYMENT",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
payment = download_and_load_query_results(athena_client, response)

#13 RENTAL
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_RENTAL",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
rental = download_and_load_query_results(athena_client, response)

#14 STAFF
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_STAFF",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
staff = download_and_load_query_results(athena_client, response)

#15 STORE
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM dvd_rental_STORE",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
store = download_and_load_query_results(athena_client, response)

In [None]:
#CREATING DIM_MOVIE 
response = athena_client.start_query_execution(
    QueryString= 
    "SELECT F.film_id movie_key, \
            F.title, \
            F.release_year, \
            F.rating, \
            F.length, \
            F.rental_duration, \
            F.rental_rate, \
            F.replacement_cost, \
            L.name original_language, \
            F.description \
     FROM dvd_rental_FILM F \
     JOIN dvd_rental_LANGUAGE L on F.language_id = L.language_id \
     JOIN dvd_rental_FILM_CATEGORY FC on F.film_id = FC.film_id \
     JOIN dvd_rental_category CA on CA.category_id = FC.category_id \
     order by F.film_id \
    ",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
DIM_MOVIE = download_and_load_query_results(athena_client, response)
#DIM_MOVIE.head()

In [None]:
#CREATING DIM_DATE 
PAYMENT = payment[['payment_date']]
date_key = PAYMENT['payment_date'].unique()
date_key = np.sort(date_key)
DIM_DATE = pd.DataFrame({'date_key': date_key})
DIM_DATE['date'] = DIM_DATE['date_key']
DIM_DATE['date_key'] = DIM_DATE['date_key'].str.replace('-','')
DIM_DATE['date'] = pd.to_datetime(DIM_DATE['date'], format='%Y-%m-%d')
DIM_DATE['year'] = DIM_DATE['date'].dt.year
DIM_DATE['quarter'] = DIM_DATE['date'].dt.quarter
DIM_DATE['month'] = DIM_DATE['date'].dt.month
DIM_DATE['day_name'] = DIM_DATE['date'].dt.day_name()
#DIM_DATE.head()

In [None]:
#CREATING DIM_CUSTOMER 
CUSTOMER = customer[['customer_id','address_id','first_name','last_name', 'email' ]]
ADDRESS = address[['address_id','city_id','address','district']]
CITY = city[['city_id','country_id','city']]
COUNTRY = country[['country_id', 'country']]
DIM_CUSTOMER = CUSTOMER.merge(ADDRESS, on='address_id').merge(CITY,on='city_id').merge(COUNTRY, on='country_id')
DIM_CUSTOMER = DIM_CUSTOMER.drop(columns=['address_id', 'city_id', 'country_id'])
DIM_CUSTOMER = DIM_CUSTOMER.rename(columns={'customer_id':'customer_key'})
#DIM_CUSTOMER.head() 

In [None]:
#CREATING DIM_STORE 
STORE = store[['store_id','staff_id','address_id']]
ADDRESS = address[['address_id','city_id','address','district']]
CITY = city[['city_id','country_id','city']]
COUNTRY = country[['country_id', 'country']]
STAFF = staff[['staff_id', 'first_name', 'last_name', 'email']]
DIM_STORE = STORE.merge(ADDRESS, on='address_id').merge(CITY,on='city_id').merge(COUNTRY, on='country_id').merge(STAFF, on='staff_id')
DIM_STORE = DIM_STORE.drop(columns=['staff_id','address_id', 'city_id', 'country_id'])
DIM_STORE = DIM_STORE.rename(columns={'store_id': 'store_key','first_name': 'manager_first_name', 'last_name': 'manager_last_name'})
#DIM_STORE.head()

In [None]:
#CREATING DIM_FACT 
DIM_FACT = payment.merge(rental, on = 'rental_id').merge(inventory, on = 'inventory_id')
DIM_FACT = DIM_FACT.loc[:, ['payment_date', 'customer_id_y', 'film_id', 'store_id', 'amount']]
DIM_FACT['payment_date'] = DIM_FACT['payment_date'].str.replace('-','')
DIM_FACT = DIM_FACT.rename(
    columns={'payment_date': 'date_key', 'customer_id_y': 'customer_key', 'film_id': 'movie_key', 'store_id': 'store_key'})
#DIM_FACT.head()

In [None]:
csv_buffer = StringIO() #put our value into a binary value
DIM_FACT.to_csv(csv_buffer) #store table into paticular buffer
s3.Object(S3_BUCKET_NAME, 'output/DIM_FACT.csv').put(Body=csv_buffer.getvalue())

csv_buffer = StringIO()
DIM_MOVIE.to_csv(csv_buffer) 
s3.Object(S3_BUCKET_NAME, 'output/DIM_MOVIE.csv').put(Body=csv_buffer.getvalue())

csv_buffer = StringIO()
DIM_DATE.to_csv(csv_buffer) 
s3.Object(S3_BUCKET_NAME, 'output/DIM_DATE.csv').put(Body=csv_buffer.getvalue())

csv_buffer = StringIO()
DIM_CUSTOMER.to_csv(csv_buffer) 
s3.Object(S3_BUCKET_NAME, 'output/DIM_CUSTOMER.csv').put(Body=csv_buffer.getvalue())

csv_buffer = StringIO()
DIM_STORE.to_csv(csv_buffer) 
s3.Object(S3_BUCKET_NAME, 'output/DIM_STORE.csv').put(Body=csv_buffer.getvalue())

In [None]:
#extract schema out of data frame = RedShift
DIM_DATE_schema = pd.io.sql.get_schema(DIM_DATE.reset_index(), 'DIM_DATE')
DIM_FACT_schema = pd.io.sql.get_schema(DIM_FACT.reset_index(), 'DIM_FACT')
DIM_MOVIE_schema = pd.io.sql.get_schema(DIM_MOVIE.reset_index(), 'DIM_MOVIE')
DIM_CUSTOMER_schema = pd.io.sql.get_schema(DIM_CUSTOMER.reset_index(), 'DIM_CUSTOMER')
DIM_STORE_schema = pd.io.sql.get_schema(DIM_STORE.reset_index(), 'DIM_STORE')
print(''.join(DIM_FACT_schema))

In [None]:
import redshift_connector

In [None]:
redshift = boto3.client('redshift',
                    region_name='eu-west-3',
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                    )

iam = boto3.client('iam',
                    region_name='eu-west-3',
                    aws_access_key_id=KEY,
                    aws_secret_access_key=SECRET
                    )

In [None]:
roleArn = iam.get_role(RoleName=DWH_IAM_ROLE_NAME)['Role']['Arn']
#we are fetching the ARN code 

In [None]:
#creating RedShift cluser
#https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/redshift.html#Redshift.Client.create_cluster
try:
    response = redshift.create_cluster(
    ClusterType=DWH_CLUSTER_TYPE,
    NodeType=DWH_NODE_TYPE, 
    #Identifiers & Credentials
    DBName=DWH_DB,
    ClusterIdentifier=DWH_CLUSTER_IDENTIFIER,
    MasterUsername=DWH_DB_USER,
    MasterUserPassword=DWH_DB_PASSWORD,
    #Roles (for s3 access)
    IamRoles=[roleArn]  
    )
except Exception as e:
    print (e)

In [None]:
conn = redshift_connector.connect(
    host      = 'dvd-rental.comq6v9wejtd.eu-west-3.redshift.amazonaws.com',
    database  = DWH_DB,
    user      = DWH_DB_USER,
    password  = DWH_DB_PASSWORD
)

In [None]:
conn.autocommit = True

In [None]:
cursor= redshift_connector.Cursor = conn.cursor()
#cursor is used for quering

In [None]:
#create table in RedShift
cursor.execute(DIM_DATE_schema)
cursor.execute(DIM_FACT_schema)
cursor.execute(DIM_MOVIE_schema)
cursor.execute(DIM_CUSTOMER_schema)
cursor.execute(DIM_STORE_schema)

In [None]:
#copy command
cursor.execute ( """
copy DIM_DATE from 's3://dvd-rental-dtb/output/DIM_DATE.csv'
credentials 'aws_iam_role=arn:aws:iam::994085994635:role/redshift-s3-access'
delimiter ','
region 'eu-west-3'
IGNOREHEADER 1
""" )

cursor.execute ( """
copy DIM_FACT from 's3://dvd-rental-dtb/output/DIM_FACT.csv'
credentials 'aws_iam_role=arn:aws:iam::994085994635:role/redshift-s3-access'
delimiter ','
region 'eu-west-3'
IGNOREHEADER 1
""" )

cursor.execute ( """
copy DIM_MOVIE from 's3://dvd-rental-dtb/output/DIM_MOVIE.csv'
credentials 'aws_iam_role=arn:aws:iam::994085994635:role/redshift-s3-access'
delimiter ','
region 'eu-west-3'
IGNOREHEADER 1
""" )

cursor.execute ( """
copy DIM_CUSTOMER from 's3://dvd-rental-dtb/output/DIM_CUSTOMER.csv'
credentials 'aws_iam_role=arn:aws:iam::994085994635:role/redshift-s3-access'
delimiter ','
region 'eu-west-3'
IGNOREHEADER 1
""" )

cursor.execute ( """
copy DIM_STORE from 's3://dvd-rental-dtb/output/DIM_STORE.csv'
credentials 'aws_iam_role=arn:aws:iam::994085994635:role/redshift-s3-access'
delimiter ','
region 'eu-west-3'
IGNOREHEADER 1
""" )

In [None]:
# at this point we can add commands to AWS Glue