<a href="https://colab.research.google.com/github/ankit-rathi/AR-Talks/blob/master/tmp_notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Install boto3 library
!pip install boto3



In [2]:
# Mount google drive
from google.colab import drive
drive.mount('/content/drive')
import os

import pandas as pd

project_path = '/content/drive/My Drive/Personal'
os.chdir(project_path)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import boto3

# S3 and Kafka configurations
# Load AWS credentials from CSV
aws_keys_df = pd.read_csv('aws-rootkey.csv')

S3_BUCKET_NAME = 'my-bucket-ar'
AWS_ACCESS_KEY_ID = aws_keys_df['Access_key_ID'][0]
AWS_SECRET_ACCESS_KEY = aws_keys_df['Secret_access_key'][0]
REGION_NAME = aws_keys_df['Region'][0]

# Initialize S3 client
s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)


# Step 1: Create S3 Bucket
def create_s3_bucket(bucket_name):
    try:
        response = s3_client.create_bucket(
            Bucket=bucket_name,
            CreateBucketConfiguration={'LocationConstraint': REGION_NAME}
        )
        print(f"Bucket '{bucket_name}' created successfully.")
    except Exception as e:
        print(f"Error creating bucket: {str(e)}")

create_s3_bucket(S3_BUCKET_NAME)

Error creating bucket: An error occurred (BucketAlreadyOwnedByYou) when calling the CreateBucket operation: Your previous request to create the named bucket succeeded and you already own it.


In [4]:
import json
import os
import zipfile


# 1. Read request body JSON for zip file names
def parse_request_body(request_body):
    try:
        request_data = json.loads(request_body)
        zip_files = request_data.get("zip_files", [])
        request_id = request_data.get("request_id", "default_request")
        return zip_files, request_id
    except json.JSONDecodeError as e:
        raise ValueError(f"Invalid JSON format: {e}")

# 2. Copy zip files from S3 bucket
def download_zip_files_from_s3(zip_files):
    local_zip_files = []
    for zip_file in zip_files:
        local_file_path = os.path.join("/tmp", zip_file)
        s3_client.download_file(S3_BUCKET_NAME, zip_file, local_file_path)
        local_zip_files.append(local_file_path)
        print(f"Downloaded {zip_file} to {local_file_path}")
    return local_zip_files

# 3. Unzip the files and collect binary content
def unzip_files(zip_files, request_id):
    unzipped_files = []
    for zip_file_path in zip_files:
        with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
            extract_path = os.path.dirname(zip_file_path)
            zip_ref.extractall(extract_path)
            for file_name in zip_ref.namelist():
                # Read file in binary mode
                with open(os.path.join(extract_path, file_name), 'rb') as f:
                    binary_file_content = f.read()
                unzipped_files.append((file_name, zip_file_path, request_id, binary_file_content))
        print(f"Unzipped {zip_file_path}")
    return unzipped_files

# 4. Rename unzipped files and collect details
def rename_unzipped_files(unzipped_files):
    renamed_files = []
    for file_name, zip_file_path, request_id, binary_file_content in unzipped_files:
        original_file_path = os.path.join(os.path.dirname(zip_file_path), file_name)
        new_file_name = f"{request_id}_{os.path.basename(zip_file_path)}_{file_name}".replace(" ", "_").replace(".zip", "")
        new_file_type = new_file_name.split('.')[-1]
        new_file_path = os.path.join(os.path.dirname(zip_file_path), new_file_name)
        zip_file_name = zip_file_path.split('/')[-1]
        os.rename(original_file_path, new_file_path)
        file_message_dict = {"old_file_name":file_name, "parent_zipfilename": zip_file_name}
        renamed_files.append((new_file_name, new_file_type, file_message_dict, binary_file_content))
        print(f"Renamed {original_file_path} to {new_file_name}")
    return renamed_files

# 5. Populate dataframe with file details and binary content
def create_output_dataframe(renamed_files):
    # Create DataFrame
    output_df = pd.DataFrame(renamed_files, columns=["file_name", "file_type", "file_message_dict", "binary_file_content"])
    return output_df


# 6. Delete zip files from S3
def delete_zip_files_from_s3(zip_files):
    try:
        for zip_file in zip_files:
            s3_client.delete_object(Bucket=S3_BUCKET_NAME, Key=zip_file)
            print(f"Deleted {zip_file} from S3 bucket {S3_BUCKET_NAME}")
    except Exception as e:
        print(f"Error deleting files from S3: {e}")

# Main workflow
def process_zip_files_workflow(request_body):
    try:
        # Step 1: Parse request body
        zip_files, request_id = parse_request_body(request_body)
        zip_files = zip_files.split(',')

        # Step 2: Download zip files from S3
        local_zip_files = download_zip_files_from_s3(zip_files)

        # Step 3: Unzip the files
        unzipped_files = unzip_files(local_zip_files, request_id)

        # Step 4: Rename the unzipped files
        renamed_files = rename_unzipped_files(unzipped_files)

        # Step 5: Create output dataframe with binary content and file details
        output_df = create_output_dataframe(renamed_files)
        print(output_df)

        # Step 6: Send file names to Kafka
        # send_file_names_to_kafka(renamed_files)

        # Step 7: Delete zip files from S3
        #delete_zip_files_from_s3(zip_files)
        new_file_names = ','.join(output_df['file_name'].tolist())
        print(new_file_names)

        print("Process completed successfully!")

        return output_df

    except Exception as e:
        print(f"Error occurred during process: {e}")



In [5]:
request_body = '{"request_id":"request_123","zip_files": "file1.zip,file2.zip"}'
output_df = process_zip_files_workflow(request_body)
output_df

Downloaded file1.zip to /tmp/file1.zip
Downloaded file2.zip to /tmp/file2.zip
Unzipped /tmp/file1.zip
Unzipped /tmp/file2.zip
Renamed /tmp/pqr.txt to request_123_file1_pqr.txt
Renamed /tmp/abc.wav to request_123_file1_abc.wav
Renamed /tmp/xyz.mp3 to request_123_file2_xyz.mp3
                   file_name file_type  \
0  request_123_file1_pqr.txt       txt   
1  request_123_file1_abc.wav       wav   
2  request_123_file2_xyz.mp3       mp3   

                                   file_message_dict binary_file_content  
0  {'old_file_name': 'pqr.txt', 'parent_zipfilena...                 b''  
1  {'old_file_name': 'abc.wav', 'parent_zipfilena...                 b''  
2  {'old_file_name': 'xyz.mp3', 'parent_zipfilena...                 b''  
request_123_file1_pqr.txt,request_123_file1_abc.wav,request_123_file2_xyz.mp3
Process completed successfully!


Unnamed: 0,file_name,file_type,file_message_dict,binary_file_content
0,request_123_file1_pqr.txt,txt,"{'old_file_name': 'pqr.txt', 'parent_zipfilena...",b''
1,request_123_file1_abc.wav,wav,"{'old_file_name': 'abc.wav', 'parent_zipfilena...",b''
2,request_123_file2_xyz.mp3,mp3,"{'old_file_name': 'xyz.mp3', 'parent_zipfilena...",b''
