In [1]:
# import random
import pandas as pd
import gc
from pathlib import Path
import os
from collections import defaultdict
import fastparquet
from fastparquet import ParquetFile
verbosity = 5

twibot_path = r"/dataset/twibot22"
twibot_user = r"/dataset/twibot22/user.json"
twibot_label = r"/dataset/twibot22/label.csv"
twibot_edges = r"/dataset/twibot22/edge.csv"

# Files in the path specified by twibot_path, that begin with %twibot_node_identifier_str%, will be assumed as node files and converted if needed.
twibot_node_identifier_str = "tweet_" 

generated_data_output = r"/dataset/twibot22/generated_data" # output is saved in this directory
ls_userdata_output = rf"{generated_data_output}/userdata.jsonl" # the desired filename of bot detail output

# Some tasks might be multithreadable. Set the max number of workers here.
concurrent_max_workers = 2

# Set this to false to disable the use of threads when processing tweet files.
threading = True 

def debug_print(m, level=5, r=None):
    if level <= verbosity:
        print(m)
        if r:
            raise r

def is_data(name, _dir=generated_data_output, ext=".parquet"):
    file_path = os.path.join(_dir, f"{name}{ext}")
    return os.path.exists(file_path)
    
def get_data(name, _dir=generated_data_output,pqargs={},**kwargs):
    if is_data(name, _dir):
        file_path = os.path.join(_dir, f"{name}.parquet")
        print(f"Loading existing data from {file_path}")
        pf = fastparquet.ParquetFile(file_path, **pqargs)
        return pf.to_pandas(**kwargs)
    return False
        
def save_data(name, df, _dir=generated_data_output, **kwargs):
    file_path = os.path.join(_dir, f"{name}.parquet")
    debug_print(f"Saving data to {file_path}", 3)
    os.makedirs(_dir, exist_ok=True)
    fastparquet.write(file_path, df, **kwargs)
    #df.to_parquet(file_path, **kwargs)
    return df        

# To quietly stop cell execution
class StopExecution(Exception):
    def _render_traceback_(self):
        return []
        

## Lockstep, the full recipe. Part A - Pre-processing

### Step 1: Convert user detail json into parquet for analysis.

In [2]:

# First, check if the data for users exists. If it does, skip this step.
FLOW_SKIPPING_USERS_GEN = False
if is_data("users"):
    FLOW_SKIPPING_USERS_GEN=True
    debug_print("User data parquet already present. Skipping generation.",5)
else:

    # The data does not exist, so it is time to create it from the raw source as specified in configuration.
    debug_print("Reading label data...",4)
    with open(twibot_label, 'r') as fi:
        udata_label = pd.read_csv(fi, header=0, names=["id", "label"], encoding="UTF8", index_col='id')
    debug_print(display(udata_label.head(5)),5)

    with open(twibot_user, 'r') as fi:
        udata_detail = pd.read_json(fi, orient='records')

    # Merge the data we have from the labels with the complete user records.
    debug_print("Merging label data...",4)
    udata_detail.set_index('id')
    udata_detail=udata_detail.join(udata_label, on=['id'])
    
    debug_print(f"collected details of {udata_detail.shape} users.",4)
    debug_print(display(udata_detail.head(5)),5)
    
    del udata_label
    gc.collect()
    
# Persists: udata_detail

User data parquet already present. Skipping generation.


### Step 2: Column pruning.

We don't need all of these and can remove them very early on. We can also do the work of flattening certain object columns early.

Columns to remove:
    verified,withheld,pinned_tweet_id,protected,profile_image_url
Columns to flatten:
    public_metrics,entities

In [3]:
import json

# If this flag has been set, then the users dataset has likely already been generated and has had the unwanted columns dropped.
if not FLOW_SKIPPING_USERS_GEN:
    trash_columns = ['verified','withheld','pinned_tweet_id','protected','profile_image_url']
    flatten_columns = ['public_metrics','entities']
    
    debug_print("Dropping columns: "+json.dumps(trash_columns),5)
    udata_detail.drop(columns=trash_columns, errors='ignore',inplace=True)
    
    debug_print("Flattening columns: "+json.dumps(flatten_columns),5)
    for col in flatten_columns:
        attributes_df = pd.json_normalize(udata_detail[col])
        # Remove the first level, join the second level, flatten anything else.
        # e.g. {entities: {url: {urls: [{item:key},{item:key}]}, {description: {urls: []}} -> {url.urls: "[{item:key},{item:key}]", description.urls: "[]"}

        # From the single level dict we made above, if the key does not exist as a column in udata_detail, then we make it. The default for these columns shall be "[]". 
        # Then, update the values for our columns with that we have in our normalized json df.
        udata_detail = pd.concat([udata_detail.drop(columns=[col]), attributes_df], axis=1)
        
    udata_detail['id'] = udata_detail['id'].apply(lambda x: int(str(x).strip('ut')) if str(x)[0] in "ut" else int(x)).astype('UInt64').fillna(0)
    debug_print(f"New Shape: {udata_detail.shape}",5)
    if verbosity >= 5:
        display(udata_detail.head(1))
    
    debug_print(f"Saving data as parquet at {generated_data_output}",1)
    save_data("users", udata_detail)

    # It's debatable how effective this is in Jupyter but I really do need every last byte.
    del udata_detail, trash_columns, flatten_columns
    gc.collect()

## Checkpoint 1, quick tests!

In [4]:
# Let's ensure we have these columns so far. Note the description.name features - this is intentional.

required_columns = ['created_at',
    'description',
    'id',
    'location',
    'name',
    'url',
    'username',
    'label',
    'followers_count',
    'following_count',
    'tweet_count',
    'listed_count',
    'url.urls',
    'description.urls',
    'description.mentions',
    'description.hashtags',
    'description.cashtags']

# Test the user data.
debug_print(f"Loading data from parquet at {generated_data_output}", 1)
udata_detail = get_data("users")

debug_print(f"Loaded parquet. Shape: {udata_detail.shape}", 5)
if verbosity >= 5:
    display(udata_detail.head(3))

# Check for required columns.
missing_columns = [col for col in required_columns if col not in udata_detail.columns]
if missing_columns:
    raise ValueError(f"The following required columns are missing in the dataset: {missing_columns}")
else:
    debug_print("All required columns are present.", 2)

# Print some statistics about the dataset.
try:
    stats = {
        "total_rows": len(udata_detail),
        "column_counts": udata_detail.count().to_dict(),
        "missing_values": udata_detail.isnull().sum().to_dict(),
    }
    debug_print("Dataset statistics:", 3)
    if verbosity >= 3:
        for key, value in stats.items():
            print(f"{key}: {value}")
except Exception as e:
    debug_print(f"Failed to compute statistics: {e}", 1)

# Cleanup
del udata_detail
gc.collect()

Loading data from parquet at /dataset/twibot22/generated_data
Loading existing data from /dataset/twibot22/generated_data/users.parquet
Loaded parquet. Shape: (1000000, 17)


Unnamed: 0,created_at,description,id,location,name,url,username,label,followers_count,following_count,tweet_count,listed_count,url.urls,description.urls,description.mentions,description.hashtags,description.cashtags
0,2020-01-16 02:02:55+00:00,Theoretical Computer Scientist. See also https...,1217628182611927040,"Cambridge, MA",Boaz Barak,https://t.co/BoMip9FF17,boazbaraktcs,human,7316,215,3098,69,"[{'start': 0, 'end': 23, 'url': 'https://t.co/...","[{'start': 41, 'end': 64, 'url': 'https://t.co...",,,
1,2014-07-02 17:56:46+00:00,creative _,2664730894,🎈,olawale 💨,,wale_io,human,123,1090,1823,0,,,,,
2,2020-05-30 12:10:45+00:00,👽,1266703520205549568,,panagiota_.b,,b_panagiota,human,3,62,66,0,,,,,


All required columns are present.
Dataset statistics:
total_rows: 1000000
column_counts: {'created_at': 1000000, 'description': 1000000, 'id': 1000000, 'location': 708458, 'name': 1000000, 'url': 1000000, 'username': 1000000, 'label': 1000000, 'followers_count': 1000000, 'following_count': 1000000, 'tweet_count': 1000000, 'listed_count': 1000000, 'url.urls': 516923, 'description.urls': 87083, 'description.mentions': 213448, 'description.hashtags': 167865, 'description.cashtags': 2874}
missing_values: {'created_at': 0, 'description': 0, 'id': 0, 'location': 291542, 'name': 0, 'url': 0, 'username': 0, 'label': 0, 'followers_count': 0, 'following_count': 0, 'tweet_count': 0, 'listed_count': 0, 'url.urls': 483077, 'description.urls': 912917, 'description.mentions': 786552, 'description.hashtags': 832135, 'description.cashtags': 997126}


0

### Step 3: Convert tweet JSON files to parquet.

Twibot-22 tweet data or 'nodes' are originally in JSON format. The original code for Lockstep would read and operate with this JSON, also. 
The problem is, this is not fast enough for the firehose that would be a social media platform's activity. JSON's performance starts to suffer in very large datasets, something that the parquet format boasts excellent performance in. Hopefully, future applications of this code need not convert such large source JSON files like this - it's very time consuming.



In [None]:
from pathlib import Path
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import traceback
import ijson
import pandas as pd
import gc

# Define default values for columns
default_values = {
    **{col: 0 for col in ['quote_count', 'like_count', 'retweet_count', 'reply_count']},
    **{col: 0 for col in ['id', 'conversation_id', 'author_id', 'in_reply_to_user_id']},
    **{col: '' for col in ["text", "source"]},
    **{col: pd.NA for col in ["urls", "annotations", "media", "user_mentions", 
                              "hashtags", "cashtags", "symbols", "geo", "location"]},
}

# Columns to drop
tweet_drop_cols = [
    'reply_settings',
    'context_annotations',
    'withheld',
    'possibly_sensitive',
    'attachments',
    'referenced_tweets',
]


def process_file(tweetFile):

    def app_clr_col(i, t):
        append_parq(i, t)
        i.clear()
                    
    try:
        ij_buffer_size_kb = 256
        ij_bsize = ij_buffer_size_kb * 1000 # kb
        chunk_size = 500000
        target = f"{twibot_path}/{tweetFile}"
        t_stem = Path(target).stem
        targetOutput = Path(rf"{generated_data_output}/{t_stem}.parquet")
        iteration_records = []
    
        if not is_data(t_stem):
            debug_print(f"Input: {target} Output: {targetOutput}", 5)
            with open(target, 'r') as fi:
                tlen = sum(1 for line in fi)
                fi.seek(0)
                parser = ijson.items(fi, 'item', use_float=True, buf_size=ij_bsize)               
                
                for i, record in enumerate(parser):                                           
                    iteration_records.append(record)
                    if (i + 1) % chunk_size == 0: 
                        app_clr_col(iteration_records, targetOutput)
    
                if iteration_records:
                    app_clr_col(iteration_records, targetOutput)
                    
                del tlen, parser
                gc.collect()
                    
            debug_print(f"Finished converting file: {targetOutput}", 5)
            
    except Exception as e: 
        debug_print(f"Error processing file {Path(target)}: {e}", 2) 
        print(traceback.format_exception(e))


def append_parq(df_conv, target, partition_columns=None):
    global default_values, tweet_drop_cols
    df_conv = pd.DataFrame(df_conv)

    # Ensure all default columns exist
    for col, default_value in default_values.items():
        if col not in df_conv.columns:
            df_conv[col] = default_value
    
    # Flatten 'public_metrics' and 'entities' columns
    flatten_columns = ['public_metrics', 'entities']

    for col in flatten_columns:
       if col in df_conv.columns:
            try:
                flattened = pd.json_normalize(
                    df_conv[col].dropna().apply(
                        lambda x: x if isinstance(x, dict) else {}
                    )
                )
                
                # Ensure all flattened columns have non-python object types
                for key in flattened.columns:
                    if key in df_conv.columns:
                        flattened[key] = flattened[key].apply(
                            lambda x: json.dumps(x) if isinstance(x, (list, dict)) else x
                        )
                        
                existing_keys = [key for key in flattened.columns if key in df_conv.columns]
                df_conv.loc[flattened.index, existing_keys] = flattened[existing_keys]
            
            except Exception as e:
                debug_print(f"Failed to normalize column {col}: {repr(e)}", 2)
            
    # Remove the original flattened columns and additionally, the drop columns specified at the beginning of this cell.
    df_conv.drop(columns=flatten_columns + tweet_drop_cols, inplace=True, errors='ignore')

    # Ensure types on integer columns
    int_cols = ['quote_count', 'like_count', 'retweet_count', 'reply_count']
    for col in int_cols:
        df_conv[col] = df_conv[col].astype('UInt32').fillna(0)
    
    # Ensure types on Int64 columns. Additionally, for columns containing string user IDs, strip it to the int64 digits so it can be stored more efficiently.
    int64_cols = ['id', 'conversation_id', 'author_id', 'in_reply_to_user_id']
    for col in int64_cols:
        df_conv[col] = df_conv[col].fillna(0).apply(
            lambda x: int(str(x).lstrip('ut')) if pd.notnull(x) and isinstance(x, str) and str(x)[0] in "ut" 
            else int(x) if pd.notnull(x) else 0
        ).astype('UInt64')
    
    # Ensure types on string columns
    str_cols = ["text", "source"]
    for col in str_cols:
        df_conv[col] = df_conv[col].astype('string')
    
    # Cast object columns into strings (just to make sure...?)
    obj_cols = ["urls", "annotations", "media", "user_mentions", "hashtags", 
                "cashtags", "symbols", "geo", "location"]
    for col in obj_cols:
        df_conv[col] = df_conv[col].astype(str)

    # Cast the created_at column into a pandas supported timestamp
    df_conv["created_at"] = pd.to_datetime(df_conv["created_at"]).dt.tz_convert(None)  
    
    try:
        if target.exists():
            df_conv.to_parquet(target, engine='fastparquet', append=True)
        else:
            debug_print(f"Created new parquet: {target} with dtypes {df_conv.dtypes}",5)
            debug_print(f"Sample: {df_conv.head(1)}",5)
            df_conv.to_parquet(target, engine='fastparquet', index=False, partition_on=partition_columns)
            
    except Exception as e:
        debug_print(f"Failed to create or modify: {target} with dtypes {df_conv.dtypes}. Exception: {e}",2)
        debug_print(f"Sample: {df_conv.head(1)}",4)
        debug_print(f"Last 2 rows:\n{df_conv.tail(2)}", 5)
        debug_print(f"Column null counts:\n{df_conv.isnull().sum()}", 4)
        raise e
        
    del df_conv
    gc.collect()


# BEGIN CELL CODE
tweetNodeFiles = list(filter(lambda fileName: twibot_node_identifier_str in fileName,
                             [child.name for child in Path(twibot_path).iterdir()]))
debug_print(f"Found nodes: {tweetNodeFiles}",3)

if threading:
    with ThreadPoolExecutor(max_workers=concurrent_max_workers) as executor:
        list(tqdm(executor.map(process_file, tweetNodeFiles), total=len(tweetNodeFiles), desc="Processing Files"))
else:
    for file in tweetNodeFiles:
        debug_print("Processing: "+file, 5)
        process_file(file)

gc.collect()

## Checkpoint 2: Test tweet dataset

In [None]:

import fastparquet
debug_print("Previewing tweet dataset nodes",5)
tweetNodeFiles = list(filter(lambda fileName: twibot_node_identifier_str in fileName,
                             [child.stem for child in Path(twibot_path).iterdir()]))
debug_print(f"Found nodes: {tweetNodeFiles}",3)

for node in tweetNodeFiles:    
    try_load = get_data(node) # Optimally, we have a parquet
    if type(try_load) is bool:
        print("Problem loading node file...")
        raise StopExecution
    display(try_load.head(1))
    debug_print(f"Loaded author list! Types:\n {try_load.dtypes} \n ...Releasing resources.",5)
    del try_load
    gc.collect()


Previewing tweet dataset nodes
Found nodes: ['tweet_4', 'tweet_8', 'tweet_7', 'tweet_3', 'tweet_0', 'tweet_6', 'tweet_1', 'tweet_2', 'tweet_5']
Loading existing data from /dataset/twibot22/generated_data/tweet_4.parquet


Unnamed: 0,author_id,conversation_id,created_at,geo,id,in_reply_to_user_id,lang,source,text,quote_count,...,retweet_count,reply_count,urls,annotations,media,user_mentions,hashtags,cashtags,symbols,location
0,1178736848744660994,1489704397554790400,2022-02-04 20:56:20,,1489704397554790400,1382478319883587584,fa,"<a href=""http://twitter.com/download/android"" ...",@mah_saros73 🙃کاش من همه بودم با همه دهان ها م...,0,...,0,0,[],,,"[{""screen_name"": ""mah_saros73"", ""name"": ""mahsa...",[],,[],


Loaded author list! Types:
 author_id                      UInt64
conversation_id                UInt64
created_at             datetime64[ns]
geo                            object
id                             UInt64
in_reply_to_user_id            UInt64
lang                           object
source                         object
text                           object
quote_count                    UInt32
like_count                     UInt32
retweet_count                  UInt32
reply_count                    UInt32
urls                           object
annotations                    object
media                          object
user_mentions                  object
hashtags                       object
cashtags                       object
symbols                        object
location                       object
dtype: object 
 ...Releasing resources.
Loading existing data from /dataset/twibot22/generated_data/tweet_8.parquet


Unnamed: 0,author_id,conversation_id,created_at,geo,id,in_reply_to_user_id,lang,source,text,quote_count,...,retweet_count,reply_count,urls,annotations,media,user_mentions,hashtags,cashtags,symbols,location
0,730877400662212609,1391411268519616518,2021-05-09 15:14:32,,1391411268519616518,0,en,"<a href=""https://mobile.twitter.com"" rel=""nofo...",Brand new R. Missing interview with Spain's @m...,0,...,1,0,"[{""url"": ""https://t.co/zKt3gNRTcp"", ""expanded_...",,,"[{""screen_name"": ""Muzikalia"", ""name"": ""Muzikal...",[],,[],


Loaded author list! Types:
 author_id                      UInt64
conversation_id                UInt64
created_at             datetime64[ns]
geo                            object
id                             UInt64
in_reply_to_user_id            UInt64
lang                           object
source                         object
text                           object
quote_count                    UInt32
like_count                     UInt32
retweet_count                  UInt32
reply_count                    UInt32
urls                           object
annotations                    object
media                          object
user_mentions                  object
hashtags                       object
cashtags                       object
symbols                        object
location                       object
dtype: object 
 ...Releasing resources.
Loading existing data from /dataset/twibot22/generated_data/tweet_7.parquet


### Step 4: Convert relationships 'edge' file into a parquet.

We can use the partitioning feature of parquet files to make queries to the converted dataset a lot faster, based on the limited number of categorical types that a relationship can be.

In [5]:
# Check for present

if is_data("edges"):
    debug_print("User relationship edges parquet already present. Skipping generation.",5)
else:
    targetOutput = Path(rf"{generated_data_output}/edges.parquet")    
    debug_print("Reading edges data...",4)
    with open(twibot_edges, 'r') as fi:
        udata_edges = pd.read_csv(fi, header=0, names=["id1", "relationship", "id2"], encoding="UTF8")
                                                       
    debug_print(display(udata_edges.head(5)),5)    
    debug_print(f"Found {udata_edges.shape[0]} edges in edge file. Converting to parks and recreation.",4)
    save_data('edges', df=udata_edges, partition_on=['relationship'])
    gc.collect()

User relationship edges parquet already present. Skipping generation.


### Step 5: Test edge file.

In [1]:
try_load = get_data("edges")
display(try_load.loc['u' in try_load.index.values].head(25))

NameError: name 'get_data' is not defined

# End Setup Process A

By now you should have a set of parquet files representing the much larger json files that all of the tweets were stored in. Additionally, you should have a users.parquet that contains a limited number of the original columns contained in the user information, merged with the label for the user.