In [1]:
%load_ext autoreload
%autoreload 2

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

import pandas as pd

import collections
import re
import pprint as pp
import numpy as np
import collections

import multiprocessing as mp
from multiprocessing.pool import ThreadPool

import math
import gzip
import pickle as pkl
from datetime import datetime
import matplotlib.pyplot as plt

import fonctions
import itertools
from tqdm.notebook import tqdm

from os import listdir
from os.path import isfile, join
import time

import random
random.seed(0)

Using TensorFlow backend.


In [2]:
def trend_processing(x):
    if type(x) is str:
        return re.split(r'\t+', x)
    return []

def get_trends(directory,doc_name):
    
    all_features = ["text_tokens", "hashtags", "tweet_id", 
                    "present_media", "present_links", 
                    "present_domains", "tweet_type","language", 
                    "tweet_timestamp", "engaged_with_user_id",
                    "engaged_with_user_follower_count", "engaged_with_user_following_count", 
                    "engaged_with_user_is_verified", "engaged_with_user_account_creation",
                    "engaging_user_id", "engaging_user_follower_count", 
                    "engaging_user_following_count", "engaging_user_is_verified",
                    "engaging_user_account_creation", "engagee_follows_engager"]
    
    labels = ['reply_timestamp','retweet_timestamp', 'retweet_with_comment_timestamp','like_timestamp']
    all_variables = all_features + labels
    
    df = pd.read_csv(directory+doc_name, encoding="utf-8", sep='\x01', header=None)
    df.columns = all_variables
    
    df['reply_timestamp']=[ 0 if math.isnan(x) else 1 for x in df['reply_timestamp'] ]
    df['retweet_timestamp']=[ 0 if math.isnan(x) else 1 for x in df['retweet_timestamp'] ]
    df['retweet_with_comment_timestamp']=[ 0 if math.isnan(x) else 1 for x in df['retweet_with_comment_timestamp'] ]
    df['like_timestamp']=[ 0 if math.isnan(x) else 1 for x in df['like_timestamp'] ]
    
    df['hashtags'] = [ trend_processing(x) for x in df.hashtags ]
    
    df = df.filter(labels+['hashtags','engaging_user_id','engaged_with_user_id'])
    
    return df

def taste(series):
    taste = collections.Counter( list(itertools.chain.from_iterable( series ) ))
    return taste

def user_tastes_on_chunk(chunk,chunk_id):
    directory = '/home/maxime/Desktop/RecSys2020/data/batches/'
    global_tastes = {}
    iteration=1
    
    for batch_file in chunk:

        print(iteration)
        
        df = get_trends(directory, batch_file)
        
        buff = df [ df['retweet_timestamp']==1 ]
        results2 = buff.groupby(['engaging_user_id']).agg({'hashtags': [taste]})
        results2.columns = ['rt_hashtags']
        
        buff = df [ df['retweet_with_comment_timestamp']==1 ]
        results3 = buff.groupby(['engaging_user_id']).agg({'hashtags': [taste]})
        results3.columns = ['rtc_hashtags']
        
        buff = df [ df['reply_timestamp']==1 ]
        results4 = buff.groupby(['engaging_user_id']).agg({'hashtags': [taste]})
        results4.columns = ['rpl_hashtags']
        
        concat = pd.concat([ results2, results3, results4],axis=1) 
        concat = concat.where(concat.notna(), lambda x: [collections.Counter()])
        concat = concat.to_dict('index')
        
        common_id =[ k for k in concat.keys() if k in global_tastes.keys()]
        else_id = [ k for k in concat.keys() if k not in global_tastes.keys() ]

        { k:update_taste(global_tastes,concat,k) for k in common_id }
        reste = { k:concat[k] for k in else_id}
        global_tastes.update(reste)

        iteration=iteration+1

    print('saving...')
    
    with gzip.open('/home/maxime/Desktop/RecSys2020/trends/hashtag_tastes_{}.pkl.gz'.format(chunk_id),'wb') as f:
        pkl.dump(global_tastes,f)
        
    return True

def update_taste(global_tastes,concat,k):
    
    dico = {'rt_hashtags':collections.Counter(),'rtc_hashtags':collections.Counter(),'rpl_hashtags':collections.Counter()}
    
    request = global_tastes.get(k, dico)
    request = { key:request[key]+v for key, v in concat[k].items() }
    global_tastes[k]=request
    

In [5]:
%%time

batch_path='/home/maxime/Desktop/RecSys2020/data/batches'
batch_list = [f for f in listdir(batch_path) if isfile(join(batch_path, f))]
chunks = fonctions.chunkIt(batch_list, 8)

if __name__ == '__main__':
    # Setup a list of processes that we want to run
        
    processes = [ mp.Process(target=user_tastes_on_chunk, args=(chunk, idx) ) for idx, chunk in zip([4,5,6,7],[chunks[4],chunks[5],chunks[6],chunks[7]] ) ]

    # Run processes
    for p in processes:
        p.start()
        
    #Stop the processes
    for p in processes:
        p.join()

1
1
1
1
2
2
2
2
3
3
3
3
4
4
4
4
5
5
5
5
6
6
6
6
7
7
7
7
8
8
8
8
9
9
9
9
10
10
10
10
11
11
11
11
12
12
12
12
13
13
13
13
14
14
14
14
15
15
15
15
16
16
16
16
17
17
18
17
17
18
19
18
18
19
19
20
19
20
20
21
20
21
21
21
22
22
22
22
23
23
23
23
24
24
24
24
25
25
25
25
26
26
26
26
27
27
27
27
28
28
28
28
29
29
29
29
30
30
30
30
31
31
31
31
32
32
32
32
33
33
33
34
33
34
34
34
35
35
35
35
36
36
36
36
37
37
37
38
37
38
38
38
39
39
39
39
40
40
40
40
41
41
41
41
42
42
42
42
43
43
43
43
44
44
44
44
45
45
45
45
46
46
46
46
47
47
47
47
48
48
48
48
49
49
49
49
50
50
50
50
51
51
51
51
52
52
52
52
53
53
53
54
53
54
54
54
55
55
55
55
56
56
56
57
57
56
57
58
58
57
58
59
59
58
59
60
60
59
61
60
61
60
61
62
62
61
63
63
62
62
63
64
64
63
64
65
65
64
66
65
66
67
65
66
67
68
66
68
67
69
67
68
69
68
70
69
70
69
71
71
70
70
72
72
71
71
73
73
72
72
74
74
73
73
75
74
75
74
76
75
76
75
77
77
76
76
78
78
77
77
79
79
78
78
79
80
80
80
81
79
81
82
80
81
82
83
82
81
83
84
83
82
84
85
83
84
85
86
84
85
86
87
85
86
87
8