Permalink
Cannot retrieve contributors at this time
Name already in use
A tag already exists with the provided branch name. Many Git commands accept both tag and branch names, so creating this branch may cause unexpected behavior. Are you sure you want to create this branch?
libco/co_routine.cpp
Go to fileThis commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
1196 lines (1002 sloc)
23.9 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Tencent is pleased to support the open source community by making Libco available. | |
| * Copyright (C) 2014 THL A29 Limited, a Tencent company. All rights reserved. | |
| * | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, | |
| * software distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| #include "co_routine.h" | |
| #include "co_routine_inner.h" | |
| #include "co_epoll.h" | |
| #include <string.h> | |
| #include <stdlib.h> | |
| #include <stdio.h> | |
| #include <string> | |
| #include <map> | |
| #include <poll.h> | |
| #include <sys/time.h> | |
| #include <errno.h> | |
| #include <assert.h> | |
| #include <fcntl.h> | |
| #include <sys/socket.h> | |
| #include <netinet/in.h> | |
| #include <arpa/inet.h> | |
| #include <sys/syscall.h> | |
| #include <unistd.h> | |
| #include <limits.h> | |
| extern "C" | |
| { | |
| extern void coctx_swap( coctx_t *,coctx_t* ) asm("coctx_swap"); | |
| }; | |
| using namespace std; | |
| stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env ); | |
| struct stCoEpoll_t; | |
| struct stCoRoutineEnv_t | |
| { | |
| stCoRoutine_t *pCallStack[ 128 ]; | |
| int iCallStackSize; | |
| stCoEpoll_t *pEpoll; | |
| //for copy stack log lastco and nextco | |
| stCoRoutine_t* pending_co; | |
| stCoRoutine_t* occupy_co; | |
| }; | |
| //int socket(int domain, int type, int protocol); | |
| void co_log_err( const char *fmt,... ) | |
| { | |
| } | |
| #if defined( __LIBCO_RDTSCP__) | |
| static unsigned long long counter(void) | |
| { | |
| register uint32_t lo, hi; | |
| register unsigned long long o; | |
| __asm__ __volatile__ ( | |
| "rdtscp" : "=a"(lo), "=d"(hi)::"%rcx" | |
| ); | |
| o = hi; | |
| o <<= 32; | |
| return (o | lo); | |
| } | |
| static unsigned long long getCpuKhz() | |
| { | |
| FILE *fp = fopen("/proc/cpuinfo","r"); | |
| if(!fp) return 1; | |
| char buf[4096] = {0}; | |
| fread(buf,1,sizeof(buf),fp); | |
| fclose(fp); | |
| char *lp = strstr(buf,"cpu MHz"); | |
| if(!lp) return 1; | |
| lp += strlen("cpu MHz"); | |
| while(*lp == ' ' || *lp == '\t' || *lp == ':') | |
| { | |
| ++lp; | |
| } | |
| double mhz = atof(lp); | |
| unsigned long long u = (unsigned long long)(mhz * 1000); | |
| return u; | |
| } | |
| #endif | |
| static unsigned long long GetTickMS() | |
| { | |
| #if defined( __LIBCO_RDTSCP__) | |
| static uint32_t khz = getCpuKhz(); | |
| return counter() / khz; | |
| #else | |
| struct timeval now = { 0 }; | |
| gettimeofday( &now,NULL ); | |
| unsigned long long u = now.tv_sec; | |
| u *= 1000; | |
| u += now.tv_usec / 1000; | |
| return u; | |
| #endif | |
| } | |
| /* no longer use | |
| static pid_t GetPid() | |
| { | |
| static __thread pid_t pid = 0; | |
| static __thread pid_t tid = 0; | |
| if( !pid || !tid || pid != getpid() ) | |
| { | |
| pid = getpid(); | |
| #if defined( __APPLE__ ) | |
| tid = syscall( SYS_gettid ); | |
| if( -1 == (long)tid ) | |
| { | |
| tid = pid; | |
| } | |
| #elif defined( __FreeBSD__ ) | |
| syscall(SYS_thr_self, &tid); | |
| if( tid < 0 ) | |
| { | |
| tid = pid; | |
| } | |
| #else | |
| tid = syscall( __NR_gettid ); | |
| #endif | |
| } | |
| return tid; | |
| } | |
| static pid_t GetPid() | |
| { | |
| char **p = (char**)pthread_self(); | |
| return p ? *(pid_t*)(p + 18) : getpid(); | |
| } | |
| */ | |
| template <class T,class TLink> | |
| void RemoveFromLink(T *ap) | |
| { | |
| TLink *lst = ap->pLink; | |
| if(!lst) return ; | |
| assert( lst->head && lst->tail ); | |
| if( ap == lst->head ) | |
| { | |
| lst->head = ap->pNext; | |
| if(lst->head) | |
| { | |
| lst->head->pPrev = NULL; | |
| } | |
| } | |
| else | |
| { | |
| if(ap->pPrev) | |
| { | |
| ap->pPrev->pNext = ap->pNext; | |
| } | |
| } | |
| if( ap == lst->tail ) | |
| { | |
| lst->tail = ap->pPrev; | |
| if(lst->tail) | |
| { | |
| lst->tail->pNext = NULL; | |
| } | |
| } | |
| else | |
| { | |
| ap->pNext->pPrev = ap->pPrev; | |
| } | |
| ap->pPrev = ap->pNext = NULL; | |
| ap->pLink = NULL; | |
| } | |
| template <class TNode,class TLink> | |
| void inline AddTail(TLink*apLink,TNode *ap) | |
| { | |
| if( ap->pLink ) | |
| { | |
| return ; | |
| } | |
| if(apLink->tail) | |
| { | |
| apLink->tail->pNext = (TNode*)ap; | |
| ap->pNext = NULL; | |
| ap->pPrev = apLink->tail; | |
| apLink->tail = ap; | |
| } | |
| else | |
| { | |
| apLink->head = apLink->tail = ap; | |
| ap->pNext = ap->pPrev = NULL; | |
| } | |
| ap->pLink = apLink; | |
| } | |
| template <class TNode,class TLink> | |
| void inline PopHead( TLink*apLink ) | |
| { | |
| if( !apLink->head ) | |
| { | |
| return ; | |
| } | |
| TNode *lp = apLink->head; | |
| if( apLink->head == apLink->tail ) | |
| { | |
| apLink->head = apLink->tail = NULL; | |
| } | |
| else | |
| { | |
| apLink->head = apLink->head->pNext; | |
| } | |
| lp->pPrev = lp->pNext = NULL; | |
| lp->pLink = NULL; | |
| if( apLink->head ) | |
| { | |
| apLink->head->pPrev = NULL; | |
| } | |
| } | |
| template <class TNode,class TLink> | |
| void inline Join( TLink*apLink,TLink *apOther ) | |
| { | |
| //printf("apOther %p\n",apOther); | |
| if( !apOther->head ) | |
| { | |
| return ; | |
| } | |
| TNode *lp = apOther->head; | |
| while( lp ) | |
| { | |
| lp->pLink = apLink; | |
| lp = lp->pNext; | |
| } | |
| lp = apOther->head; | |
| if(apLink->tail) | |
| { | |
| apLink->tail->pNext = (TNode*)lp; | |
| lp->pPrev = apLink->tail; | |
| apLink->tail = apOther->tail; | |
| } | |
| else | |
| { | |
| apLink->head = apOther->head; | |
| apLink->tail = apOther->tail; | |
| } | |
| apOther->head = apOther->tail = NULL; | |
| } | |
| /////////////////for copy stack ////////////////////////// | |
| stStackMem_t* co_alloc_stackmem(unsigned int stack_size) | |
| { | |
| stStackMem_t* stack_mem = (stStackMem_t*)malloc(sizeof(stStackMem_t)); | |
| stack_mem->occupy_co= NULL; | |
| stack_mem->stack_size = stack_size; | |
| stack_mem->stack_buffer = (char*)malloc(stack_size); | |
| stack_mem->stack_bp = stack_mem->stack_buffer + stack_size; | |
| return stack_mem; | |
| } | |
| stShareStack_t* co_alloc_sharestack(int count, int stack_size) | |
| { | |
| stShareStack_t* share_stack = (stShareStack_t*)malloc(sizeof(stShareStack_t)); | |
| share_stack->alloc_idx = 0; | |
| share_stack->stack_size = stack_size; | |
| //alloc stack array | |
| share_stack->count = count; | |
| stStackMem_t** stack_array = (stStackMem_t**)calloc(count, sizeof(stStackMem_t*)); | |
| for (int i = 0; i < count; i++) | |
| { | |
| stack_array[i] = co_alloc_stackmem(stack_size); | |
| } | |
| share_stack->stack_array = stack_array; | |
| return share_stack; | |
| } | |
| static stStackMem_t* co_get_stackmem(stShareStack_t* share_stack) | |
| { | |
| if (!share_stack) | |
| { | |
| return NULL; | |
| } | |
| int idx = share_stack->alloc_idx % share_stack->count; | |
| share_stack->alloc_idx++; | |
| return share_stack->stack_array[idx]; | |
| } | |
| // ---------------------------------------------------------------------------- | |
| struct stTimeoutItemLink_t; | |
| struct stTimeoutItem_t; | |
| struct stCoEpoll_t | |
| { | |
| int iEpollFd; | |
| static const int _EPOLL_SIZE = 1024 * 10; | |
| struct stTimeout_t *pTimeout; | |
| struct stTimeoutItemLink_t *pstTimeoutList; | |
| struct stTimeoutItemLink_t *pstActiveList; | |
| co_epoll_res *result; | |
| }; | |
| typedef void (*OnPreparePfn_t)( stTimeoutItem_t *,struct epoll_event &ev, stTimeoutItemLink_t *active ); | |
| typedef void (*OnProcessPfn_t)( stTimeoutItem_t *); | |
| struct stTimeoutItem_t | |
| { | |
| enum | |
| { | |
| eMaxTimeout = 40 * 1000 //40s | |
| }; | |
| stTimeoutItem_t *pPrev; | |
| stTimeoutItem_t *pNext; | |
| stTimeoutItemLink_t *pLink; | |
| unsigned long long ullExpireTime; | |
| OnPreparePfn_t pfnPrepare; | |
| OnProcessPfn_t pfnProcess; | |
| void *pArg; // routine | |
| bool bTimeout; | |
| }; | |
| struct stTimeoutItemLink_t | |
| { | |
| stTimeoutItem_t *head; | |
| stTimeoutItem_t *tail; | |
| }; | |
| struct stTimeout_t | |
| { | |
| stTimeoutItemLink_t *pItems; | |
| int iItemSize; | |
| unsigned long long ullStart; | |
| long long llStartIdx; | |
| }; | |
| stTimeout_t *AllocTimeout( int iSize ) | |
| { | |
| stTimeout_t *lp = (stTimeout_t*)calloc( 1,sizeof(stTimeout_t) ); | |
| lp->iItemSize = iSize; | |
| lp->pItems = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) * lp->iItemSize ); | |
| lp->ullStart = GetTickMS(); | |
| lp->llStartIdx = 0; | |
| return lp; | |
| } | |
| void FreeTimeout( stTimeout_t *apTimeout ) | |
| { | |
| free( apTimeout->pItems ); | |
| free ( apTimeout ); | |
| } | |
| int AddTimeout( stTimeout_t *apTimeout,stTimeoutItem_t *apItem ,unsigned long long allNow ) | |
| { | |
| if( apTimeout->ullStart == 0 ) | |
| { | |
| apTimeout->ullStart = allNow; | |
| apTimeout->llStartIdx = 0; | |
| } | |
| if( allNow < apTimeout->ullStart ) | |
| { | |
| co_log_err("CO_ERR: AddTimeout line %d allNow %llu apTimeout->ullStart %llu", | |
| __LINE__,allNow,apTimeout->ullStart); | |
| return __LINE__; | |
| } | |
| if( apItem->ullExpireTime < allNow ) | |
| { | |
| co_log_err("CO_ERR: AddTimeout line %d apItem->ullExpireTime %llu allNow %llu apTimeout->ullStart %llu", | |
| __LINE__,apItem->ullExpireTime,allNow,apTimeout->ullStart); | |
| return __LINE__; | |
| } | |
| unsigned long long diff = apItem->ullExpireTime - apTimeout->ullStart; | |
| if( diff >= (unsigned long long)apTimeout->iItemSize ) | |
| { | |
| diff = apTimeout->iItemSize - 1; | |
| co_log_err("CO_ERR: AddTimeout line %d diff %d", | |
| __LINE__,diff); | |
| //return __LINE__; | |
| } | |
| AddTail( apTimeout->pItems + ( apTimeout->llStartIdx + diff ) % apTimeout->iItemSize , apItem ); | |
| return 0; | |
| } | |
| inline void TakeAllTimeout( stTimeout_t *apTimeout,unsigned long long allNow,stTimeoutItemLink_t *apResult ) | |
| { | |
| if( apTimeout->ullStart == 0 ) | |
| { | |
| apTimeout->ullStart = allNow; | |
| apTimeout->llStartIdx = 0; | |
| } | |
| if( allNow < apTimeout->ullStart ) | |
| { | |
| return ; | |
| } | |
| int cnt = allNow - apTimeout->ullStart + 1; | |
| if( cnt > apTimeout->iItemSize ) | |
| { | |
| cnt = apTimeout->iItemSize; | |
| } | |
| if( cnt < 0 ) | |
| { | |
| return; | |
| } | |
| for( int i = 0;i<cnt;i++) | |
| { | |
| int idx = ( apTimeout->llStartIdx + i) % apTimeout->iItemSize; | |
| Join<stTimeoutItem_t,stTimeoutItemLink_t>( apResult,apTimeout->pItems + idx ); | |
| } | |
| apTimeout->ullStart = allNow; | |
| apTimeout->llStartIdx += cnt - 1; | |
| } | |
| static int CoRoutineFunc( stCoRoutine_t *co,void * ) | |
| { | |
| if( co->pfn ) | |
| { | |
| co->pfn( co->arg ); | |
| } | |
| co->cEnd = 1; | |
| stCoRoutineEnv_t *env = co->env; | |
| co_yield_env( env ); | |
| return 0; | |
| } | |
| struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr, | |
| pfn_co_routine_t pfn,void *arg ) | |
| { | |
| stCoRoutineAttr_t at; | |
| if( attr ) | |
| { | |
| memcpy( &at,attr,sizeof(at) ); | |
| } | |
| if( at.stack_size <= 0 ) | |
| { | |
| at.stack_size = 128 * 1024; | |
| } | |
| else if( at.stack_size > 1024 * 1024 * 8 ) | |
| { | |
| at.stack_size = 1024 * 1024 * 8; | |
| } | |
| if( at.stack_size & 0xFFF ) | |
| { | |
| at.stack_size &= ~0xFFF; | |
| at.stack_size += 0x1000; | |
| } | |
| stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) ); | |
| memset( lp,0,(long)(sizeof(stCoRoutine_t))); | |
| lp->env = env; | |
| lp->pfn = pfn; | |
| lp->arg = arg; | |
| stStackMem_t* stack_mem = NULL; | |
| if( at.share_stack ) | |
| { | |
| stack_mem = co_get_stackmem( at.share_stack); | |
| at.stack_size = at.share_stack->stack_size; | |
| } | |
| else | |
| { | |
| stack_mem = co_alloc_stackmem(at.stack_size); | |
| } | |
| lp->stack_mem = stack_mem; | |
| lp->ctx.ss_sp = stack_mem->stack_buffer; | |
| lp->ctx.ss_size = at.stack_size; | |
| lp->cStart = 0; | |
| lp->cEnd = 0; | |
| lp->cIsMain = 0; | |
| lp->cEnableSysHook = 0; | |
| lp->cIsShareStack = at.share_stack != NULL; | |
| lp->save_size = 0; | |
| lp->save_buffer = NULL; | |
| return lp; | |
| } | |
| int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg ) | |
| { | |
| if( !co_get_curr_thread_env() ) | |
| { | |
| co_init_curr_thread_env(); | |
| } | |
| stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg ); | |
| *ppco = co; | |
| return 0; | |
| } | |
| void co_free( stCoRoutine_t *co ) | |
| { | |
| if (!co->cIsShareStack) | |
| { | |
| free(co->stack_mem->stack_buffer); | |
| free(co->stack_mem); | |
| } | |
| //walkerdu fix at 2018-01-20 | |
| //存在内存泄漏 | |
| else | |
| { | |
| if(co->save_buffer) | |
| free(co->save_buffer); | |
| if(co->stack_mem->occupy_co == co) | |
| co->stack_mem->occupy_co = NULL; | |
| } | |
| free( co ); | |
| } | |
| void co_release( stCoRoutine_t *co ) | |
| { | |
| co_free( co ); | |
| } | |
| void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co); | |
| void co_resume( stCoRoutine_t *co ) | |
| { | |
| stCoRoutineEnv_t *env = co->env; | |
| stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ]; | |
| if( !co->cStart ) | |
| { | |
| coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 ); | |
| co->cStart = 1; | |
| } | |
| env->pCallStack[ env->iCallStackSize++ ] = co; | |
| co_swap( lpCurrRoutine, co ); | |
| } | |
| // walkerdu 2018-01-14 | |
| // 用于reset超时无法重复使用的协程 | |
| void co_reset(stCoRoutine_t * co) | |
| { | |
| if(!co->cStart || co->cIsMain) | |
| return; | |
| co->cStart = 0; | |
| co->cEnd = 0; | |
| // 如果当前协程有共享栈被切出的buff,要进行释放 | |
| if(co->save_buffer) | |
| { | |
| free(co->save_buffer); | |
| co->save_buffer = NULL; | |
| co->save_size = 0; | |
| } | |
| // 如果共享栈被当前协程占用,要释放占用标志,否则被切换,会执行save_stack_buffer() | |
| if(co->stack_mem->occupy_co == co) | |
| co->stack_mem->occupy_co = NULL; | |
| } | |
| void co_yield_env( stCoRoutineEnv_t *env ) | |
| { | |
| stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ]; | |
| stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ]; | |
| env->iCallStackSize--; | |
| co_swap( curr, last); | |
| } | |
| void co_yield_ct() | |
| { | |
| co_yield_env( co_get_curr_thread_env() ); | |
| } | |
| void co_yield( stCoRoutine_t *co ) | |
| { | |
| co_yield_env( co->env ); | |
| } | |
| void save_stack_buffer(stCoRoutine_t* occupy_co) | |
| { | |
| ///copy out | |
| stStackMem_t* stack_mem = occupy_co->stack_mem; | |
| int len = stack_mem->stack_bp - occupy_co->stack_sp; | |
| if (occupy_co->save_buffer) | |
| { | |
| free(occupy_co->save_buffer), occupy_co->save_buffer = NULL; | |
| } | |
| occupy_co->save_buffer = (char*)malloc(len); //malloc buf; | |
| occupy_co->save_size = len; | |
| memcpy(occupy_co->save_buffer, occupy_co->stack_sp, len); | |
| } | |
| void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co) | |
| { | |
| stCoRoutineEnv_t* env = co_get_curr_thread_env(); | |
| //get curr stack sp | |
| char c; | |
| curr->stack_sp= &c; | |
| if (!pending_co->cIsShareStack) | |
| { | |
| env->pending_co = NULL; | |
| env->occupy_co = NULL; | |
| } | |
| else | |
| { | |
| env->pending_co = pending_co; | |
| //get last occupy co on the same stack mem | |
| stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co; | |
| //set pending co to occupy thest stack mem; | |
| pending_co->stack_mem->occupy_co = pending_co; | |
| env->occupy_co = occupy_co; | |
| if (occupy_co && occupy_co != pending_co) | |
| { | |
| save_stack_buffer(occupy_co); | |
| } | |
| } | |
| //swap context | |
| coctx_swap(&(curr->ctx),&(pending_co->ctx) ); | |
| //stack buffer may be overwrite, so get again; | |
| stCoRoutineEnv_t* curr_env = co_get_curr_thread_env(); | |
| stCoRoutine_t* update_occupy_co = curr_env->occupy_co; | |
| stCoRoutine_t* update_pending_co = curr_env->pending_co; | |
| if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co) | |
| { | |
| //resume stack buffer | |
| if (update_pending_co->save_buffer && update_pending_co->save_size > 0) | |
| { | |
| memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size); | |
| } | |
| } | |
| } | |
| //int poll(struct pollfd fds[], nfds_t nfds, int timeout); | |
| // { fd,events,revents } | |
| struct stPollItem_t ; | |
| struct stPoll_t : public stTimeoutItem_t | |
| { | |
| struct pollfd *fds; | |
| nfds_t nfds; // typedef unsigned long int nfds_t; | |
| stPollItem_t *pPollItems; | |
| int iAllEventDetach; | |
| int iEpollFd; | |
| int iRaiseCnt; | |
| }; | |
| struct stPollItem_t : public stTimeoutItem_t | |
| { | |
| struct pollfd *pSelf; | |
| stPoll_t *pPoll; | |
| struct epoll_event stEvent; | |
| }; | |
| /* | |
| * EPOLLPRI POLLPRI // There is urgent data to read. | |
| * EPOLLMSG POLLMSG | |
| * | |
| * POLLREMOVE | |
| * POLLRDHUP | |
| * POLLNVAL | |
| * | |
| * */ | |
| static uint32_t PollEvent2Epoll( short events ) | |
| { | |
| uint32_t e = 0; | |
| if( events & POLLIN ) e |= EPOLLIN; | |
| if( events & POLLOUT ) e |= EPOLLOUT; | |
| if( events & POLLHUP ) e |= EPOLLHUP; | |
| if( events & POLLERR ) e |= EPOLLERR; | |
| if( events & POLLRDNORM ) e |= EPOLLRDNORM; | |
| if( events & POLLWRNORM ) e |= EPOLLWRNORM; | |
| return e; | |
| } | |
| static short EpollEvent2Poll( uint32_t events ) | |
| { | |
| short e = 0; | |
| if( events & EPOLLIN ) e |= POLLIN; | |
| if( events & EPOLLOUT ) e |= POLLOUT; | |
| if( events & EPOLLHUP ) e |= POLLHUP; | |
| if( events & EPOLLERR ) e |= POLLERR; | |
| if( events & EPOLLRDNORM ) e |= POLLRDNORM; | |
| if( events & EPOLLWRNORM ) e |= POLLWRNORM; | |
| return e; | |
| } | |
| static __thread stCoRoutineEnv_t* gCoEnvPerThread = NULL; | |
| void co_init_curr_thread_env() | |
| { | |
| gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) ); | |
| stCoRoutineEnv_t *env = gCoEnvPerThread; | |
| env->iCallStackSize = 0; | |
| struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL ); | |
| self->cIsMain = 1; | |
| env->pending_co = NULL; | |
| env->occupy_co = NULL; | |
| coctx_init( &self->ctx ); | |
| env->pCallStack[ env->iCallStackSize++ ] = self; | |
| stCoEpoll_t *ev = AllocEpoll(); | |
| SetEpoll( env,ev ); | |
| } | |
| stCoRoutineEnv_t *co_get_curr_thread_env() | |
| { | |
| return gCoEnvPerThread; | |
| } | |
| void OnPollProcessEvent( stTimeoutItem_t * ap ) | |
| { | |
| stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg; | |
| co_resume( co ); | |
| } | |
| void OnPollPreparePfn( stTimeoutItem_t * ap,struct epoll_event &e,stTimeoutItemLink_t *active ) | |
| { | |
| stPollItem_t *lp = (stPollItem_t *)ap; | |
| lp->pSelf->revents = EpollEvent2Poll( e.events ); | |
| stPoll_t *pPoll = lp->pPoll; | |
| pPoll->iRaiseCnt++; | |
| if( !pPoll->iAllEventDetach ) | |
| { | |
| pPoll->iAllEventDetach = 1; | |
| RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( pPoll ); | |
| AddTail( active,pPoll ); | |
| } | |
| } | |
| void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg ) | |
| { | |
| if( !ctx->result ) | |
| { | |
| ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE ); | |
| } | |
| co_epoll_res *result = ctx->result; | |
| for(;;) | |
| { | |
| int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 ); | |
| stTimeoutItemLink_t *active = (ctx->pstActiveList); | |
| stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList); | |
| memset( timeout,0,sizeof(stTimeoutItemLink_t) ); | |
| for(int i=0;i<ret;i++) | |
| { | |
| stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr; | |
| if( item->pfnPrepare ) | |
| { | |
| item->pfnPrepare( item,result->events[i],active ); | |
| } | |
| else | |
| { | |
| AddTail( active,item ); | |
| } | |
| } | |
| unsigned long long now = GetTickMS(); | |
| TakeAllTimeout( ctx->pTimeout,now,timeout ); | |
| stTimeoutItem_t *lp = timeout->head; | |
| while( lp ) | |
| { | |
| //printf("raise timeout %p\n",lp); | |
| lp->bTimeout = true; | |
| lp = lp->pNext; | |
| } | |
| Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout ); | |
| lp = active->head; | |
| while( lp ) | |
| { | |
| PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active ); | |
| if (lp->bTimeout && now < lp->ullExpireTime) | |
| { | |
| int ret = AddTimeout(ctx->pTimeout, lp, now); | |
| if (!ret) | |
| { | |
| lp->bTimeout = false; | |
| lp = active->head; | |
| continue; | |
| } | |
| } | |
| if( lp->pfnProcess ) | |
| { | |
| lp->pfnProcess( lp ); | |
| } | |
| lp = active->head; | |
| } | |
| if( pfn ) | |
| { | |
| if( -1 == pfn( arg ) ) | |
| { | |
| break; | |
| } | |
| } | |
| } | |
| } | |
| void OnCoroutineEvent( stTimeoutItem_t * ap ) | |
| { | |
| stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg; | |
| co_resume( co ); | |
| } | |
| stCoEpoll_t *AllocEpoll() | |
| { | |
| stCoEpoll_t *ctx = (stCoEpoll_t*)calloc( 1,sizeof(stCoEpoll_t) ); | |
| ctx->iEpollFd = co_epoll_create( stCoEpoll_t::_EPOLL_SIZE ); | |
| ctx->pTimeout = AllocTimeout( 60 * 1000 ); | |
| ctx->pstActiveList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) ); | |
| ctx->pstTimeoutList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) ); | |
| return ctx; | |
| } | |
| void FreeEpoll( stCoEpoll_t *ctx ) | |
| { | |
| if( ctx ) | |
| { | |
| free( ctx->pstActiveList ); | |
| free( ctx->pstTimeoutList ); | |
| FreeTimeout( ctx->pTimeout ); | |
| co_epoll_res_free( ctx->result ); | |
| } | |
| free( ctx ); | |
| } | |
| stCoRoutine_t *GetCurrCo( stCoRoutineEnv_t *env ) | |
| { | |
| return env->pCallStack[ env->iCallStackSize - 1 ]; | |
| } | |
| stCoRoutine_t *GetCurrThreadCo( ) | |
| { | |
| stCoRoutineEnv_t *env = co_get_curr_thread_env(); | |
| if( !env ) return 0; | |
| return GetCurrCo(env); | |
| } | |
| typedef int (*poll_pfn_t)(struct pollfd fds[], nfds_t nfds, int timeout); | |
| int co_poll_inner( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout, poll_pfn_t pollfunc) | |
| { | |
| if (timeout == 0) | |
| { | |
| return pollfunc(fds, nfds, timeout); | |
| } | |
| if (timeout < 0) | |
| { | |
| timeout = INT_MAX; | |
| } | |
| int epfd = ctx->iEpollFd; | |
| stCoRoutine_t* self = co_self(); | |
| //1.struct change | |
| stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t))); | |
| memset( &arg,0,sizeof(arg) ); | |
| arg.iEpollFd = epfd; | |
| arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd)); | |
| arg.nfds = nfds; | |
| stPollItem_t arr[2]; | |
| if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack) | |
| { | |
| arg.pPollItems = arr; | |
| } | |
| else | |
| { | |
| arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) ); | |
| } | |
| memset( arg.pPollItems,0,nfds * sizeof(stPollItem_t) ); | |
| arg.pfnProcess = OnPollProcessEvent; | |
| arg.pArg = GetCurrCo( co_get_curr_thread_env() ); | |
| //2. add epoll | |
| for(nfds_t i=0;i<nfds;i++) | |
| { | |
| arg.pPollItems[i].pSelf = arg.fds + i; | |
| arg.pPollItems[i].pPoll = &arg; | |
| arg.pPollItems[i].pfnPrepare = OnPollPreparePfn; | |
| struct epoll_event &ev = arg.pPollItems[i].stEvent; | |
| if( fds[i].fd > -1 ) | |
| { | |
| ev.data.ptr = arg.pPollItems + i; | |
| ev.events = PollEvent2Epoll( fds[i].events ); | |
| int ret = co_epoll_ctl( epfd,EPOLL_CTL_ADD, fds[i].fd, &ev ); | |
| if (ret < 0 && errno == EPERM && nfds == 1 && pollfunc != NULL) | |
| { | |
| if( arg.pPollItems != arr ) | |
| { | |
| free( arg.pPollItems ); | |
| arg.pPollItems = NULL; | |
| } | |
| free(arg.fds); | |
| free(&arg); | |
| return pollfunc(fds, nfds, timeout); | |
| } | |
| } | |
| //if fail,the timeout would work | |
| } | |
| //3.add timeout | |
| unsigned long long now = GetTickMS(); | |
| arg.ullExpireTime = now + timeout; | |
| int ret = AddTimeout( ctx->pTimeout,&arg,now ); | |
| int iRaiseCnt = 0; | |
| if( ret != 0 ) | |
| { | |
| co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld", | |
| ret,now,timeout,arg.ullExpireTime); | |
| errno = EINVAL; | |
| iRaiseCnt = -1; | |
| } | |
| else | |
| { | |
| co_yield_env( co_get_curr_thread_env() ); | |
| iRaiseCnt = arg.iRaiseCnt; | |
| } | |
| { | |
| //clear epoll status and memory | |
| RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &arg ); | |
| for(nfds_t i = 0;i < nfds;i++) | |
| { | |
| int fd = fds[i].fd; | |
| if( fd > -1 ) | |
| { | |
| co_epoll_ctl( epfd,EPOLL_CTL_DEL,fd,&arg.pPollItems[i].stEvent ); | |
| } | |
| fds[i].revents = arg.fds[i].revents; | |
| } | |
| if( arg.pPollItems != arr ) | |
| { | |
| free( arg.pPollItems ); | |
| arg.pPollItems = NULL; | |
| } | |
| free(arg.fds); | |
| free(&arg); | |
| } | |
| return iRaiseCnt; | |
| } | |
| int co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout_ms ) | |
| { | |
| return co_poll_inner(ctx, fds, nfds, timeout_ms, NULL); | |
| } | |
| void SetEpoll( stCoRoutineEnv_t *env,stCoEpoll_t *ev ) | |
| { | |
| env->pEpoll = ev; | |
| } | |
| stCoEpoll_t *co_get_epoll_ct() | |
| { | |
| if( !co_get_curr_thread_env() ) | |
| { | |
| co_init_curr_thread_env(); | |
| } | |
| return co_get_curr_thread_env()->pEpoll; | |
| } | |
| struct stHookPThreadSpec_t | |
| { | |
| stCoRoutine_t *co; | |
| void *value; | |
| enum | |
| { | |
| size = 1024 | |
| }; | |
| }; | |
| void *co_getspecific(pthread_key_t key) | |
| { | |
| stCoRoutine_t *co = GetCurrThreadCo(); | |
| if( !co || co->cIsMain ) | |
| { | |
| return pthread_getspecific( key ); | |
| } | |
| return co->aSpec[ key ].value; | |
| } | |
| int co_setspecific(pthread_key_t key, const void *value) | |
| { | |
| stCoRoutine_t *co = GetCurrThreadCo(); | |
| if( !co || co->cIsMain ) | |
| { | |
| return pthread_setspecific( key,value ); | |
| } | |
| co->aSpec[ key ].value = (void*)value; | |
| return 0; | |
| } | |
| void co_disable_hook_sys() | |
| { | |
| stCoRoutine_t *co = GetCurrThreadCo(); | |
| if( co ) | |
| { | |
| co->cEnableSysHook = 0; | |
| } | |
| } | |
| bool co_is_enable_sys_hook() | |
| { | |
| stCoRoutine_t *co = GetCurrThreadCo(); | |
| return ( co && co->cEnableSysHook ); | |
| } | |
| stCoRoutine_t *co_self() | |
| { | |
| return GetCurrThreadCo(); | |
| } | |
| //co cond | |
| struct stCoCond_t; | |
| struct stCoCondItem_t | |
| { | |
| stCoCondItem_t *pPrev; | |
| stCoCondItem_t *pNext; | |
| stCoCond_t *pLink; | |
| stTimeoutItem_t timeout; | |
| }; | |
| struct stCoCond_t | |
| { | |
| stCoCondItem_t *head; | |
| stCoCondItem_t *tail; | |
| }; | |
| static void OnSignalProcessEvent( stTimeoutItem_t * ap ) | |
| { | |
| stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg; | |
| co_resume( co ); | |
| } | |
| stCoCondItem_t *co_cond_pop( stCoCond_t *link ); | |
| int co_cond_signal( stCoCond_t *si ) | |
| { | |
| stCoCondItem_t * sp = co_cond_pop( si ); | |
| if( !sp ) | |
| { | |
| return 0; | |
| } | |
| RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &sp->timeout ); | |
| AddTail( co_get_curr_thread_env()->pEpoll->pstActiveList,&sp->timeout ); | |
| return 0; | |
| } | |
| int co_cond_broadcast( stCoCond_t *si ) | |
| { | |
| for(;;) | |
| { | |
| stCoCondItem_t * sp = co_cond_pop( si ); | |
| if( !sp ) return 0; | |
| RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &sp->timeout ); | |
| AddTail( co_get_curr_thread_env()->pEpoll->pstActiveList,&sp->timeout ); | |
| } | |
| return 0; | |
| } | |
| int co_cond_timedwait( stCoCond_t *link,int ms ) | |
| { | |
| stCoCondItem_t* psi = (stCoCondItem_t*)calloc(1, sizeof(stCoCondItem_t)); | |
| psi->timeout.pArg = GetCurrThreadCo(); | |
| psi->timeout.pfnProcess = OnSignalProcessEvent; | |
| if( ms > 0 ) | |
| { | |
| unsigned long long now = GetTickMS(); | |
| psi->timeout.ullExpireTime = now + ms; | |
| int ret = AddTimeout( co_get_curr_thread_env()->pEpoll->pTimeout,&psi->timeout,now ); | |
| if( ret != 0 ) | |
| { | |
| free(psi); | |
| return ret; | |
| } | |
| } | |
| AddTail( link, psi); | |
| co_yield_ct(); | |
| RemoveFromLink<stCoCondItem_t,stCoCond_t>( psi ); | |
| free(psi); | |
| return 0; | |
| } | |
| stCoCond_t *co_cond_alloc() | |
| { | |
| return (stCoCond_t*)calloc( 1,sizeof(stCoCond_t) ); | |
| } | |
| int co_cond_free( stCoCond_t * cc ) | |
| { | |
| free( cc ); | |
| return 0; | |
| } | |
| stCoCondItem_t *co_cond_pop( stCoCond_t *link ) | |
| { | |
| stCoCondItem_t *p = link->head; | |
| if( p ) | |
| { | |
| PopHead<stCoCondItem_t,stCoCond_t>( link ); | |
| } | |
| return p; | |
| } |