Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

♻️ pass the Observable instance to the onFirstSubscribe callback #2539

Merged
merged 2 commits into from Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 1 addition & 3 deletions packages/core/src/browser/fetchObservable.ts
Expand Up @@ -38,7 +38,7 @@ export function initFetchObservable() {
}

function createFetchObservable() {
const observable = new Observable<FetchContext>(() => {
return new Observable<FetchContext>((observable) => {
if (!window.fetch) {
return
}
Expand Down Expand Up @@ -66,8 +66,6 @@ function createFetchObservable() {

return stop
})

return observable
}

function beforeSend(observable: Observable<FetchContext>, input: unknown, init?: RequestInit) {
Expand Down
4 changes: 1 addition & 3 deletions packages/core/src/browser/pageExitObservable.ts
Expand Up @@ -19,7 +19,7 @@ export interface PageExitEvent {
}

export function createPageExitObservable(configuration: Configuration): Observable<PageExitEvent> {
const observable = new Observable<PageExitEvent>(() => {
return new Observable<PageExitEvent>((observable) => {
const pagehideEnabled = isExperimentalFeatureEnabled(ExperimentalFeature.PAGEHIDE)
const { stop: stopListeners } = addEventListeners(
configuration,
Expand Down Expand Up @@ -60,8 +60,6 @@ export function createPageExitObservable(configuration: Configuration): Observab
stopBeforeUnloadListener()
}
})

return observable
}

export function isPageExitReason(reason: string | undefined): reason is PageExitReason {
Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/browser/xhrObservable.ts
Expand Up @@ -39,7 +39,7 @@ export function initXhrObservable(configuration: Configuration) {
}

function createXhrObservable(configuration: Configuration) {
const observable = new Observable<XhrContext>(() => {
return new Observable<XhrContext>((observable) => {
const { stop: stopInstrumentingStart } = instrumentMethodAndCallOriginal(XMLHttpRequest.prototype, 'open', {
before: openXhr,
})
Expand All @@ -60,7 +60,6 @@ function createXhrObservable(configuration: Configuration) {
stopInstrumentingAbort()
}
})
return observable
}

function openXhr(this: XMLHttpRequest, method: string, url: string | URL | undefined | null) {
Expand Down
4 changes: 1 addition & 3 deletions packages/core/src/domain/console/consoleObservable.ts
Expand Up @@ -33,7 +33,7 @@ export function resetConsoleObservable() {
}

function createConsoleObservable(api: ConsoleApiName) {
const observable = new Observable<ConsoleLog>(() => {
return new Observable<ConsoleLog>((observable) => {
const originalConsoleApi = globalConsole[api]

globalConsole[api] = (...params: unknown[]) => {
Expand All @@ -49,8 +49,6 @@ function createConsoleObservable(api: ConsoleApiName) {
globalConsole[api] = originalConsoleApi
}
})

return observable
}

function buildConsoleLog(params: unknown[], api: ConsoleApiName, handlingStack: string): ConsoleLog {
Expand Down
7 changes: 2 additions & 5 deletions packages/core/src/domain/report/reportObservable.ts
Expand Up @@ -39,7 +39,7 @@ export function initReportObservable(configuration: Configuration, apis: RawRepo
}

function createReportObservable(reportTypes: ReportType[]) {
const observable = new Observable<RawReport>(() => {
return new Observable<RawReport>((observable) => {
if (!window.ReportingObserver) {
return
}
Expand All @@ -60,19 +60,16 @@ function createReportObservable(reportTypes: ReportType[]) {
observer.disconnect()
}
})

return observable
}

function createCspViolationReportObservable(configuration: Configuration) {
const observable = new Observable<RawReport>(() => {
return new Observable<RawReport>((observable) => {
const { stop } = addEventListener(configuration, document, DOM_EVENT.SECURITY_POLICY_VIOLATION, (event) => {
observable.notify(buildRawReportFromCspViolation(event))
})

return stop
})
return observable
}

function buildRawReportFromReport(report: DeprecationReport | InterventionReport): RawReport {
Expand Down
8 changes: 8 additions & 0 deletions packages/core/src/tools/observable.spec.ts
Expand Up @@ -53,6 +53,14 @@ describe('observable', () => {
expect(onFirstSubscribe).toHaveBeenCalledTimes(1)
})

it('should pass the observable instance to the onFirstSubscribe callback', () => {
const onFirstSubscribe = jasmine.createSpy('callback')
observable = new Observable(onFirstSubscribe)
observable.subscribe(subscriber)

expect(onFirstSubscribe).toHaveBeenCalledWith(observable)
})

it('should execute onLastUnsubscribe callback', () => {
const onLastUnsubscribe = jasmine.createSpy('callback')
const otherSubscriber = jasmine.createSpy('sub2')
Expand Down
8 changes: 3 additions & 5 deletions packages/core/src/tools/observable.ts
Expand Up @@ -6,11 +6,11 @@ export class Observable<T> {
private observers: Array<(data: T) => void> = []
private onLastUnsubscribe?: () => void

constructor(private onFirstSubscribe?: () => (() => void) | void) {}
constructor(private onFirstSubscribe?: (observable: Observable<T>) => (() => void) | void) {}

subscribe(f: (data: T) => void): Subscription {
if (!this.observers.length && this.onFirstSubscribe) {
this.onLastUnsubscribe = this.onFirstSubscribe() || undefined
this.onLastUnsubscribe = this.onFirstSubscribe(this) || undefined
}
this.observers.push(f)
return {
Expand All @@ -29,12 +29,10 @@ export class Observable<T> {
}

export function mergeObservables<T>(...observables: Array<Observable<T>>) {
const globalObservable = new Observable<T>(() => {
return new Observable<T>((globalObservable) => {
const subscriptions: Subscription[] = observables.map((observable) =>
observable.subscribe((data) => globalObservable.notify(data))
)
return () => subscriptions.forEach((subscription) => subscription.unsubscribe())
})

return globalObservable
}
4 changes: 1 addition & 3 deletions packages/rum-core/src/browser/domMutationObservable.ts
Expand Up @@ -3,7 +3,7 @@ import { monitor, noop, Observable, getZoneJsOriginalValue } from '@datadog/brow
export function createDOMMutationObservable() {
const MutationObserver = getMutationObserverConstructor()

const observable: Observable<void> = new Observable<void>(() => {
return new Observable<void>((observable) => {
if (!MutationObserver) {
return
}
Expand All @@ -16,8 +16,6 @@ export function createDOMMutationObservable() {
})
return () => observer.disconnect()
})

return observable
}

type MutationObserverConstructor = new (callback: MutationCallback) => MutationObserver
Expand Down
30 changes: 15 additions & 15 deletions packages/rum-core/src/browser/locationChangeObservable.ts
Expand Up @@ -14,28 +14,28 @@ export interface LocationChange {

export function createLocationChangeObservable(configuration: RumConfiguration, location: Location) {
let currentLocation = shallowClone(location)
const observable = new Observable<LocationChange>(() => {

return new Observable<LocationChange>((observable) => {
const { stop: stopHistoryTracking } = trackHistory(configuration, onLocationChange)
const { stop: stopHashTracking } = trackHash(configuration, onLocationChange)

function onLocationChange() {
if (currentLocation.href === location.href) {
return
}
const newLocation = shallowClone(location)
observable.notify({
newLocation,
oldLocation: currentLocation,
})
currentLocation = newLocation
}

return () => {
stopHistoryTracking()
stopHashTracking()
}
})

function onLocationChange() {
if (currentLocation.href === location.href) {
return
}
const newLocation = shallowClone(location)
observable.notify({
newLocation,
oldLocation: currentLocation,
})
currentLocation = newLocation
}

return observable
}

function trackHistory(configuration: RumConfiguration, onHistoryChange: () => void) {
Expand Down
4 changes: 1 addition & 3 deletions packages/rum-core/src/browser/viewportObservable.ts
Expand Up @@ -16,16 +16,14 @@ export function initViewportObservable(configuration: RumConfiguration) {
}

export function createViewportObservable(configuration: RumConfiguration) {
const observable = new Observable<ViewportDimension>(() => {
return new Observable<ViewportDimension>((observable) => {
const { throttled: updateDimension } = throttle(() => {
observable.notify(getViewportDimension())
}, 200)

return addEventListener(configuration, window, DOM_EVENT.RESIZE, updateDimension, { capture: true, passive: true })
.stop
})

return observable
}

// excludes the width and height of any rendered classic scrollbar that is fixed to the visual viewport
Expand Down
Expand Up @@ -89,7 +89,7 @@ export function createScrollValuesObservable(
configuration: RumConfiguration,
throttleDuration = THROTTLE_SCROLL_DURATION
): Observable<ScrollValues> {
const observable = new Observable<ScrollValues>(() => {
return new Observable<ScrollValues>((observable) => {
function notify() {
observable.notify(computeScrollValues())
}
Expand All @@ -114,6 +114,4 @@ export function createScrollValuesObservable(
}
}
})

return observable
}
4 changes: 1 addition & 3 deletions packages/rum-core/src/domain/waitPageActivityEnd.ts
Expand Up @@ -120,7 +120,7 @@ export function createPageActivityObservable(
domMutationObservable: Observable<void>,
configuration: RumConfiguration
): Observable<PageActivityEvent> {
const observable = new Observable<PageActivityEvent>(() => {
return new Observable<PageActivityEvent>((observable) => {
const subscriptions: Subscription[] = []
let firstRequestIndex: undefined | number
let pendingRequestsCount = 0
Expand Down Expand Up @@ -171,8 +171,6 @@ export function createPageActivityObservable(
observable.notify({ isBusy: pendingRequestsCount > 0 })
}
})

return observable
}

function isExcludedUrl(configuration: RumConfiguration, requestUrl: string): boolean {
Expand Down