<a href="https://colab.research.google.com/github/Apekshaa2908/Enhancing-E-Commerce-Agility-With-Advanced-ETL-Pipeline/blob/main/GlueJob.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql.functions import col
import sys
import boto3
import json
import time
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

# Creating a Spark session
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

# Define Redshift cluster and database information
jdbc_url = "jdbc:redshift://enter-your-cluster-url-here:5439/enter-your-db-name-here"
redshift_db = "enter-your-db-name-here"
redshift_table = "enter-your-target-redshift-table-here"

# Fetch credentials from AWS Secrets Manager
def get_redshift_credentials(secret_arn):
    client = boto3.client('secretsmanager')

    try:
        response = client.get_secret_value(SecretId=secret_arn)
        secret = json.loads(response['SecretString'])
        return secret['username'], secret['password']
    except Exception as e:
        raise Exception(f"Error retrieving secret: {e}")

# Trigger the Glue Crawler
def trigger_crawler(crawler_name):
    glue_client = boto3.client('glue')

    # Start the crawler
    try:
        glue_client.start_crawler(Name=crawler_name)
        print(f"Crawler {crawler_name} started successfully.")

        # Wait for the crawler to complete
        while True:
            crawler_status = glue_client.get_crawler(Name=crawler_name)['Crawler']['State']
            if crawler_status == 'READY':
                print(f"Crawler {crawler_name} completed.")
                break
            else:
                print(f"Crawler {crawler_name} is running...")
                time.sleep(30)  # Check every 30 seconds

    except Exception as e:
        raise Exception(f"Error starting crawler: {e}")

# Define the secret ARN where Redshift credentials are stored
secret_arn = "enter-your-secret-arn-here"  # Enter your AWS Secrets Manager secret ARN

# Trigger the crawler
crawler_name = "enter-your-crawler-name-here"  # Enter your Glue crawler name
trigger_crawler(crawler_name)

# Fetch the Redshift credentials (username and password)
redshift_user, redshift_password = get_redshift_credentials(secret_arn)

# Create the database properties for the Redshift connection
db_properties = {
    "user": redshift_user,
    "password": redshift_password,
    "driver": "com.amazon.redshift.jdbc42.Driver"
}

# Retrieve the orders data from Glue catalog
orders_df = glueContext.create_dynamic_frame.from_catalog(
    database="enter-your-database-name-here",
    table_name="enter-your-order-table-name-here"
).toDF()

# Retrieve the returns data from Glue catalog
returns_df = glueContext.create_dynamic_frame.from_catalog(
    database="enter-your-database-name-here",
    table_name="enter-your-return-table-name-here"
).toDF()

# Add an index to the DataFrames and filter out the first row (header)
orders_df = orders_df.rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0]).toDF(orders_df.columns)
returns_df = returns_df.rdd.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0]).toDF(returns_df.columns)

# Join the orders and returns dataframes on the 'order_id' column from the second row
joined_df = orders_df.join(
    returns_df,
    orders_df["order_id"] == returns_df["order_id"],
    'inner'
).drop(returns_df["order_id"])

# Write the joined dataframe to Redshift
joined_df.write.jdbc(
    url=jdbc_url,
    table=redshift_table,
    mode="overwrite",
    properties=db_properties
)

print("Data successfully written to Redshift")

