# Generate Mock transaction data

In [1]:
import csv
import os
from random import randint, choice
from datetime import datetime

# Define customer and bank names
customer_names = ["John Doe", "Jane Smith", "Michael Brown", "Sarah Lee", "David Miller"]
bank_names = ["AIB", "Bank of Ireland", "Bank of America", "Wells Fargo", "PNC"]

# Define debit card types
card_types = ["Visa", "Mastercard"]

# Define number of transactions per day
transactions_per_day = 10

# Create a dictionary to store customer information
customer_info = {}

# Define the current date and date string for the CSV file
current_date = datetime.now().date()
date_str = current_date.strftime("%Y%m%d")

# Initialize a list to store transactions
transactions = []

# Generate transactions
for _ in range(transactions_per_day):
    customer_name = choice(customer_names)
    if customer_name not in customer_info:
        # Generate card number and bank for new customer
        card_number = f"{randint(1000, 9999)}-{randint(1000, 9999)}-{randint(1000, 9999)}-{randint(1000, 9999)}"
        bank_name = choice(bank_names)
        customer_info[customer_name] = {
            "customer_id": randint(1000000000, 9999999999),  # 10-digit customer ID
            "debit_card_number": card_number,
            "debit_card_type": choice(card_types),
            "bank_name": bank_name,
        }
    
    # Use retrieved information and add customer name
    transaction_data = customer_info[customer_name].copy()
    transaction_data["name"] = customer_name
    transaction_date = str(current_date)
    amount = round(randint(10, 100) + randint(0, 99) / 100, 2)  # Up to 2 decimal places
    transaction_data["transaction_date"] = transaction_date
    transaction_data["amount_spend"] = amount
    transactions.append(transaction_data)

# Define the directory and CSV file name
directory = 'transaction_data'
if not os.path.exists(directory):
    os.makedirs(directory)
filename = os.path.join(directory, f"transactions_{date_str}.csv")

# Write transactions to CSV file
with open(filename, "w", newline="") as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=["customer_id", "name", "debit_card_number", "debit_card_type",
                                                  "bank_name", "transaction_date", "amount_spend"])
    writer.writeheader()
    writer.writerows(transactions)

print(f"Generated mock transaction data {filename} and saved in CSV file")

Generated mock transaction data transaction_data\transactions_20240806.csv and saved in CSV file


In [2]:
!pip install boto3

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 23.3.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
!pip install python-dotenv

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 23.3.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [4]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Access the environment variables
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
aws_region = os.getenv('AWS_DEFAULT_REGION')


# Upload generated transaction data csv in S3

In [13]:
import boto3
from botocore.exceptions import ClientError
bucket_name = 'transactionsdata0597'
region_name = aws_region

s3_client = boto3.client('s3',aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)

try:
    s3_client.head_bucket(Bucket=bucket_name)
    print(f'Bucket {bucket_name} already exists.')
except ClientError as e:
    if e.response['Error']['Code'] == '404':
        # Bucket does not exist, create it
        try:
            s3_client.create_bucket(Bucket=bucket_name)
            print(f'Bucket {bucket_name} created successfully.')
        except ClientError as create_error:
            print(f'Error creating bucket: {create_error}')
    else:
        print(f'Error checking bucket: {e}')

# Upload the CSV file to the S3 bucket
file_name = filename  # Use the file path from above
object_name = os.path.basename(file_name)  # The object name in S3 will be the same as the file name

# Upload the file
try:
    s3_client.upload_file(file_name, bucket_name, object_name)
    print(f'File {file_name} uploaded to bucket {bucket_name} as {object_name}.')
except ClientError as e:
    print(f'Error uploading file: {e}')

Bucket transactionsdata0597 already exists.
File transaction_data\transactions_20240806.csv uploaded to bucket transactionsdata0597 as transactions_20240806.csv.


In [16]:
import boto3
from botocore.exceptions import ClientError

# Replace with your AWS credentials profile or remove if using default credentials
# boto3.setup_default_session(profile_name='your_profile_name')

# Initialize the RDS client
client = boto3.client('rds', region_name='us-east-1',aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)  # Replace with your preferred region

# Define parameters for the RDS instance
db_instance_identifier = 'customersdata0597'
db_name = 'customersdata0597'
db_master_username = 'rutuja'
db_master_password = 'rutuja051997'
db_instance_class = 'db.t3.micro'  # Replace with your preferred instance class
engine = 'mysql'
allocated_storage = 20  # Replace with your preferred storage size in GB

# Function to check if the RDS instance exists
def check_db_instance_exists(db_instance_identifier):
    try:
        response = client.describe_db_instances(DBInstanceIdentifier=db_instance_identifier)
        if response['DBInstances']:
            return True
    except ClientError as e:
        if 'DBInstanceNotFound' in str(e):
            return False
        else:
            print("Error describing DB instance:", e)
            raise e

# Function to create the RDS instance
def create_db_instance():
    try:
        response = client.create_db_instance(
            DBInstanceIdentifier=db_instance_identifier,
            DBName=db_name,
            AllocatedStorage=allocated_storage,
            DBInstanceClass=db_instance_class,
            Engine=engine,
            MasterUsername=db_master_username,
            MasterUserPassword=db_master_password,
            PubliclyAccessible=True,  # Adjust based on your security needs
            Tags=[
                {
                    'Key': 'Name',
                    'Value': 'YourRDSInstance'
                },
            ]
        )
        print(f"Creating RDS instance '{db_instance_identifier}'...")
        print("Response:", response)
    except Exception as e:
        print("Error creating RDS instance:", e)

# Check if the RDS instance already exists
if not check_db_instance_exists(db_instance_identifier):
    create_db_instance()
    print(f"RDS instance '{db_instance_identifier}' does not exist. Creating a new instance.")
else:
    print(f"RDS instance '{db_instance_identifier}' already exists.")

# Wait for the DB instance to be available
if not check_db_instance_exists(db_instance_identifier):
    waiter = client.get_waiter('db_instance_available')
    print(f"Waiting for RDS instance '{db_instance_identifier}' to be available...")
    waiter.wait(DBInstanceIdentifier=db_instance_identifier)
    print(f"RDS instance '{db_instance_identifier}' is now available.")

RDS instance 'customersdata0597' already exists.


In [15]:
!pip install mysql-connector-python

Defaulting to user installation because normal site-packages is not writeable



[notice] A new release of pip is available: 23.3.1 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [22]:
import mysql.connector
from mysql.connector import Error

# Replace with your RDS endpoint, database name, username, and password
endpoint = 'customersdata0597.crwyyk44cd2p.us-east-1.rds.amazonaws.com'
db_name = 'customersdata0597'
db_user = 'rutuja'
db_password = 'rutuja051997'

# Initialize connection and cursor to None
conn = None
cursor = None

# Establish connection
try:
    conn = mysql.connector.connect(
        host=endpoint,
        database=db_name,
        user=db_user,
        password=db_password
    )
    if conn.is_connected():
        print("Connected to MySQL database")
except Error as e:
    print(f"Error connecting to MySQL database: {e}")

# Create table for aggregated transaction data
if conn and conn.is_connected():
    try:
        cursor = conn.cursor()

        # Define the SQL query to create the table
        create_table_query = """
        CREATE TABLE IF NOT EXISTS aggregated_transactions (
            customer_id INT NOT NULL,
            debit_card_number VARCHAR(16) NOT NULL,
            bank_name VARCHAR(100),
            total_amount_spend DECIMAL(10, 2),
            PRIMARY KEY (customer_id)
     
        )
        """

        # Execute the query
        cursor.execute(create_table_query)
        conn.commit()

        print("Table 'aggregated_transactions' created successfully")
    except Error as e:
        print(f"Error creating table: {e}")
    finally:
        # Ensure the cursor and connection are closed properly
        if cursor:
            cursor.close()
        if conn:
            conn.close()
else:
    print("Connection to the database was not established.")

Connected to MySQL database
Table 'aggregated_transactions' created successfully


In [23]:
local_script_path = 'data_aggregation_script.py'  # Path to your local script
s3_bucket_name = 'transactionsdata0597'
s3_key = 'data_aggregation_script.py'  # Key or filename in S3

# Upload the file
try:
    s3_client.upload_file(local_script_path, s3_bucket_name, s3_key)
    print(f"Uploaded {local_script_path} to bucket {s3_bucket_name} as {s3_key}")
except Exception as e:
    print(f"Error uploading file: {str(e)}")

Uploaded data_aggregation_script.py to bucket transactionsdata0597 as data_aggregation_script.py


In [30]:
# Initialize Glue client
glue_client = boto3.client('glue',aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)

# Specify your Glue script location in S3
s3_bucket = 'transactionsdata0597'
script_name = 'data_aggregation_script.py'
s3_script_path = f's3://{s3_bucket}/{script_name}'

# Define job parameters
job_name = 'DataAggregationJob1'
role_name = 'arn:aws:iam::025066257198:role/glue-role-for-etl'  # Update with your Glue service role ARN

# Create a Glue job
try:
    response = glue_client.create_job(
        Name=job_name,
        Role=role_name,
        Command={
            'Name': 'glueetl',
            'ScriptLocation': s3_script_path,
            'PythonVersion': '3'
        },
        DefaultArguments={
            '--job-bookmark-option': 'job-bookmark-enable'
        },
        ExecutionProperty={
            'MaxConcurrentRuns': 1
        },
        MaxRetries=0,
        Timeout=2880,
        GlueVersion='2.0',  # Replace with appropriate Glue version
        Tags={
            'Environment': 'Production'
        }
    )
    print(f"Glue job '{job_name}' created successfully.")
except Exception as e:
    print(f"Error creating Glue job: {str(e)}")

# Add a trigger if needed (e.g., schedule daily execution)
try:
    response = glue_client.create_trigger(
        Name='DailyTrigger1',
        Type='SCHEDULED',
        Schedule='cron(0 0 * * ? *)',  # Example: runs daily at midnight UTC
        Actions=[
            {
                'JobName': job_name
            }
        ]
    )
    print("Trigger 'DailyTrigger' created successfully.")
except Exception as e:
    print(f"Error creating trigger: {str(e)}")

# Start the Glue job (optional, if you want to run it immediately)
try:
    response = glue_client.start_job_run(
        JobName=job_name
    )
    print(f"Started job run for '{job_name}'")
except Exception as e:
    print(f"Error starting job run: {str(e)}")

Glue job 'DataAggregationJob1' created successfully.
Trigger 'DailyTrigger' created successfully.
Error starting job run: An error occurred (ConcurrentRunsExceededException) when calling the StartJobRun operation: Concurrent runs exceeded for DataAggregationJob1
