### TODO: Quite likely, the whole process can be written as a HQL query on Hive

In [None]:
import pandas as pd

# The tcparticipants.tsv is just a dump of the corresponding Hive table
df = pd.read_csv("data/tcparticipants.tsv", sep="\t", header=2,  skip_blank_lines=True,
                names=["message_id", "participant_id", "participant_address", "participant_username", "int_ext", "part_type"])
df["part_type"] = map( lambda x : str(x).strip(), df["part_type"])

In [None]:
df.head()

In [None]:
# We limit ourselves now to internal senders
query = 'part_type == "S" and int_ext == "I"'
sender = df.query(query).copy()
sender.head(5)

In [None]:
# We limit ourselves now to internal recipients
query = '(part_type == "R-CC" | part_type == "R-TO" | part_type == "R-BCC" | part_type == "R-unknown") and int_ext == "I"'
recipient = df.query(query).copy()
recipient.head(5)

In [None]:
# Count the number of recipients for each message
num_participants = pd.DataFrame(recipient.groupby('message_id').size(), columns=["cnt"])
num_participants.head(5)

In [None]:
# We replace the sender/recipients with anonymized ids
s_nodes = sender["participant_address"]
r_nodes = recipient["participant_address"]
nodes = set(s_nodes) | set(r_nodes)
hashing = dict()
for n in nodes:
    hashing[n] = str(len(hashing)).zfill(6)
    
sender["participant_address"] = map(lambda x: hashing[x], sender["participant_address"])
recipient["participant_address"] = map(lambda x: hashing[x], recipient["participant_address"])

In [None]:
# Drop unecessary columns
sender.drop(["participant_id", "participant_username", "int_ext", "part_type"],inplace=True,axis=1)
recipient.drop(["participant_id", "participant_username", "int_ext", "part_type"],inplace=True,axis=1)

In [None]:
# Let's build an index for speed
sender.set_index(["message_id"], inplace=True)
sender = sender.sortlevel().sortlevel(axis=1)

# Not sure if we need this index
# tuples = [tuple(x) for x in recipient[['message_id', 'participant_id']].to_records(index=False)]
# index = pd.MultiIndex.from_tuples(tuples, names=['message_id', 'participant_id'])
# recipient.set_index(index, inplace=True)
# recipient = recipient.sortlevel().sortlevel(axis=1)

In [None]:
# We want now to join three tables: sender, recipient, num_participants
# in order to calculate the "normalized" number of messages from node A to B
# We normalize by counting a message sent to n recipients to count as 1/n of a message

# Joins sender with number_of_participants
sender_count = pd.merge(sender, num_participants, left_index=True, right_index=True)

# Joins with recipients
sender_recipient = pd.merge(sender_count, recipient, left_index=True, right_on=["message_id"])

# We group by "sender, participant, number_of_people_in_recipients, count_of_such_messages"
# In this case, the tuple "A, B, K, N" means that A send to B a total of N messages in which there were exactly K recipients
grouped_by = sender_recipient.groupby(['participant_address_x', 'participant_address_y', 'cnt']).size()

# We now normalize the messages received by the number of recipients
# If B received from A: m1 messages with n1 recipients, m2 messages with n2 recipients, etc 
# then ===> total = m1/n1 + m2/n2 + m3/n3 + ...
counts_normalized = pd.DataFrame([(index[0], index[1], 1.0*value/index[2]) for index, value in grouped_by.iteritems()], columns=["sender", "recipient", "total"])
normalized_message_count = pd.DataFrame(counts_normalized.groupby(["sender", "recipient"]).sum())


In [None]:
# We just flatten the dataframe
msg_counts = [(index[0], index[1], value[0]) for index, value in normalized_message_count.iterrows()]

In [None]:
final_df =  pd.DataFrame(msg_counts, columns=["sender", "recipient", "total"])
final_df.sort_values(by=['total'], ascending=False, inplace=True)
final_df.head(5)

In [None]:
final_df.to_csv("data/msg_counts.csv", index=False, sep='\t')

In [None]:
for k,v in hashing.iteritems():
    if (v == '033618' or v =='021950' or v == '037821' or v == '019117'):
        print k,v
    