Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 59 additions & 28 deletions services/github/pod-github/src/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { setMetadata } from '@hcengineering/platform'
import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server-storage'
import serverToken, { generateToken } from '@hcengineering/server-token'
import tracker from '@hcengineering/tracker'
import { Installation } from '@octokit/webhooks-types'
import { Installation, type InstallationCreatedEvent, type InstallationUnsuspendEvent } from '@octokit/webhooks-types'
import { Collection } from 'mongodb'
import { App, Octokit } from 'octokit'

Expand All @@ -38,12 +38,15 @@ import { registerLoaders } from './loaders'
import { createNotification } from './notifications'
import { errorToObj } from './sync/utils'
import { GithubIntegrationRecord, GithubUserRecord } from './types'
import { UserManager } from './users'
import { GithubWorker, syncUser } from './worker'

export interface InstallationRecord {
installationName: string
login: string
loginNodeId: string

repositories?: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories']
type: 'Bot' | 'User' | 'Organization'
octokit: Octokit
}
Expand All @@ -60,12 +63,14 @@ export class PlatformWorker {
mongoRef!: MongoClientReference

integrationCollection!: Collection<GithubIntegrationRecord>
usersCollection!: Collection<GithubUserRecord>

periodicTimer: any
periodicSyncPromise: Promise<void> | undefined

canceled = false

userManager!: UserManager

private constructor (
readonly ctx: MeasureContext,
readonly app: App,
Expand All @@ -82,7 +87,8 @@ export class PlatformWorker {

const db = mongoClient.db(config.ConfigurationDB)
this.integrationCollection = db.collection<GithubIntegrationRecord>('installations')
this.usersCollection = db.collection<GithubUserRecord>('users')

this.userManager = new UserManager(db.collection<GithubUserRecord>('users'))

const storageConfig = storageConfigFromEnv()
this.storageAdapter = buildStorageFromConfig(storageConfig, config.MongoURL)
Expand Down Expand Up @@ -165,7 +171,7 @@ export class PlatformWorker {
}

private async findUsersWorkspaces (): Promise<Map<string, GithubUserRecord[]>> {
const i = this.usersCollection.find({})
const i = this.userManager.getAllUsers()
const workspaces = new Map<string, GithubUserRecord[]>()
while (await i.hasNext()) {
const userInfo = await i.next()
Expand All @@ -178,19 +184,16 @@ export class PlatformWorker {
}
}
}
await i.close()
return workspaces
}

public async getUsers (workspace: string): Promise<GithubUserRecord[]> {
return await this.usersCollection
.find<GithubUserRecord>({
[`accounts.${workspace}`]: { $exists: true }
})
.toArray()
return await this.userManager.getUsers(workspace)
}

public async getUser (login: string): Promise<GithubUserRecord | undefined> {
return (await this.usersCollection.find<GithubUserRecord>({ _id: login }).toArray()).shift()
return await this.userManager.getAccount(login)
}

async mapInstallation (
Expand Down Expand Up @@ -263,8 +266,8 @@ export class PlatformWorker {
private async removeInstallationFromWorkspace (client: Client, installationId: number): Promise<void> {
const wsIntegerations = await client.findAll(github.class.GithubIntegration, { installationId })

const ops = new TxOperations(client, core.account.System)
for (const intValue of wsIntegerations) {
const ops = new TxOperations(client, core.account.System)
await ops.remove<GithubIntegration>(intValue)
}
}
Expand Down Expand Up @@ -347,12 +350,13 @@ export class PlatformWorker {
scope: resultJson.scope,
accounts: { [payload.workspace]: payload.accountId }
}
const [existingUser] = await this.usersCollection.find({ _id: user.data.login }).toArray()
if (existingUser === undefined) {
await this.usersCollection.insertOne(dta)
await this.userManager.updateUser(dta)
const existingUser = await this.userManager.getAccount(user.data.login)
if (existingUser == null) {
await this.userManager.insertUser(dta)
} else {
dta.accounts = { ...existingUser.accounts, [payload.workspace]: payload.accountId }
await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any)
await this.userManager.updateUser(dta)
}

// Update workspace client login info.
Expand Down Expand Up @@ -520,17 +524,17 @@ export class PlatformWorker {
auth.refreshTokenExpiresIn = dta.refreshTokenExpiresIn
auth.scope = dta.scope

await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any)
await this.userManager.updateUser(dta)
}
}
}

async getAccount (login: string): Promise<GithubUserRecord | undefined> {
return (await this.usersCollection.findOne({ _id: login })) ?? undefined
return await this.userManager.getAccount(login)
}

async getAccountByRef (workspace: string, ref: Ref<Account>): Promise<GithubUserRecord | undefined> {
return (await this.usersCollection.findOne({ [`accounts.${workspace}`]: ref })) ?? undefined
return await this.userManager.getAccountByRef(workspace, ref)
}

private async updateInstallation (installationId: number): Promise<void> {
Expand All @@ -544,6 +548,24 @@ export class PlatformWorker {
type: tinst.account?.type ?? 'User',
installationName: `${tinst.account?.html_url ?? ''}`
}
this.updateInstallationRecord(installationId, val)
}
}

private updateInstallationRecord (installationId: number, val: InstallationRecord): void {
const current = this.installations.get(installationId)
if (current !== undefined) {
if (val.octokit !== undefined) {
current.octokit = val.octokit
}
current.login = val.login
current.loginNodeId = val.loginNodeId
current.type = val.type
current.installationName = val.installationName
if (val.repositories !== undefined) {
current.repositories = val.repositories
}
} else {
this.installations.set(installationId, val)
}
}
Expand All @@ -558,24 +580,30 @@ export class PlatformWorker {
type: tinst.account?.type ?? 'User',
installationName: `${tinst.account?.html_url ?? ''}`
}
this.installations.set(install.installation.id, val)
this.updateInstallationRecord(install.installation.id, val)
ctx.info('Found installation', {
installationId: install.installation.id,
url: install.installation.account?.html_url ?? ''
})
}
}

async handleInstallationEvent (install: Installation, enabled: boolean): Promise<void> {
async handleInstallationEvent (
install: Installation,
repositories: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories'],
enabled: boolean
): Promise<void> {
this.ctx.info('handle integration add', { installId: install.id, name: install.html_url })
const okit = await this.app.getInstallationOctokit(install.id)
const iName = `${install.account.html_url ?? ''}`
this.installations.set(install.id, {

this.updateInstallationRecord(install.id, {
octokit: okit,
login: install.account.login,
type: install.account?.type ?? 'User',
loginNodeId: install.account.node_id,
installationName: iName
installationName: iName,
repositories
})

const worker = this.getWorker(install.id)
Expand Down Expand Up @@ -612,6 +640,9 @@ export class PlatformWorker {
if (integeration !== undefined) {
integeration.enabled = false
integeration.synchronized = new Set()

await this.removeInstallationFromWorkspace(worker._client, installId)

await worker._client.remove(integeration.integration)
}
worker.integrations.delete(installId)
Expand Down Expand Up @@ -797,11 +828,11 @@ export class PlatformWorker {
if (event.payload.action === 'revoked') {
const sender = event.payload.sender

const records = await this.usersCollection.find({ _id: sender.login }).toArray()
for (const r of records) {
await this.revokeUserAuth(r)
const record = await this.getAccount(sender.login)
if (record !== undefined) {
await this.revokeUserAuth(record)
await this.userManager.removeUser(sender.login)
}
await this.usersCollection.deleteOne({ _id: sender.login })
}
})

Expand Down Expand Up @@ -902,7 +933,7 @@ export class PlatformWorker {
case 'created':
case 'unsuspend': {
catchEventError(
this.handleInstallationEvent(payload.installation, true),
this.handleInstallationEvent(payload.installation, payload.repositories, true),
payload.action,
name,
id,
Expand All @@ -912,7 +943,7 @@ export class PlatformWorker {
}
case 'suspend': {
catchEventError(
this.handleInstallationEvent(payload.installation, false),
this.handleInstallationEvent(payload.installation, payload.repositories, false),
payload.action,
name,
id,
Expand Down
71 changes: 68 additions & 3 deletions services/github/pod-github/src/sync/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
//

import core, { Doc, DocData, DocumentUpdate, MeasureContext, TxOperations, generateId } from '@hcengineering/core'
import { Endpoints } from '@octokit/types'
import { Repository, RepositoryEvent } from '@octokit/webhooks-types'
import github, { DocSyncInfo, GithubIntegrationRepository, GithubProject } from '@hcengineering/github'
import { Endpoints } from '@octokit/types'
import {
Repository,
RepositoryEvent,
type InstallationCreatedEvent,
type InstallationUnsuspendEvent
} from '@octokit/webhooks-types'
import { App } from 'octokit'
import { DocSyncManager, ExternalSyncField, IntegrationContainer, IntegrationManager } from '../types'
import { collectUpdate } from './utils'
Expand Down Expand Up @@ -34,8 +39,67 @@ export class RepositorySyncMapper implements DocSyncManager {
return {}
}

async reloadRepositories (integration: IntegrationContainer): Promise<void> {
async reloadRepositories (
integration: IntegrationContainer,
repositories?: InstallationCreatedEvent['repositories'] | InstallationUnsuspendEvent['repositories']
): Promise<void> {
integration.synchronized.delete(syncReposKey)

if (repositories !== undefined) {
// We have a list of repositories, so we could create them if they are missing.
// Need to find all repositories, not only active, so passed repositories are not work.
const allRepositories = (
await this.provider.liveQuery.queryFind(github.class.GithubIntegrationRepository, {})
).filter((it) => it.attachedTo === integration.integration._id)

const allRepos: GithubIntegrationRepository[] = [...allRepositories]
for (const repository of repositories) {
const integrationRepo: GithubIntegrationRepository | undefined = allRepos.find(
(it) => it.repositoryId === repository.id
)

if (integrationRepo === undefined) {
// No integration repository found, we need to push one.
await this.client.addCollection(
github.class.GithubIntegrationRepository,
integration.integration.space,
integration.integration._id,
integration.integration._class,
'repositories',
{
nodeId: repository.node_id,
name: repository.name,
url: integration.installationName + '/' + repository.name,
repositoryId: repository.id,
enabled: true,
deleted: false,
archived: false,
fork: false,
forks: 0,
hasDiscussions: false,
hasDownloads: false,
hasIssues: false,
hasPages: false,
hasProjects: false,
hasWiki: false,
openIssues: 0,
private: repository.private,
size: 0,
stargazers: 0,
watchers: 0,
visibility: repository.private ? 'private' : 'public'
},
undefined, // id
Date.now(),
integration.integration.createdBy
)
this.ctx.info('Creating repository info document...', {
url: repository.full_name,
workspace: this.provider.getWorkspaceId().name
})
}
}
}
}

async handleEvent<T>(integration: IntegrationContainer, derivedClient: TxOperations, evt: T): Promise<void> {
Expand Down Expand Up @@ -191,6 +255,7 @@ export class RepositorySyncMapper implements DocSyncManager {
installationId: integration.installationId,
workspace: this.provider.getWorkspaceId().name
})

const iterable = this.app.eachRepository.iterator({ installationId: integration.installationId })

// Need to find all repositories, not only active, so passed repositories are not work.
Expand Down
69 changes: 69 additions & 0 deletions services/github/pod-github/src/users.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import type { Account, Ref } from '@hcengineering/core'
import type { Collection, FindCursor } from 'mongodb'
import type { GithubUserRecord } from './types'

export class UserManager {
userCache = new Map<string, GithubUserRecord>()
refUserCache = new Map<string, GithubUserRecord>()

constructor (readonly usersCollection: Collection<GithubUserRecord>) {}

public async getUsers (workspace: string): Promise<GithubUserRecord[]> {
return await this.usersCollection
.find<GithubUserRecord>({
[`accounts.${workspace}`]: { $exists: true }
})
.toArray()
}

async getAccount (login: string): Promise<GithubUserRecord | undefined> {
let res = this.userCache.get(login)
if (res !== undefined) {
return res
}
res = (await this.usersCollection.findOne({ _id: login })) ?? undefined
if (res !== undefined) {
if (this.userCache.size > 1000) {
this.userCache.clear()
}
this.userCache.set(login, res)
}
return res
}

async getAccountByRef (workspace: string, ref: Ref<Account>): Promise<GithubUserRecord | undefined> {
const key = `${workspace}.${ref}`
let rec = this.refUserCache.get(key)
if (rec !== undefined) {
return rec
}
rec = (await this.usersCollection.findOne({ [`accounts.${workspace}`]: ref })) ?? undefined
if (rec !== undefined) {
if (this.refUserCache.size > 1000) {
this.refUserCache.clear()
}
this.refUserCache.set(key, rec)
}
return rec
}

async updateUser (dta: GithubUserRecord): Promise<void> {
this.userCache.clear()
this.refUserCache.clear()
await this.usersCollection.updateOne({ _id: dta._id }, { $set: dta } as any)
}

async insertUser (dta: GithubUserRecord): Promise<void> {
await this.usersCollection.insertOne(dta)
}

async removeUser (login: string): Promise<void> {
this.userCache.clear()
this.refUserCache.clear()
await this.usersCollection.deleteOne({ _id: login })
}

getAllUsers (): FindCursor<GithubUserRecord> {
return this.usersCollection.find({})
}
}
Loading