In [None]:
from app.config import config


config

In [2]:
from app.utils.s3_storage import S3FileDB


s3 = S3FileDB()

In [3]:
from io import BytesIO
import mimetypes
from pathlib import Path

def load_file_as_bytesio(file_path: str) -> tuple[BytesIO, str, str]:
    path = Path(file_path)

    # Leer el archivo en binario
    with open(path, "rb") as f:
        file_bytes = BytesIO(f.read())

    file_name = path.name

    # Detectar el content-type (image/png, application/pdf, etc.)
    content_type, _ = mimetypes.guess_type(file_name)
    content_type = content_type or "application/octet-stream"

    return file_bytes, file_name, content_type

file_bytes, file_name, content_type = load_file_as_bytesio("/Users/manuelalejandroquesada/PERSONAL/UMA MASTER/UMA-DOCENCIA/Desarrollo de Aplicaciones en la Nube/practica/backend/ChatGPT Image Dec 30, 2025, 01_28_09 AM.png")


In [4]:
await s3.upload_file(file_bytes, file_name, content_type)


'https://urbanspot-bucket-2025.s3.eu-central-1.amazonaws.com/20251230_002924_e880896d.png'

## MongoDB test

In [1]:
from app.utils.mongodb_storage import MongoDBDataDB


mongo =MongoDBDataDB()

In [2]:
# Conectar a MongoDB
await mongo.connect()
print("✅ Conexión a MongoDB exitosa!")

✅ Conexión a MongoDB exitosa!


In [3]:
# Prueba 1: Crear un documento de prueba
test_collection = "test_collection"
test_document = {
    "name": "Test Document",
    "description": "Este es un documento de prueba",
    "value": 42,
    "tags": ["test", "mongodb", "python"]
}

created_doc = await mongo.create(test_collection, test_document)
print("✅ Documento creado exitosamente!")
print(f"ID del documento: {created_doc['_id']}")
print(f"Documento completo: {created_doc}")


✅ Documento creado exitosamente!
ID del documento: 6951d4c5312661a79a53f674
Documento completo: {'_id': '6951d4c5312661a79a53f674', 'name': 'Test Document', 'description': 'Este es un documento de prueba', 'value': 42, 'tags': ['test', 'mongodb', 'python'], 'created_at': datetime.datetime(2025, 12, 29, 1, 9, 25, 164000), 'updated_at': datetime.datetime(2025, 12, 29, 1, 9, 25, 164000)}


In [4]:
# Prueba 2: Leer el documento creado (read_one)
document_id = created_doc['_id']
found_doc = await mongo.read_one(test_collection, {"_id": document_id})

if found_doc:
    print("✅ Documento encontrado!")
    print(f"Documento: {found_doc}")
else:
    print("❌ Documento no encontrado")


✅ Documento encontrado!
Documento: {'_id': '6951d4c5312661a79a53f674', 'name': 'Test Document', 'description': 'Este es un documento de prueba', 'value': 42, 'tags': ['test', 'mongodb', 'python'], 'created_at': datetime.datetime(2025, 12, 29, 1, 9, 25, 164000), 'updated_at': datetime.datetime(2025, 12, 29, 1, 9, 25, 164000)}


In [5]:
# Prueba 3: Crear más documentos para probar read_many
for i in range(3):
    doc = {
        "name": f"Test Document {i+1}",
        "number": i+1,
        "category": "test"
    }
    await mongo.create(test_collection, doc)
print("✅ Se crearon 3 documentos adicionales")


✅ Se crearon 3 documentos adicionales


In [14]:
# Prueba 4: Leer múltiples documentos (read_many)
all_docs = await mongo.read_many(
    test_collection,
    filter_dict={"category": "test"},
    limit=10
)
print(f"✅ Se encontraron {len(all_docs)} documentos con category='test'")
for doc in all_docs:
    print(f"  - {doc.get('name')} (ID: {doc.get('_id')})")


✅ Se encontraron 0 documentos con category='test'


In [7]:
# Prueba 5: Actualizar un documento (update_one)
update_result = await mongo.update_one(
    test_collection,
    {"_id": document_id},
    {"$set": {"name": "Documento Actualizado", "value": 100}}
)

if update_result:
    print("✅ Documento actualizado exitosamente!")
    # Verificar la actualización
    updated_doc = await mongo.read_one(test_collection, {"_id": document_id})
    print(f"Documento actualizado: {updated_doc}")
else:
    print("❌ No se pudo actualizar el documento")


✅ Documento actualizado exitosamente!
Documento actualizado: {'_id': '6951d4c5312661a79a53f674', 'name': 'Documento Actualizado', 'description': 'Este es un documento de prueba', 'value': 100, 'tags': ['test', 'mongodb', 'python'], 'created_at': datetime.datetime(2025, 12, 29, 1, 9, 25, 164000), 'updated_at': datetime.datetime(2025, 12, 29, 1, 9, 58, 818000)}


In [11]:
# Prueba 6: Eliminar un documento (delete_one)
delete_result = await mongo.delete_one(
    test_collection,
    {"_id": document_id}
)

if delete_result:
    print("✅ Documento eliminado exitosamente!")
    # Verificar que fue eliminado
    deleted_check = await mongo.read_one(test_collection, {"_id": document_id})
    if deleted_check is None:
        print("✅ Confirmado: El documento ya no existe en la base de datos")
    else:
        print("❌ Error: El documento aún existe")
else:
    print("❌ No se pudo eliminar el documento")


❌ No se pudo eliminar el documento


In [13]:
# Prueba 7: Limpiar documentos de prueba (opcional)
# Descomenta las siguientes líneas si quieres eliminar todos los documentos de prueba
all_test_docs = await mongo.read_many(test_collection, filter_dict={"category": "test"}, limit=100)
for doc in all_test_docs:
    await mongo.delete_one(test_collection, {"_id": doc["_id"]})
print("✅ Documentos de prueba eliminados")


✅ Documentos de prueba eliminados


In [12]:
# Prueba 8: Probar agregación (aggregate)
# Ejemplo: Contar documentos por categoría
pipeline = [
    {"$match": {"category": {"$exists": True}}},
    {"$group": {"_id": "$category", "count": {"$sum": 1}}}
]

aggregation_result = await mongo.aggregate(test_collection, pipeline)
print("✅ Resultado de agregación:")
for result in aggregation_result:
    print(f"  - {result['_id']}: {result['count']} documentos")


✅ Resultado de agregación:
  - test: 3 documentos


In [15]:
# Desconectar de MongoDB
await mongo.disconnect()
print("✅ Desconectado de MongoDB exitosamente")


✅ Desconectado de MongoDB exitosamente
