<a href="https://colab.research.google.com/github/deepaksethionly/AWS-data-engineering-projects/blob/steelcase/steelcase_aws_data_pipeline_etl_redshift_jobscript.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Loading Libraries

In [None]:
import boto3
import pandas as pd
import psycopg2
import json
import time
from io import StringIO
import redshift_connector

# Setting up configuration of aws account

In [None]:
AWS_ACCESS_KEY= "your access key"
AWS_SECRET_KEY= "your secret key"
AWS_REGION="us-east-2"
SCHEMA_NAME="steelcase"
S3_STAGING_DIR= "s3://steelcase-output/output/"
S3_BUCKET_NAME= "steelcase-output"
S3_OUTPUT_DIRECTORY='output'

In [None]:
athena_client=boto3.client(
    "athena",
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY,
    region_name=AWS_REGION,
    )

# Getting query from athena and load the data into dataframe using boto3 libarary and other important access details

In [None]:
Dict={}
def download_and_load_query_results(client: boto3.client, query_response: Dict) -> pd.DataFrame:
    while True:
        try:
            client.get_query_results(
                QueryExecutionId=query_response["QueryExecutionId"]
            )
            break
        except Exception as err:

            if "not yet finished" in str(err):
                time.sleep(0.001)
            else:
                raise err
    temp_file_location: str = "athena_query_results.csv"
    S3_client = boto3.client(
        "s3",
        aws_access_key_id=AWS_ACCESS_KEY,
        aws_secret_access_key=AWS_SECRET_KEY,
        region_name=AWS_REGION,
    )

    S3_client.download_file(
        S3_BUCKET_NAME,
        f"{S3_OUTPUT_DIRECTORY}/{query_response['QueryExecutionId']}.csv",
        temp_file_location,
    )

    df = pd.read_csv(temp_file_location)
    return df


In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM master_customer",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)


In [None]:
master_customer = download_and_load_query_results(athena_client, response)

In [None]:
master_customer.head()

In [None]:
master_customer.shape

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM master_product",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
countypopulation = download_and_load_query_results(athena_client, response)


In [None]:
master_product = download_and_load_query_results(athena_client, response)

In [None]:
master_product.head()

In [None]:
master_product_cols_name=master_product.iloc[0,:].tolist()
master_product_cols_name

In [None]:
master_product=master_product.iloc[1:,]

In [None]:
master_product.columns=master_product_cols_name

In [None]:
master_product.head()

In [None]:
master_product.shape

In [None]:
response = athena_client.start_query_execution(
    QueryString="SELECT * FROM store_data",
    QueryExecutionContext={"Database": SCHEMA_NAME},
    ResultConfiguration={
        "OutputLocation": S3_STAGING_DIR,
        "EncryptionConfiguration": {"EncryptionOption": "SSE_S3"},
    },
)
countypopulation = download_and_load_query_results(athena_client, response)


In [None]:
store_data = download_and_load_query_results(athena_client, response)

In [None]:
store_data.head()

In [None]:
store_data.shape

# Dimensional modeling

In [None]:
steelcase_fact_table=store_data[['order_id','customer_id','product_id','sales','discount']].copy()

In [None]:
steelcase_fact_table.head()

In [None]:
steelcase_fact_table.shape

In [None]:
customer_dim_table=master_customer.copy()

In [None]:
customer_dim_table.head()

In [None]:
shopping_dim_table_1=store_data[['product_id','order_date','ship_date','ship_mode']].copy()
shopping_dim_table_2=master_product.copy()

In [None]:
shopping_dim_table_1.columns

In [None]:
shopping_dim_table_2.columns=['product_id','category', 'sub_category', 'product_name']

In [None]:
shopping_dim_table=pd.merge(shopping_dim_table_1,shopping_dim_table_2,on='product_id',how='inner')

In [None]:
shopping_dim_table.head()

In [None]:
shopping_dim_table.shape

In [None]:
shopping_dim_table

# Write a the content of dataframe into buffer using to_csv method (suitable for short data for directly handling the data into memory)

In [None]:
csv_buffer_steelcase = StringIO()
steelcase_fact_table.to_csv(csv_buffer_steelcase,index=False)

In [None]:
csv_buffer_customer = StringIO()
customer_dim_table.to_csv(csv_buffer_customer,index=False)

In [None]:
csv_buffer_shopping = StringIO()
shopping_dim_table.to_csv(csv_buffer_shopping,index=False)

In [None]:
s3_resource = boto3.resource(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_KEY
)

# Setting up data into structured manner for accessing the csv files for further use

In [None]:
csv_files = [
    ('steelcase_fact.csv', csv_buffer_steelcase),
    ('customer_dim.csv', csv_buffer_customer),
    ('shopping_dim.csv', csv_buffer_shopping)
]

# Uploading the csvs in output folder of s3 bucket  

In [None]:
for file_name, csv_buffer in csv_files:
    s3_resource.Object(S3_BUCKET_NAME, 'csvs/' + file_name).put(Body=csv_buffer.getvalue())
    csv_buffer.close()
print("successfully uploaded")

# Getting the schema

In [None]:
steelcase_fact_sql=pd.io.sql.get_schema(steelcase_fact_table.reset_index(),"steelcase_fact")
print(''.join(steelcase_fact_sql))

In [None]:
customer_dim_sql=pd.io.sql.get_schema(customer_dim_table.reset_index(),"customer_dim")
print(''.join(customer_dim_sql))

In [None]:
shopping_dim_sql=pd.io.sql.get_schema(shopping_dim_table.reset_index(),"shopping_dim")
print(''.join(shopping_dim_sql))

# Connection for redshift cluster

In [None]:
conn = redshift_connector.connect(
    host='redshift-cluster-1.cu3tnmdp6x7e.us-east-2.redshift.amazonaws.com',
    database='dev',
    user='awsuser',
    password='Passw0rd123'
 )

In [None]:
conn.autocommit=True

In [None]:
cursor = redshift_connector.Cursor = conn.cursor()

# Making the table according the schema on redshift cluster

In [None]:
cursor.execute("""
CREATE TABLE "steelcase_fact_table" (
  "order_id" TEXT,
  "customer_id" TEXT,
  "product_id" TEXT,
  "sales" REAL,
  "discount" REAL
)

""")


cursor.execute("""
CREATE TABLE "customer_dim_table" (
  "customer_id" TEXT,
  "customer_name" TEXT,
  "segment" TEXT,
  "country" TEXT,
  "city" TEXT,
  "state" TEXT,
  "postal_code" INTEGER,
  "region" TEXT,
  "age" INTEGER
)

""")


cursor.execute("""
CREATE TABLE "shopping_dim_table" (
  "product_id" TEXT,
  "order_date" TEXT,
  "ship_date" TEXT,
  "ship_mode" TEXT,
  "category" TEXT,
  "sub_category" TEXT,
  "product_name" TEXT
)

""")


# Using the copy command, data from S3 is uploaded to the Redshift cluster on tables.

In [None]:
cursor.execute("""
COPY steelcase_fact_table from 's3://steelcase-output/csvs/steelcase_fact.csv'
credentials 'aws_iam_role=arn:aws:iam::835245511616:role/redshift-s3-access'
delimiter ','
region 'us-east-2'
IGNOREHEADER 1;

""")

cursor.execute("""
COPY customer_dim_table from 's3://steelcase-output/csvs/customer_dim.csv'
credentials 'aws_iam_role=arn:aws:iam::835245511616:role/redshift-s3-access'
delimiter ','
region 'us-east-2'
IGNOREHEADER 1;

""")

cursor.execute("""
COPY shopping_dim_table from 's3://steelcase-output/csvs/shopping_dim.csv'
credentials 'aws_iam_role=arn:aws:iam::835245511616:role/redshift-s3-access'
delimiter ','
region 'us-east-2'
IGNOREHEADER 1;

""")

# Here is the link of full project https://github.com/deepaksethionly/AWS-data-engineering-projects/tree/steelcase-aws-de-project . Please upvote and give star to my repository if you like me work