Skip to content
Permalink
Browse files

Fast atomics (#1263)

Motivation:

The existing Atomic class holds an UnsafeEmbeddedAtomic which holds an OpaquePointer to raw memory for the C atomic. This results in multiple allocations on init. The new NIOAtomic class uses ManagedBufferPointer which tail allocates memory for the C atomic as part of the NIOAtomic class allocation.

Modifications:

Created NIOAtomic class that uses ManagedBufferPointer to allocate memory for the C atomic. Created version of catmc_atomic_* C functions that expect memory to be allocated/deallocated by caller. Created NIOAtomicPrimitive protocol for the new version of catmc_atomic_* functions.

Result:

Purely additive. NIOAtomic is available as a replacement for Atomic which results in fewer memory allocations.
  • Loading branch information
2bjake authored and Lukasa committed Nov 28, 2019
1 parent abe5219 commit 3c879ebbafbd22ae8b66fcb910c92017adc1fb7a

Large diffs are not rendered by default.

@@ -0,0 +1,75 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2019 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#include <stdlib.h>
#include <stdatomic.h>
#include <stdbool.h>
#include <inttypes.h>
#include <stdio.h>

#include "../include/CNIOAtomics.h"
#include "cpp_magic.h"

#define MAKE(type) /*
*/ void catmc_nio_atomic_##type##_create_with_existing_storage(struct catmc_nio_atomic_##type *storage, type value) { /*
*/ atomic_init(&storage->value, value); /*
*/ } /*
*/ /*
*/ bool catmc_nio_atomic_##type##_compare_and_exchange(struct catmc_nio_atomic_##type *wrapper, type expected, type desired) { /*
*/ type expected_copy = expected; /*
*/ return atomic_compare_exchange_strong(&wrapper->value, &expected_copy, desired); /*
*/ } /*
*/ /*
*/ type catmc_nio_atomic_##type##_add(struct catmc_nio_atomic_##type *wrapper, type value) { /*
*/ return atomic_fetch_add_explicit(&wrapper->value, value, memory_order_relaxed); /*
*/ } /*
*/ /*
*/ type catmc_nio_atomic_##type##_sub(struct catmc_nio_atomic_##type *wrapper, type value) { /*
*/ return atomic_fetch_sub_explicit(&wrapper->value, value, memory_order_relaxed); /*
*/ } /*
*/ /*
*/ type catmc_nio_atomic_##type##_exchange(struct catmc_nio_atomic_##type *wrapper, type value) { /*
*/ return atomic_exchange_explicit(&wrapper->value, value, memory_order_relaxed); /*
*/ } /*
*/ /*
*/ type catmc_nio_atomic_##type##_load(struct catmc_nio_atomic_##type *wrapper) { /*
*/ return atomic_load_explicit(&wrapper->value, memory_order_relaxed); /*
*/ } /*
*/ /*
*/ void catmc_nio_atomic_##type##_store(struct catmc_nio_atomic_##type *wrapper, type value) { /*
*/ atomic_store_explicit(&wrapper->value, value, memory_order_relaxed); /*
*/ }

typedef signed char signed_char;
typedef signed short signed_short;
typedef signed int signed_int;
typedef signed long signed_long;
typedef signed long long signed_long_long;
typedef unsigned char unsigned_char;
typedef unsigned short unsigned_short;
typedef unsigned int unsigned_int;
typedef unsigned long unsigned_long;
typedef unsigned long long unsigned_long_long;
typedef long long long_long;

MAP(MAKE,EMPTY,
bool,
char, short, int, long, long_long,
signed_char, signed_short, signed_int, signed_long, signed_long_long,
unsigned_char, unsigned_short, unsigned_int, unsigned_long, unsigned_long_long,
int_least8_t, uint_least8_t,
int_least16_t, uint_least16_t,
int_least32_t, uint_least32_t,
int_least64_t, uint_least64_t
)
@@ -34,7 +34,7 @@ private struct SocketChannelLifecycleManager {
// MARK: properties
private let eventLoop: EventLoop
// this is queried from the Channel, ie. must be thread-safe
internal let isActiveAtomic: Atomic<Bool>
internal let isActiveAtomic: NIOAtomic<Bool>
// these are only to be accessed on the EventLoop
// have we seen the `.readEOF` notification
@@ -57,7 +57,7 @@ private struct SocketChannelLifecycleManager {

// MARK: API
// isActiveAtomic needs to be injected as it's accessed from arbitrary threads and `SocketChannelLifecycleManager` is usually held mutable
internal init(eventLoop: EventLoop, isActiveAtomic: Atomic<Bool>) {
internal init(eventLoop: EventLoop, isActiveAtomic: NIOAtomic<Bool>) {
self.eventLoop = eventLoop
self.isActiveAtomic = isActiveAtomic
}
@@ -213,7 +213,7 @@ class BaseSocketChannel<SocketType: BaseSocketProtocol>: SelectableChannel, Chan
internal let selectableEventLoop: SelectableEventLoop
private let addressesCached: AtomicBox<Box<(local:SocketAddress?, remote:SocketAddress?)>> = AtomicBox(value: Box((local: nil, remote: nil)))
private let bufferAllocatorCached: AtomicBox<Box<ByteBufferAllocator>>
private let isActiveAtomic: Atomic<Bool> = Atomic(value: false)
private let isActiveAtomic: NIOAtomic<Bool> = .makeAtomic(value: false)
private var _pipeline: ChannelPipeline! = nil // this is really a constant (set in .init) but needs `self` to be constructed and therefore a `var`. Do not change as this needs to accessed from arbitrary threads
// just a thread-safe way of having something to print about the socket from any thread
internal let socketDescription: String
@@ -1128,7 +1128,7 @@ extension EventLoopGroup {
}
}

private let nextEventLoopGroupID = Atomic(value: 0)
private let nextEventLoopGroupID = NIOAtomic.makeAtomic(value: 0)

/// Called per `NIOThread` that is created for an EventLoop to do custom initialization of the `NIOThread` before the actual `EventLoop` is run on it.
typealias ThreadInitializer = (NIOThread) -> Void
@@ -1156,7 +1156,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {

private static let threadSpecificEventLoop = ThreadSpecificVariable<SelectableEventLoop>()

private let index = Atomic<Int>(value: 0)
private let index = NIOAtomic<Int>.makeAtomic(value: 0)
private let eventLoops: [SelectableEventLoop]
private let shutdownLock: Lock = Lock()
private var runState: RunState = .running
@@ -360,7 +360,7 @@ final class PendingDatagramWritesManager: PendingWritesManager {
private var state = PendingDatagramWritesState()

internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
internal let channelWritabilityFlag: Atomic<Bool> = Atomic(value: true)
internal let channelWritabilityFlag: NIOAtomic<Bool> = .makeAtomic(value: true)
internal var writeSpinCount: UInt = 16
private(set) var isOpen = true

@@ -277,7 +277,7 @@ final class PendingStreamWritesManager: PendingWritesManager {
private var storageRefs: UnsafeMutableBufferPointer<Unmanaged<AnyObject>>

internal var waterMark: ChannelOptions.Types.WriteBufferWaterMark = ChannelOptions.Types.WriteBufferWaterMark(low: 32 * 1024, high: 64 * 1024)
internal let channelWritabilityFlag: Atomic<Bool> = Atomic(value: true)
internal let channelWritabilityFlag: NIOAtomic<Bool> = .makeAtomic(value: true)

internal var writeSpinCount: UInt = 16

@@ -451,7 +451,7 @@ internal protocol PendingWritesManager {
var isFlushPending: Bool { get }
var writeSpinCount: UInt { get }
var currentBestWriteMechanism: WriteMechanism { get }
var channelWritabilityFlag: Atomic<Bool> { get }
var channelWritabilityFlag: NIOAtomic<Bool> { get }
}

extension PendingWritesManager {

0 comments on commit 3c879eb

Please sign in to comment.
You can’t perform that action at this time.