This Python notebook produces the Daily Activity Heat-Maps based on the learned STEM-MCI model </br></br>      

To run this notebook, ensure you have the following Python libraries installed:</br></br>
pandas: 2.0.3</br>
numpy: 1.24.3</br>
tqdm: 4.66.1</br>
sklearn: 1.3.0</br>
json: 2.0.9</br>
csv: 1.0</br>
matplotlib: 3.7.5</br>
seaborn: 0.13.2</br></br>

Save the network produced by STEM-MCI Model before running the python notebook

In [None]:
from fusionART_vs3 import *
from ARTxtralib import *

import pickle
import pandas as pd
f3Schema = [{'name':'events','compl':True,'attrib':['y1']},{'name':'group','compl':True,'attrib':['g1','g2']}]

f2Schema = [{'name':'features','compl':True,'attrib':['bedroom','kitchen','living_room','bed','in_out']},
            {'name':'timestamp','compl':True,'attrib':['t']}]

f3 = FusionART(schema=f3Schema,beta=[1.0,1.0],alpha=[0.1,0.1],gamma=[1.0,1.0],rho=[1.0,1.0],numarray=True)
f2 = FusionART(schema=f2Schema,beta=[1.0,1.0],alpha=[0.1,0.1],gamma=[1.0,1.0],rho=[1.0,1.0],numarray=True)

#Load saved STEM-MCI network
loadFusionARTNetwork(f2,name='f2stemart_all_folds_22jan_vs2_0.net')
loadFusionARTNetwork(f3,name='f3stemart_all_folds_22jan_vs2_0.net')

f2.stackTopFusionART(f3)
#the channel representing the sequence of events ('events') is linked with the category field (F2) 
f2.linkF2TopF1BySchema(['events'])


In [None]:
#function to predict a single episode in STEM-MCI with counting prediction and correct prediction (for testing data)
def predictSingleEpisode(stemart=None, epidata=None, eplen=0, foldidx=None):
    ccnt = 0
    ecnt = 0
    arttop = stemart.TopFusionART
    arttop.clearActivityF1()
    duplEvent = 0
    normalized_time = []
    for event in epidata['events']:
        normtime = 0
        if ecnt > 0:
            normtime = ecnt/eplen
            normalized_time.append(normtime)
        stemart.updateF1bySchema([{'name':'features', 'val':event},
                                 {'name':'timestamp', 'val':[normtime]}])
        J = stemart.SchemaBasedSequentialResSearch(duprep=False) #do resonance search for F2 to select an event
       
        ecnt += 1
    f2_activation = arttop.activityF1[0]
    JJ = arttop.resSearch()
    #arttop.add_usage(JJ) #-------------
    arttop.add_predict(JJ)
    print(f'code selected {JJ}, uncommitted:{arttop.uncommitted(JJ)}')
    #pred = arttop.add_predict(JJ)
    pred = []
    y = []
    arttop.doReadout(JJ,1)
    arttop.TopDownF1()
    routgroup = []
    gtruthgroup = []
   
    gtruthgroup.append(epidata['label'])
    y = []
    if (arttop.F1Fields[1]['val'] == epidata['label']):
         ccnt += 1
         y = arttop.add_correct(JJ)
         print(f'correct! no. correct:{arttop.codes[JJ]["correct_cnt"]}')
   
    print(f'{JJ}: sel_cnt {arttop.codes[JJ]["sel_cnt"]}, predict_cnt {arttop.codes[JJ]["prediction_cnt"]}, correct_cnt {arttop.codes[JJ]["correct_cnt"]}')

    return routgroup, gtruthgroup, (routgroup == gtruthgroup), f2_activation, [normalized_time], y, pred

In [None]:
def midvalc(v=None, vc=None):
    vcc = 1 - vc
    mv = v + ((vcc-v)/2)
    if v > 0 or vc > 0:
        return mv
    return 0


def RetCode(fart=None, code=None, fld=0, compl=True):
    if fart.schema[fld]['compl']:
        if compl:
            return fart.codes[code]['weights'][fld][int(len(fart.codes[code]['weights'][fld])/2):len(fart.codes[code]['weights'][fld])]
        else:
            return fart.codes[code]['weights'][fld][0:int(len(fart.codes[code]['weights'][fld])/2)]
    else:
        return fart.codes[code]['weights'][fld]
    
def seqIdxEventRanges(fart=None, code=None, fld=0):
    ilist = []
    tlistc = RetCode(fart=fart, code=code, fld=fld)
    tlist = RetCode(fart=fart, code=code, fld=fld, compl=False)
    mlist = [midvalc(v=tlistc[i], vc=tlist[i]) for i in range(len(tlistc))]
    while any(mlist) > 0:
        midx = mlist.index(max(mlist))
        ilist.append((midx, tlist[midx], tlistc[midx], mlist[midx], 96-round(math.log(tlist[midx],0.9)), 96-round(math.log((1-tlistc[midx]),0.9))))
        mlist[midx] = 0
    return ilist

def seqIdxEvent(fart=None, code=None, fld=0, compl=True):
    ilist = []
    thelist = RetCode(fart=fart, code=code, fld=fld, compl=compl)
    while any(thelist) > 0:
        ilist.append(thelist.index(max(thelist)))
        thelist[thelist.index(max(thelist))] = 0
    return ilist


def seqEpisodeFeatureMidRange(stemmci=None, code=None):
    flist = []
    atop = stemmci.TopFusionART
    ilist = seqIdxEventRanges(fart=atop, code=code)
    group = RetCode(fart=atop, code=code, fld=1)
    for ep in ilist:
        idx = ep[0]
        stemmci.doReadoutAllFields(idx)
        #flist.append(stemmci.TopDownF1())
        flist.append(([stemmci.activityF1[0],stemmci.activityF1[1]],ep[4],ep[5]))
    return flist, group

def transintervalmidvals(depisode=None):
    flist = []
    for ep in depisode:
        spf = ep[0][0]
        tmp = ep[0][1]
        inof = ep[1]
        inot = ep[2]
        midspf = [midvalc(v=spf[0:int(len(spf)/2)][i], vc=spf[int(len(spf)/2):len(spf)][i]) for i in range(int(len(spf)/2))]
        #print(f'spf {spf}, midspf {midspf}')
        tmp[0] = round(tmp[0]*96)
        tmp[1] = round((1-tmp[1])*96)
        flist.append((midspf,tmp,inof,inot))
    return flist, inot

def sortintervalmidvals(tepisode=None):
    flist = []
    frst = True
    nfro = 0
    for ep in tepisode:
        spf = ep[0]
        ifro = ep[1][0]
        ito = ep[1][1]
        efro = ep[2]
        eto = ep[3]
        if not frst:
            ifro = nfro
        else:
            frst = False
        ito = eto
        nfro = ito + 1
        flist.append((spf,ifro,ito,ep[1]))
    return flist

def genadlvisual(stepisode=None, ninterval=96):
    levents = []
    for ev in stepisode:
        levents += [ev[0]] * ((ev[2]-ev[1]) + 1)
    return levents

def seqEpisodeFeature(stemmci=None, code=None, compl=True):
    flist = []
    atop = stemmci.TopFusionART
    ilist = seqIdxEvent(fart=atop, code=code, fld=0, compl=compl)
    group = RetCode(fart=atop, code=code, fld=1, compl=compl)
    for idx in ilist:
        stemmci.doReadoutAllFields(idx)
        #flist.append(stemmci.TopDownF1())
        flist.append([stemmci.activityF1[0],stemmci.activityF1[1]])
    return flist, group

def filterGroup(stemmci=None, grp=None):
    flistg = []
    atop = stemmci.TopFusionART
    for j in range(len(atop.codes)):
        sep = seqEpisodeFeature(stemmci=stemmci, code=j)
        if sep[1] == grp:
            spl = []
            for sp in sep[0]:
                spl.append([sp[0],list(np.array(sp[1])*96)])
            flistg.append(spl)
            #flistg.append([sep[0][0][0], list(np.array(sep[0][0][1])*96)])
    return flistg, grp    

def filtersumidxgroup(stemmci=None, lcodes=None, grp=None):
    flistg = {}
    atop = stemmci.TopFusionART
    for code in lcodes:
        #sep = seqEpisodeFeature(stemmci=stemmci, code=code[0])
        grc = RetCode(fart=atop, code=code[0], fld=1)
        if grc == grp:
            if code[0] in flistg:
                flistg[code[0]] += 1
            else:
                flistg[code[0]] = 1
    return flistg, grp

def getdayepisode(stemmci=None, code=None):
    repf = seqEpisodeFeatureMidRange(stemmci=stemmci, code=code)
    tep, lep = transintervalmidvals(depisode=repf[0])
    #print(f'group:{ri2[1]}, #interval:{lit2}')
    tepin = sortintervalmidvals(tepisode=tep)
    gtepin = genadlvisual(stepisode=tepin)
    return gtepin, lep, repf[1]

def filtblankgrp(stemmci=None, grp=None, pblank=[0,0,0,0,0]):
    flt = []
    atop = stemmci.TopFusionART
    for j in range(len(atop.codes)):
        print(f'j {j}')
        if not atop.uncommitted(j):
            gin, gr = getdayepisode(stemmci=stemmci, code=j)
            if gr == grp:
                if all(gin[i] == pblank for i in range(len(gin))):
                    flt.append(j)
    return flt    

def filtavalatgrp(stemmci=None, grp=None, ival=None):
    flt = []
    atop = stemmci.TopFusionART
    for j in range(len(atop.codes)):
        print(f'j {j}')
        if not atop.uncommitted(j):
            gin, lin, gr = getdayepisode(stemmci=stemmci, code=j)
            if gr == grp:
                if ival < 5:
                    if any(gin[i][ival] > 0 for i in range(len(gin))):
                        flt.append(j)
    return flt

def filtavalallgrp(stemmci=None, grp=None):
    flt = []
    atop = stemmci.TopFusionART
    for j in range(len(atop.codes)):
        print(f'j {j}')
        if not atop.uncommitted(j):
            gin, lin, gr = getdayepisode(stemmci=stemmci, code=j)
            if gr == grp:
                cntf = [0,0,0,0,0]
                for i in range(len(gin)):
                    for k in range(len(cntf)):
                        if (gin[i][k] > 0):
                            cntf[k] += 1
                if all(cntf[k] > 0 for k in range(len(cntf))):
                    flt.append(j)
    return flt

In [None]:
first = ConfidenceIdxbySchema(f3, field='group', outVal=[0,1], theta=0.75)
list2 = [list_of_values["code"] for list_of_values in first]

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import matplotlib as mpl
import json
from datetime import datetime,timedelta
import numpy as np
import seaborn as sns

#learned activity code 
n = 1161

#get episode data
gtin2, lin2, mcig2 = getdayepisode(stemmci=f2, code=n)#1161

data = gtin2
list1 = []
for i in data:
    list1.append(i)


df_temp_1_3 = pd.DataFrame()
bed = [item[0] for item in list1]
bed.insert(len(bed),0)
bedroom = [item[1] for item in list1]
bedroom.insert(len(bedroom),0)
kitchen = [item[2] for item in list1]
kitchen.insert(len(kitchen),0)
living_room = [item[3] for item in list1]
living_room.insert(len(living_room),0)
indooroutdoor = [item[4] for item in list1]
indooroutdoor.insert(len(indooroutdoor),0)
df_temp_1_3['bed'] = bed
df_temp_1_3['bedroom'] = bedroom
df_temp_1_3['kitchen'] = kitchen
df_temp_1_3['living_room'] = living_room
df_temp_1_3['indoor/outdoor'] = indooroutdoor
unique_values = df_temp_1_3['bed'].unique().tolist()

#df_temp_1_3['bed'].loc[df_temp_1_3['bed'] > 0, 'bed'] = 1
unique_values = df_temp_1_3['bed'].tolist()
df_temp_1_3.loc[df_temp_1_3['bed'] > 0.8, 'bed'] = 1
df_temp_1_3['bedroom']=(df_temp_1_3['bedroom']-df_temp_1_3['bedroom'].min())/(df_temp_1_3['bedroom'].max()-df_temp_1_3['bedroom'].min())
df_temp_1_3.loc[df_temp_1_3['bedroom'] > 0.8, 'bedroom'] = 1
df_temp_1_3['kitchen']=(df_temp_1_3['kitchen']-df_temp_1_3['kitchen'].min())/(df_temp_1_3['kitchen'].max()-df_temp_1_3['kitchen'].min())
df_temp_1_3.loc[df_temp_1_3['kitchen'] > 0.8, 'bedroom'] = 1
df_temp_1_3['living_room']=(df_temp_1_3['living_room']-df_temp_1_3['living_room'].min())/(df_temp_1_3['living_room'].max()-df_temp_1_3['living_room'].min())
df_temp_1_3.loc[df_temp_1_3['living_room'] > 0.8, 'living_room'] = 1
df_temp_1_3['indoor/outdoor']=(df_temp_1_3['indoor/outdoor']-df_temp_1_3['indoor/outdoor'].min())/(df_temp_1_3['indoor/outdoor'].max()-df_temp_1_3['indoor/outdoor'].min())
df_temp_1_3.loc[df_temp_1_3['indoor/outdoor'] > 0.8, 'indoor/outdoor'] = 1




spacing = 15
time = [datetime.strptime(str(i*timedelta(minutes=spacing)),
    '%H:%M:%S').strftime('%I:%M %p') for i in range(24*60//spacing)]
time = time[:len(time)-1]
df_temp_1_3.drop(df_temp_1_3.tail(2).index,inplace=True)              
if(len(df_temp_1_3) == 95):

    df_temp_1_3['intervals'] = time
    #print(df_temp_1_3)

    df_bar = df_temp_1_3 

    df_bar['intervals']= pd.to_datetime(df_bar['intervals'])
    timeseries = df_bar['intervals'].astype(str).tolist()
    timeseries_1 = timeseries

    df_bar['intervals'] = df_bar['intervals'].dt.strftime('%H:%M')
    timeseries = df_bar['intervals'].astype(str).tolist()

    timeseries_1 = timeseries

    df=pd.DataFrame({'hour':timeseries_1,
                    'in bed':df_bar['bed'].tolist(),
                    'bedroom':df_bar['bedroom'].tolist(),
                    'kitchen':df_bar['kitchen'].tolist(),
                    'living_room':df_bar['living_room'].tolist(),
                    'Outdoor':df_bar['indoor/outdoor'].tolist()})
    df.set_index(['hour'],inplace=True)

    sns.set (rc = {'figure.figsize':(5, 5)})

    cmap = mpl.colormaps["rainbow"]

    newcolors = cmap(np.linspace(0, 1, 100))

    newcolors[0:5] = mpl.colors.to_rgb('#155FC8') + (1,)
    newcolors[5:15] = mpl.colors.to_rgb('#2A7EF7') + (1,)
    newcolors[97:] = mpl.colors.to_rgb('#C11F0C') + (1,)


    newcmap = mpl.colors.ListedColormap(newcolors)
    
    ax = sns.heatmap(df, cmap=newcmap)
 
    ax.set_ylim(0,96)
 

    plt.savefig('mcicodestest_' + n + '.png')
    plt.clf()
    print(df_temp_1_3)
    print(unique_values)