Skip to content

Commit

Permalink
ddex: multi tenant (#8401)
Browse files Browse the repository at this point in the history
  • Loading branch information
stereosteve committed May 9, 2024
1 parent c7e9b46 commit a84cf8f
Show file tree
Hide file tree
Showing 17 changed files with 499 additions and 348 deletions.
1 change: 1 addition & 0 deletions packages/ddex/processor/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
s3stuff
*.db*
sources.json
6 changes: 6 additions & 0 deletions packages/ddex/processor/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"editor.codeActionsOnSave": {
"source.fixAll": "always",
"source.organizeImports": "always"
}
}
41 changes: 14 additions & 27 deletions packages/ddex/processor/cli.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import 'dotenv/config'

import { program } from 'commander'
import { parseDelivery, reParsePastXml } from './src/parseDelivery'
import { cleanupFiles } from './src/cleanupFiles'
import { parseDelivery, reParsePastXml } from './src/parseDelivery'
import { publishValidPendingReleases } from './src/publishRelease'
import { pollS3 } from './src/s3poller'
import {
deleteRelease,
publishValidPendingReleases,
} from './src/publishRelease'
import { sync } from './src/s3sync'
import { startServer } from './src/server'
import { sources } from './src/sources'
import { sleep } from './src/util'
import { releaseRepo } from './src/db'
import { createSdkService } from './src/sdk'

sources.load()

program
.name('ddexer')
Expand All @@ -23,30 +21,19 @@ program
program
.command('parse')
.description('Parse DDEX xml and print results')
.argument('<source>', 'source name to use')
.argument('<path>', 'path to ddex xml file')
.action(async (p) => {
const releases = await parseDelivery(p)
.action(async (source, p) => {
const releases = await parseDelivery(source, p)
console.log(JSON.stringify(releases, undefined, 2))
})

program
.command('publish')
.description('Publish any valid deliveries')
.option('--republish', 'update already published releases')
.action(async (opts) => {
.action(async () => {
await reParsePastXml()
await publishValidPendingReleases(opts)
})

program
.command('delete')
.description('Take down a published release')
.argument('<id>', 'release id')
.action(async (id) => {
// find release and delete it
const release = releaseRepo.get(id)
const sdk = (await createSdkService()).getSdk()
await deleteRelease(sdk, release!)
await publishValidPendingReleases()
})

program
Expand All @@ -68,31 +55,31 @@ program
program
.command('server')
.description('start server without background processes, useful for dev')
.action(async (opts) => {
.action(async () => {
startServer()
})

program
.command('worker')
.description('start background processes, useful for dev')
.action(async (opts) => {
.action(async () => {
startWorker()
})

program
.command('start')
.description('Start both server + background processes')
.action(async (opts) => {
.action(async () => {
startServer()
startWorker()
})

program.command('cleanup').description('remove temp files').action(cleanupFiles)

program.parse()
const globalOptions = program.opts()

async function startWorker() {
// eslint-disable-next-line no-constant-condition
while (true) {
await sleep(3_000)
console.log('polling...')
Expand Down
14 changes: 14 additions & 0 deletions packages/ddex/processor/sources.example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"sources": [
{
"env": "staging",
"name": "",
"ddexKey": "",
"ddexSecret": "",
"awsKey": "",
"awsSecret": "",
"awsRegion": "",
"awsBucket": ""
}
]
}
12 changes: 12 additions & 0 deletions packages/ddex/processor/sources.test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"sources": [
{
"name": "crudTest",
"ddexKey": "crudTestKey"
},
{
"name": "dealTest",
"ddexKey": "dealTestKey"
}
]
}
64 changes: 55 additions & 9 deletions packages/ddex/processor/src/db.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import sql, { Database } from '@radically-straightforward/sqlite'
import { DDEXRelease, DDEXReleaseIds } from './parseDelivery'
import { Statement } from 'better-sqlite3'
import { DDEXRelease, DDEXReleaseIds } from './parseDelivery'

const dbLocation = process.env.SQLITE_URL || 'data/dev.db'
const db = new Database(dbLocation)
Expand All @@ -11,20 +11,24 @@ db.migrate(
sql`
create table if not exists xmls (
source text not null,
xmlUrl text primary key,
xmlText text not null,
messageTimestamp text not null,
createdAt DATETIME DEFAULT CURRENT_TIMESTAMP
);
create table if not exists users (
id text primary key,
apiKey text, -- app that user authorized
id text,
handle text not null,
name text not null,
createdAt DATETIME DEFAULT CURRENT_TIMESTAMP
createdAt DATETIME DEFAULT CURRENT_TIMESTAMP,
primary key (apiKey, id)
);
create table if not exists releases (
source text not null,
key text primary key,
ref text,
xmlUrl text,
Expand Down Expand Up @@ -55,13 +59,15 @@ create table if not exists s3markers (
)

export type XmlRow = {
source: string
xmlText: string
xmlUrl: string
messageTimestamp: string
createdAt: string
}

export type UserRow = {
apiKey: string
id: string
handle: string
name: string
Expand All @@ -78,11 +84,13 @@ export enum ReleaseProcessingStatus {
}

export type ReleaseRow = {
source: string
key: string
xmlUrl: string
messageTimestamp: string
json: string
status: ReleaseProcessingStatus
createdAt: string

entityType?: 'track' | 'album'
entityId?: string
Expand Down Expand Up @@ -126,13 +134,23 @@ export const userRepo = {
return db.all<UserRow>(sql`select * from users`)
},

find(example: Partial<UserRow>) {
return dbSelect('users', example) as UserRow[]
},

findOne(example: Partial<UserRow>) {
return dbSelectOne('users', example) as UserRow | undefined
},

upsert(user: Partial<UserRow>) {
dbUpsert('users', user)
},

match(artistNames: string[]) {
match(apiKey: string, artistNames: string[]) {
const artistSet = new Set(artistNames.map(lowerAscii))
const users = db.all<UserRow>(sql`select * from users`)
const users = db.all<UserRow>(
sql`select * from users where apiKey = ${apiKey}`
)
for (const u of users) {
if (
artistSet.has(lowerAscii(u.name)) ||
Expand Down Expand Up @@ -210,7 +228,7 @@ export const releaseRepo = {
$${ifdef(params.status, sql` and status = ${params.status} `)}
order by xmlUrl, ref
order by createdAt
`)

for (const row of rows) {
Expand All @@ -233,7 +251,12 @@ export const releaseRepo = {
},

upsert: db.transaction(
(xmlUrl: string, messageTimestamp: string, release: DDEXRelease) => {
(
source: string,
xmlUrl: string,
messageTimestamp: string,
release: DDEXRelease
) => {
const key = releaseRepo.chooseReleaseId(release.releaseIds)
const prior = releaseRepo.get(key)
const json = JSON.stringify(release)
Expand All @@ -256,6 +279,7 @@ export const releaseRepo = {
: ReleaseProcessingStatus.PublishPending

dbUpsert('releases', {
source,
key,
status,
ref: release.ref,
Expand All @@ -268,7 +292,12 @@ export const releaseRepo = {
),

markForDelete: db.transaction(
(xmlUrl: string, messageTimestamp: string, releaseIds: DDEXReleaseIds) => {
(
source: string,
xmlUrl: string,
messageTimestamp: string,
releaseIds: DDEXReleaseIds
) => {
// here we do PK lookup using the "best" id
// but we may need to try to find by all the different releaseIds
// if it's not consistent
Expand All @@ -288,6 +317,7 @@ export const releaseRepo = {
releaseRepo.update({
key,
status: ReleaseProcessingStatus.DeletePending,
source,
xmlUrl,
messageTimestamp,
})
Expand Down Expand Up @@ -320,6 +350,22 @@ function toStmt(rawSql: string) {
return stmtCache[rawSql]
}

export function dbSelect(table: string, data: Record<string, any>) {
const wheres = Object.keys(data)
.map((k) => ` ${k} = ? `)
.join(' AND ')
const rawSql = `select * from ${table} where ${wheres}`
return toStmt(rawSql).all(...Object.values(data))
}

export function dbSelectOne(table: string, data: Record<string, any>) {
const wheres = Object.keys(data)
.map((k) => ` ${k} = ? `)
.join(' AND ')
const rawSql = `select * from ${table} where ${wheres}`
return toStmt(rawSql).get(...Object.values(data))
}

export function dbUpdate(
table: string,
pkField: string,
Expand Down Expand Up @@ -354,5 +400,5 @@ function dbUpsert(table: string, data: Record<string, any>) {
}

function ifdef(obj: any, snippet: any) {
return Boolean(obj) ? snippet : sql``
return obj ? snippet : sql``
}
9 changes: 7 additions & 2 deletions packages/ddex/processor/src/parseDelivery.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { expect, test } from 'vitest'
import { beforeAll, expect, test } from 'vitest'
import {
DDEXRelease,
DealEthGated,
Expand All @@ -7,10 +7,15 @@ import {
DealSolGated,
parseDdexXmlFile,
} from './parseDelivery'
import { readFileSync } from 'node:fs'
import { sources } from './sources'

beforeAll(async () => {
sources.load('./sources.test.json')
})

test('deal types', async () => {
const releases = (await parseDdexXmlFile(
'dealTest',
'fixtures/dealTypes.xml'
)) as DDEXRelease[]

Expand Down

0 comments on commit a84cf8f

Please sign in to comment.