Skip to content

Commit fa78ca9

Browse files
committed
working datapath draft changes
1 parent 39628b3 commit fa78ca9

22 files changed

+2623
-1231
lines changed

src/core/worker.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,10 @@ QuicWorkerInitialize(
9595
Worker->ExecutionContext.NextTimeUs = UINT64_MAX;
9696
Worker->ExecutionContext.Ready = TRUE;
9797

98-
#ifndef _KERNEL_MODE // Not supported on kernel mode
9998
if (ExecProfile != QUIC_EXECUTION_PROFILE_TYPE_MAX_THROUGHPUT) {
10099
Worker->IsExternal = TRUE;
101100
CxPlatAddExecutionContext(&Worker->ExecutionContext, PartitionIndex);
102-
} else
103-
#endif // _KERNEL_MODE
104-
{
101+
} else {
105102
const uint16_t ThreadFlags =
106103
ExecProfile == QUIC_EXECUTION_PROFILE_TYPE_REAL_TIME ?
107104
CXPLAT_THREAD_FLAG_SET_AFFINITIZE : CXPLAT_THREAD_FLAG_NONE;

src/inc/quic_platform.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -463,10 +463,6 @@ typedef struct CXPLAT_EXECUTION_CONTEXT {
463463

464464
} CXPLAT_EXECUTION_CONTEXT;
465465

466-
#ifdef _KERNEL_MODE // Not supported on kernel mode
467-
#define CxPlatAddExecutionContext(Context, IdealProcessor) CXPLAT_FRE_ASSERT(FALSE)
468-
#define CxPlatWakeExecutionContext(Context) CXPLAT_FRE_ASSERT(FALSE)
469-
#else
470466
void
471467
CxPlatAddExecutionContext(
472468
_Inout_ CXPLAT_EXECUTION_CONTEXT* Context,
@@ -477,7 +473,6 @@ void
477473
CxPlatWakeExecutionContext(
478474
_In_ CXPLAT_EXECUTION_CONTEXT* Context
479475
);
480-
#endif
481476

482477
//
483478
// The "type" of the completion queue event is stored as the first uint32_t of

src/inc/quic_platform_winkernel.h

Lines changed: 110 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -461,8 +461,12 @@ _CxPlatEventWaitWithTimeout(
461461
)
462462
{
463463
LARGE_INTEGER Timeout100Ns;
464-
Timeout100Ns.QuadPart = Int32x32To64(TimeoutMs, -10000);
465-
return KeWaitForSingleObject(Event, Executive, KernelMode, FALSE, &Timeout100Ns);
464+
if (TimeoutMs == 0xffffffff) {
465+
return KeWaitForSingleObject(Event, Executive, KernelMode, FALSE, NULL);
466+
} else {
467+
Timeout100Ns.QuadPart = Int32x32To64(TimeoutMs, -10000);
468+
return KeWaitForSingleObject(Event, Executive, KernelMode, FALSE, &Timeout100Ns);
469+
}
466470
}
467471
#define CxPlatEventWaitWithTimeout(Event, TimeoutMs) \
468472
(STATUS_SUCCESS == _CxPlatEventWaitWithTimeout(&Event, TimeoutMs))
@@ -471,16 +475,35 @@ _CxPlatEventWaitWithTimeout(
471475
// Event Queue Interfaces
472476
//
473477

474-
typedef KEVENT CXPLAT_EVENTQ; // Event queue
475-
typedef void* CXPLAT_CQE;
478+
typedef struct CXPLAT_EVENTQ {
479+
CXPLAT_DISPATCH_LOCK Lock;
480+
LIST_ENTRY Events;
481+
CXPLAT_EVENT EventsAvailable;
482+
} CXPLAT_EVENTQ;
483+
484+
typedef struct CXPLAT_CQE {
485+
void* UserData;
486+
} CXPLAT_CQE;
487+
488+
#define CXPLAT_SQE CXPLAT_SQE
489+
#define CXPLAT_SQE_DEFAULT {0}
490+
typedef struct CXPLAT_SQE {
491+
void* UserData;
492+
int Overlapped; // Used as the completion context to platform IO routines.
493+
LIST_ENTRY Link;
494+
BOOLEAN IsQueued; // Prevent double queueing.
495+
} CXPLAT_SQE;
476496

477497
inline
478498
BOOLEAN
479499
CxPlatEventQInitialize(
480500
_Out_ CXPLAT_EVENTQ* queue
481501
)
482502
{
483-
KeInitializeEvent(queue, SynchronizationEvent, FALSE);
503+
CxPlatZeroMemory(queue, sizeof(*queue));
504+
CxPlatDispatchLockInitialize(&queue->Lock);
505+
InitializeListHead(&queue->Events);
506+
CxPlatEventInitialize(&queue->EventsAvailable, TRUE, FALSE);
484507
return TRUE;
485508
}
486509

@@ -489,24 +512,55 @@ void
489512
CxPlatEventQCleanup(
490513
_In_ CXPLAT_EVENTQ* queue
491514
)
515+
{
516+
CxPlatEventUninitialize(queue->EventsAvailable);
517+
CXPLAT_DBG_ASSERT(IsListEmpty(&queue->Events));
518+
CxPlatDispatchLockUninitialize(&queue->Lock);
519+
}
520+
521+
inline
522+
BOOLEAN
523+
CxPlatEventQAssociateHandle(
524+
_In_ CXPLAT_EVENTQ* queue,
525+
_In_ HANDLE fileHandle
526+
)
492527
{
493528
UNREFERENCED_PARAMETER(queue);
529+
UNREFERENCED_PARAMETER(fileHandle);
530+
return FALSE;
494531
}
495532

496533
inline
497534
BOOLEAN
498-
_CxPlatEventQEnqueue(
535+
CxPlatEventQEnqueue(
499536
_In_ CXPLAT_EVENTQ* queue,
537+
_In_ CXPLAT_SQE* sqe,
500538
_In_opt_ void* user_data
501539
)
502540
{
503-
UNREFERENCED_PARAMETER(user_data);
504-
KeSetEvent(queue, IO_NO_INCREMENT, FALSE);
541+
BOOLEAN SignalEvent;
542+
543+
CxPlatDispatchLockAcquire(&queue->Lock);
544+
545+
if (sqe->IsQueued) {
546+
CxPlatDispatchLockRelease(&queue->Lock);
547+
return TRUE;
548+
}
549+
550+
sqe->IsQueued = TRUE;
551+
sqe->UserData = user_data;
552+
SignalEvent = IsListEmpty(&queue->Events);
553+
InsertTailList(&queue->Events, &sqe->Link);
554+
555+
CxPlatDispatchLockRelease(&queue->Lock);
556+
557+
if (SignalEvent) {
558+
CxPlatEventSet(queue->EventsAvailable);
559+
}
560+
505561
return TRUE;
506562
}
507563

508-
#define CxPlatEventQEnqueue(queue, sqe, user_data) _CxPlatEventQEnqueue(queue, user_data)
509-
510564
inline
511565
uint32_t
512566
CxPlatEventQDequeue(
@@ -516,9 +570,40 @@ CxPlatEventQDequeue(
516570
_In_ uint32_t wait_time // milliseconds
517571
)
518572
{
519-
UNREFERENCED_PARAMETER(count);
520-
*events = NULL;
521-
return STATUS_SUCCESS == _CxPlatEventWaitWithTimeout(queue, wait_time) ? 1 : 0;
573+
LIST_ENTRY* Entry;
574+
uint32_t EventsDequeued = 0;
575+
576+
// TODO: this fn signature needs better SAL
577+
RtlZeroMemory(events, sizeof(*events));
578+
579+
CxPlatEventReset(queue->EventsAvailable);
580+
581+
CxPlatDispatchLockAcquire(&queue->Lock);
582+
583+
if (IsListEmpty(&queue->Events)) {
584+
CxPlatDispatchLockRelease(&queue->Lock);
585+
CxPlatEventWaitWithTimeout(queue->EventsAvailable, wait_time);
586+
CxPlatDispatchLockAcquire(&queue->Lock);
587+
}
588+
589+
while (EventsDequeued < count && !IsListEmpty(&queue->Events)) {
590+
Entry = RemoveHeadList(&queue->Events);
591+
CXPLAT_SQE* Sqe = CXPLAT_CONTAINING_RECORD(Entry, CXPLAT_SQE, Link);
592+
// TODO: this fn signature needs better SAL
593+
#pragma warning(push)
594+
#pragma warning(disable:6386)
595+
events[EventsDequeued].UserData = Sqe->UserData;
596+
#pragma warning(pop)
597+
598+
CXPLAT_DBG_ASSERT(Sqe->IsQueued);
599+
Sqe->IsQueued = FALSE;
600+
601+
EventsDequeued++;
602+
}
603+
604+
CxPlatDispatchLockRelease(&queue->Lock);
605+
606+
return EventsDequeued;
522607
}
523608

524609
inline
@@ -538,9 +623,17 @@ CxPlatCqeUserData(
538623
_In_ const CXPLAT_CQE* cqe
539624
)
540625
{
541-
return *cqe;
626+
return cqe->UserData;
542627
}
543628

629+
typedef struct DATAPATH_SQE DATAPATH_SQE;
630+
631+
void
632+
CxPlatDatapathSqeInitialize(
633+
_Out_ DATAPATH_SQE* DatapathSqe,
634+
_In_ uint32_t CqeType
635+
);
636+
544637
//
545638
// Time Measurement Interfaces
546639
//
@@ -711,7 +804,9 @@ CxPlatSleep(
711804
KeWaitForSingleObject(&SleepTimer, Executive, KernelMode, FALSE, NULL);
712805
}
713806

714-
#define CxPlatSchedulerYield() // no-op
807+
#define CxPlatSchedulerYield() \
808+
LARGE_INTEGER Timeout = {0}; \
809+
KeDelayExecutionThread(KernelMode, FALSE, &Timeout)
715810

716811
//
717812
// Create Thread Interfaces

src/platform/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ if("${CX_PLATFORM}" STREQUAL "windows")
2020
${SYSTEM_PROCESSOR} STREQUAL "arm64ec")
2121
set(SOURCES ${SOURCES} datapath_raw_dummy.c)
2222
else()
23-
set(SOURCES ${SOURCES} datapath_raw_win.c datapath_raw_socket.c datapath_raw_socket_win.c datapath_raw_xdp_win.c)
23+
set(SOURCES ${SOURCES} datapath_raw_win.c datapath_raw_socket.c datapath_raw_socket_win.c datapath_raw_xdp_win.c datapath_raw_xdp_winuser.c)
2424
endif()
2525
else()
2626
set(SOURCES ${SOURCES} inline.c platform_posix.c storage_posix.c cgroup.c)

src/platform/datapath_raw.h

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,9 @@ typedef struct CXPLAT_SOCKET_RAW {
244244
CXPLAT_HASHTABLE_ENTRY Entry;
245245
CXPLAT_RUNDOWN_REF Rundown;
246246
CXPLAT_DATAPATH_RAW* RawDatapath;
247+
#ifndef _KERNEL_MODE
247248
SOCKET AuxSocket;
249+
#endif
248250
BOOLEAN Wildcard; // Using a wildcard local address. Optimization
249251
// to avoid always reading LocalAddress.
250252
uint8_t CibirIdLength; // CIBIR ID length. Value of 0 indicates CIBIR isn't used
@@ -374,14 +376,14 @@ CxPlatFramingWriteHeaders(
374376
#pragma pack(push)
375377
#pragma pack(1)
376378

377-
typedef struct ETHERNET_HEADER {
379+
typedef struct CXPLAT_ETHERNET_HEADER {
378380
uint8_t Destination[6];
379381
uint8_t Source[6];
380382
uint16_t Type;
381383
uint8_t Data[0];
382-
} ETHERNET_HEADER;
384+
} CXPLAT_ETHERNET_HEADER;
383385

384-
typedef struct IPV4_HEADER {
386+
typedef struct CXPLAT_IPV4_HEADER {
385387
uint8_t VersionAndHeaderLength;
386388
union {
387389
uint8_t TypeOfServiceAndEcnField;
@@ -396,20 +398,20 @@ typedef struct IPV4_HEADER {
396398
uint8_t TimeToLive;
397399
uint8_t Protocol;
398400
uint16_t HeaderChecksum;
399-
uint8_t Source[4];
400-
uint8_t Destination[4];
401+
uint8_t SourceAddress[4];
402+
uint8_t DestinationAddress[4];
401403
uint8_t Data[0];
402-
} IPV4_HEADER;
404+
} CXPLAT_IPV4_HEADER;
403405

404-
typedef struct IPV6_HEADER {
406+
typedef struct CXPLAT_IPV6_HEADER {
405407
uint32_t VersionClassEcnFlow;
406408
uint16_t PayloadLength;
407409
uint8_t NextHeader;
408410
uint8_t HopLimit;
409-
uint8_t Source[16];
410-
uint8_t Destination[16];
411+
uint8_t SourceAddress[16];
412+
uint8_t DestinationAddress[16];
411413
uint8_t Data[0];
412-
} IPV6_HEADER;
414+
} CXPLAT_IPV6_HEADER;
413415

414416
typedef struct IPV6_EXTENSION {
415417
uint8_t NextHeader;
@@ -456,11 +458,16 @@ typedef struct TCP_HEADER {
456458
#define TH_CWR 0x80
457459

458460
#define IPV4_VERSION 4
461+
#ifndef _KERNEL_MODE
459462
#define IPV6_VERSION 6
463+
#endif
460464
#define IPV4_VERSION_BYTE (IPV4_VERSION << 4)
461-
#define IPV4_DEFAULT_VERHLEN ((IPV4_VERSION_BYTE) | (sizeof(IPV4_HEADER) / sizeof(uint32_t)))
465+
#ifndef _KERNEL_MODE
466+
#define IPV4_DEFAULT_VERHLEN ((IPV4_VERSION_BYTE) | (sizeof(CXPLAT_IPV4_HEADER) / sizeof(uint32_t)))
467+
#endif
462468

463469
#define IP_DEFAULT_HOP_LIMIT 128
464470

465-
#define ETHERNET_TYPE_IPV4 0x0008
466-
#define ETHERNET_TYPE_IPV6 0xdd86
471+
472+
#define CXPLAT_ETHERNET_TYPE_IPV4 0x0008
473+
#define CXPLAT_ETHERNET_TYPE_IPV6 0xdd86

0 commit comments

Comments
 (0)