In [None]:
%%sql project.redshift
DROP TABLE IF EXISTS store_promotions;

In [None]:
%%sql project.redshift
DROP TABLE IF EXISTS store_sales;

In [None]:
%%sql project.redshift
DROP TABLE IF EXISTS store_dim;

In [None]:
%%sql project.redshift
DROP TABLE IF EXISTS date_dim;

### Create Date Dimension table

In [None]:
%%sql project.redshift
create table date_dim(
    date_key integer not null,
    date_time timestamp,
    week smallint not null,
    month smallint not null,
    quarter smallint not null,
    year smallint not null,
    weekday smallint not null,
    primary key(date_key)
)
diststyle all;



### Create Store Dimension table


In [None]:
%%sql project.redshift
create table store_dim(
    store_id integer not null,
    store_name varchar(50) not null,
    store_address varchar(100) not null,
    city varchar(50) not null,
    state varchar(10) not null,
    country varchar(50) not null,
    primary key(store_id)
)
diststyle all;



### Create Store Sales table


In [None]:
%%sql project.redshift
create table store_sales(
    store_id integer not null,
    date_key integer not null,
    total_sales decimal not null,
    primary key(store_id, date_key),
    foreign key(store_id) references store_dim(store_id),
    foreign key(date_key) references date_dim(date_key)
)
distkey(store_id);



### Create Store Promotions table


In [None]:
%%sql project.redshift
create table store_promotions(
    store_id integer not null,
    date_key integer not null,
    promo smallint not null,
    school_holiday smallint not null,
    primary key(store_id, date_key),
    foreign key(store_id) references store_dim(store_id),
    foreign key(date_key) references date_dim(date_key)
)
distkey(store_id);

In [None]:
%%pyspark project.spark
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

### Import modules & Set the Spark Session


In [None]:
%%pyspark project.spark
import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, date_format
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession

  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

### Fetch the Default S3 Bucket and IAM Role

In [None]:
import sagemaker
from sagemaker import get_execution_role

session = sagemaker.Session()
s3_bucket = session.default_bucket()
s3_prefix = sagemaker.Session().default_bucket_prefix

print ('-----------------------------------------')
print ('default bucket: ' + s3_bucket)
print ('-----------------------------------------')

iam_role = get_execution_role()
print ('-----------------------------------------')
print ('iam role: ' + iam_role)
print ('-----------------------------------------')


print ('-----------------------------------------')
print ('s3_prefix: ' + s3_prefix)
print ('-----------------------------------------')

%send_to_remote --name project.spark  --local s3_bucket --remote s3_bucket	
%send_to_remote --name project.spark  --local iam_role --remote iam_role                                                                                              
%send_to_remote --name project.spark  --local s3_prefix --remote s3_prefix

In [None]:
import os
import boto3


s3_client = boto3.client('s3')

# Walk through the local folder
local_folder_path = '../../input_data/'
bucket_name = s3_bucket
s3_target_folder = s3_prefix

for root, dirs, files in os.walk(local_folder_path):
     for file_name in files:
         local_path = os.path.join(root, file_name)  # Full local path
         relative_path = os.path.relpath(local_path, local_folder_path)  # Relative to the base folder
         s3_path = os.path.join(s3_target_folder, relative_path) if s3_target_folder else relative_path  # S3 path
            
         # Normalize S3 path for Unix-style paths
         s3_path = s3_path.replace("\\", "/")
            
         print(f"Uploading {local_path} to s3://{bucket_name}/{s3_path}")
         s3_client.upload_file(local_path, bucket_name, s3_path)


### Set up the variables

#### Special notes for setting up the following variables:
* s3_bucket => use the default bucket name printed in the above cell
* iam_role => use the iam role arn printed in the above cell
* redshift_url => Go to project > compute > select the redshift datawarehouse and copy the jdbc url. Since we are using IAM role to connect the database, use 'jdbc:redshift:iam://...' instead of 'jdbc:redshift://...'


In [None]:
%%pyspark project.spark

# Go to project > compute > select the redshift datawarehouse and copy the jdbc url. 
# Since we are using IAM role, use 'jdbc:redshift:iam://...' instead of 'jdbc:redshift://...'
redshift_url = 'jdbc:redshift:iam://your_redshift_url:port/db'

# Set the prefix for the redshift temp directory
s3_redshift_prefix = s3_bucket + '/' + s3_prefix +'/redshift-temp/'

s3_input_path = 's3://' + s3_bucket + '/' + s3_prefix + '/'
date_dim_file = 'date_dim.csv'
store_promotions_file = 'store_promotions.csv'
store_sales_file = 'store_sales.csv'
redshiftTmpDir = 's3://' + s3_bucket + '/' + s3_prefix
date_dim_table = 'project.date_dim'
store_dim_table = 'project.store_dim'
store_sales_table = 'project.store_sales'
store_promotions_table = 'project.store_promotions'
glue_db = 'sales_db'
rs_database = "dev"
redshift_iam_role = iam_role

### Read Date Dimension file from S3

In [None]:
%%pyspark project.spark
date_dim_path = s3_input_path + date_dim_file
# Script generated for node Amazon S3
AmazonS3_date_dim_dyf = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": "\"", 
        "withHeader": True, 
        "separator": ","}, 
    connection_type="s3", 
    format="csv", 
    connection_options={
        "paths": [date_dim_path], 
        "recurse": True},
    transformation_ctx="AmazonS3_date_dim_dyf")

#AmazonS3_date_dim_dyf.toDF()

AmazonS3_date_dim_df = AmazonS3_date_dim_dyf.toDF()
# Show the first 10 rows
print("First 10 rows of the DataFrame:")
AmazonS3_date_dim_df.show(n=10, truncate=False)

### Write Date Dimension in Redshift

In [None]:
%%pyspark project.spark
redshift_url# Convert data types
AmazonS3_date_dim_df = AmazonS3_date_dim_df.withColumn("date_key", col("date_key").cast(IntegerType()))\
                                                .withColumn("date_time", to_timestamp(date_format("date_time", "yyyy-MM-dd HH:mm:ss")))\
                                                .withColumn("week", col("week").cast(IntegerType()))\
                                                .withColumn("month", col("month").cast(IntegerType()))\
                                                .withColumn("quarter", col("quarter").cast(IntegerType()))\
                                                .withColumn("year", col("year").cast(IntegerType()))\
                                                .withColumn("weekday", col("weekday").cast(IntegerType()))

# Write the date_dim in Redshift
(
    AmazonS3_date_dim_df.write.format("io.github.spark_redshift_community.spark.redshift")
    .option("url", redshift_url)
    .option("dbtable", date_dim_table)
    .option("tempdir", redshiftTmpDir)
    .option("aws_iam_role", redshift_iam_role)
    .mode("append")
    .save()
)

print("Data successfully uploaded to Redshift table: date_dim")

In [None]:
%%sql project.redshift
select * from date_dim limit 10

In [None]:
%%sql project.redshift
select count(*) from date_dim

### Read Store Sales file from S3


In [None]:
%%pyspark project.spark
store_sales_path = s3_input_path + store_sales_file

# Script generated for node Amazon S3
AmazonS3_store_sales_dyf = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": "\"", 
        "withHeader": True, 
        "separator": ","}, 
    connection_type="s3", 
    format="csv", 
    connection_options={
        "paths": [store_sales_path], 
        "recurse": True},
    transformation_ctx="AmazonS3_store_sales_dyf")

AmazonS3_store_sales_df = AmazonS3_store_sales_dyf.toDF()
# Show the first 10 rows
print("First 10 rows of the DataFrame:")
AmazonS3_store_sales_df.show(n=10, truncate=False)

### Extract Store Information


In [None]:
%%pyspark project.spark
# Create a new DataFrame with the specified fields
AmazonS3_store_dim_df = AmazonS3_store_sales_df.select(
    "store_id",
    "store_name",
    "store_address",
    "city",
    "state",
    "country"
).distinct()

# Show the first 10 rows
print("First 10 rows of the DataFrame:")
AmazonS3_store_dim_df.show(n=10, truncate=False)

### Write Store Dimension in Redshift


In [None]:
%%pyspark project.spark
# Convert data types
AmazonS3_store_dim_df = AmazonS3_store_dim_df.withColumn("store_id", col("store_id").cast(IntegerType()))


# Write the store_dim in Redshift
(
    AmazonS3_store_dim_df.write.format("io.github.spark_redshift_community.spark.redshift")
    .option("url", redshift_url)
    .option("dbtable", store_dim_table)
    .option("tempdir", redshiftTmpDir)
    .option("aws_iam_role", redshift_iam_role)
    .mode("append")
    .save()
)

print("Data successfully uploaded to Redshift table: store_dim")

In [None]:
%%sql project.redshift
select * from store_dim limit 10

In [None]:
%%sql project.redshift
select count(*) from store_dim

### Create Store Sales dataframe

In [None]:
%%pyspark project.spark
# Convert data types
AmazonS3_store_sales_df = AmazonS3_store_sales_df.withColumn("store_id", col("store_id").cast(IntegerType()))\
                                                .withColumn("date_time", to_timestamp(date_format("sales_date", "yyyy-MM-dd HH:mm:ss")))

# Join the DataFrames
AmazonS3_store_sales_df_joined = AmazonS3_store_sales_df.join(
    AmazonS3_date_dim_df,
    AmazonS3_store_sales_df.date_time == AmazonS3_date_dim_df.date_time,
    "left"
)

# Create the new DataFrame with the required columns
AmazonS3_store_sales_df_updated = AmazonS3_store_sales_df_joined.select(
    col("store_id"),
    col("date_key"),
    col("total_sales")
)

# Show the first few rows of the new DataFrame
AmazonS3_store_sales_df_updated.show(5)

### Write Store Sales in Redshift

In [None]:
%%pyspark project.spark
# Write the store_sales in Redshift
(
    AmazonS3_store_sales_df_updated.write.format("io.github.spark_redshift_community.spark.redshift")
    .option("url", redshift_url)
    .option("dbtable", store_sales_table)
    .option("tempdir", redshiftTmpDir)
    .option("aws_iam_role", redshift_iam_role)
    .mode("append")
    .save()
)

print("Data successfully uploaded to Redshift table: store_sales")

In [None]:
%%sql project.redshift
select * from store_sales limit 10

In [None]:
%%sql project.redshift
select count(*) from store_sales

### Read Store Promotions file from S3


In [None]:
%%pyspark project.spark
store_promotions_path = s3_input_path + store_promotions_file

# Script generated for node Amazon S3
AmazonS3_store_promotions_dyf = glueContext.create_dynamic_frame.from_options(
    format_options={
        "quoteChar": "\"", 
        "withHeader": True, 
        "separator": ","}, 
    connection_type="s3", 
    format="csv", 
    connection_options={
        "paths": [store_promotions_path], 
        "recurse": True},
    transformation_ctx="AmazonS3_store_promotions_dyf")

AmazonS3_store_promotions_df = AmazonS3_store_promotions_dyf.toDF()
# Show the first 10 rows
print("First 10 rows of the DataFrame:")
AmazonS3_store_promotions_df.show(n=10, truncate=False)

### Create Store_Promotions updated dataframe


In [None]:
%%pyspark project.spark
# Convert data types
AmazonS3_store_promotions_df = AmazonS3_store_promotions_df.withColumn("store_id", col("store_id").cast(IntegerType()))\
                                                .withColumn("date_time", to_timestamp(date_format("sales_date", "yyyy-MM-dd HH:mm:ss")))\
                                                .withColumn("promo", col("promo").cast(IntegerType()))\
                                                .withColumn("school_holiday", col("school_holiday").cast(IntegerType()))

# Join the DataFrames
AmazonS3_store_promotions_df_joined = AmazonS3_store_promotions_df.join(
    AmazonS3_date_dim_df,
    AmazonS3_store_promotions_df.date_time == AmazonS3_date_dim_df.date_time,
    "left"
)

# Create the new DataFrame with the required columns
AmazonS3_store_promotions_df_updated = AmazonS3_store_promotions_df_joined.select(
    col("store_id"),
    col("date_key"),
    col("promo"),
    col("school_holiday")
)

# Show the first few rows of the new DataFrame
AmazonS3_store_promotions_df_updated.show(5)

### Write Store Promotions in Redshift


In [None]:
%%pyspark project.spark
# Write the store_promotions in Redshift
(
    AmazonS3_store_promotions_df_updated.write.format("io.github.spark_redshift_community.spark.redshift")
    .option("url", redshift_url)
    .option("dbtable", store_promotions_table)
    .option("tempdir", redshiftTmpDir)
    .option("aws_iam_role", redshift_iam_role)
    .mode("append")
    .save()
)

print("Data successfully uploaded to Redshift table: store_promotions")

In [None]:
%%sql project.redshift
select * from store_promotions limit 10

In [None]:
%%sql project.redshift
select count(*) from store_promotions

In [None]:
%%sql project.redshift
select  sm.store_id, sm.store_name, dm.date_time as sales_date, ss.total_sales, sp.promo,sp.school_holiday
from    store_dim sm,
        date_dim dm,
        store_sales ss,
        store_promotions sp
where   sm.store_id = ss.store_id
and     dm.date_key = ss.date_key
and     sm.store_id = sp.store_id
and     dm.date_key = sp.date_key
order by sm.store_id
limit 10