In [None]:
import boto3
from botocore.exceptions import ClientError
from datetime import datetime
import re

In [None]:
# Configuration
BUCKET = 'prod-vlc-real-estate-analytics-listings'
SOURCE_PREFIX = ''  # Root level
DEST_PREFIX = 'bronze/idealista/'
REGION = 'eu-central-1'

# Pattern to match listing files: sale_YYYYMMDD_HHMMSS_N.json or rent_YYYYMMDD_HHMMSS_N.json
FILE_PATTERN = r'^(sale|rent)_\d{8}_\d{6}_\d+\.json$'

# Initialize S3 client
s3_client = boto3.client('s3', region_name=REGION)

print(f"‚úÖ Configuration loaded")
print(f"   Bucket: {BUCKET}")
print(f"   Source: root level")
print(f"   Destination: {DEST_PREFIX}")

In [None]:
# List all objects at root level using pagination
def list_root_objects(bucket_name, pattern):
    """List all objects at root level matching the pattern."""
    objects = []
    paginator = s3_client.get_paginator('list_objects_v2')
    
    # List objects at root level (no prefix)
    for page in paginator.paginate(Bucket=bucket_name, Prefix=SOURCE_PREFIX):
        if 'Contents' in page:
            for obj in page['Contents']:
                key = obj['Key']
                # Only include files at root level (no slashes) matching pattern
                if '/' not in key and re.match(pattern, key):
                    objects.append(obj)
    
    return objects

print(f"Listing root-level listing files in {BUCKET}...")
root_objects = list_root_objects(BUCKET, FILE_PATTERN)
print(f"‚úÖ Found {len(root_objects)} listing files at root level")

In [None]:
# Show sample of files to be migrated
if root_objects:
    print("Sample files to be migrated:")
    for obj in root_objects[:10]:
        print(f"  - {obj['Key']} ({obj['Size']} bytes, modified: {obj['LastModified']})")
    
    if len(root_objects) > 10:
        print(f"  ... and {len(root_objects) - 10} more files")
    
    # Summary by operation type
    sale_count = sum(1 for obj in root_objects if obj['Key'].startswith('sale_'))
    rent_count = sum(1 for obj in root_objects if obj['Key'].startswith('rent_'))
    
    print(f"\nSummary:")
    print(f"  - Sale listings: {sale_count} files")
    print(f"  - Rent listings: {rent_count} files")
    print(f"  - Total: {len(root_objects)} files")
else:
    print("‚ÑπÔ∏è  No listing files found at root level. Migration may have already been completed.")

In [None]:
# Copy objects from root to bronze/idealista/ folder
def migrate_to_bronze(bucket_name, objects, dest_prefix):
    """Copy objects from root to bronze layer."""
    copied = 0
    failed = 0
    skipped = 0
    
    for i, obj in enumerate(objects, 1):
        source_key = obj['Key']
        dest_key = f"{dest_prefix}{source_key}"
        
        try:
            # Check if destination already exists
            try:
                s3_client.head_object(Bucket=bucket_name, Key=dest_key)
                print(f"‚è≠Ô∏è  Skipping {source_key} (already exists at destination)")
                skipped += 1
                continue
            except ClientError as e:
                if e.response['Error']['Code'] != '404':
                    raise
            
            # Copy object
            copy_source = {'Bucket': bucket_name, 'Key': source_key}
            s3_client.copy_object(
                CopySource=copy_source,
                Bucket=bucket_name,
                Key=dest_key
            )
            copied += 1
            
            # Progress update every 100 files
            if i % 100 == 0:
                print(f"Progress: {i}/{len(objects)} files processed ({copied} copied, {skipped} skipped, {failed} failed)")
                
        except ClientError as e:
            print(f"‚ùå Error copying {source_key}: {e}")
            failed += 1
    
    return copied, skipped, failed

if root_objects:
    print(f"\nüöÄ Starting migration operation...")
    start_time = datetime.now()
    
    copied_count, skipped_count, failed_count = migrate_to_bronze(BUCKET, root_objects, DEST_PREFIX)
    
    end_time = datetime.now()
    duration = (end_time - start_time).total_seconds()
    
    print(f"\n‚úÖ Migration operation completed!")
    print(f"   - Successfully copied: {copied_count} files")
    print(f"   - Skipped (already exist): {skipped_count} files")
    print(f"   - Failed: {failed_count} files")
    print(f"   - Duration: {duration:.2f} seconds")
else:
    print("‚è≠Ô∏è  No files to migrate")

In [None]:
# Verify: List objects in bronze/idealista/ folder
def list_bronze_objects(bucket_name, prefix):
    """List all objects in bronze layer."""
    objects = []
    paginator = s3_client.get_paginator('list_objects_v2')
    
    for page in paginator.paginate(Bucket=bucket_name, Prefix=prefix):
        if 'Contents' in page:
            objects.extend(page['Contents'])
    
    return objects

print(f"\nüîç Verifying files in {DEST_PREFIX}...")
bronze_objects = list_bronze_objects(BUCKET, DEST_PREFIX)
print(f"‚úÖ Found {bronze_objects} objects in bronze/idealista/ folder")

if root_objects:
    if len(bronze_objects) >= len(root_objects):
        print("‚úÖ Success! All files are in bronze layer.")
    else:
        print(f"‚ö†Ô∏è  Warning: Expected at least {len(root_objects)} files, but found {len(bronze_objects)}")

In [None]:
# Show sample of migrated files
if bronze_objects:
    print("\nSample files in bronze/idealista/ folder:")
    sorted_bronze_objects = sorted(bronze_objects, key=lambda x: x['LastModified'], reverse=True)
    for obj in sorted_bronze_objects[:10]:
        print(f"  - {obj['Key']} (modified: {obj['LastModified']})")
    
    if len(bronze_objects) > 10:
        print(f"  ... and {len(bronze_objects) - 10} more files")

## Optional: Delete Original Root-Level Files

**‚ö†Ô∏è WARNING:** This will permanently delete the original files from the root level. Only run this after verifying the migration was successful!

**Before running:**
1. Verify that all files were copied successfully (check cell above)
2. Optionally download a backup of the bucket
3. Uncomment the code below and run

In [None]:
# UNCOMMENT TO DELETE ORIGINAL FILES
# 
# def delete_root_files(bucket_name, objects):
#     """Delete original files from root level."""
#     deleted = 0
#     failed = 0
#     
#     print(f"\n‚ö†Ô∏è  DELETING {len(objects)} files from root level...")
#     user_confirm = input(f"Type 'DELETE' to confirm deletion of {len(objects)} files: ")
#     
#     if user_confirm != 'DELETE':
#         print("‚ùå Deletion cancelled")
#         return deleted, failed
#     
#     for i, obj in enumerate(objects, 1):
#         key = obj['Key']
#         
#         try:
#             s3_client.delete_object(Bucket=bucket_name, Key=key)
#             deleted += 1
#             
#             if i % 100 == 0:
#                 print(f"Progress: {i}/{len(objects)} files deleted")
#                 
#         except ClientError as e:
#             print(f"‚ùå Error deleting {key}: {e}")
#             failed += 1
#     
#     return deleted, failed
# 
# if root_objects:
#     deleted_count, delete_failed = delete_root_files(BUCKET, root_objects)
#     print(f"\n‚úÖ Deletion completed!")
#     print(f"   - Successfully deleted: {deleted_count} files")
#     print(f"   - Failed: {delete_failed} files")

print("‚è≠Ô∏è  Deletion skipped (code is commented out for safety)")

## Summary

The migration moves all listing files from the root level to the proper Medallion Architecture structure:

```
Before:
‚îú‚îÄ‚îÄ sale_20230409_120044_1.json
‚îú‚îÄ‚îÄ rent_20230409_120044_1.json
‚îî‚îÄ‚îÄ ...

After:
‚îî‚îÄ‚îÄ bronze/
    ‚îî‚îÄ‚îÄ idealista/
        ‚îú‚îÄ‚îÄ sale_20230409_120044_1.json
        ‚îú‚îÄ‚îÄ rent_20230409_120044_1.json
        ‚îî‚îÄ‚îÄ ...
```

**Next steps:**
1. Verify all files are in `bronze/idealista/`
2. Test Lambda function to ensure new files go to correct location
3. Optionally delete original root-level files (after verification)
4. Update any downstream processes to read from bronze layer