In [5]:
# import packages

import json
import os
import csv
import uuid
import sys
from datetime import datetime, timezone
from zoneinfo import ZoneInfo

## CSV Preparation for Bulk Upload

In [12]:
# read in json

base_path = "/Users/vjoseph/Desktop/eel_projects/global-rct/twitter_search/twitter_search/data/networks"
city_name = "guatemala"
raw_network_path = os.path.join(base_path, city_name, f"{city_name}.json")
with open(raw_network_path, 'r') as file:
    raw_network = json.load(file)

In [13]:
# inspect raw file and subset

print(type(raw_network))
print(len(raw_network))

# sub_network = raw_network[:5]
# sub_network

<class 'list'>
136


In [14]:
# Creating vertices and edges

## helper functions

def write_dict_to_csv(file_name, data, fieldnames=None):
    # Check if the file already exists
    file_exists = os.path.isfile(file_name)

    # Set fieldnames
    if not fieldnames:
        fieldnames=data.keys()
    
    with open(file_name, mode='a', newline='', encoding='utf-8') as file:
        # Create a CSV DictWriter object
        writer = csv.DictWriter(file, fieldnames=fieldnames)
        
        # Write the header only if the file does not exist or is empty
        if not file_exists or os.stat(file_name).st_size == 0:
            writer.writeheader()
        
        # Write the dictionary as a row
        writer.writerow(data)

def handle_date_attribute(timestamp_str):
    naive_dt = datetime.fromisoformat(timestamp_str)
    # If the datetime already has timezone info and it's UTC, return as is
    if naive_dt.tzinfo is not None and naive_dt.tzinfo == timezone.utc:
        return naive_dt.isoformat()
    # Otherwise, convert from CST to UTC
    cst_dt = naive_dt.replace(tzinfo=ZoneInfo("America/Chicago"))
    utc_dt = cst_dt.astimezone(ZoneInfo("UTC"))
    return utc_dt.isoformat()

def create_user_vertex(user_attributes):
    user_vertex = {}
    user_vertex['~id'] = str(user_attributes['user_id'])
    user_vertex['~label'] = "User"

    if 'processing_status' in user_attributes:
        user_attributes['retweeter_status'] = user_attributes['processing_status']
        user_attributes['follower_status'] = user_attributes['processing_status']

    if 'last_processed' in user_attributes:
        if user_attributes['processing_status'] == "pending":
            user_attributes['retweeter_last_processed'] = "null"
            user_attributes['follower_last_processed'] = "null"
        else:
            user_attributes['retweeter_last_processed'] = user_attributes['last_processed']
            user_attributes['follower_last_processed'] = user_attributes['last_processed']      

    # Define the integer attribute names
    int_attributes = {'followers_count', 'following_count', 'tweets_count'}
    exclude_attributes = {'user_id', 'tweets', 'followers', 'description', 'processing_status', 'last_processed'}
    date_attributes = {'extracted_at', 'retweeter_last_processed', 'follower_last_processed', 'last_updated'}

    # Optimized attribute assignment
    for attr, value in user_attributes.items():
        # Skip excluded attributes
        if attr in exclude_attributes:
            continue
        
        # Handle integer attributes
        if attr in int_attributes:
            # Efficiently handle missing or invalid integer values
            user_vertex[f'{attr}:Int'] = -99 if not value else int(value)
        # Handle date attributes
        elif attr in date_attributes:
            user_vertex[attr] = value if value == "null" else handle_date_attribute(value) if value else "null"
        # Handle non-integer attributes
        else:
            # Assign "null" for empty values, otherwise the value itself
            user_vertex[attr] = "null" if not value else value

    if 'tweets' in user_attributes and user_attributes['tweets']:
        timestamps = [datetime.fromisoformat(tweet['created_at']) for tweet in user_attributes['tweets']]
        most_recent_timestamp = max(timestamps)
        user_vertex['last_tweeted_at'] = most_recent_timestamp.isoformat()
    else:
        user_vertex['last_tweeted_at'] = "null"
    
    return user_vertex

def create_city_edge(source, target):
    edge = {}
    edge['~id'] = str(uuid.uuid4())
    edge['~from'] = source
    edge['~to'] = target
    edge['~label'] = "BELONGS_TO"
    return edge

def create_retweeter_edges(sources, target):
    edges = []
    for source, attributes in sources.items():
        edge = {}
        edge['~id'] = str(uuid.uuid4())
        edge['~from'] = source
        edge['~to'] = target
        edge['~label'] = "RETWEETED"
        edge['weight:Int'] = int(attributes['weight'])
        edge['tweet_ids'] = ";".join(attributes['tweet_ids'])
        edges.append(edge)
    return edges

def create_follower_edges(sources, target):
    edges = []
    for source in sources:
        edge = {}
        edge['~id'] = str(uuid.uuid4())
        edge['~from'] = source
        edge['~to'] = target
        edge['~label'] = "FOLLOWS"
        edges.append(edge)
    return edges

# file paths
user_vertices_file_path = os.path.join(base_path, city_name, f"{city_name}_user_vertices.csv")
city_edges_file_path = os.path.join(base_path, city_name, f"{city_name}_city_edges.csv")
retweeter_edges_file_path = os.path.join(base_path, city_name, f"{city_name}_retweeter_edges.csv")
follower_edges_file_path = os.path.join(base_path, city_name, f"{city_name}_follower_edges.csv")

# fieldnames
user_vertices_fieldnames = ["~id", "~label", "username", "city", "profile_location", "target_location",
                            "followers_count:Int", "following_count:Int", "tweets_count:Int", "category", "treatment_arm",
                            "verified", "created_at", "last_tweeted_at", "retweeter_status", 
                            "retweeter_last_processed", "follower_status", "follower_last_processed", 
                            "extracted_at", "last_updated"]

# creating master lists to ensure no repetitions occur
root_users = []
all_users = []

## MAIN ##
for root_user in raw_network:
    print(f"Processing root user: {root_user['user_id']}")

    # ensure user id exists
    if not root_user.get('user_id'):
        print("!! Invalid user_id. Skipping... !!")
        continue
    # ensure root user hasn't already been processed
    if root_user['user_id'] in root_users:
        print("!! Root user has already been processed. Skipping... !!")
        continue
    # ensure root user meets basic criteria
    if (not root_user.get('city')) or (root_user['city'] != root_user['target_location']):
        print("!! Root user does not meet location criteria. Skipping... !!")
        continue

    # create and write user vertex
    if root_user['user_id'] not in all_users:
        user_vertex = create_user_vertex(root_user)
        write_dict_to_csv(user_vertices_file_path, user_vertex, user_vertices_fieldnames)
        all_users.append(root_user['user_id'])

    # create and write city edges 
    city_edge = create_city_edge(root_user['user_id'], root_user['city'])
    write_dict_to_csv(city_edges_file_path, city_edge)

    # create and write retweeter edges + corresponding user vertices
    if root_user.get('tweets'):
        retweet_sources = {}
        for tweet in root_user['tweets']:
            if tweet.get('retweeters'):
                for retweeter in tweet['retweeters']:
                    if retweeter['user_id'] not in all_users:
                        user_vertex = create_user_vertex(retweeter)
                        write_dict_to_csv(user_vertices_file_path, user_vertex, user_vertices_fieldnames)
                        all_users.append(retweeter['user_id'])
                    if retweeter['user_id'] not in retweet_sources:
                        retweet_sources[retweeter['user_id']] = {'weight': 0, 'tweet_ids': []}
                    retweet_sources[retweeter['user_id']]['weight'] += 1
                    retweet_sources[retweeter['user_id']]['tweet_ids'].append(tweet['tweet_id'])
        if retweet_sources:
            retweeter_edges = create_retweeter_edges(retweet_sources, root_user['user_id'])
            for edge in retweeter_edges:
                write_dict_to_csv(retweeter_edges_file_path, edge)
    
    # create and write follower edges + corresponding user vertices
    if root_user.get('followers'):
        follower_sources = []
        for follower in root_user['followers']:
            if follower['user_id'] not in all_users:
                user_vertex = create_user_vertex(follower)
                write_dict_to_csv(user_vertices_file_path, user_vertex, user_vertices_fieldnames)
                all_users.append(follower['user_id'])
            if follower['user_id'] not in follower_sources:
                follower_sources.append(follower['user_id'])
        if follower_sources:
            follower_edges = create_follower_edges(follower_sources, root_user['user_id'])
            for edge in follower_edges:
                write_dict_to_csv(follower_edges_file_path, edge)

    # append root user to master list
    root_users.append(root_user['user_id'])


Processing root user: 1869546683450765312
!! Root user does not meet location criteria. Skipping... !!
Processing root user: 1845837992637661190
Processing root user: 1842313354616168448
Processing root user: 1782609390639501313
!! Root user does not meet location criteria. Skipping... !!
Processing root user: 1752054121937387520
Processing root user: 1750902493024821248
Processing root user: 1703817165676158976
Processing root user: 1593487901349888001
Processing root user: 1572777218090098688
Processing root user: 1561130432938344453
Processing root user: 1560859776745103360
Processing root user: 1539976222007930880
Processing root user: 1504148226970202121
Processing root user: 1469338796990906378
Processing root user: 1445089457217150988
Processing root user: 1439523648755445763
Processing root user: 1433925962249510926
Processing root user: 1430913468115275783
Processing root user: 1429991394010148874
Processing root user: 1371304624339181571
Processing root user: 1369869726436954