# How to Design DynamoDB Data Model for Production

- http://docs.getmoto.org/en/latest/docs/services/dynamodb.html

## Overview

DynamoDB is a serverless, fully-managed, schemaless, key-value NoSQL database that been proved to be a great fit for many business critical use cases. Usually, to design a data model for a new use case, there's not only one way to do that. In this example, I will share my best practice to design a data model for any use case.

**The highlight of this best practice**

- Fast start, no need to set up any infrastructure.
- Fast iterate, flexible to try different data model ideas.
- Code included, at the end, you will have a working data model and a data ingestion layer can be deployed as AWS Lambda / AWS ECS / EC2.
- Business logic implemented, all user interaction to your application will be implemented as a method.
- Query pattern verified, all of required business query is ready to use as a method.

## Case Study - Design YouTube Data Model

Let's learn this best practice from a real business use case - Design DynamoDB data model for YouTube application.

### Business Requirement

Entities:

- User: user can upload video, user can also view other's video
- Video: user can create video

User Interaction:

- People can sign up as a new user.
- User can upload video.
- User can view other user's profile.
- User can view the list of other user's videos, ordered by create time.
- User can watch video, then video views plus one.
- User can subscribe other User.
- System can push new video notification based on his subscription.

Query Pattern:

- Given a User id, Video id, we can get the detailed information of the user, video.
- Given a User id, we can get all the videos he uploaded, ordered by create time, latest video comes first.
- Given a User id, we can get list of user he subscribed, ordered by subscribe time.
- Given a User id, we can get number of user subscribes him.

### Import SDK

In [104]:
import typing as T
import enum
from datetime import datetime

import dataclasses
import pynamodb_mate as pm
from moto import mock_dynamodb

from rich import print as rprint

### Define Application Data Model

We have to define two different type of data model:

1. Application data model: this is the data model that will be used by your application code, Rest API, Web App.
2. DynamoDB data model: this is the data model for DynamoDB table. It is optimized for Read / Write efficiency.

Let's start with your application data model.


In [105]:
class EntityTypeEnum(str, enum.Enum):
    USER = "USER"
    VIDEO = "VIDEO"
    SUBSCRIPTION = "SUBSCRIPTION"


@dataclasses.dataclass
class User:
    user_id: int = dataclasses.field()
    user_name: str = dataclasses.field()
    created_at: datetime = dataclasses.field()

    videos: T.List["Video"] = dataclasses.field(default_factory=list)
    subscriptions: T.List["Subscription"] = dataclasses.field(default_factory=list)


@dataclasses.dataclass
class Video:
    video_id: int = dataclasses.field()
    video_title: str = dataclasses.field()
    created_at: datetime = dataclasses.field()
    creator_id: str = dataclasses.field() # the creator user id

    creator: T.Optional[User] = dataclasses.field(default=None)


@dataclasses.dataclass
class Subscription:
    subscriber_user_id: int = dataclasses.field()
    publisher_user_id: int = dataclasses.field()

### Define DynamoDB Data Model

In [106]:
ROOT = "_root" # indicate that this item only has hash key, range key is not used (logically)


class UsersVideoIndex(pm.GlobalSecondaryIndex):
    class Meta:
        index = "user-s-video-index"
        projection = pm.IncludeProjection([
            "pk",
            "video_title",
        ])

    video_creator_id: T.Union[str, pm.UnicodeAttribute] = pm.UnicodeAttribute(hash_key=True)
    created_at: T.Union[datetime, pm.UTCDateTimeAttribute] = pm.UTCDateTimeAttribute(range_key=True)


# TYPE HINT
REQUIRED_STR = T.Union[str, pm.UnicodeAttribute]
OPTIONAL_STR = T.Optional[REQUIRED_STR]
REQUIRED_INT = T.Union[int, pm.NumberAttribute]
OPTIONAL_INT = T.Optional[REQUIRED_INT]
REQUIRED_DATETIME = T.Union[datetime, pm.UTCDateTimeAttribute]
OPTIONAL_DATETIME = T.Optional[REQUIRED_DATETIME]


class Model(pm.Model):
    class Meta:
        table_name = f"entities"
        region = "us-east-1"
        billing_mode = pm.PAY_PER_REQUEST_BILLING_MODE

    # --- hash key, range key, and entity type
    pk: REQUIRED_STR = pm.UnicodeAttribute(hash_key=True)
    sk: REQUIRED_STR = pm.UnicodeAttribute(range_key=True)
    entity_type: OPTIONAL_STR = pm.UnicodeAttribute(default=None, null=True)

    # --- user related
    user_name: OPTIONAL_STR = pm.UnicodeAttribute(default=None, null=True)
    subscribers: OPTIONAL_INT = pm.NumberAttribute(default=None, null=True)

    # --- video related
    video_title: OPTIONAL_STR = pm.UnicodeAttribute(default=None, null=True)
    video_creator_id: OPTIONAL_STR = pm.UnicodeAttribute(default=None, null=True)
    views: OPTIONAL_INT = pm.NumberAttribute(default=None, null=True)

    # --- subscription related
    publisher_id: OPTIONAL_STR = pm.UnicodeAttribute(default=None, null=True)

    # --- common
    created_at: OPTIONAL_DATETIME = pm.UTCDateTimeAttribute(default=None, null=True)

    user_s_video_index = UsersVideoIndex()

    _USER_ID_STARTED = 0
    _VIDEO_ID_STARTED = 0

mock = mock_dynamodb()
mock.start()

connect = pm.Connection()
Model.create_table(wait=True)
Model.delete_all()

pass

In [107]:
class Model(Model):
    _USER_ID_STARTED = 0

    @classmethod
    def signup_user(
        cls,
        user_name: str,
        created_at: datetime,
    ) -> "Model":
        cls._USER_ID_STARTED += 1
        user_model = cls(
            pk=f"user-{cls._USER_ID_STARTED}",
            sk=ROOT,
            entity_type=EntityTypeEnum.USER.value,
            user_name=user_name,
            subscribers=0,
            created_at=created_at,
        )
        user_model.save()
        return user_model

    @classmethod
    def get_user(cls, user_id: str) -> "Model":
        return cls.get(hash_key=user_id, range_key=ROOT)

print("Create some sample users")
user1 = Model.signup_user(user_name="alice", created_at=datetime(2020, 1, 1))
user2 = Model.signup_user(user_name="bob", created_at=datetime(2020, 1, 2))
user3 = Model.signup_user(user_name="cathy", created_at=datetime(2020, 1, 3))
rprint(user1.to_dict())

print("Get user by user id")
rprint(Model.get_user("user-3").to_dict())

Create some sample users


Get user by user id


In [108]:
class Model(Model):
    _VIDEO_ID_STARTED = 0

    @classmethod
    def upload_video(
        cls,
        user_id: str,
        video_title: str,
        created_at: datetime,
    ) -> "Model":
        cls._VIDEO_ID_STARTED += 1

        video_model = cls(
            pk=f"video-{cls._VIDEO_ID_STARTED}",
            sk=ROOT,
            entity_type=EntityTypeEnum.VIDEO.value,
            video_title=video_title,
            video_creator_id=user_id,
            views=0,
            created_at=created_at,
        )
        video_model.save()
        return video_model

    @classmethod
    def get_video(cls, video_id: str) -> "Model":
        return cls.get(hash_key=video_id, range_key=ROOT)

    @classmethod
    def get_users_videos(cls, user_id: str, limit: int = 5):
        return cls.iter_query_index(
            index=cls.user_s_video_index,
            hash_key=user_id,
            scan_index_forward=False,
            limit=limit,
        )

video1 = Model.upload_video(user_id="user-1", video_title="video 1 title", created_at=datetime(2020, 2, 1))
video2 = Model.upload_video(user_id="user-2", video_title="video 2 title", created_at=datetime(2020, 2, 2))
video3 = Model.upload_video(user_id="user-2", video_title="video 3 title", created_at=datetime(2020, 2, 3))
video4 = Model.upload_video(user_id="user-3", video_title="video 4 title", created_at=datetime(2020, 2, 4))
video5 = Model.upload_video(user_id="user-3", video_title="video 5 title", created_at=datetime(2020, 2, 5))
video6 = Model.upload_video(user_id="user-3", video_title="video 6 title", created_at=datetime(2020, 2, 6))

print("Create some sample users")
rprint(video1.to_dict())

print("Get video by video id")
rprint(Model.get_video("video-2").to_dict())

print("Get user's videos by user id")
rprint(Model.get_users_videos("user-3").all())

Create some sample users


Get video by video id


Get user's videos by user id


In [109]:
class Model(Model):
    @classmethod
    def subscribe(
        cls,
        subscriber_id: str,
        publisher_id: str,
        created_at: datetime,
    ) -> "Model":
        subscription_model = cls(
            pk=f"subscriber-{subscriber_id}",
            sk=publisher_id,
            entity_type=EntityTypeEnum.SUBSCRIPTION,
            created_at=created_at,
        )
        subscription_model.save()

        cls(pk=publisher_id, sk=ROOT).update(actions=[
            cls.subscribers.set(cls.subscribers + 1)
        ])

        return subscription_model

    @classmethod
    def get_user_subscriptions(
        cls,
        user_id: str,
    ):
        return cls.iter_query(hash_key=f"subscriber-{user_id}")


print("Create some sample subscriptions")
Model.subscribe(subscriber_id="user-1", publisher_id="user-2", created_at=datetime(2020, 4, 1))
Model.subscribe(subscriber_id="user-1", publisher_id="user-3", created_at=datetime(2020, 4, 2))

print("Get subscribe list by user id")
rprint([i.to_dict() for i in Model.get_user_subscriptions("user-1").all()])

print("Get number of subscribers by user id")
user = Model.get_user("user-3")
rprint(f"User {user.pk} has {user.subscribers} subscribers")

Create some sample subscriptions
Get subscribe list by user id


Get number of subscribers by user id


In [110]:
class Model(Model):
    @classmethod
    def watch_video(
        cls,
        user_id: str,
        video_id: str,
    ):
        cls(pk=video_id, sk=ROOT).update(actions=[
            cls.views.set(cls.views + 1)
        ])
        view_activity = cls(pk=user_id, sk=video_id).save()
        return view_activity

    @classmethod
    def is_user_has_viewed(
        cls,
        user_id: str,
        video_id: str,
    ) -> bool:
        return cls.get_one_or_none(hash_key=user_id, range_key=video_id) is not None

print("Mock some view activities")
Model.watch_video(user_id="user-2", video_id="video-1")
Model.watch_video(user_id="user-3", video_id="video-1")

print("Get number of views of a video")
video = Model.get_video("video-1")
rprint(f"Video {video.pk} has {video.views} views")

print("Check if a user has viewed a video")
rprint("has user-2 viewed video-1: ", Model.is_user_has_viewed("user-2", "video-1"))
rprint("has user-2 viewed video-2: ", Model.is_user_has_viewed("user-2", "video-2"))

Mock some view activities
Get number of views of a video


Check if a user has viewed a video


In [111]:
class Model(Model):
    @classmethod
    def push_new_videos(
        cls,
        user_id: str,
        limit_per_publisher: int =3,
        max_push: int = 10,
    ):
        """
        :param limit: for each publisher, max number of videos to push
        """
        publishers = cls.get_user_subscriptions(user_id).all()
        new_videos = list()
        for publisher in publishers:
            videos = Model.get_users_videos(publisher.sk, limit=limit_per_publisher).all()
            for video in videos:
                if cls.is_user_has_viewed(user_id, video.pk) is False:
                    new_videos.append(video)
                    if len(new_videos) >= max_push:
                        return new_videos
        return new_videos


print("User 2 subscribe User 3")
Model.subscribe(subscriber_id="user-2", publisher_id="user-3", created_at=datetime(2020, 5, 1))
print("User 3 has three video: video 4, 5, 6")
print("User 2 has watched video 4 already")
Model.watch_video(user_id="user-2", video_id="video-4")
print("System should push video 5, 6 to user 2")
rprint([i.to_dict() for i in Model.push_new_videos("user-2")])

User 2 subscribe User 3
User 3 has three video: video 4, 5, 6
User 2 has watched video 4 already
System should push video 5, 6 to user 2
