In [None]:
import boto3
import gzip
import io
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from botocore.exceptions import ClientError
from botocore.config import Config
from dotenv import load_dotenv
import os
import time
 
# Load environment variables
load_dotenv()

# AWS credentials and bucket information from .env
SOURCE_BUCKET = 'flatfiles'
DEST_BUCKET = os.getenv("DEST_BUCKET")
SOURCE_REGION = os.getenv("SOURCE_REGION")
DEST_REGION = os.getenv("DEST_REGION")
SOURCE_ACCESS_KEY = os.getenv("SOURCE_ACCESS_KEY")
SOURCE_SECRET_KEY = os.getenv("SOURCE_SECRET_KEY")
DEST_ACCESS_KEY = os.getenv("DEST_ACCESS_KEY")
DEST_SECRET_KEY = os.getenv("DEST_SECRET_KEY")

# Initialize S3 clients
session = boto3.Session(
    aws_access_key_id=SOURCE_ACCESS_KEY,
    aws_secret_access_key=SOURCE_SECRET_KEY
)


source_s3_client = session.client(
    's3',
    aws_access_key_id=SOURCE_ACCESS_KEY,
    aws_secret_access_key=SOURCE_SECRET_KEY,
    endpoint_url='https://files.polygon.io',
    config=Config(signature_version='s3v4')
)

destination_s3_client = boto3.client(
    's3',
    aws_access_key_id=DEST_ACCESS_KEY,
    aws_secret_access_key=DEST_SECRET_KEY,
    region_name=DEST_REGION
)

# Logger setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')


def retry_with_backoff(func, retries=3, backoff_factor=2, **kwargs):
    """Retry logic with exponential backoff."""
    logging.info("retry with backoff")
    for attempt in range(1, retries + 1):
        try:
            return func(**kwargs)
        except Exception as e:
            if attempt == retries:
                logging.error(f"Failed after {retries} attempts: {e}")
                raise
            wait_time = backoff_factor ** attempt
            logging.warning(f"Retrying in {wait_time}s due to: {e}")
            time.sleep(wait_time)


def transfer_and_decompress(file_key):
    try:
        logging.info(f"Processing file: {file_key}")

        # Stream download from source bucket
        response = source_s3_client.get_object(Bucket=SOURCE_BUCKET, Key=file_key)
        compressed_stream = response['Body']

        # Decompress in chunks and stream to destination bucket
        with gzip.GzipFile(fileobj=compressed_stream) as gz:
            uncompressed_stream = io.BytesIO()
            while chunk := gz.read(8192):  # Read in 8 KB chunks
                uncompressed_stream.write(chunk)
            uncompressed_stream.seek(0)  # Reset pointer for uploading

            # Extract year and month from the file key
            parts = file_key.split('/')  # Example: 'stocks/trades/2025/01/2025-01-13.csv.gz'
            year = parts[2]  # Third segment
            month = parts[3]  # Fourth segment
            file_name = parts[-1].replace('.gz', '')  # Remove .gz extension

            # Construct the destination key
            destination_key = f"{year}/{month}/{file_name}"
            logging.info(f"wbout to trigger upload attempt, with bucket: {DEST_BUCKET}")
            retry_with_backoff(
                destination_s3_client.upload_fileobj,
                retries=3,
                backoff_factor=2,
                Fileobj=uncompressed_stream,
                Bucket=DEST_BUCKET,
                Key=destination_key
            )
            logging.info(f"Successfully processed and uploaded: {file_key}")

    except Exception as e:
        logging.error(f"Error processing {file_key}: {e}")


def process_files_concurrently(files):
    """Process files in parallel."""
    with ThreadPoolExecutor(max_workers=1) as executor:
        futures = {executor.submit(transfer_and_decompress, file): file for file in files}
        for future in as_completed(futures):
            try:
                future.result()
            except Exception:
                logging.error(f"Failed processing file: {futures[future]}")


def main(year, month, day=None):
    # If day is provided, directly construct the file name and process that file
    if day:
        file_name = f"us_stocks_sip/trades_v1/{year}/{month:02d}/{year}-{month:02d}-{day:02d}.csv.gz"
        logging.info(f"Processing file: {file_name}")
        try:
            transfer_and_decompress(file_name)
            logging.info("File processed successfully.")
        except Exception as e:
            logging.error(f"Failed to process file: {file_name}, Error: {e}")
        return

*
    prefix = f"us_stocks_sip/trades_v1/{year}/{month:02d}/"
    logging.info(f"Listing files under prefix: {prefix}")
    files = []
    try:
        paginator = source_s3_client.get_paginator('list_objects_v2')
        for page in paginator.paginate(Bucket=SOURCE_BUCKET, Prefix=prefix):
            if 'Contents' in page:
                files.extend(obj['Key'] for obj in page['Contents'])
    except ClientError as e:
        logging.error(f"Failed to list objects: {e}")
        return

    if files:
        logging.info(f"Found {len(files)} files to process.")
        process_files_concurrently(files)
        logging.info("All files processed successfully.")
    else:
        logging.info("No files found for the specified period.")



if __name__ == "__main__":
    year = int(input("Enter year: "))
    month = int(input("Enter month: "))
    day = input("Enter day (optional): ")
    day = int(day) if day else None
    main(year, month, day)


Enter year:  2024
Enter month:  10
Enter day (optional):  


2025-01-15 20:15:36,522 [INFO] Listing files under prefix: us_stocks_sip/trades_v1/2024/10/
2025-01-15 20:15:36,691 [INFO] Found 23 files to process.
2025-01-15 20:15:36,692 [INFO] Processing file: us_stocks_sip/trades_v1/2024/10/2024-10-01.csv.gz
2025-01-15 20:16:26,981 [INFO] wbout to trigger upload attempt, with bucket: ml-3bean-ts-raw-data
2025-01-15 20:16:26,982 [INFO] retry with backoff
2025-01-15 20:17:07,102 [INFO] Successfully processed and uploaded: us_stocks_sip/trades_v1/2024/10/2024-10-01.csv.gz
2025-01-15 20:17:07,414 [INFO] Processing file: us_stocks_sip/trades_v1/2024/10/2024-10-02.csv.gz
2025-01-15 20:17:51,577 [INFO] wbout to trigger upload attempt, with bucket: ml-3bean-ts-raw-data
2025-01-15 20:17:51,578 [INFO] retry with backoff
2025-01-15 20:18:26,375 [INFO] Successfully processed and uploaded: us_stocks_sip/trades_v1/2024/10/2024-10-02.csv.gz
2025-01-15 20:18:26,654 [INFO] Processing file: us_stocks_sip/trades_v1/2024/10/2024-10-03.csv.gz
2025-01-15 20:19:08,123 