In [4]:
import requests

userServiceURL = "http://localhost:8080"
marketplaceServiceURL = "http://localhost:8081"
walletServiceURL = "http://localhost:8082"

def main():

    delete_users()
    name = "John Doe"
    email = "johndoe@mail.com"
    userId = 101
    print(f"=> create_user({name}, {email})")
    response_create_user = create_user(userId,name, email)
    print(f"<= create_user() response: {response_create_user.json()}")

    if (test_create_user(name, email, response_create_user)):
        print("create_user() Passed\n")
    
    new_user = response_create_user.json()
    user_id = new_user['id']

    print(f"=> get_wallet({user_id})")
    create_wallet(user_id)
    response_get_wallet = get_wallet(user_id)
    print(f"<= get_wallet() response: {response_get_wallet.json()}")
    
    if (test_get_wallet(user_id, response_get_wallet)):
        print("get_wallet() Passed\n")

    wallet = response_get_wallet.json()
    old_balance = wallet['balance']
    action = "credit"
    amount = 100
    
    print(f"=> update_wallet({user_id}, {action}, {amount})")
    response_update_wallet = update_wallet(user_id, action, amount)
    print(f"<= update_wallet() response: {response_update_wallet.json()}")

    if (test_update_wallet(user_id, action, amount, old_balance, response_update_wallet)):
        print("update_wallet() Passed.")
    
def create_user(userid,name, email):
    new_user = {"id":userid,"name": name, "email": email}
    response = requests.post(userServiceURL + "/users", json=new_user)
    return response

def test_create_user(name, email, response):
    new_user = response.json()

    # checking if 'id' field exists in the response
    if ('id' not in new_user):
        print("create_user() Failed: 'id' field not present in response.")
        return False
    # checking the type of 'id' as int
    else:
        if (not isinstance(new_user['id'], int)):
            print("create_user() Failed: 'id' field not an integer value.")
            return False

    # checking if 'name' field exists in the response
    if ('name' not in new_user):
        print("create_user() Failed: 'name' field not present in response.")
        return False


    # checking if 'email' field exists in the response
    if ('email' not in new_user):
        print("create_user() Failed: 'email' field not present in response.")
        return False

    # checking the number of field returned in the response
    if (len(new_user) != 4):
        print("create_user() Failed: Incorrect response format.")
        return False

    # checking the status code
    if (response.status_code != 201):
        print(f"create_user() Failed: HTTP 201 expected, got {response.status_code}")
        return False

    return True

def delete_users():
    requests.delete(userServiceURL+f"/users") 

def create_wallet(user_id):
    requests.put(walletServiceURL+f"/wallets/{user_id}", json={"action":"credit", "amount":0})

def get_wallet(user_id):
    response = requests.get(walletServiceURL + f"/wallets/{user_id}")
    return response

def test_get_wallet(user_id, response):
    
    user_res = requests.get(userServiceURL + f"/users/{user_id}")

    # checking for the correct status code
    if(user_res.status_code != 404 and response.status_code == 404):
	    return False

    payload = response.json()

    # checking if 'user_id' field exists in response
    if ('user_id' not in payload):
        print("get_wallet() Failed: 'user_id' field not present in response.")
        return False

    # checking if 'user_id' field is int in response
    if (not isinstance(payload['user_id'], int)):
        print("update_wallet() Failed: 'user_id' field not an integer value.")
        return False

    if ('balance' not in payload):
        print("get_wallet() Failed: 'balance' field not present in response.")
        return False
    
    if (not isinstance(payload['balance'], int)):
        print("get_wallet() Failed: 'balance' field not an integer value.")
        return False

    if (len(payload) != 2):
        print("get_wallet() Failed: Incorrect response format.")
        return False

    if (response.status_code != 200):
        print(f"get_wallet() Failed: HTTP 200 expected, got {response.status_code}")
        return False

    return True
    
def update_wallet(user_id, action, amount):
    response = requests.put(walletServiceURL + f"/wallets/{user_id}", json={"action":action, "amount":amount})
    return response

def test_update_wallet(user_id, action, amount, old_balance, response):
    # checking if negative balance is allowed or not
    if (action == 'debit' and old_balance < amount and response.status_code != 400):
        print(f"get_wallet() Failed: insufficient balance, expected HTTP 400, got {response.status_code}.")
        return False

    payload = response.json()

    if ('user_id' not in payload):
        print("update_wallet() Failed: 'user_id' field not present in response.")
        return False

    if (not isinstance(payload['user_id'], int)):
        print("update_wallet() Failed: 'user_id' field not an integer value.")
        return False
        
    if ('balance' not in payload):
        print("get_wallet() Failed: 'balance' field not present in response.")
        return False
    
    if (not isinstance(payload['balance'], int)):
        print("get_wallet() Failed: 'balance' field not an integer value.")
        return False

    if (len(payload) != 2):
        print("update_wallet() Failed: Incorrect response format.")
        return False
        
    if (action == 'credit' and payload['balance'] != old_balance + amount):
        print(f"get_wallet() Failed: 'balance' field incorrectly updated, expected {old_balance + amount}, got {payload['balance']}.")
        return False

    if (action == 'debit' and payload['balance'] != old_balance - amount):
        print(f"get_wallet() Failed: 'balance' field incorrectly updated, expected {old_balance - amount}, got {payload['balance']}.")
        return False
        
    if (response.status_code != 200):
        print(f"update_wallet() Failed: HTTP 200 expected, got {response.status_code}")
        return False

    return True

if __name__ == "__main__":
    main()

=> create_user(John Doe, johndoe@mail.com)
<= create_user() response: {'id': 101, 'name': 'John Doe', 'email': 'johndoe@mail.com', 'discount_availed': False}
create_user() Passed

=> get_wallet(101)
<= get_wallet() response: {'user_id': 101, 'balance': 0}
get_wallet() Passed

=> update_wallet(101, credit, 100)
<= update_wallet() response: {'user_id': 101, 'balance': 100}
update_wallet() Passed.


In [11]:
import requests
import sys
import traceback

userServiceURL = "http://localhost:8080"
marketplaceServiceURL = "http://localhost:8081"
walletServiceURL = "http://localhost:8082"

def main():
    name = "John Doe"
    email = "johndoe@mail.com"
    userId = 101
    productId = 104
    walletAmount = 10000
    add_money_and_check_detail(userId, name, email, productId, walletAmount)

def create_user(userId, name, email):
    new_user = {"id": userId, "name": name, "email": email}
    response = requests.post(userServiceURL + "/users", json=new_user)
    
    if response.status_code != 201:
        print(f"[ERROR] Failed to create user: {response.status_code}, {response.text}")
    
    return response

def get_wallet(user_id):
    response = requests.get(walletServiceURL + f"/wallets/{user_id}")
    
    if response.status_code != 200:
        print(f"[ERROR] Failed to fetch wallet: {response.status_code}, {response.text}")
    
    return response

def update_wallet(user_id, action, amount):
    response = requests.put(walletServiceURL + f"/wallets/{user_id}", json={"action": action, "amount": amount})
    
    if response.status_code != 200:
        print(f"[ERROR] Failed to update wallet: {response.status_code}, {response.text}")
    
    return response

def get_product_details(product_id):
    response = requests.get(marketplaceServiceURL + f"/products/{product_id}")
    
    if response.status_code != 200:
        print(f"[ERROR] Failed to fetch product details (ID: {product_id}): {response.status_code}, {response.text}")
        sys.exit()
    
    return response

def delete_users():
    response = requests.delete(userServiceURL + "/users")
    
    if response.status_code != 200:
        print(f"[WARNING] Failed to delete users: {response.status_code}, {response.text}")

def add_money_and_check_detail(userId, name, email, productId, walletAmount):
    try:
        print("[INFO] Deleting existing users...")
        delete_users()
        
        print("[INFO] Creating a new user...")
        new_user = create_user(userId, name, email)
        new_user_json = new_user.json()

        if 'id' not in new_user_json:
            print("[ERROR] User creation response does not contain an ID.")
            return

        new_userid = new_user_json['id']
        print(f"[INFO] New user created with ID: {new_userid}")

        print("[INFO] Adding money to wallet...")
        update_wallet(new_userid, "credit", walletAmount)
        
        print("[INFO] Fetching product details before ordering...")
        product_details_before_ordering = get_product_details(productId)
        old_wallet_balance = get_wallet(new_userid).json().get('balance', None)

        if old_wallet_balance is None:
            print("[ERROR] Could not retrieve initial wallet balance.")
            return
        
        print(f"[INFO] Wallet balance before order: {old_wallet_balance}")
        
        new_order = {
            "user_id": new_userid,
            "items": [{"product_id": productId, "quantity": 2}],
        }

        print("[INFO] Placing order...")
        order_response = requests.post(marketplaceServiceURL + "/orders", json=new_order)

        if order_response.status_code != 201:
            print(f"[ERROR] Order placement failed: {order_response.status_code}, {order_response.text}")
            return

        print("[INFO] Fetching product details after ordering...")
        product_details_after_ordering = get_product_details(productId)
        new_wallet_balance = get_wallet(new_userid).json().get('balance', None)

        if new_wallet_balance is None:
            print("[ERROR] Could not retrieve updated wallet balance.")
            return
        
        print(f"[INFO] Wallet balance after order: {new_wallet_balance}")

        print("[INFO] Product details after ordering: ", product_details_after_ordering.json())

        price_per_unit = product_details_after_ordering.json().get('price', None)

        if price_per_unit is None:
            print("[ERROR] Could not retrieve product price.")
            return
        
        total_expected_deduction = (price_per_unit * 2) * 0.9
        print(f"[INFO] Got response: {product_details_after_ordering.json()}")
        stock_difference = product_details_before_ordering.json().get('stock_quantity', 0) - product_details_after_ordering.json().get('stock_quantity', 0)

        if (old_wallet_balance - new_wallet_balance == total_expected_deduction) and (stock_difference == 2):
            print("[SUCCESS] Test Passed ✅")
        else:
            print(f"[ERROR] Test Failed ❌")
            print(f"    Expected wallet deduction: {total_expected_deduction}, Actual: {old_wallet_balance - new_wallet_balance}")
            print(f"    Expected stock reduction: 2, Actual: {stock_difference}")

    except Exception as e:
        print("[FATAL] An unexpected error occurred:")
        traceback.print_exc()
        print("[ERROR] Test Failed ❌")

if __name__ == "__main__":
    main()


[INFO] Deleting existing users...
[INFO] Creating a new user...
[INFO] New user created with ID: 101
[INFO] Adding money to wallet...
[INFO] Fetching product details before ordering...
[INFO] Wallet balance before order: 10000
[INFO] Placing order...
[INFO] Fetching product details after ordering...
[INFO] Wallet balance after order: 8740
[INFO] Product details after ordering:  {'id': 104, 'name': 'Wireless Mouse', 'description': '2.4 GHz wireless mouse', 'price': 700, 'stock_quantity': 18}
[INFO] Got response: {'id': 104, 'name': 'Wireless Mouse', 'description': '2.4 GHz wireless mouse', 'price': 700, 'stock_quantity': 18}
[SUCCESS] Test Passed ✅


In [6]:
import requests
import sys

userServiceURL = "http://localhost:8080"
marketplaceServiceURL = "http://localhost:8081"
walletServiceURL = "http://localhost:8082"

def main():
    name = "John Doe"
    email = "johndoe@mail.com"
    userId = 101
    productId = 104
    add_money_and_place_order(userId, name, email, productId)

def create_user(userId, name, email):
    new_user = {"id": userId, "name": name, "email": email}
    response = requests.post(userServiceURL + "/users", json=new_user)
    print(f"[DEBUG] create_user response: {response.status_code}, {response.text}")
    return response

def create_wallet(user_id):
    response = requests.put(walletServiceURL + f"/wallets/{user_id}", json={"action": "credit", "amount": 0})
    print(f"[DEBUG] create_wallet response: {response.status_code}, {response.text}")

def get_wallet(user_id):
    response = requests.get(walletServiceURL + f"/wallets/{user_id}")
    print(f"[DEBUG] get_wallet response: {response.status_code}, {response.text}")
    return response

def update_wallet(user_id, action, amount):
    response = requests.put(walletServiceURL + f"/wallets/{user_id}", json={"action": action, "amount": amount})
    print(f"[DEBUG] update_wallet response: {response.status_code}, {response.text}")
    return response

def get_product_details(product_id):
    response = requests.get(marketplaceServiceURL + f"/products/{product_id}")
    print(f"[DEBUG] get_product_details response: {response.status_code}, {response.text}")
    return response

def delete_order(user_id):
    response = requests.delete(marketplaceServiceURL + f"/marketplace/users/{user_id}")
    print(f"[DEBUG] delete_order response: {response.status_code}, {response.text}")
    if response.status_code != 200:
        print("[ERROR] Wrong status code returned during delete order")
        sys.exit()
    return response

def delete_users():
    response = requests.delete(userServiceURL + "/users")
    print(f"[DEBUG] delete_users response: {response.status_code}, {response.text}")

def add_money_and_place_order(userId, name, email, productId):
    try:
        print("[INFO] Starting test case execution")
        delete_users()
        
        new_user = create_user(userId, name, email)
        if new_user.status_code != 201:
            print("[ERROR] Failed to create user")
            return
        
        new_userid = new_user.json().get('id')
        if not new_userid:
            print("[ERROR] User ID not returned from create_user")
            return
        
        update_wallet(new_userid, "credit", 10000)
        
        product_details_before_ordering = get_product_details(productId)
        if product_details_before_ordering.status_code != 200:
            print("[ERROR] Failed to fetch product details")
            return
        
        old_wallet_balance = get_wallet(new_userid).json().get('balance', 0)
        print(f"[DEBUG] Old wallet balance: {old_wallet_balance}")
        
        new_order = {"user_id": new_userid, "items": [{"product_id": productId, "quantity": 2}]}
        order_response = requests.post(marketplaceServiceURL + "/orders", json=new_order)
        print(f"[DEBUG] place_order response: {order_response.status_code}, {order_response.text}")
        
        delete_order(new_userid)
        
        product_details_after_ordering = get_product_details(productId)
        new_wallet_balance = get_wallet(new_userid).json().get('balance', 0)
        
        if (product_details_after_ordering.json().get('stock_quantity') == product_details_before_ordering.json().get('stock_quantity') and
                old_wallet_balance == new_wallet_balance):
            print("[INFO] Test passed")
        else:
            print("[ERROR] Test failed: Stock quantity or wallet balance mismatch")
            print(f"[DEBUG] Stock before: {product_details_before_ordering.json().get('stock_quantity')}, after: {product_details_after_ordering.json().get('stock_quantity')}")
            print(f"[DEBUG] Wallet balance before: {old_wallet_balance}, after: {new_wallet_balance}")
    except Exception as e:
        print(f"[ERROR] Exception occurred: {e}")

if __name__ == "__main__":
    main()


[INFO] Starting test case execution
[DEBUG] delete_users response: 200, 
[DEBUG] create_user response: 201, {"id":101,"name":"John Doe","email":"johndoe@mail.com","discount_availed":false}
[DEBUG] update_wallet response: 200, {"user_id":101,"balance":36220}
[DEBUG] get_product_details response: 200, {"id":104,"name":"Wireless Mouse","description":"2.4 GHz wireless mouse","price":700,"stock_quantity":20}
[DEBUG] get_wallet response: 200, {"user_id":101,"balance":36220}
[DEBUG] Old wallet balance: 36220
[DEBUG] place_order response: 201, {"order_id":1,"user_id":101,"total_price":1260,"status":"PLACED","items":[{"id":1,"product_id":104,"quantity":2}]}
[DEBUG] delete_order response: 200, 
[DEBUG] get_product_details response: 200, {"id":104,"name":"Wireless Mouse","description":"2.4 GHz wireless mouse","price":700,"stock_quantity":20}
[DEBUG] get_wallet response: 200, {"user_id":101,"balance":36220}
[INFO] Test passed
