In [1]:
import sqlite3
import random
from pyvis.network import Network
import networkx as nx
from functools import reduce

In [2]:
iVaccInfProb = 0
hexInfColor = '#fc0303'
hexVaccColor = '#7cff0a'
# 第一個被感染的人
iFirstInfPid = 0

listMaxConn = [1, 3, 9, 50]
listWtOfChoiceMaxConn = [2, 11, 13, 5]
dictInfectivity = {1: 0, 2: 0, 3: 0, 4: 0.6, 5: 0.7, 6: 0.6, 7: 0.4, 8: 0.2, 9: 0.1, 10: 0}
iDays = 20 
iPopCnt = 500
listPR = []

In [3]:
class Person:
    def __init__(self, iPid, boolInfFlag=False, floatInfProbability=0, boolIfVacc=False, iInfDays=0):
        self.pid = iPid
        self.boolIfInf = boolInfFlag
        self.boolIfVacc = boolIfVacc
        self.floatInfProbability = floatInfProbability
        self.iInfDays=iInfDays
        
    def IsInfected(self):
        self.boolIfInf = True
        
    def IsVacced(self):
        self.boolIfInf = False
        self.boolIfVacc = True
        self.floatInfProbability = iVaccInfProb
        
    def ResetInfStatus(self):
        self.boolIfInf = False
        self.boolIfVacc=False
        self.floatInfProbability = 0
        self.iInfDays = 0

In [4]:
def getRelatePidsWithThisMan(iPid, listAllPopRels):
    listRelsWithThisPerson = list(filter(lambda x: x[0] == person.pid or x[1] == person.pid, listAllPopRels))
    if listRelsWithThisPerson == []:
        return []
    # 挑出跟這個人有關的人際關係
    listGrpMightBeInf = list(set(reduce(lambda x, y: x + y, listRelsWithThisPerson)))
    # 移除這個人
    listGrpMightBeInf.remove(person.pid)
    return listGrpMightBeInf

def NetworkMaterialization(listAllPop, listAllPopRels, sNetworkName):
    netPRInf = Network(height='750px', width='100%', bgcolor='#222222', font_color='white')
    for person in listAllPop:
        if person.boolIfInf == True:
            netPRInf.add_node(person.pid, title=str(person.pid), color=hexInfColor, value=10)
        elif person.boolIfVacc == True:
            netPRInf.add_node(person.pid, title=str(person.pid), color=hexVaccColor, value=10)
        else:
            netPRInf.add_node(person.pid, title=str(person.pid), value=10)
    
    for relation in listAllPopRels:
        netPRInf.add_edge(relation[0], relation[1])
    
    netPRInf.barnes_hut()
    netPRInf.show(f'{sNetworkName}.html')

In [5]:
connPR = sqlite3.connect('./PeopleRelation.db')
cur = connPR.cursor()

In [6]:
# drop 所有表格
cur.execute('drop table if exists Relation')
cur.execute('drop table if exists People')

<sqlite3.Cursor at 0x7f5109cde650>

In [7]:
cur.execute(
'''
    create table if not exists People( pid integer not null primary key )
''')

cur.execute(
'''
    create table if not exists Relation( pid integer not null,
    pidconn integer not null,
    primary key(pid,
    pidconn),
    foreign key (pid) references People(pid),
    foreign key (pidconn) references People(pid) )
''')

<sqlite3.Cursor at 0x7f5109cde650>

In [8]:
listPeople = [Person(x) for x in range(iPopCnt)]

In [9]:
# insert data into table People
for person in listPeople:
    cur.execute(f'insert into People values({person.pid})')
connPR.commit()

In [10]:
# 模擬人與人之間的接觸關係
listPR = []
for person in listPeople:
    iRandconnCnt = random.choices(listMaxConn, listWtOfChoiceMaxConn)[0]
    # 把 person 挑掉
    listPeopleWithoutPerson = [x for x in listPeople if x.pid != person.pid]
    # 根據隨機選出的 MaxConn 數目挑出可能接觸的 pid
    listPidConnTo = random.choices(listPeopleWithoutPerson, k=iRandconnCnt)

    for PersonConnTo in listPidConnTo:
        if person.pid < PersonConnTo.pid:
            tuplePR = (person.pid, PersonConnTo.pid)
        else:
            tuplePR = (PersonConnTo.pid, person.pid)
        listPR.append(tuplePR)
listPR = list(set(listPR))

In [11]:
cur.executemany('insert into Relation values(?, ?)', listPR)
connPR.commit()

In [12]:
# 畫出未受感染的群體
NetworkMaterialization(listPeople, listPR, 'NetOrig')

In [13]:
# 未施予疫苗保護的群體
for person in listPeople:
    person.ResetInfStatus()

listPeople[iFirstInfPid].IsInfected()

for day in range(iDays):
    for person in listPeople:
        listRelationsWithThisPerson = list(filter(lambda x: x[0] == person.pid or x[1] == person.pid, listPR))
        if listRelationsWithThisPerson == []:
            continue
        # 挑出跟這個人有關的人際關係
        listMightBeInfPids = list(set(reduce(lambda x, y: x + y, listRelationsWithThisPerson)))
        # 移除這個人
        listMightBeInfPids.remove(person.pid)
        # 如果被感染就感染天數開始加一，這個天數會影響感染機率
        if person.boolIfInf == True:
            person.iInfDays += 1
        # 如果感染機率大於 0 ，就開始感染過程。
        # 首先依照被感染天數決定感染機率
        person.floatInfProbability = dictInfectivity.get(person.iInfDays, 0)
        # print(person.pid, person.boolIfInf, person.iInfDays, person.floatInfProbability)

        if person.floatInfProbability > 0:
            # 根據這個人的感染機率隨機挑人
            listBeInfPids = random.sample(listMightBeInfPids, k=round(len(listMightBeInfPids) * person.floatInfProbability))
            for iInfPid in listBeInfPids:
                listPeople[iInfPid].IsInfected()

NetworkMaterialization(listPeople, listPR, 'NetOrigInf')

In [14]:
len(list(filter(lambda x: x.boolIfInf == False, listPeople)))

0

In [15]:
listPidContactCnt = []
for person in listPeople:
    listMightBeInfPids = getRelatePidsWithThisMan(person.pid, listPR)
    listPidContactCnt.append((person.pid, len(listMightBeInfPids)))
listPidContactCnt = list(filter(lambda x: x[0] != 0, listPidContactCnt))
listTop100PidContactCnt = sorted(listPidContactCnt, key=lambda x: x[1], reverse=True)[0:100]

In [16]:
# 以接觸最多人數選擇打疫苗族群的策略

# 挑出要打疫苗的族群
listPidsForVacc = list(map(lambda x: x[0], listTop100PidContactCnt))

# 重設 listPeople
for person in listPeople:
    person.ResetInfStatus()
    
# 幫接觸最多人的族群前 100 名打疫苗    
for PidForVac in listPidsForVacc:
    listPeople[PidForVac].boolIfVacc = True

listPeople[iFirstInfPid].IsInfected()

for day in range(iDays):
    for person in listPeople:
        if person.boolIfVacc == True:
            continue
        listMightBeInfPids = getRelatePidsWithThisMan(person.pid, listPR)
        listMightBeInfPids2 = []
        for pid in listMightBeInfPids:
            if pid in listPidsForVacc:
                continue
            listMightBeInfPids2.append(pid)
        # 如果被感染就感染天數開始加一，這個天數會影響感染機率
        if person.boolIfInf == True:
            person.iInfDays += 1
        # 如果感染機率大於 0 ，就開始感染過程。
        # 首先依照被感染天數決定感染機率
        person.floatInfProbability = dictInfectivity.get(person.iInfDays, 0)

        if person.floatInfProbability > 0:
            # 根據這個人的感染機率隨機挑人
            listBeInfPids = random.sample(listMightBeInfPids2, k=round(len(listMightBeInfPids2) * person.floatInfProbability))
            for iInfPid in listBeInfPids:
                listPeople[iInfPid].IsInfected()
                    
NetworkMaterialization(listPeople, listPR, 'NetVaccByContactCnt')

In [17]:
connPR.close()