In [1]:
import os
if not os.getcwd().endswith('mob2crime'):
    os.chdir('..')
os.getcwd()

'/home/Jiahui/mob2crime'

In [5]:
import gzip, json, glob
import logging
import argparse

import pandas as pd
from collections import defaultdict
import json

from src.creds import mex_root, mex_tower_fn

from collections import deque


In [157]:
args_list = '--call-in-or-out out+in --debugging'.split()
parser = argparse.ArgumentParser(description='options for aggregating mexico tower daily hourly unique user in call-out or call-in data')
parser.add_argument('--debugging', action='store_true')
parser.add_argument('--call-in-or-out', required=True, choices=['in','out', 'out+in'])
args = parser.parse_args(args_list)
print(args)

Namespace(call_in_or_out='out+in', debugging=True)


In [8]:
level = logging.DEBUG if args.debugging else logging.INFO
logging.basicConfig(filename="logs/StatMexTwHrUniqCnt.log", level=level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

In [10]:
stat_dir = f'stats/MexTwHrUniqCnt-{args.call_in_or_out}/'
if args.debugging:
    stat_dir += 'debug/'
print('statistics in ', stat_dir)
os.makedirs(stat_dir, exist_ok=True)

stats/MexTwHrUniqCnt-out/debug/


In [11]:
logging.info('===============================')
logging.info('MEX tower hourly unique user counting starts. '
             f'debugging={args.debugging}, call_in_or_out={args.call_in_or_out}')


In [13]:
# tower ids information
tower_info_path = mex_root+ mex_tower_fn
towers = pd.read_csv(tower_info_path,header=None,sep='|')
towers['latlon'] = towers.apply(lambda x: '%.6f'%(x[2])+','+'%.6f'%(x[3]), axis=1)
towers_gp = towers.groupby('latlon')[0].apply(list).to_frame()
towers_gp['gtid'] = towers_gp[0].apply(lambda x: '-'.join(x))

gt2loc = {row['gtid']: loc.split(',') for loc, row in towers_gp.iterrows()}
t2gt = {}
for _, row in towers_gp.iterrows():
    for tid in row[0]:
        t2gt[tid] = row['gtid']


In [140]:
class DataQueue:
    def __init__(self,directory, maxlen=6):
        self.directory = directory
        self.maxlen = maxlen
        self.dates = deque(maxlen=6)
        self.aggs = {}
    def get_agg(self, date):
        if date not in self.aggs:
            loaded = self.load_agg(date)
            if not loaded:
#                 print(f'get_agg {date} can not be loaded')
                return None
        return self.aggs[date]
    
    def load_agg(self, date):
        if set(self.dates) != set(self.aggs.keys()):
            self.dates = [d for d in self.dates if d in self.aggs]
        fn = f'{self.directory}{date}.json.gz'
        if not os.path.exists(fn):
            return False
        if len(self.dates)==self.maxlen:
            self.aggs.pop(self.dates.popleft())
        self.dates.append(date)
        agg = json.load(gzip.open(fn))
#         agg=fn
        self.aggs[date] = agg
        return True

In [154]:
def update_stat_by_agg(stat, stat_no_tinfo, agg):
    for tid, hr_uniq_users in agg.items():
        tid = tid.replace('33F430','')
        if tid in t2gt:
            gtid = t2gt[tid]
            for hr, uniq_users in hr_uniq_users.items():
                stat[gtid][int(hr)].update(uniq_users)
        else: 
            for hr, uniq_users in hr_uniq_users.items():
                stat_no_tinfo[tid][int(hr)].update(uniq_users)


In [159]:
call_directions = args.call_in_or_out.split('+')
call_directions

['out', 'in']

In [160]:
# initiating task list
agg_dir = {'in': 'stats/AggMexTwDyHrUnqUsrVOZENTRANTE/', 'out': 'stats/AggMexTwDyHrUnqUsrVOZ/'}
dates_in_file = {}
data_queues = {}
dates = set()
for call_d in call_directions:
    adir = agg_dir[call_d]
    dates_in_file[call_d] = json.load(open(f'{adir}dates_in_file.json'))
    dates.update(dates_in_file[call_d].keys())
    data_queues[call_d] = DataQueue(directory=adir, maxlen=6)

dates = sorted(dates)

In [176]:
# for each date
for date in dates:
    print(f'working on date: {date}')
    logging.info(f'working on date: {date}')
    
    # compiling stat: stats[tower][hour] = set of users
    stat = defaultdict(lambda: defaultdict(set))
    stat_no_tinfo = defaultdict(lambda: defaultdict(set))
    
    # get agg_file_dates in each call direction of interests for each date 
    for call_d in call_directions:
        if date not in dates_in_file[call_d]:
            print(f'{date} not in call_direction: {call_d}')
            logging.info(f'{date} not in call_direction: {call_d}')
            continue
        agg_dates = dates_in_file[call_d][date]
        if args.debugging: print(call_d, agg_dates)
        # update the unique users set of each hour in that date by each agg_file in each call direction 
        for adate in agg_dates:
            agg = data_queues[call_d].get_agg(adate)[date]
            update_stat_by_agg(stat, stat_no_tinfo, agg)
#             break
    # store stat
    df_stat = pd.DataFrame(stat).T.applymap(lambda x: len(x) if not pd.isnull(x) else 0)
    df_stat.to_csv(f"{stat_dir}{date}-located.csv")
    df_stat_no_tinfo = pd.DataFrame(stat_no_tinfo).T.applymap(lambda x: len(x) if not pd.isnull(x) else 0)
    df_stat_no_tinfo.to_csv(f"{stat_dir}{date}-no-info.csv")
    logging.info('%d towers located, %d towers no info' %(len(df_stat), len(df_stat_no_tinfo)))
    if args.debugging:
        break

working on date: 2009-10-01
out ['2009-10-01']
2009-10-01 not in call_direction: in


In [None]:
logging.info('finish counting')
logging.info('*'*20)