In [1]:
import pandas as pd
import pymongo
from datetime import datetime, timedelta
from pymongo import MongoClient

In [2]:
TIME_STAMP_FORMAT = "%Y-%m-%d %H:%M:%S.%f"

In [3]:
def mongo_connect():
    # Try to connect to MongoDB,  exit if not successful.
    try:
        conn = MongoClient('localhost', 27018)
        # print "Connected successfully to MongoDB, instance created!"

    except(pymongo.errors.ConnectionFailure, e):
        print("Could not connect to MongoDB: {}".format(e))

    name = 'SA_Voting_Data'
    db = conn[name]  
    conversations = db.conversations_collection
    raw = db.raw_collection
    features = db.features_collection

    return conn, db, conversations, raw, features

In [4]:
def get_addresses():
    conn, db, conversations, raw, features = mongo_connect()
    try:
        c = conversations.distinct('poi')
    finally:
        conn.close()
    
    return list(c)

def get_conversation(poi):
    conn, db, conversations, raw, features = mongo_connect()
    try:
        c = conversations.find({'poi': poi})
    finally:
        conn.close()
    df = pd.DataFrame(list(c))
    header = ['poi','content','from_addr','to_addr','transport_type','transport_name','session_event','_id']
    df = df.set_index('timestamp')
    
    # we drop duplicates of timestamp which is the index
    df = df[~df.index.duplicated(keep='first')]
    return df[header]

In [5]:
addresses = get_addresses()

In [6]:
example_df = get_conversation(addresses[10])

# feature time to response for each push and reminder messages #4
# feature: average time to answer a question #3

In [7]:
def alpha(conversation_df):
    """average number of responses given per open session and total number of sessions"""
    total_number_of_responses = len(conversation_df[conversation_df['session_event']=='resume'])
    number_of_sessions = len(conversation_df[conversation_df['session_event']=='close'])
    if float(number_of_sessions) != 0:
        return float(total_number_of_responses)/float(number_of_sessions), float(number_of_sessions)
    else:
        return 0, 0

In [8]:
average_response, total_number_of_sessions = alpha(example_df)
(average_response, total_number_of_sessions)

(1.3333333333333333, 6.0)

In [9]:
def get_total_time_in_system(df):
    try:
        # start is the first instance we observe a new_connection
        start = datetime.strptime(df[df['session_event'] == 'new'].index[0], TIME_STAMP_FORMAT)
        # finish is the very last report of the system
        finish = datetime.strptime(df.index[-1],TIME_STAMP_FORMAT)
        delta = finish - start
        return delta.seconds
    except:
        return 0

def beta(conversation_df):
    """Returns total time interacting and total time in system """
    total_time_in_system = get_total_time_in_system(conversation_df)
    total_time_interacting = 0.0
    start_time = 0.0
    finish_time = 0.0
    in_session = False
    for rows in conversation_df.iterrows():

        if rows[1]['session_event'] == 'new':
            start_time = datetime.strptime(rows[0],TIME_STAMP_FORMAT)
            in_session = True

        if rows[1]['session_event'] == 'close' and in_session is True:
            finish_time = datetime.strptime(rows[0],TIME_STAMP_FORMAT)
            delta = finish_time - start_time
            time_interacting_in_session = delta.seconds
            total_time_interacting += delta.seconds
            in_session = False
            
    return total_time_interacting, total_time_in_system

In [10]:
(total_time_interacting, total_time_in_system) = beta(example_df)
(total_time_interacting, total_time_in_system)

(518.0, 85076)

# surfing : need to make sure it ignores the endline #2

In [11]:
def channels(conversation_df):
    """returns three boolean variables, indicating whether there was surfing happening in this channel"""
    
    ch1 = conversation_df[conversation_df['to_addr'] == '*120*7692*2#']
    ch2 = conversation_df[conversation_df['to_addr'] == '*120*7692*3#']
    ch3 = conversation_df[conversation_df['to_addr'] == '*120*4729#']
    
    channel_1 = not ch1[ch1['content'].isnull()].empty
    channel_2 = not ch2[ch2['content'].isnull()].empty
    channel_3 = not ch3[ch3['content'].isnull()].empty
    
    count = 0
    for channel in [channel_1, channel_2, channel_3]:
        if channel is True:
            count += 1
    
    if count >=2:
        is_surfing = True
    else:
        is_surfing = False
        
    return channel_1, channel_2, channel_3, is_surfing

In [12]:
(channel_1, channel_2, channel_3, is_surfing) = channels(example_df)
(channel_1, channel_2, channel_3, is_surfing)

(False, False, True, False)

GET all channels which the person dialed in who they talked to?

In [13]:
set(example_df['to_addr'].values)

{None, '*120*4729#', '+27766763040', '*120*7692#', '*120*4729*1#', 'None'}

Get all transport types

In [14]:
set(example_df['transport_name'].values)

{'truteq_7692_transport',
 'ambient_go_smpp_transport',
 None,
 'truteq_4729_transport'}

In [15]:
set(example_df['transport_type'].values)

{'ussd', None, 'sms'}

#### CHANNELS

\*120\*7692\*  residual short code

\*120\*4729\*1# E-day Monitoring

\*120\*4729\*3# Endline

\*120\*7692\*1# VIP live magazine

\*120\*7692\*2# Main Channel Control

\*120\*7692\*3# Main Channel Lottery

\*120\*4729# Main Channel Subsidary 

\*120\*4279# OR have your voice heard on vip

# RESPONSE TIME

In [16]:
def get_average_response_time(conversation_df):
    questions = conversation_df[conversation_df['from_addr'] != conversation_df['poi']]
    questions_loc = [conversation_df.index.get_loc(loc) for loc in questions.index.tolist()]
    potential_answers_loc = [conversation_df.index.get_loc(loc)+1 for loc in questions.index.tolist()]
    cleaned_questions = list(set(questions_loc)-set(potential_answers_loc))
    cleaned_answers = [q+1 for q in cleaned_questions]
    
    if len(conversation_df)-1 in cleaned_questions:
        cleaned_questions.remove(len(conversation_df)-1)

    cleaned_answers = [q+1 for q in cleaned_questions]
    times = []
    
    for q,a in zip(cleaned_questions, cleaned_answers):
        r1 = conversation_df.iloc[q]
        r2 = conversation_df.iloc[a]
        r1_date = datetime.strptime(r1.name, TIME_STAMP_FORMAT)
        r2_date = datetime.strptime(r2.name, TIME_STAMP_FORMAT)
        times.append([(r2_date-r1_date).total_seconds(), r1.content, r2.content])
        
    df = pd.DataFrame(times)
    if len(times) == 0:
        return 0, df
    else:
        
        df.columns = ['times', 'question', 'answer']
        df.times.mean()
        return df.times.mean(), df

In [17]:
t,x = get_average_response_time(example_df)

In [18]:
t

105650.55264869564

# feature: set of boolean variables for what you did when you were nudged #5

In [19]:
def nudged_answer(conversation_df):
    english = conversation_df[conversation_df['content'].str.match(r'Welcome to VIP!') == True]
    afrikans = conversation_df[conversation_df['content'].str.match(r'Welkom by VIP!') == True]
    zulu = conversation_df[conversation_df['content'].str.match(r'Siyakwamukelaohlelweni i-VIP!') == True]
    iloc_answers = None
    if not english.empty:
        iloc_answers = [(conversation_df.index.get_loc(x))+1 for x in english.index.tolist()]
        
    if not afrikans.empty:
        iloc_answers = [(conversation_df.index.get_loc(x))+1 for x in afrikans.index.tolist()]
        
    if not zulu.empty:
        iloc_answers = [(conversation_df.index.get_loc(x))+1 for x in zulu.index.tolist()]
    
    if iloc_answers:    
        try:
            answers = conversation_df.iloc[iloc_answers].content.unique()    
        except:
            answers = []
    else:
        answers = []
        
    event_report = True if '3' in answers else False
    answer_win = True if '1' in answers else False
    vip = True if '2' in answers else False
    whats_up =True if '5' in answers else False
    return event_report, answer_win, vip, whats_up

In [20]:
(event_report, answer_win, vip, whats_up) = nudged_answer(example_df)
(event_report, answer_win, vip, whats_up)

(False, True, False, False)

In [21]:
def make_features(address):
    conversation_df = get_conversation(address)
    
    (average_response, total_number_of_sessions) = alpha(conversation_df)
    (total_time_interacting, total_time_in_system) = beta(conversation_df)
    (channel_1, channel_2, channel_3, is_surfing) = channels(conversation_df)
    (average_response_time, response_df) = get_average_response_time(conversation_df)
    (event_report, answer_win, vip, whats_up) = nudged_answer(conversation_df)
    payload = {
        'poi': address, 
        'average_response_count': average_response, 
        'total_number_of_sessions': total_number_of_sessions,
        'total_time_interacting': total_time_interacting,
        'total_time_in_system': total_time_in_system,
        'channel_1': channel_1,
        'channel_2': channel_2,
        'channel_3': channel_3,
        'is_surfing': is_surfing,
        'event_report': event_report,
        'answer_win': answer_win,
        'vip': vip,
        'whats_up': whats_up,
        'average_response_time': average_response_time,
        'reponse_data': response_df.to_dict(orient='records'),
    }
    
    conn, db, conversations, raw, features = mongo_connect()
    try:
        result = features.insert_one(payload)
    finally:
        conn.close()
    
    return result.inserted_id

In [22]:
addresses[10]

'+27766763040'

In [23]:
make_features(addresses[10])

ObjectId('5990d9dfe0938274fc11ed76')

In [24]:
conn, db, conversations, raw, features = mongo_connect()
    
r = features.delete_many({'poi': '+27766763040'})

In [25]:
conn.close()

In [33]:
%%time
from concurrent.futures import ProcessPoolExecutor

e = ProcessPoolExecutor()
addresses = get_addresses()
results = list(e.map(make_features, addresses[0:100]))

CPU times: user 636 ms, sys: 116 ms, total: 752 ms
Wall time: 2min 26s


In [34]:
%%time
from concurrent.futures import ThreadPoolExecutor
e = ThreadPoolExecutor(36)

futures = []
for address in addresses[0:100]:
    futures.append(e.submit(make_features, address))

r = [f.result() for f in futures]

CPU times: user 9.59 s, sys: 1.41 s, total: 11 s
Wall time: 1min 34s


In [36]:
%%time
from concurrent.futures import ThreadPoolExecutor
e = ThreadPoolExecutor(16)

futures = []
for address in addresses[0:100]:
    futures.append(e.submit(make_features, address))

r = [f.result() for f in futures]

CPU times: user 8.52 s, sys: 992 ms, total: 9.51 s
Wall time: 1min 45s


In [37]:
%%time
from concurrent.futures import ThreadPoolExecutor
e = ThreadPoolExecutor(8)

futures = []
for address in addresses[0:100]:
    futures.append(e.submit(make_features, address))

r = [f.result() for f in futures]

CPU times: user 7.33 s, sys: 748 ms, total: 8.08 s
Wall time: 2min 12s


In [38]:
%%time
from concurrent.futures import ThreadPoolExecutor
e = ThreadPoolExecutor(64)

futures = []
for address in addresses[0:100]:
    futures.append(e.submit(make_features, address))

r = [f.result() for f in futures]

CPU times: user 7.63 s, sys: 1.04 s, total: 8.68 s
Wall time: 1min 33s


In [35]:
len(r)

100

### Final

In [39]:
len(addresses)

262013

In [40]:
from concurrent.futures import ThreadPoolExecutor
e = ThreadPoolExecutor()

futures = []
for address in addresses:
    futures.append(e.submit(make_features, address))

r = [f.result() for f in futures]

AutoReconnect: connection closed

In [None]:
from concurrent.futures import ThreadPoolExecutor
e = ThreadPoolExecutor(9)

futures = []
for address in addresses:
    futures.append(e.submit(make_features, address))

r = [f.result() for f in futures]

In [None]:
len(results)