In [1]:
import pandas as pd
import os
from tqdm import tqdm
import concurrent.futures

dff = pd.read_csv('./twcs.csv')

In [2]:
dff['company'] = dff['author_id']
dff['created_at'] = pd.to_datetime(dff['created_at'])
dff.set_index('tweet_id', inplace=True)
dff['in_response_to_tweet_id'] = dff['in_response_to_tweet_id'].astype('Int64')
dff = dff.sort_index(ascending=True)

In [3]:
dff = dff.sort_index(ascending=True)

In [3]:
dff.shape

(2811774, 7)

In [25]:
dff.head(5)

Unnamed: 0_level_0,author_id,inbound,created_at,text,response_tweet_id,in_response_to_tweet_id,company
tweet_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1,sprintcare,False,2017-10-31 22:10:47+00:00,@115712 I understand. I would like to assist y...,2.0,3,sprintcare
2,115712,True,2017-10-31 22:11:45+00:00,@sprintcare and how do you propose we do that,,1,115712
3,115712,True,2017-10-31 22:08:27+00:00,@sprintcare I have sent several private messag...,1.0,4,115712
4,sprintcare,False,2017-10-31 21:54:49+00:00,@115712 Please send us a Private Message so th...,3.0,5,sprintcare
5,115712,True,2017-10-31 21:49:35+00:00,@sprintcare I did.,4.0,6,115712


In [12]:
# Assign Conversation Numbers - Original non-parallelised code - Too slow 12 it/s
df = dff.head(2000)
def find_conv(resp_id, idx):
    flag = True
    while flag:
        new_idx = resp_id[resp_id.isin(idx)].index
        new_idx = set(new_idx).union(idx)
        flag = len(new_idx) - len(idx)
        idx = list(new_idx)
    return idx 

conv_idx = df[pd.isna(df['in_response_to_tweet_id'])].index
df.loc[conv_idx, 'conversation'] = conv_idx.values

conv_dict = dict()
for conv in tqdm(conv_idx):
    idx = find_conv(df['in_response_to_tweet_id'], [conv])
    conv_dict[conv] = idx

for conv, idx in conv_dict.items():
    df.loc[idx, 'conversation'] = conv
df['conversation'] = df['conversation'].astype('int')
df.tail(10)

100%|███████████████████████████████████████████████████████████████████████████████| 518/518 [00:00<00:00, 2458.23it/s]


IntCastingNaNError: Cannot convert non-finite values (NA or inf) to integer

In [41]:
# Assign conversation numbers - parallelised - Chunk of conversations - ok speed (chunk_size 10k~1.3k it/s, 50k~1.5k it/s, 500~ max speed 2.5k it/s)
#df = dff

def find_conv(resp_id, idx):
    flag = True
    while flag:
        new_idx = resp_id[resp_id.isin(idx)].index
        new_idx = set(new_idx).union(idx)
        flag = len(new_idx) - len(idx)
        idx = list(new_idx)
    return idx

# Function to process each conversation
def process_conversation(conv, df):
    idx = find_conv(df['in_response_to_tweet_id'], [conv])
    return conv, idx

def get_conversations(dff, chunk_size):
    # # Parallelize the loop using concurrent.futures
    # with concurrent.futures.ThreadPoolExecutor(max_workers=num_cores) as executor:
    #     results = list(tqdm(executor.map(process_conversation, conv_idx), total=len(conv_idx)))

    conv_idx = dff[pd.isna(dff['in_response_to_tweet_id'])].index
    # Get the number of available CPU cores
    num_cores = min(len(conv_idx), os.cpu_count())
    # Print the number of cores used
    print("Number of cores used:", num_cores)

    # Parallelize the loop using concurrent.futures
    results = []
    start_idx = 0
    for i in range(len(conv_idx)//chunk_size):
        end_idx = conv_idx[min(len(conv_idx), (i+1)*chunk_size)]
        chunk_idx = conv_idx[i*chunk_size:min(len(conv_idx), (i+1)*chunk_size)]
        if len(chunk_idx) == chunk_size:
            df = dff.loc[dff.index.isin(list(range(start_idx,end_idx)))]
        if len(chunk_idx) < chunk_size:
            df = dff.loc[start_idx:]
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_cores) as executor:
            temp = list(tqdm(executor.map(process_conversation, chunk_idx, [df]*len(chunk_idx)), total=len(chunk_idx)))
        results += temp
        start_idx = end_idx
    return results

In [42]:
# Update the 'conversation' values in the DataFrame
chunk_size = 10000
results = get_conversations(dff, chunk_size)
for conv, idx in results:
    dff.loc[idx, 'conversation'] = conv

dff['conversation'] = dff['conversation'].astype('Int64')

Number of cores used: 10


100%|███████████████████████████████████████████████████████████████████████████| 10000/10000 [00:07<00:00, 1268.99it/s]
100%|███████████████████████████████████████████████████████████████████████████| 10000/10000 [00:07<00:00, 1252.09it/s]
100%|███████████████████████████████████████████████████████████████████████████| 10000/10000 [00:07<00:00, 1278.39it/s]
100%|███████████████████████████████████████████████████████████████████████████| 10000/10000 [00:07<00:00, 1355.36it/s]
100%|███████████████████████████████████████████████████████████████████████████| 10000/10000 [00:07<00:00, 1288.95it/s]
100%|███████████████████████████████████████████████████████████████████████████| 10000/10000 [00:08<00:00, 1159.53it/s]
100%|███████████████████████████████████████████████████████████████████████████| 10000/10000 [00:07<00:00, 1264.97it/s]
100%|███████████████████████████████████████████████████████████████████████████| 10000/10000 [00:08<00:00, 1243.05it/s]
100%|███████████████████████████

In [47]:
dff.tail(20)

Unnamed: 0_level_0,author_id,inbound,created_at,text,response_tweet_id,in_response_to_tweet_id,company,conversation
tweet_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2987930,823862,True,2017-11-29 19:24:29+00:00,@AskPayPal Könnt ihr mal bitte auf meine DM An...,2987929.0,,823862,
2987931,UPSHelp,False,2017-11-30 07:56:27+00:00,@823863 Sorry to see this. If anything is dama...,,2987932.0,UPSHelp,
2987932,823863,True,2017-11-30 01:41:28+00:00,@115817 seriously ?? https://t.co/i7JhZaQuGg,2987931.0,,823863,
2987933,UPSHelp,False,2017-11-30 07:54:51+00:00,"@823864 If you need assistance, please use the...",,2987934.0,UPSHelp,
2987934,823864,True,2017-11-30 01:40:49+00:00,"Second day (night) in a row my package is ""on ...",29879352987933.0,,823864,
2987936,UPSHelp,False,2017-11-30 07:48:01+00:00,"@823865 If you need assistance, please use the...",,2987937.0,UPSHelp,
2987937,823865,True,2017-11-30 01:39:12+00:00,@115817 @UPSHelp Why does the tracking record ...,2987936.0,,823865,
2987938,AmazonHelp,False,2017-11-22 07:49:52+00:00,@823866 当サイトからそのようなメールをお送りすることはございません。当サイトの名をか...,,2987939.0,AmazonHelp,
2987939,823866,True,2017-11-22 07:41:45+00:00,いきなり来たんだけど\nなんですかこれ！！？\n\n@120465 https://t.co...,2987938.0,,823866,
2987940,783956,True,2017-11-22 07:15:45+00:00,@Safaricom_Care It's almost clocking 24hrs sin...,,2811285.0,783956,


In [45]:
missed = dff[dff['conversation'].isna()]
missed_resp_id = missed['in_response_to_tweet_id'].values
idx = list(set(missed_resp_id).intersection(dff.index))
missed_resp = dff.loc[idx]
newdf = pd.concat([missed, missed_resp]).sort_index(ascending=True).drop_duplicates()

missed_conv_idx = newdf[pd.isna(newdf['in_response_to_tweet_id'])].index
results_missed = []
for conv in tqdm(missed_conv_idx):
    idx = find_conv(newdf['in_response_to_tweet_id'], [conv])
    results_missed.append((conv, idx))

for conv, idx in results_missed:
    dff.loc[idx, 'conversation'] = conv

dff['conversation'] = dff['conversation'].astype('Int64')

Number of cores used: 10


In [55]:
# There are missed data in the dataframe for some reason. Find all and assign conversation numbers again.
conv_idx = newdf[pd.isna(newdf['in_response_to_tweet_id'])].index
results2 = []
for conv in tqdm(conv_idx):
    idx = find_conv(newdf['in_response_to_tweet_id'], [conv])
    results2.append((conv, idx))

dff.to_csv('twcs_proccessed.csv')

100%|██████████████████████████████████████████████████████████████████████████████| 4413/4413 [00:14<00:00, 300.35it/s]


In [95]:
# Assign Company to conversations

def process_group(group_tuple):
    group_name, group_data = group_tuple
    grp_idx = group_data.index
    companies = [n.lower() for n in group_data['author_id'].unique() if not n.isnumeric()]
    # It is possible that multiple companies got involved into the same conversation, often both merchant and logistic company.
    return ', '.join(sorted(companies)), grp_idx
    # if len(authors) == 1:
    #     company = authors[0]
    #     return company, grp_idx
    # else:
    #     print('Conversation {} has more than one company: {}'.format(group_name, authors))
    #     return None

# Your existing code...
idxGroup = dict()
grouped = dff.groupby('conversation')


for group in tqdm(grouped):
    company, grp_idx = process_group(group)
    if company in idxGroup:
        idxGroup[company].extend(grp_idx)
    else:
        idxGroup[company] = list(grp_idx)    


# Assign the 'company' values to the DataFrame
for company, idx in idxGroup.items():
    dff.loc[list(set(idx)), 'company'] = company


100%|████████████████████████████████████████████████████████████████████████| 794335/794335 [00:28<00:00, 27468.53it/s]


In [96]:
dff.loc[dff['conversation']==2979681]

Unnamed: 0_level_0,author_id,inbound,created_at,text,response_tweet_id,in_response_to_tweet_id,company,conversation
tweet_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
2979680,AmazonHelp,False,2017-11-30 04:08:00+00:00,@249801 Please reach out to us by phone or cha...,,2979681.0,"amazonhelp, upshelp",2979681
2979681,249801,True,2017-11-30 03:58:22+00:00,"Hey @115817, thanks for not delivering my pack...",29796802979682.0,,"amazonhelp, upshelp",2979681
2979682,UPSHelp,False,2017-11-30 04:25:28+00:00,@249801 I am sorry to hear that your package w...,2980530.0,2979681.0,"amazonhelp, upshelp",2979681
2980530,221581,True,2017-11-30 04:45:53+00:00,@UPSHelp @249801 Dont feel too bad. I have a N...,2980531.0,2979682.0,"amazonhelp, upshelp",2979681
2980531,249801,True,2017-11-30 04:49:13+00:00,@221581 @UPSHelp wow that sucks. Hope you get ...,2980532.0,2980530.0,"amazonhelp, upshelp",2979681
2980532,221581,True,2017-11-30 04:51:37+00:00,@249801 @UPSHelp I am lucky that Amazon does t...,,2980531.0,"amazonhelp, upshelp",2979681


In [97]:
dff.to_csv('twcs_proccessed.csv')

In [None]:
dff['company'].value_counts().plot(kind='bar')

<Axes: >

## No use

In [None]:
# Assign conversation numbers - parallelised - very slow ~100it/s
df = dff

def find_conv(resp_id, idx):
    flag = True
    while flag:
        new_idx = resp_id[resp_id.isin(idx)].index
        new_idx = set(new_idx).union(idx)
        flag = len(new_idx) - len(idx)
        idx = list(new_idx)
    return idx

conv_idx = df[pd.isna(df['in_response_to_tweet_id'])].index
df.loc[conv_idx, 'conversation'] = conv_idx.values

# Function to process each conversation
def process_conversation(conv):
    idx = find_conv(df['in_response_to_tweet_id'], [conv])
    return conv, idx

# Get the number of available CPU cores
num_cores = min(len(conv_idx), os.cpu_count())
# Print the number of cores used
print("Number of cores used:", num_cores)

# Parallelize the loop using concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=num_cores) as executor:
    results = list(tqdm(executor.map(process_conversation, conv_idx), total=len(conv_idx)))

# Update the 'conversation' values in the DataFrame
for conv, idx in results:
    df.loc[idx, 'conversation'] = conv

df['conversation'] = df['conversation'].astype('Int64')
