In [1]:
# !pip install git+https://github.com/deamonpog/gsdmm.git
# !pip install wordcloud
# !pip install --upgrade gensim
# !pip install --upgrade s3fs
# !pip install --upgrade boto3
# !pip install --upgrade numexpr
# !pip install --upgrade pandas

In [2]:
import json
import gzip
import glob
import csv
import pandas as pd

In [3]:
from time_keeper import TimeKeeper

In [4]:
data_files = glob.glob("./cache2_data/*cache*.json.gzip")
data_files.sort()
len(data_files)

29317

In [5]:
class Maybe:
    def __init__(self, json_object):
        self.json_object = json_object
        
    def field(self, field):
        if self.json_object is not None and type(self.json_object) is dict and field in self.json_object:
            return Maybe(self.json_object[field])
        return Maybe(None)
    
    def index(self, index):
        if self.json_object is not None and type(self.json_object) is list and index < len(self.json_object):
            return Maybe(self.json_object[index])
        return Maybe(None)
    
    def array(self, func=lambda m: m, as_type=list):
        if self.json_object is not None and type(self.json_object) is list:
            return as_type([func(obj) for obj in self.json_object])
        return []
    
    def value(self):
        return self.json_object


In [6]:
def read_local_text_data_raw(file_path):
    with gzip.open(file_path, 'r') as fin:
        json_bytes = fin.read()
    json_str = json_bytes.decode('utf-8')
    data = json.loads(json_str)
    return data


def read_local_text_data_filtered(file_path):
    with gzip.open(file_path, 'r') as fin:
        json_bytes = fin.read()
    json_str = json_bytes.decode('utf-8')
    data = json.loads(json_str)
    return Maybe(data).field("data").value(), Maybe(data).field("includes").field("places").value(), Maybe(data).field("includes").field("users").value()

In [7]:
def read_tweets_and_places(in_data_files):
    all_tweets = []
    all_places = []
    all_users = []
    for data_file in in_data_files:
        tweets, places, users = read_local_text_data_filtered(data_file)
        if tweets is not None:
            all_tweets.extend(tweets)
        else:
            print(f"Tweets is None for {data_file}")
        if places is not None:
            all_places.extend(places)
        if users is not None:
            all_users.extend(users)
    place_dict = {place["id"]: place["full_name"] for place in all_places}
    users = pd.DataFrame(users, dtype=str)
    users.drop_duplicates(inplace=True)
    users.to_csv("all_users_df_v5.csv", index=False, quoting=csv.QUOTE_ALL)
    return all_tweets, place_dict

In [8]:
%%time
tweets, places = read_tweets_and_places(data_files)

Tweets is None for ./cache2_data/querylist_cache_50_403.json.gzip
CPU times: user 5min 21s, sys: 19 s, total: 5min 40s
Wall time: 5min 39s


In [9]:
# Should be equal to following
expected_all_fields = {  'attachments',
                         'author_id',
                         'conversation_id',
                         'created_at',
                         'edit_history_tweet_ids',
                         'entities',
                         'geo',
                         'id',
                         'in_reply_to_user_id',
                         'public_metrics',
                         'referenced_tweets',
                         'text',
                         'withheld'  }
possible_fields = set().union(*[set(tweet.keys()) for tweet in tweets])
print(possible_fields == expected_all_fields)
possible_fields

True


{'attachments',
 'author_id',
 'conversation_id',
 'created_at',
 'edit_history_tweet_ids',
 'entities',
 'geo',
 'id',
 'in_reply_to_user_id',
 'public_metrics',
 'referenced_tweets',
 'text',
 'withheld'}

In [10]:
expected_common_fields = {  'author_id',
                             'conversation_id',
                             'created_at',
                             'edit_history_tweet_ids',
                             'id',
                             'public_metrics',
                             'text'  }
common_fields = possible_fields.intersection(*[set(tweet.keys()) for tweet in tweets])
print(common_fields == expected_common_fields)
common_fields

True


{'author_id',
 'conversation_id',
 'created_at',
 'edit_history_tweet_ids',
 'id',
 'public_metrics',
 'text'}

In [11]:
column_names = ['id',
                'conversation_id',
                'edit_history_tweet_ids',
                'author_id',
                'created_at',
                'text',
                'impression_count',
                'like_count',
                'quote_count',
                'reply_count',
                'retweet_count',
                'quoted',
                'replied_to',
                'retweeted',
                'in_reply_to_user_id',
                'geo',
                'mentions']
def get_columns(tweet_json):
    quoted = []
    replied_to = []
    retweeted = []
    for ref_tweet in Maybe(tweet_json).field("referenced_tweets").array():
        if ref_tweet["type"] == "quoted":
            quoted.append(ref_tweet["id"])
        elif ref_tweet["type"] == "replied_to":
            replied_to.append(ref_tweet["id"])
        elif ref_tweet["type"] == "retweeted":
            retweeted.append(ref_tweet["id"])
    columns_values = [
        # tweet always has following keys
        tweet_json["id"],
        tweet_json["conversation_id"],
        tweet_json["edit_history_tweet_ids"], # list of tweetIds
        tweet_json["author_id"],
        tweet_json["created_at"],
        tweet_json["text"],
        tweet_json["public_metrics"]["impression_count"],
        tweet_json["public_metrics"]["like_count"],
        tweet_json["public_metrics"]["quote_count"],
        tweet_json["public_metrics"]["reply_count"],
        tweet_json["public_metrics"]["retweet_count"],
        # optional tweet data fields
        str(quoted),
        str(replied_to),
        str(retweeted),
        Maybe(tweet_json).field("in_reply_to_user_id").value(),
        Maybe(places).field( Maybe(tweet_json).field("geo").field("place_id").value() ).value(),
        Maybe(tweet_json).field("entities").field("mentions").array(lambda m: m["id"], str),
        # Maybe(tweet_json).field("attachments") # we dont take this field at the moment
    ]
    return columns_values


In [12]:
%%time
tdf = pd.DataFrame([get_columns(tw) for tw in tweets],  columns=column_names, dtype=str)
print(tdf.shape)
tdf

(14060535, 17)
CPU times: user 3min 21s, sys: 6.14 s, total: 3min 27s
Wall time: 3min 27s


Unnamed: 0,id,conversation_id,edit_history_tweet_ids,author_id,created_at,text,impression_count,like_count,quote_count,reply_count,retweet_count,quoted,replied_to,retweeted,in_reply_to_user_id,geo,mentions
0,1509757106202849291,1500971180395802633,['1509757106202849291'],1385729645740449804,2022-04-01T04:58:38.000Z,@Peroporqu_ :),0,0,0,0,0,[],['1500971180395802633'],[],1500536286591033351,,['1500536286591033351']
1,1509756641117319168,1509756641117319168,['1509756641117319168'],4501930040,2022-04-01T04:56:47.000Z,RT @purpleluvrain: @jlints58 love it dear swee...,0,0,0,0,1,[],[],['1509404114555334659'],,,"['781947948934369280', '4501930040']"
2,1509756540265414656,1509732884608860161,['1509756540265414656'],28729354,2022-04-01T04:56:23.000Z,@StephanieNTX @dfwhispanic @Raquel_AmparoTV @S...,0,0,0,0,0,[],['1509743982724595715'],[],3173688997,,"['3173688997', '761051', '402718868', '7663422..."
3,1509756323231002627,1507144019482083339,['1509756323231002627'],4501930040,2022-04-01T04:55:32.000Z,@SFDamnPodcast @alvatabitha I've been wonderin...,0,0,0,0,0,[],['1509402710470103044'],[],1248074962377699328,,"['1248074962377699328', '66141034']"
4,1509756190716096512,1509756190716096512,['1509756190716096512'],4501930040,2022-04-01T04:55:00.000Z,RT @SFDamnPodcast: @alvatabitha @jlints58 a gr...,0,0,0,0,1,[],[],['1509402710470103044'],,,"['1248074962377699328', '66141034', '4501930040']"
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
14060530,1465918181088710659,1465918181088710659,['1465918181088710659'],25444322,2021-12-01T05:38:24.000Z,#GetVaccinated https://t.co/MHvXKKv5Fv,0,0,0,0,0,['1465701709112434693'],[],[],,"Union City, CA",[]
14060531,1465918067775447045,1465918067775447045,['1465918067775447045'],25444322,2021-12-01T05:37:57.000Z,RT @UncvrngTheTruth: Jim Jordan said Biden thi...,0,0,0,0,358,[],[],['1465487354546638853'],,,[]
14060532,1465914960844705794,1465803774291677187,['1465914960844705794'],40091601,2021-12-01T05:25:36.000Z,@SoCassandra another family star,0,0,0,0,0,[],['1465803774291677187'],[],48928145,,['48928145']
14060533,1465914356332326914,1465914290435772417,['1465914356332326914'],40091601,2021-12-01T05:23:12.000Z,@dmschmeyer *saves*,0,0,0,0,0,[],['1465914290435772417'],[],732409522778742784,,['732409522778742784']


In [13]:
%%time
import csv
tdf.to_csv("all_data_raw_df_v5.csv", index=False, quoting=csv.QUOTE_ALL)

CPU times: user 2min 7s, sys: 3.43 s, total: 2min 10s
Wall time: 2min 10s


In [14]:
%%time
tdf["text"].to_csv("all_text_list_v5.csv" , header=False, index=False, quoting=csv.QUOTE_ALL)

CPU times: user 46.9 s, sys: 1.27 s, total: 48.1 s
Wall time: 48.3 s


In [15]:
# %%time
# tdf.drop_duplicates(subset="id", inplace=True)
# print(tdf.shape)
# tdf.set_index("id", inplace=True)
# tdf["created_at"] = tdf["created_at"].apply(lambda x: datetime.datetime.strptime(x, "%Y-%m-%dT%H:%M:%S.%f%z"))
# tdf.sort_values("created_at", inplace=True)
# tdf

In [16]:
# tdf.shape

In [17]:
# %%time
# tdf.to_csv("all_data_df_v5.csv")