# üß≠ Workshop: Build Your Own AWS AppSync API

In this notebook you will:

1. Set up the environment and AWS resources
2. Create an AppSync GraphQL API from the schema file we generated previously
3. Learn what **data sources** and **resolvers** are  
4. Specify **data sources** and attach JavaScript **unit** resolvers and **pipeline** resolvers to map between fields and data sources
5. Run test queries and mutations and see real time update notifications.

In [1]:
# --- Code: Imports & basic config ---
import os, json, requests
from helpers.helpers_env import get_region, whoami, build_api_name, validate_identifiers
from helpers.helpers_ddb_pricing import (
    ensure_catalog_table, ensure_cost_table, seed_catalog, seed_cost
)
from helpers.helpers_sns_sqs import (
    ensure_sns_topic, ensure_sqs_queue, setup_sns_sqs_subscription
)
from helpers.helpers_appsync import (
    appsync_client, ensure_api, upload_schema, ensure_api_key,
    ensure_ddb_ds, ensure_none_ds, upsert_js_resolver, create_none_resolver_code,
    get_ddb_query_resolver_js_code, get_ddb_mutation_resolver_js_code, get_sns_mutation_resolver_js_code
)
from helpers.helpers_pipeline_resolvers import (
    ensure_sns_ds, create_pipeline_resolver, create_function
)


REGION = get_region()
ACCOUNT, ARN = whoami(REGION)
print("**NOTE**: Please check that the account and region matches the one on the sheet assigned to you!!")
print("Region:", REGION)
print("Account:", ACCOUNT)


**NOTE**: Please check that the account and region matches the one on the sheet assigned to you!!
Region: us-east-2
Account: 509319545497


## ü™™ Naming, schema path, and IAM role

We‚Äôll construct a unique API name from a display name and short suffix.  

**IMPORTANT TO-DO: Update the next identifiers**

In [2]:
FIRST_4_LETTERS_OF_FIRST_NAME = "PIAS" # Add first 4 characters of first name
BIRTH_MMDD = "0908" # Add month and date 

validate_identifiers(FIRST_4_LETTERS_OF_FIRST_NAME, BIRTH_MMDD)

API_NAME, API_BASENAME = build_api_name(FIRST_4_LETTERS_OF_FIRST_NAME, BIRTH_MMDD)
print("API_NAME:", API_NAME, "| base:", API_BASENAME)

SCHEMA_FILE = "schema/fixed_schema_asin_pricing.graphql"  # your existing file

APPSYNC_ROLE_ARN = "arn:aws:iam::" + ACCOUNT + ":role/AppSyncDDBRole"
print("APPSYNC_ROLE_ARN:", APPSYNC_ROLE_ARN)

API_NAME: ghc25-pias-0908 | base: pias
APPSYNC_ROLE_ARN: arn:aws:iam::509319545497:role/AppSyncDDBRole


# AWS Resources Setup

## 1. üóÉÔ∏è Create DynamoDB tables & seed data

We‚Äôll create two on-demand tables:

- **Catalog table** (PK = `asin`)
- **Cost table** (PK = `asinVendor`), where `asinVendor = "{asin}#{vendorId}"`

Then seed one ASIN with:
- Catalog row
- Cost rows for vendors `AcmeSupply` and `GlobalParts`


In [3]:
# --- Code: Ensure DDB tables and seed ---
CATALOG_TABLE = f"{API_NAME}-catalog"
COST_TABLE    = f"{API_NAME}-cost"

cat = ensure_catalog_table(CATALOG_TABLE, REGION)
cost = ensure_cost_table(COST_TABLE, REGION)

ASIN = "TEST-ASIN-1"  # sample key for the workshop

seed_catalog(CATALOG_TABLE, REGION, ASIN)
seed_cost(COST_TABLE, REGION, ASIN)

print("‚úì Tables ready:")
print("  -", cat["TableArn"])
print("  -", cost["TableArn"])


‚úì Tables ready:
  - arn:aws:dynamodb:us-east-2:509319545497:table/ghc25-pias-0908-catalog
  - arn:aws:dynamodb:us-east-2:509319545497:table/ghc25-pias-0908-cost


## 2. üóÉÔ∏è Create SNS for sending cost data update notifications and SQS for listening to events on cost update SNS

In [4]:
# Create SNS topic and SQS queue using same naming convention as DDB tables
TOPIC_ARN = ensure_sns_topic(API_NAME, REGION)
QUEUE_URL, QUEUE_ARN = ensure_sqs_queue(API_NAME, REGION)

# Set up the SNS -> SQS subscription
SUBSCRIPTION_ARN = setup_sns_sqs_subscription(TOPIC_ARN, QUEUE_ARN, QUEUE_URL, REGION)

print("‚úì Event infrastructure ready:")
print(f"  - SNS Topic: {TOPIC_ARN}")
print(f"  - SQS Queue: {QUEUE_ARN}")
print(f"  - Subscription: {SUBSCRIPTION_ARN}")# Create SNS topic and SQS queue using same naming convention as DDB tables


‚úì Event infrastructure ready:
  - SNS Topic: arn:aws:sns:us-east-2:509319545497:ghc25-pias-0908-price-changes
  - SQS Queue: arn:aws:sqs:us-east-2:509319545497:ghc25-pias-0908-price-processing
  - Subscription: arn:aws:sns:us-east-2:509319545497:ghc25-pias-0908-price-changes:a4c51b23-77aa-491e-b9a8-6287a801f19b


## 3. üß¨ Create AppSync API from schema

In [5]:
# --- Code: Create AppSync API, upload schema, ensure API key ---
appsync = appsync_client(REGION)
api = ensure_api(appsync, API_NAME, cloudwatch_logs_role_arn=APPSYNC_ROLE_ARN, enable_xray=False)
API_ID = api["apiId"]
print("‚úì API:", API_ID, api["name"])

upload_schema(appsync, API_ID, SCHEMA_FILE)
print("‚úì Schema uploaded")

API_KEY, EXPIRES = ensure_api_key(appsync, API_ID, days=1)
print("‚úì API Key created:", API_KEY, "| expires (epoch):", EXPIRES)

api_full = appsync.get_graphql_api(apiId=API_ID)["graphqlApi"]
APPSYNC_URL = api_full["uris"]["GRAPHQL"]
print("GraphQL URL:", APPSYNC_URL)


‚úì API: kneubqbftfbyzeuzzoispstne4 ghc25-pias-0908
‚úì Schema uploaded
‚úì API Key created: da2-yklpp27i5jde3fvhjfp3j2v33e | expires (epoch): 1762488000
GraphQL URL: https://dogmqlrkcrcc3mg5a4e73afkta.appsync-api.us-east-2.amazonaws.com/graphql


## üîç AppSync Data Sources

A data source tells AppSync where to fetch or write data for a given resolver.

AppSync supports several types:

- DynamoDB ‚Äî connect directly to a DynamoDB table.
- Lambda ‚Äî call a Lambda function for custom logic.
- HTTP / OpenSearch / RDS / None ‚Äî for APIs, search clusters, or no backend.

We will add three data sources:

- `CatalogDDB` ‚Üí DynamoDB table for product catalog (Primary Key: `asin`)
- `CostDDB` ‚Üí DynamoDB table for vendor costs (Primary Key: `asinVendor`)
- `NoneDS` ‚Üí **NONE** data source (no backend) used for synthetic fields

In [6]:
# --- Code: Data sources ---
cat_ds  = ensure_ddb_ds(appsync, API_ID, "CatalogDDB", cat["TableArn"],  APPSYNC_ROLE_ARN)
cost_ds = ensure_ddb_ds(appsync, API_ID, "CostDDB",    cost["TableArn"], APPSYNC_ROLE_ARN)
none_ds = ensure_none_ds(appsync, API_ID, "NoneDS")
sns_ds = ensure_sns_ds(appsync, API_ID, "CostSNS", TOPIC_ARN, APPSYNC_ROLE_ARN)

print("‚úì Data sources ready:", [d["name"] for d in [cat_ds, cost_ds, none_ds, sns_ds]])

upsert_js_resolver(appsync, API_ID, "Query", "pricingDocument", "NoneDS", create_none_resolver_code())


‚úì Data sources ready: ['CatalogDDB', 'CostDDB', 'NoneDS', 'CostSNS']


## ‚öôÔ∏è What is a Resolver?

A **resolver** is attached to a **field** in your schema. When a query or mutation hits that field.

**Resolver Types:**
- **Unit Resolver**: One function, one data source (DynamoDB GetItem)
- **Pipeline Resolver**: Multiple functions in sequence (DDB ‚Üí SNS)
- **Direct Lambda Resolver**: Calls Lambda function directly
- **Local Resolver**: Pure computation, no data source

**Key Context Objects:**
- `ctx.args` ‚Üí field arguments from the GraphQL query
- `ctx.source` ‚Üí parent value for nested fields (enables chaining)
- `ctx.result` ‚Üí raw backend result from data source
- `util.*` helpers ‚Üí encode/decode DynamoDB values, time, errors, etc.

**Example Context Flow:**
```
query {
 pricingDocument(asin: "B123") {    # ctx.args = { asin: "B123" }
   catalogSection {
     catalogData {                   # ctx.source = { asin: "B123" }
       title
     }
   }
 }
}
```

## üìñ Query Resolver 1: CatalogSection.catalogData (DynamoDB)

**Goal:** Get product info from DynamoDB

**Data Source:** CatalogDDB

Now we'll read from a database!


In [7]:
# Database resolver - reads from DynamoDB
JS_catalog_get = """
export function request(ctx) {
  const asin = ctx.source?.asin;  // Get ASIN from parent (pricingDocument)
  
  if (!asin) {
    util.error("Missing ASIN from parent", "BadRequest");
  }
  
  // Ask DynamoDB to get item by ASIN
  return {
    operation: "GetItem",
    key: util.dynamodb.toMapValues({ asin: asin })
  };
}

export function response(ctx) {
  if (ctx.error) {
    util.error(ctx.error.message, ctx.error.type);
  }
  
  return ctx.result || null;  // Return data from DynamoDB, or null if not found
}
"""

upsert_js_resolver(appsync, API_ID, "CatalogSection", "catalogData", "CatalogDDB", JS_catalog_get)
print("‚úì Resolver 1: CatalogSection.catalogData ‚Üí CatalogDDB (database read!)")

‚úì Resolver 1: CatalogSection.catalogData ‚Üí CatalogDDB (database read!)


## üí∞ Query Resolver 2: CostDataSection.vendorCostData (DynamoDB with arguments)

**Goal:** Get vendor cost data using both parent ASIN and vendorId argument

**Data Source:** CostDDB

This one combines parent data + GraphQL arguments!

In [8]:
# Database resolver with arguments
JS_cost_get = get_ddb_query_resolver_js_code()
upsert_js_resolver(appsync, API_ID, "CostDataSection", "vendorCostData", "CostDDB", JS_cost_get)
print("‚úì Resolver 2: CostDataSection.vendorCostData ‚Üí CostDDB (with arguments!)")

‚úì Resolver 2: CostDataSection.vendorCostData ‚Üí CostDDB (with arguments!)


## ‚úÖ Test our read resolvers!

In [9]:
# --- Code: Run a test query ---
q = """
query($asin:String!, $vendorId:ID!){
  pricingDocument(asin:$asin){
    asin
    catalogSection {
      catalogData {
        title brand category description defaultCurrency updatedAt
      }
    }
    costDataSection {
      vendorCostData(vendorId:$vendorId){
        vendorId cost currency updatedAt
      }
    }
  }
}
"""
resp = requests.post(
    APPSYNC_URL,
    headers={"x-api-key": API_KEY, "content-type": "application/json"},
    json={"query": q, "variables": {"asin": ASIN, "vendorId": "AcmeSupply"}}
)
print(resp.status_code)
print(resp.text[:1200])


200
{"data":{"pricingDocument":{"asin":"TEST-ASIN-1","catalogSection":{"catalogData":{"title":"Laptop 13‚Äù","brand":"Acme","category":"Computers","description":"Thin-and-light 13-inch laptop","defaultCurrency":"USD","updatedAt":"2025-11-06T03:59:33Z"}},"costDataSection":{"vendorCostData":{"vendorId":"AcmeSupply","cost":865.0,"currency":"USD","updatedAt":"2025-11-06T03:59:33Z"}}}}}


## üßæ Mutation Resolver 1: `Mutation.putCatalogData(asin, input)` (DynamoDB PutItem)

**Goal:** Write catalog properties for an ASIN.

- **Data Source:** `CatalogDDB`
- **Operation:** `PutItem`
- **Key:** `asin`
- **Return:** `CatalogData` (echo of input + ensured `updatedAt`)


In [10]:
# Write resolver - saves to DynamoDB
JS_put_catalog = """
export function request(ctx) {
  const { asin, catalogData } = ctx.args;  // Get both arguments
  
  if (!asin || !catalogData) {
    util.error("asin and catalogData are required", "BadRequest");
  }

  // Add timestamp if not provided
  const updatedAt = catalogData.updatedAt || util.time.nowISO8601();
  const item = { asin, ...catalogData, updatedAt };

  return {
    operation: "PutItem",  // Write operation
    key: util.dynamodb.toMapValues({ asin }),
    attributeValues: util.dynamodb.toMapValues(item)
  };
}

export function response(ctx) {
  if (ctx.error) {
    util.error(ctx.error.message, ctx.error.type);
  }
  
  // Return what we wrote (without the asin since schema doesn't include it)
  const written = { ...ctx.args.catalogData };
  written.updatedAt = written.updatedAt || util.time.nowISO8601();
  return written;
}
"""

upsert_js_resolver(appsync, API_ID, "Mutation", "putCatalogData", "CatalogDDB", JS_put_catalog)
print("‚úì Resolver 1: Mutation.putCatalogData ‚Üí CatalogDDB (write data!)")

‚úì Resolver 1: Mutation.putCatalogData ‚Üí CatalogDDB (write data!)


## ‚úÖ Test catalog mutation!

In [15]:
# Test catalog mutation
mutation = """
mutation PutCatalog($asin: ID!, $data: CatalogDataInput) {
  putCatalogData(asin: $asin, catalogData: $data) {
    title
    brand
    category
    updatedAt
  }
}
"""

variables = {
    "asin": "TEST-ASIN-2",
    "data": {
        "title": "Echo Dot (4th Gen)",
        "brand": "Amazon",
        "category": "Electronics",
        "defaultCurrency": "USD"
    }
}

resp = requests.post(
    APPSYNC_URL,
    headers={"x-api-key": API_KEY, "content-type": "application/json"},
    json={"query": mutation, "variables": variables}
)

print("Catalog mutation result:", resp.json())

Catalog mutation result: {'data': {'putCatalogData': {'title': 'Echo Dot (4th Gen)', 'brand': 'Amazon', 'category': 'Electronics', 'updatedAt': '2025-11-06T04:03:23.612Z'}}}


## üí∞ Mutation Resolver 2: Mutation.putVendorCost (Advanced - Pipeline!)

**Goal:** Write vendor cost AND send notification

This uses a **pipeline resolver** - multiple steps in sequence:
1. Write to DynamoDB
2. Send SNS notification
3. Return result

**Advanced concept:** Pipeline = multiple functions chained together


In [12]:
JS_put_cost_ddb = get_ddb_mutation_resolver_js_code()
JS_put_cost_sns = get_sns_mutation_resolver_js_code(TOPIC_ARN)

# Create functions
fn1 = create_function(appsync, API_ID, "PutCostDDB", "CostDDB", JS_put_cost_ddb)
fn2 = create_function(appsync, API_ID, "PutCostSNS", "CostSNS", JS_put_cost_sns)

# Recreate pipeline resolver
create_pipeline_resolver(appsync, API_ID, "Mutation", "putVendorCost", [fn1["functionId"], fn2["functionId"]])
print("‚úì Pipeline Resolver recreated with both functions")


‚úì Pipeline Resolver recreated with both functions


## ‚úçÔ∏è Verification: Cost Mutation


In [16]:
# --- Code: Mutation: putVendorCost ---

MUTATION_ASIN = "TEST-ASIN-2"
m2 = """
mutation($asin:ID!, $costData:VendorCostInput!){
  putVendorCost(asin:$asin, costData:$costData){
    vendorId cost currency updatedAt
  }
}
"""
vars2 = {
  "asin": MUTATION_ASIN,
  "costData": { "vendorId": "GlobalParts", "cost": 855.50, "currency": "USD" }
}
r2 = requests.post(APPSYNC_URL, headers={"x-api-key": API_KEY}, json={"query": m2, "variables": vars2})
print(r2.status_code)
print(r2.text[:800])


200
{"data":{"putVendorCost":{"vendorId":"GlobalParts","cost":855.5,"currency":"USD","updatedAt":"2025-11-06T04:03:39.505Z"}}}


In [14]:
# --- Code: Read again to verify ---
r3 = requests.post(
    APPSYNC_URL,
    headers={"x-api-key": API_KEY},
    json={"query": q, "variables": {"asin": MUTATION_ASIN, "vendorId": "GlobalParts"}}
)
print(r3.status_code)
print(r3.text[:1200])


200
{"data":{"pricingDocument":{"asin":"TEST-ASIN-2","catalogSection":{"catalogData":null},"costDataSection":{"vendorCostData":null}}}}


## üéâ You‚Äôre done!

You now have:
- A working AppSync API backed by DynamoDB
- Clear understanding of **data sources** and **unit JS resolvers**
- End-to-end read/write flow:
  - Synthetic document ‚Üí nested catalog read ‚Üí nested vendor cost read
  - Mutations to upsert catalog and cost
