@@ -0,0 +1,386 @@
/* @flow */

/*
* n.b. Although this spec is designed with DynamoDB in mind,
* it is an abstract spec that can be used with any document store
* that can emulate an index
*
* e.g. An implementation can be created using redis, using sorted sets for indices
* An implementation can be done in MongoDB, you know, if you're a "webscale" asshole
*/

import type { DocumentClient } from 'aws-sdk'
import type { $Id, $Weight } from '../Types'

import { DynamoDB } from 'aws-sdk'
import https from 'https'

import { assign, invariant } from '../utils'
import { Id } from '../Types'

/**
* G
*
* A graph is a persistent object G = (V, E, σ)
* where V is the set of vertices
* E is the set of edges
* σ is some object containing meta information
* such as the size of the graph, the last unique id, etc.
*/

type Table = "vertex"
| "edge"
| "system"

export const TABLE_VERTEX : Table = "vertex"
export const TABLE_EDGE : Table = "edge"
export const TABLE_SYSTEM : Table = "system"

/**
* In order to perform the desired graph operations with reasonable performance,
* we also assume the existence of the following indices:
*/

type Index
= "hk-weight-index" // where hk is of the form [id]:[edge_label]
| "label-key-index" // used for quick retrieval of significant vertices or mappings
| "label-updatedAt-index" // used for scanning the entire table for maintenance tasks

export const INDEX_ADJACENCY : Index = "hk-weight-index"
export const INDEX_VERTEX_KEY : Index = "label-key-index"
export const INDEX_VERTEX_ALL : Index = "label-updatedAt-index"

/*
* We define the graph type to be some closure containing a partial representation σ',
* as well as operations which read and write to the respective tables.
*/

export type Graph =
{ name : string // graph name
, region : Region // region
, env : Env // environment (non-production environments will log results)

, id : () => Promise<$Id> // generate a fresh id
, weight : () => Promise<$Weight> // generate a fresh weight

/*
* The graph exposes only batch read, query, and mutation operations,
* leaving the V, E, and Adj modules responsible for processing the data
*/

, batchGet:
( table: Table
, keys: [any]
) => Promise<[any]>

, batchPut:
( table: Table
, items: [any]
) => Promise<void>

, batchDel:
( table: Table
, keys: [any]
) => Promise<void>

, query:
( table: Table
, index: Index
, params: any
, limit: ?number
) => Promise<QueryResult<any>>

}

export type QueryResult<a> =
{ items: [a]
, count: number
, total?: number
}

/**
* This module exposes a graph constructor:
*
* G.define :: string -> GraphConfigs -> Graph
*
*/

type GraphConfigs =
{ env? : Env
, region? : Region
}

type Env
= "production"
| "beta"
| "development"

export const ENV_PRODUCTION : Env = "production"
export const ENV_BETA : Env = "beta"
export const ENV_DEVELOPMENT : Env = "development"

type Region
= "us-east-1"
| "us-west-1"
| "us-west-2"
| "ap-south-1"
| "ap-northeast-1"
| "ap-northeast-2"
| "ap-southeast-1"
| "ap-southeast-2"
| "eu-central-1"
| "eu-west-1"
| "sa-east-1"
| "local"

/*
* As well as a method
*
* G.generate :: Graph -> Promise<void>
*
* which ensures that all AWS resources are created
*
* // example usage:
*
* import { G } from 'dynamo-graph'
*
* const g = G.define
* ( 'my-graph-name'
* , { env: G.ENV_PRODUCTION
* , region:
* }
* )
*
* task = async () => {
* await G.generate(g)
* const id = await g.incrId()
* console.log('Generated id:', id)
* }
*/

// IMPLEMENTATIONS:

const graphs :
{ [key: string]:
{ graph : Graph
, env : Env
, region : Region
}
} = {}

export function define
( name: string
, { env = "development"
, region = "us-east-1"
} : GraphConfigs = {}
): Graph {

invariant
( validateName(name)
, 'Invalid character in graph name'
)

// if the name is already occupied, return a reference to the same graph
if (graphs[name]) {
const g = graphs[name]
invariant
( g.env === env && g.region === region
, `There already exists a distcint graph named "${name}"`
)
return g.graph
}

const client = dynamoClient(region)

const TABLES =
{ [TABLE_VERTEX]: `${name}-vertex`
, [TABLE_EDGE]: `${name}-edge`
, [TABLE_SYSTEM]: `${name}-system`
}

const graph =
{ name
, env
, region

, async id() {
const { Attributes } = await dynamo
( client
, 'update'
, { TableName: TABLES[TABLE_SYSTEM]
, Key: { key: 'id' }
, ...incrField('value')
}
)
return Id.fromNat(Attributes.value)
}

, async weight() {
const { Attributes } = await dynamo
( client
, 'update'
, { TableName: TABLES[TABLE_SYSTEM]
, Key: { key: 'weight' }
, ...incrField('value')
}
)
return Attributes.value
}

, async batchGet(table, keys) {

const TableName = TABLES[table]
// execute gets in parallel
const reads : Promise<any>[] =
chunk(keys, 100).map(async chunk => {
const { Responses } = await dynamo
( client
, 'batchGet'
, { RequestItems: { [TableName]: { Keys: chunk } }
}
)
return Responses[TableName]
})

const results : any[] = flatten(await Promise.all(reads))
const resultMap : { [key: string]: any } = {}

// Dynamo does not guarantee order in the response,
// so we must reproduce the array oureslves:

switch (table) {

case TABLE_VERTEX:
results.forEach(v => assign(resultMap, v.id, v))
return keys.map(({ id }) => resultMap[id])

case TABLE_EDGE:
results.forEach(e => assign(resultMap, `${e.hk}$${e.to}`, e))
return keys.map(({ hk, to }) => resultMap[`${hk}$${to}`])

case TABLE_SYSTEM:
results.forEach(o => assign(resultMap, o.key, o))
return keys.map(({ key }) => resultMap[key])

default:
return []
}
}

, async batchPut(table, items) {
const TableName : string = TABLES[table]
const chunks = chunk(items, 100)
// execute each mutation chunk in serial
for (let chunk of chunks) {
const requests = chunk.map(Item => ({ PutRequest: { Item } }))
await dynamo
( client
, 'batchWrite'
, { RequestItems: { [TableName]: requests } }
)
}
}

, async batchDel(table, keys) {
const TableName = TABLES[table]
const chunks = chunk(keys, 100)
// execute each mutation batch in serial
for (let chunk of chunks) {
const requests = chunk.map(Key => ({ DeleteRequest: { Key } }))
await dynamo
( client
, 'batchWrite'
, { RequestItems: { [TableName]: requests } }
)
}
}

// TODO: it would be nice to decouple the query params from dynamodb
// but for now we'll omit the possibility of multiple adapters
, async query(table, IndexName, params, limit) {
const TableName = TABLES[table]
const { Items: items, Count: count } =
await dynamo
( client
, 'query'
, { TableName, IndexName, Limit: limit, ...params }
)
if (!limit) return { items, count }

const { Count: total } =
await dynamo
( client
, 'query'
, { TableName, IndexName, Select: 'COUNT', ...params }
)
return { items, count, total }
}
}

graphs[name] = { graph, env, region }

return graph

}

export { default as generate } from './generate'

// HELPERS:

const INVALID_CHAR = /[^a-zA-Z0-9-_]/

const validateName = (name: string): mixed =>
name && !name.match(INVALID_CHAR)

function chunk<a>(arr: [a], n: number): [[a]] {
const chunks = []
for (let i = 0, j = arr.length; i < j; i += n)
chunks.push(arr.slice(i, i+n))
return chunks
}

function flatten<a>(arr: [[a]]): [a] {
return [].concat(...arr)
}

const httpOptions =
{ agent: new https.Agent
( { rejectUnauthorized: true
, secureProtocol: 'TLSv1_method'
, ciphers: 'ALL'
}
)
}

const dynamoClient = (region: Region): DocumentClient =>
new DynamoDB.DocumentClient
( { region // TODO: handle the local case
, httpOptions
}
)

const dynamo =
( client: DocumentClient
, job: string
, params: mixed
): Promise<any> =>
new Promise(
(resolve, reject) =>
client[job]
( params
, (err, data) =>
err ? reject(err)
: resolve(data)
)
)

const incrField =
( name: string
, amount: number = 1
) =>
( { UpdateExpression: 'SET #a = #a + :amount'
, ExpressionAttributeNames: { '#a': name }
, ExpressionAttributeValues: { ':amount': amount }
, ReturnValues: 'ALL_NEW'
}
)
@@ -0,0 +1,59 @@
/* @flow */

import type { $Id } from './index'

import { assign, invariant } from '../utils'

const ALPHABET : Array<string> =
'-0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz'
.split('')
.sort()

const CHAR_TO_DEC : { [key: string]: number } =
ALPHABET.reduce(assign, {})

const INVALID = /[^0-9a-zA-Z_-]/
const BASE : number = 64
const BASE_BITS : number = 6

const isNat = (n: number): boolean =>
Math.max(Math.floor(n), 1) == n

export function toNat(id: $Id): number {

invariant
( id
, `Id.toNat may not be called on the empty id`
)

invariant
( !id.match(INVALID)
, `Id.toNat called on some non-natural id "${id}", possibly a foreign key`
)

return id
.split('')
.reduce((n, c) => n * BASE + CHAR_TO_DEC[c], 0)

}

export function fromNat(n: number): $Id {

invariant
( isNat(n)
, `Id.fromNat called on n "${n}" ∉ ℤ+`
)

const length = Math.max(0, Math.floor(Math.log2(n) / BASE_BITS)) + 1
const chars = new Array(length)

let i = length - 1
while (n > 0) {
chars[i--] = ALPHABET[n % BASE]
n = Math.floor(n / BASE)
}
while (i >= 0)
chars[i--] = ALPHABET[0]

return chars.join('')
}
Empty file.
@@ -0,0 +1,83 @@
/* @flow */

/**
* Id
*
* We define an Id to be some string which uniquely identifies a vertex
*/

export type $Id = string

// The Id type has the following partial methods:
//
// Id.fromNat :: (id: $Id) -> number where `id` is a radix64 string
// Id.toNat :: (n: number) -> $Id where `n` is a positive integer
//
// both of which raise an error on invalid input

export * as Id from './Id'

/**
* Label
*
* A type representation that uniquely identifies the type of some element
*
* i.e. ∀x : x ∈ x.label
*/

export type $Label<a> = string

/**
* Key
*
* A unique string identifier for some typed element
*/

export type $Key<a> = string

/**
* Weight
*
* dynamo-graph describes a weighted graph, so the weight type is
* any ordered value which can be used to sort the index
*/

export type $Weight = number

/**
* Schema
*
* dynamo-graph will perform data validations before persisting an element
* We define a Schema type to describe the shape of data
*/

export type $Schema = { [key: string]: $Schema } | "NoAttrs"

/**
* Cursor and Page
*
* dynamo-graph follows the relay specification for pagination
*/

export type $Cursor = ForwardCursor
| BackwardCursor

type ForwardCursor =
{ direction: "forward"
, first?: number
, after?: number
}

type BackwardCursor =
{ direction: "backward"
, last?: number
, before?: number
}

export type $Page<a> =
{ items: [a]
, pageInfo:
{ hasNextPage: boolean
, hasPreviousPage: boolean
}
}
@@ -0,0 +1,204 @@
/* @flow */

import type { $Id, $Label, $Key } from '../Types'
import type { Graph, QueryResult } from '../G'

import { TABLE_SYSTEM
, TABLE_VERTEX
, INDEX_VERTEX_KEY
, INDEX_VERTEX_ALL
} from '../G'

import { invariant } from '../utils'

/**
* V
*
* A vertex is a tuple of an id, type representation, and attribute map:
*
* v = (id, label, λ: k -> v)
*
*/

type Vertex<a> =
{ id : $Id
, label : $Label<Vertex<a>>
, attrs : a

/*
* Such that the id uniquely identifies the vertex.
*
* i.e. ∀ v1,v2 ∈ V : v1.id = v2.id ⇒ v1 = v2
*
* For convenience, we augment this definition with two additional properties:
*
* key - unique string identifier such that the tuple (label, key) uniquely identifies a vertex:
* i.e. ∀ v1,v2 ∈ V : v1.key = v2.key ∧ v1.label = v2.label ⇒ v1 = v2
*
* updatedAt - timestamp of last mutation, used to track the state of the graph
* n.b. ideally mutations should be avoided in favour of duplicating and re-linking vertices
* in which case, updatedAt describes the time the vertex was created
*/

, key : ?$Key<Vertex<a>>
, updatedAt : number
}

/**
*
* Notice that vertices are paramterized over their type.
* We define a VertexDef object which describes the type of a vertex,
* such that the string representation `label` uniquey identifies the definition
*
*/

type VertexDef<a> =
{ label: $Label<Vertex<a>>
// TODO: schema/validation object
}

/**
* usage:
*
* import { G, V } from 'dynamo-graph'
*
* const g = G.define('my-graph')
* const PERSON = V.define('Person')
*
*/

const defs: { [key: string]: VertexDef<mixed> } = {}

export function define<a>(label: $Label<Vertex<a>>): VertexDef<a> {

invariant
( label
, 'Label must be non-empty'
)

if (defs[label])
return defs[label]

const def =
{ label
}

return defs[label] = def

}

/**
* Using this vertex definition, we have enough information to put a vertex to the graph
*/

// V.create :: Graph -> VertexDef a -> a -> Vertex a
export async function create<a>(g: Graph, def: VertexDef<a>, attrs: a): Promise<Vertex<a>> {
const id: $Id = await g.id()
return putVertex(g, def, id, attrs)
}
// V.put :: Graph -> VertexDef a -> Id -> a -> Vertex a
export async function put<a>(g: Graph, def: VertexDef<a>, id: $Id, attrs: a): Promise<Vertex<a>> {
return putVertex(g, def, id, attrs)
}
// V.putByKey :: Graph -> VertexDef a -> Key a -> a -> Vertex a
export async function putByKey<a>(g: Graph, def: VertexDef<a>, key: $Key<a>, attrs: a): Promise<Vertex<a>> {
const v: ?Vertex<a> = await findByKey(g, def, key)
const id: $Id = v ? v.id
: await g.id()
return putVertex(g, def, id, attrs, key)
}

async function putVertex<a>(g: Graph, { label }: VertexDef<a>, id: $Id, attrs: a, key: ?$Key<a>): Promise<Vertex<a>> {
const v: Vertex<a> =
{ id, label, attrs // copy attributes
, updatedAt: +Date.now() // ensure updatedAt field is changed with each mutation
, ...(key ? { key } : {}) // avoid undefined fields
}
await g.batchPut(TABLE_VERTEX, [v])
return v
}
/**
*
* Since the id uniquely identifies the vertex, there exists a mapping
*
* V.find :: Graph -> Id -> Vertex (∃ a) ?
* V.findMany :: Graph -> [ Id ] -> [ Vertex (∃ a) ? ]
*
*/

export async function find(g: Graph, id: $Id): Promise<?Vertex<mixed>> {
const [ v ]: [?Vertex<mixed>] = await findMany(g, [id])
return v
}

export async function findMany(g: Graph, ids: [$Id]): Promise<[?Vertex<mixed>]> {
const keys: { id: $Id }[] = ids.map(id => ({ id }))
const vertices: [?Vertex<mixed>] = await g.batchGet(TABLE_VERTEX, keys)
return vertices
}

/**
* And since the label-key pair uniquely identifies a vertex,
* there exists a similar mapping:
*
* V.findByKey :: Graph -> VertexDef a -> Key a -> Vertex a ?
*
* note however, that due to performance characteristics,
* this method should only be used for root fields, not for traversals
*/

export async function findByKey<a>
( g: Graph
, { label }: VertexDef<a>
, key: $Key<a>
): Promise<?Vertex<a>> {

const { items }: QueryResult<?Vertex<a>> =
await g.query
( TABLE_VERTEX
, INDEX_VERTEX_KEY
, { KeyConditions:
{ label: { ComparisonOperator: 'EQ', AttributeValueList: [ label ] }
, key: { ComparisonOperator: 'EQ', AttributeValueList: [ key ] }
}
, Limit: 1 // we do not want pagination data
}
)

/*
const test: QueryResult<?Vertex<a>> =
await g.query
( TABLE_VERTEX
, INDEX_VERTEX_ALL
, { KeyConditions:
{ label: { ComparisonOperator: 'EQ'
, AttributeValueList: [ label ]
}
, updatedAt: { ComparisonOperator: 'GT'
, AttributeValueList: [ 1469837733790 ]
}
}
}
, 10
)
console.log(test)
*/

return items[0]

}
/**
* Also recall that there exists a per-type index of all vertices,
* we expose this through a paginated method:
*
* V.all :: Graph -> VertexDef a -> Cursor -> Page (Vertex a)
*
*/

export async function all<a>(g: Graph, { label }: VertexDef<a>, pageInfo: any): Promise<void> {
}

This file was deleted.

@@ -1,5 +1,6 @@
/* @flow */

export * as G from './graph'
export * as G from './G'
export * as V from './V'

export const foo : string = 'foo'
export * as Types from './Types'

This file was deleted.

@@ -1,5 +1,10 @@
/* @flow */

export function assign<K, V>(obj: { [key: K]: V }, key: K, val: V): { [key: K]: V } {
obj[key] = val
return obj
}

export function invariant(condition: mixed, message: string) : void {
if (!condition)
throw new Error(message)

This file was deleted.

@@ -2,14 +2,10 @@ import test from 'ava'

import { G } from 'dynamo-graph'

const g = G.define
( 'dynamo-graph-test'
, { env: G.ENV_DEVELOPMENT
}
)
import g from '../helpers/g'

test
( 'graph generates'
( 'should produce a sensible output'
, t => {
t.is(g.name, 'dynamo-graph-test')
t.is(g.env, G.ENV_DEVELOPMENT)
@@ -35,15 +31,3 @@ test
t.is(G.define('dynamo-graph-test'), g)
}
)

// id increments

test
( '.incrId'
, async t => {
const ids = await Promise.all(
Array(10).fill(0).map(g.incrId)
)
t.pass()
}
)
@@ -0,0 +1,12 @@
import test from 'ava'

import { G } from 'dynamo-graph'
import g from '../helpers/g'

test
( 'should create the appropriate resources'
, async t => {
await G.generate(g)
// TODO: write this test
}
)
@@ -0,0 +1,24 @@
import test from 'ava'

import { G } from 'dynamo-graph'
import * as Gen from '../helpers/gen'
import g from '../helpers/g'

test
( 'generates fresh ids'
, async t => {

// generate
const ids = await Promise.all(
Gen.array(10).map(g.id)
)

// type check
ids.forEach(id => t.true(typeof id === 'string'))

// pairwise equality
for (let i = 0; i < 9; i++)
for (let j = i + 1; j < 10; j++)
t.not(ids[i], ids[j])
}
)
@@ -0,0 +1,23 @@
import test from 'ava'

import { G } from 'dynamo-graph'
import * as Gen from '../helpers/gen'
import g from '../helpers/g'

test
( 'generates fresh weights'
, async t => {

const weights = await Promise.all(
Gen.array(10).map(g.weight)
)

// type check
weights.forEach(w => t.true(typeof w === 'number'))

// pairwise quality
for (let i = 0; i < 9; i++)
for (let j = i + 1; j < 10; j++)
t.not(weights[i], weights[j])
}
)
@@ -0,0 +1,73 @@
import test from 'ava'

import { Types } from 'dynamo-graph'
import { Gen } from '../helpers'

const { Id } = Types

const genId = () => Gen.id(24)

// toNat

test
( 'Id.toNat rejects invalid strings'
, t => {
t.throws(() => Id.toNat(''))
t.throws(() => Id.toNat('article:10'))
t.is(Id.toNat('A'), 11)
}
)

test
( 'Id.toNat is complete over valid strings'
, t => {
const nats = Gen.array(100).map(genId).map(Id.toNat)
nats.forEach(nat => nat === 0 || t.truthy(nat))
}
)

// fromNat

test
( 'Id.fromNat fails on n ∉ ℤ+'
, t => {
t.throws(() => Id.fromNat(-1))
t.throws(() => Id.fromNat(0))
t.throws(() => Id.fromNat(11.5))
t.throws(() => Id.fromNat(-123.142))
t.is(Id.fromNat(11.0), 'A')
}
)

test
( 'Id.fromNat is complete over ℤ+'
, t => {
const ids = Gen.array(100).map(Gen.nat).map(Id.fromNat)
ids.forEach(id => t.truthy(id))
}
)

// correctness

test
( 'Id.toNat is the inverse of Id.fromNat'
, t => {
const ints = Gen.array(100).map(Gen.nat)
const ids = ints.map(Id.fromNat)
ints.forEach(
(i, idx) => t.is(i, Id.toNat(ids[idx]))
)
}
)

test
( 'Id.fromNat is a surjective and injective map'
, t => {
const ints = Gen.array(50).map(Gen.nat)
const ids = ints.map(Id.fromNat)
for (let i = 0; i < 50; i++)
for (let j = i; j < 50; j++)
if (ints[i] != ints[j] && ids[i] === ids[j])
t.fail(`Expected either ${ints[i]} === ${ints[j]} or ${ids[i]} != ${ids[j]}`)
}
)
@@ -0,0 +1,50 @@
import test from 'ava'

import { V } from 'dynamo-graph'
import { g } from '../helpers'

const CREATE = V.define
( 'Create'
, { foo: "string!"
, bar: "number"
}
)

test
( 'Creates a vertex'
, async t => {
const attrs =
{ foo: 'Foo'
, bar: 9
}
const v = await V.create(g, CREATE, attrs)
t.deepEqual(attrs, v.attrs)
t.is(v.label, 'Create')
const v$ = await V.find(g, v.id)
t.deepEqual(v, v$)
}
)

test
( 'Creates distinct vertices'
, async t => {
const attrs =
{ foo: 'Bar'
, bar: 7
}
const [ v1, v2 ] = await Promise.all(
[ V.create(g, CREATE, attrs)
, V.create(g, CREATE, attrs)
]
)
t.deepEqual(v1.attrs, v2.attrs)
t.is(v1.label, v2.label)
t.not(v1.id, v2.id)

const [ v1$, v2$ ] = await V.findMany(g, [ v1.id, v2.id ])

t.deepEqual(v1, v1$)
t.deepEqual(v2, v2$)
t.deepEqual(v1$.attrs, v2$.attrs)
}
)
@@ -0,0 +1,19 @@
import test from 'ava'

import { V } from 'dynamo-graph'

const DEFINED = V.define('Defined')

test
( 'Creates a vertex definition'
, t => {
t.is(DEFINED.label, 'Defined')
}
)

test
( 'Ensures there are no naming conflicts'
, t => {
t.is(DEFINED, V.define('Defined'))
}
)
@@ -0,0 +1,7 @@
import { G } from '../../src'

export default G.define
( 'dynamo-graph-test'
, { env: G.ENV_DEVELOPMENT
}
)
@@ -0,0 +1,16 @@
const NAT_CAP = 9223372036854775807
const ALPHABET = '-0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz'

const rand = Math.random

const toInt = n => Math.floor(n)

export const array = n => Array(Math.max(0, toInt(n))).fill(0)

export const int = n => toInt(rand() * n)

export const letter = () => ALPHABET[int(64)]

export const id = n => array(int(n - 1) + 1).map(letter).join('')

export const nat = () => int(NAT_CAP - 1) + 1
@@ -0,0 +1,2 @@
export { default as g } from './g'
export * as Gen from './gen'

This file was deleted.