In [2]:
!pip install -qU requests --progress-bar off


[notice] A new release of pip is available: 24.0 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import requests
import json

# AppSync API endpoint URL and API key
api_url = "https://6b6b55pkcd.execute-api.us-east-1.amazonaws.com"  # Replace with your A API GateWay URL

### A create user example.



In [2]:
user = {"user_name": "user_2", "role": "user", "password": "password"}

# GraphQL query or mutation
graphql_stmt = """
mutation CreateUser($user_name: ID!, $role: String!, $password: String!) {
    createUser(user_name: $user_name, role: $role, password: $password) {
        user_name
        role
    }
}
"""

variables = user

# Request payload
payload = {
    "query": graphql_stmt,
    "variables": variables
}

# Request headers
headers = {"Content-Type": "application/json"} 

# Make the HTTP request
try:
    response = requests.post(api_url + '/create_user', data=json.dumps(payload), headers=headers)
    response.raise_for_status()
    json_response = response.json()
    
    match json_response:
        case {'data': {'createUser': {'user_name': user_name, 'role': role}}}:
            print(f"User created successfully: {user_name=}, {role=}")
        case {'errors': errors}:
            print(f"Errors: {errors}")
        case _:
            print("Unexpected response format")

except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
except json.JSONDecodeError as e:
    print(f"Failed to decode JSON response: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

Request failed: 404 Client Error: Not Found for url: https://6b6b55pkcd.execute-api.us-east-1.amazonaws.com/create_user


### Login a user

In [14]:
user = {"user_name": "user_2", "password": "password"}

# GraphQL query or mutation
graphql_stmt = """
query UserLogin($user_name: ID!, $password: String!) {
    login(user_name: $user_name, password: $password) {
        user_name
        token
    }
}
"""

variables = user

# Request payload
payload = {
    "query": graphql_stmt,
    "variables": variables
}

# Request headers
headers = {"Content-Type": "application/json"} 

user_headers = None

# Make the HTTP request
try:
    response = requests.post(api_url + '/login', data=json.dumps(payload), headers=headers)
    response.raise_for_status()
    json_response = response.json()
    
    match json_response:
        case {'data': {'login': {'user_name': user_name, 'token': token}}}:
            print(f"User logged in successfully: {user_name=}, {token=}")
            user_headers = {"Authorization": f"{token}", "Content-Type": "application/json"}
        case {'errors': errors}:
            print(f"Errors: {errors}")
        case _:
            print("Unexpected response format")

except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
except json.JSONDecodeError as e:
    print(f"Failed to decode JSON response: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

print(f"User headers: {user_headers}")


User logged in successfully: user_name='user_2', token='eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzIiLCJyb2xlIjoidXNlciIsImV4cCI6MTc0NzM2NzYwNn0.lg3w3wlZG88nJW_oJ5lhvGPJ0Bq7Ot9f_IEHlfIOnMM'
User headers: {'Authorization': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ1c2VyXzIiLCJyb2xlIjoidXNlciIsImV4cCI6MTc0NzM2NzYwNn0.lg3w3wlZG88nJW_oJ5lhvGPJ0Bq7Ot9f_IEHlfIOnMM', 'Content-Type': 'application/json'}


### A get user example.

In [18]:
# GraphQL query or mutation
graphql_stmt = """
query GetUser($user_name: ID!) {
    getUser(user_name: $user_name) {
        user_name
        role
    }
}
"""

variables = {"user_name": "user_1"}

# Request payload
payload = {
    "query": graphql_stmt,
    "variables": variables
}

# Request headers
headers = user_headers #Created in cell above.

# Make the HTTP request
try:
    response = requests.post(api_url + '/get_user', data=json.dumps(payload), headers=headers)
    response.raise_for_status()
    json_response = response.json()
    
    match json_response:
        case {'data': {'getUser': {'user_name': user_name, 'role': role}}}:
            print(f"Retrieved user: {user_name=}, {role=}")
        case {'errors': errors}:
            print(f"Errors: {errors}")
        case _:
            print("Unexpected response format")

except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
except json.JSONDecodeError as e:
    print(f"Failed to decode JSON response: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")


Retrieved user: user_name='user_1', role='user'


### Update User Location

In [26]:
user = {"user_name": "user_2", "latitude": "7.0", "longitude": "8.0"}

# GraphQL query or mutation
graphql_stmt = """
mutation UpdateUserLocation($user_name: ID!, $latitude: String!, $longitude: String!) {
    updateUserLocation(user_name: $user_name, latitude: $latitude, longitude: $longitude) {
        user_name
        latitude 
        longitude
    }
}
"""

variables = user

# Request payload
payload = {
    "query": graphql_stmt,
    "variables": variables
}

# Request headers
#headers = {"Content-Type": "application/json"} 
# Request headers
headers = user_headers #Created in login cell above.

# Make the HTTP request
try:
    response = requests.post(api_url + '/update_user_location', data=json.dumps(payload), headers=headers)
    response.raise_for_status()
    json_response = response.json()
    
    match json_response:
        case {'data': {'updateUserLocation': {'user_name': user_name, 'latitude': latitude, 'longitude': longitude}}}:
            print(f"User location update successfully: {user_name=}, {latitude=}, {longitude=}")
        case {'errors': errors}:
            print(f"Errors: {errors}")
        case _:
            print("Unexpected response format")

except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
except json.JSONDecodeError as e:
    print(f"Failed to decode JSON response: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

User location update successfully: user_name='user_2', latitude='7.0', longitude='8.0'


In [19]:

# GraphQL query or mutation
graphql_stmt = """
query GetUserLocations {
    getUserLocations {
        user_name
        latitude 
        longitude
    }
}
"""

# Request payload
payload = {
    "query": graphql_stmt,
    #"variables": variables
}

headers = user_headers #Created in login cell above.

# Make the HTTP request
try:
    response = requests.post(api_url + '/get_user_locations', data=json.dumps(payload), headers=headers)
    #response.raise_for_status()
    json_response = response.json()
    
    match json_response:
        case {'data': {'getUserLocations': locations}}:
            print(f"Retrieved user locations: {locations}")
        case {'errors': errors}:
            print(f"Errors: {errors}")
        case _:
            print(f"Unexpected response format: {json_response}")

except requests.exceptions.RequestException as e:
    print(f"Request failed: {e}")
except json.JSONDecodeError as e:
    print(f"Failed to decode JSON response: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

Retrieved user locations: [{'user_name': 'user_2', 'latitude': '7.0', 'longitude': '8.0'}, {'user_name': 'user_1', 'latitude': '5.0', 'longitude': '5.0'}]


### Subscription Example.

For an example of subscribing to the AppSync realtime update service, see the `subscription_client.py` script.

The code from that script came from the AWS example [here](https://aws.amazon.com/blogs/mobile/appsync-websockets-python/).