In [1]:
import pandas as pd
import time
from glob import glob
from os.path import basename
from os import cpu_count
from itertools import islice
from multiprocessing import Pool
import logging
from datetime import datetime
import uuid


logging.getLogger().setLevel(logging.INFO)


def current_time(p: bool = True):
    now = datetime.now()
    if p:
        time = now.strftime("%Y-%m-%d %H:%M:%S")
    else:
        time = now.strftime("%Y%m%d%H%M%S")

    return time


def splitDict(d, num_of_cpus):
    # split dict evenly by number of CPUs
    
    lists = []
    n = len(d) // num_of_cpus
    i = iter(d.items())
    for x in range(num_of_cpus):
        d = dict(islice(i, n))
        lists.append(d)
        
    return lists


def proc_df(users: dict):
    # create sub-dataframe by given data
    
    praty_id = dict()
    messages = pd.DataFrame(columns=['timestamp', 'unix-timestamp', 'hour', 'from', 'to', 'to_num', 'to_ext', 'location'])
    
    for uid in users:
        for ts in users[uid]:
            t = datetime.strptime(ts, "%Y-%m-%d %H:%M:%S")
            to_ext = 0
            
            unix_time = int(time.mktime(t.timetuple()))
            flipped = {}
            for key, value in users[uid][ts].items():
                if value not in flipped:
                    flipped[value] = [key]
                else:
                    flipped[value].append(key)
            
            for loc in flipped:
                if 'external' in flipped[loc]:
                    to_ext = 1
                    
                messages.loc[len(messages.index)] = [ts, unix_time, t.hour, uid, flipped[loc], len(flipped[loc]), to_ext, loc]
    
    return messages


def create_df(file: str):
    logging.info(f"[{file}] started at: {current_time()}")
    # create dataframe
    
    messages = pd.DataFrame(columns=['timestamp', 'unix-timestamp', 'hour','from', 'to', 'to_num', 'to_ext', 'location'])
    fri = pd.read_csv(file, index_col=0)
    users = dict()

    for ts, row in fri.iterrows():
        msg_from = str(row[0])
        msg_to = row[1]
        loc = row[2]

        if msg_from not in users:
            users[msg_from] = dict()
        if ts not in users[msg_from]:
            users[msg_from][ts] = dict()

        users[msg_from][ts][msg_to] = loc
        
    # multiprocessing data entries
    # num_of_cpus = 1
    num_of_cpus = cpu_count() - 1
    users_list = splitDict(users, num_of_cpus)
    
    with Pool(processes=num_of_cpus) as pool:
        workers = list()
        for user_item in users_list:
            workers.append(pool.apply_async(proc_df, (user_item, )))
    
        for w in workers:
            try:
                messages = pd.concat([messages, w.get()])
            except Exception as e:
                logging.error(e)
    
        pool.close()
        pool.join()
    
    messages = messages.sort_values(by=['unix-timestamp'])
    messages = messages.reset_index(drop=True)

    # output dataframes to json
    output_name = basename(file).replace("csv", "json")
    messages.to_csv(f"./outputs/{basename(file)}")
    messages.to_json(f"./outputs/{output_name}")
    
    logging.info(f"[{file}] completed at: {current_time()}")
    
    return messages

In [None]:
fri = create_df("./dataset/mc2_2015_data/comm-data-Fri.csv")
sat = create_df("./dataset/mc2_2015_data/comm-data-Sat.csv")
sun = create_df("./dataset/mc2_2015_data/comm-data-Sun.csv")

INFO:root:[./dataset/mc2_2015_data/comm-data-Fri.csv] started at: 2022-12-11 03:31:28
INFO:root:[./dataset/mc2_2015_data/comm-data-Fri.csv] completed at: 2022-12-11 03:32:27
INFO:root:[./dataset/mc2_2015_data/comm-data-Sat.csv] started at: 2022-12-11 03:32:27
INFO:root:[./dataset/mc2_2015_data/comm-data-Sat.csv] completed at: 2022-12-11 03:34:14
INFO:root:[./dataset/mc2_2015_data/comm-data-Sun.csv] started at: 2022-12-11 03:34:14


In [None]:
fri.head(20)