# User-info Service API tests

This notebook contains tests for the User-Info Service API endpoints defined in `user_info/service.py`.

In [1]:
import sys
import os
import uuid
import requests
import json
import time
from dotenv import load_dotenv
from pathlib import Path

# Dynamically find the project root
project_root = Path(os.getcwd()).resolve().parents[1]
dotenv_path = project_root / ".env"  # Path to .env

# Load environment variables from .env file
load_dotenv(dotenv_path)

# Set the base URL for the API
BASE_URL = "http://localhost:" + os.getenv("USER_MICROSERVICE_PORT")

## Helper Functions

In [2]:
def print_response(response):
    """Print the response status code and content"""
    print(f"Status Code: {response.status_code}")
    try:
        print(f"Response: {json.dumps(response.json(), indent=2)}")
    except:
        print(f"Response: {response.text}")

In [3]:
from pydantic import BaseModel
from typing import Tuple, List

class UserMovieRatings(BaseModel):
    ratings: List[Tuple[str, str, float, int]]

class UserPreferences(BaseModel):
    preferences: List[Tuple[str, str, int]]

## Test user creation and retrieval

In [4]:
def test_create_user():
    """Test creating a new user"""
    user_id = '7772d881-3a14-4741-a102-3d57398e7e9d' # Use the predefined user ID
    print(f"Creating user with ID: {user_id}")

    user_data = {
        "user_id": user_id,
        "first_name": "Test",
        "last_name": "User",
        "email": f"test{user_id}@example.com"
    }
    
    response = requests.post(f"{BASE_URL}/users", json=user_data)
    print_response(response)
    
    assert response.status_code == 200, "Failed to create user"
    assert "user_id" in response.json(), "Response doesn't contain user_id"
    
    return response.json()["user_id"]

# Execute test
user_id = test_create_user()
print(f"Created user with ID: {user_id}")

Creating user with ID: 7772d881-3a14-4741-a102-3d57398e7e9d
Status Code: 200
Response: {
  "user_id": "7772d881-3a14-4741-a102-3d57398e7e9d"
}
Created user with ID: 7772d881-3a14-4741-a102-3d57398e7e9d


In [5]:
def test_user_exists(user_id):
    """Test checking if a user exists"""
    response = requests.get(f"{BASE_URL}/users/{user_id}/exists")
    print_response(response)
    
    assert response.status_code == 200, "Failed to check if user exists"
    assert "exists" in response.json(), "Response doesn't contain 'exists' field"
    assert response.json()["exists"] is True, "User should exist but 'exists' is False"

# Execute test
test_user_exists(user_id)

Status Code: 200
Response: {
  "exists": true
}


In [6]:
def test_non_existent_user_exists():
    """Test checking if a non-existent user exists"""
    fake_user_id = str(uuid.uuid4())
    response = requests.get(f"{BASE_URL}/users/{fake_user_id}/exists")
    print_response(response)
    
    assert response.status_code == 200, "Failed to check if user exists"
    assert "exists" in response.json(), "Response doesn't contain 'exists' field"
    assert response.json()["exists"] is False, "User should not exist but 'exists' is True"

# Execute test
test_non_existent_user_exists()

Status Code: 200
Response: {
  "exists": false
}


In [7]:
def test_get_user(user_id):
    """Test retrieving user information"""
    response = requests.get(f"{BASE_URL}/users/{user_id}")
    print_response(response)
    
    assert response.status_code == 200, "Failed to retrieve user"
    user_data = response.json()
    assert user_data["user_id"] == user_id, "User ID mismatch"
    assert "first_name" in user_data, "Missing first_name field"
    assert "last_name" in user_data, "Missing last_name field"
    assert "email" in user_data, "Missing email field"
    
    return user_data

# Execute test
user_data = test_get_user(user_id)

Status Code: 200
Response: {
  "user_id": "7772d881-3a14-4741-a102-3d57398e7e9d",
  "first_name": "Test",
  "last_name": "User",
  "email": "testbb690173-51d0-4ec5-abbd-07b707582ef3@example.com",
  "timestamp": 1745449636
}


In [8]:
def test_get_nonexistent_user():
    """Test retrieving a user that doesn't exist"""
    fake_user_id = str(uuid.uuid4())
    response = requests.get(f"{BASE_URL}/users/{fake_user_id}")
    print_response(response)
    
    assert response.status_code == 404, "Expected 404 error for nonexistent user"

# Execute test
test_get_nonexistent_user()

Status Code: 404
Response: {
  "detail": "User not found"
}


In [9]:
def test_list_users():
    """Test retrieving a list of all users"""
    response = requests.get(f"{BASE_URL}/users")
    print_response(response)
    
    assert response.status_code == 200, "Failed to retrieve users list"
    assert "user_ids" in response.json(), "Response doesn't contain user_ids list"
    
    # Check if our new user is in the list
    assert user_id in response.json()["user_ids"], "Newly created user not in users list"

# Execute test
test_list_users()

Status Code: 200
Response: {
  "user_ids": [
    "455465e4-2e80-468d-abd4-e9a75678f81c",
    "7772d881-3a14-4741-a102-3d57398e7e9d",
    "f5743244-f522-457e-a3d4-91130785f3f1",
    "9a9316a3-5ffb-498e-b761-a7b93c3847b5",
    "d6b32a84-aff1-4f06-85b0-396d4c3432d1",
    "ff5382f6-6605-4f5e-aaa5-555a0153d63a"
  ]
}


## Test user-movie ratings storage and retrieval

In [10]:
def test_add_movie_rating(user_id):
    """Test adding a movie rating for a user"""
    rating_data = {
        "user_id": user_id,
        "movie_title": "Titanic",
        "rating": 4.5
    }
    
    response = requests.post(f"{BASE_URL}/users/{user_id}/ratings", json=rating_data)
    print_response(response)
    
    assert response.status_code == 200, "Failed to add movie rating"

# Execute test
test_add_movie_rating(user_id)

Status Code: 200
Response: {
  "message": "Movie rating stored successfully",
  "movie_id": "597"
}


In [11]:
def test_get_movie_ratings(user_id):
    """Test retrieving movie ratings for a user"""
    response = requests.get(f"{BASE_URL}/users/{user_id}/ratings")
    print_response(response)

    parsed_response = UserMovieRatings(**response.json())
    print("\nParsed Response:")
    print(parsed_response)
    
    assert response.status_code == 200, "Failed to retrieve movie ratings"
    assert "ratings" in response.json(), "Response doesn't contain ratings list"
    
    # Check if our rating is in the list (by user_id and rating value)
    ratings = response.json()["ratings"]
    assert len(ratings) > 0, "No ratings found for user"
    
    found = False
    for rating in ratings:
        user_id_from_db = rating[0]
        rating_value_from_db = rating[2]
        if user_id_from_db == user_id and rating_value_from_db == 4.5:
            found = True
    
    assert found, "Added rating not found in user's ratings"

# Execute test
test_get_movie_ratings(user_id)

Status Code: 200
Response: {
  "ratings": [
    [
      "7772d881-3a14-4741-a102-3d57398e7e9d",
      "597",
      4.5,
      1745490234
    ]
  ]
}

Parsed Response:
ratings=[('7772d881-3a14-4741-a102-3d57398e7e9d', '597', 4.5, 1745490234)]


## Test user preferences storage and retrieval

In [12]:
def test_add_user_preference(user_id):
    """Test adding a preference for a user"""
    preference_data = {
        "user_id": user_id,
        "preference": "Action"
    }
    
    response = requests.post(f"{BASE_URL}/users/{user_id}/preferences", json=preference_data)
    print_response(response)
    
    assert response.status_code == 200, "Failed to add user preference"
    
    # Add a second preference
    preference_data["preference"] = "Comedy"
    response = requests.post(f"{BASE_URL}/users/{user_id}/preferences", json=preference_data)
    assert response.status_code == 200, "Failed to add second user preference"

# Execute test
test_add_user_preference(user_id)

Status Code: 200
Response: {
  "message": "User preference stored successfully"
}


In [13]:
def test_get_user_preferences(user_id):
    """Test retrieving preferences for a user"""
    response = requests.get(f"{BASE_URL}/users/{user_id}/preferences")
    print_response(response)

    parsed_response = UserPreferences(**response.json())
    print("\nParsed Response:")
    print(parsed_response)
    
    assert response.status_code == 200, "Failed to retrieve user preferences"
    assert "preferences" in response.json(), "Response doesn't contain preferences list"
    
    # Check if our preferences are in the list
    preferences = response.json()["preferences"]
    preference_values = [p[1] for p in preferences]
    
    assert "Action" in preference_values, "Action preference not found"
    assert "Comedy" in preference_values, "Comedy preference not found"

# Execute test
test_get_user_preferences(user_id)

Status Code: 200
Response: {
  "preferences": [
    [
      "7772d881-3a14-4741-a102-3d57398e7e9d",
      "Action",
      1745490234
    ],
    [
      "7772d881-3a14-4741-a102-3d57398e7e9d",
      "Comedy",
      1745490234
    ]
  ]
}

Parsed Response:
preferences=[('7772d881-3a14-4741-a102-3d57398e7e9d', 'Action', 1745490234), ('7772d881-3a14-4741-a102-3d57398e7e9d', 'Comedy', 1745490234)]
