In [1]:
# Imports
import pandas as pd
import json
import re
import boto3
import os
import logging
from io import StringIO
from datetime import datetime
from dotenv import load_dotenv
from neo4j import GraphDatabase

# Set up logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


# Load environment variables
load_dotenv()

# AWS credentials
AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
AWS_REGION = os.getenv('AWS_REGION', 'us-east-1')
BUCKET_NAME = "census-warpcast-account-metadata"

# Neo4j credentials
NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USERNAME = os.getenv("NEO4J_USERNAME")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
NEO4J_DATABASE = os.getenv("NEO4J_DATABASE", None)


# Set up S3 clients
s3_client = boto3.client(
    's3',
    region_name=AWS_REGION,
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

s3_resource = boto3.resource(
    's3',
    region_name=AWS_REGION,
    aws_access_key_id=AWS_ACCESS_KEY,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY
)

def sanitize_text(text):
    """
    Sanitize text to make it suitable for Neo4j queries by removing special characters
    and escaping quotes.
    """
    if not text or not isinstance(text, str):
        return ""
        
    # Replace quotes and escape characters
    sanitized = text.replace("'", "\\'").replace('\\', '\\\\')
    
    # Remove control characters
    sanitized = re.sub(r'[\x00-\x1F\x7F]', '', sanitized)
    
    return sanitized


# Function to convert string boolean values to actual booleans
def str2bool(value):
    """
    Convert string representations of boolean values to actual booleans.
    """
    if isinstance(value, bool):
        return value
    if not value or not isinstance(value, str):
        return False
    return value.lower() in ('true', 't', 'yes', 'y', '1')



In [2]:
# Setup Neo4j driver
neo4j_driver = GraphDatabase.driver(
    NEO4J_URI, 
    auth=(NEO4J_USERNAME, NEO4J_PASSWORD)
)

def split_dataframe(df, chunk_size=10000):
    """Split a DataFrame into chunks"""
    chunks = []
    num_chunks = len(df) // chunk_size + (1 if len(df) % chunk_size else 0)
    for i in range(num_chunks):
        chunks.append(df[i * chunk_size:(i + 1) * chunk_size])
    return chunks

def save_df_as_csv(df, file_name, acl='public-read', max_lines=10000, max_size=10000000):
    """Save DataFrame to CSV in S3 with chunking if needed"""
    chunks = [df]
    
    # Check if dataframe needs to be split
    if df.memory_usage(index=False).sum() > max_size or len(df) > max_lines:
        chunks = split_dataframe(df, chunk_size=max_lines)
    
    logger.info(f"Uploading DataFrame in {len(chunks)} chunks...")
    urls = []
    
    for chunk_id, chunk in enumerate(chunks):
        chunk_name = f"{file_name}--{chunk_id}.csv"
        
        # Save to S3 directly using pandas
        chunk.to_csv(f"s3://{BUCKET_NAME}/{chunk_name}", index=False, escapechar='\\')
        
        # Set ACL
        s3_resource.ObjectAcl(BUCKET_NAME, chunk_name).put(ACL=acl)
        
        # Get URL
        location = s3_client.get_bucket_location(Bucket=BUCKET_NAME)["LocationConstraint"]
        if location is None:
            location = "us-east-1"
        
        url = f"https://s3-{location}.amazonaws.com/{BUCKET_NAME}/{chunk_name}"
        urls.append(url)
        logger.info(f"Saved chunk {chunk_id+1}/{len(chunks)} to {url}")
    
    return urls

def get_latest_file(prefix):
    """Get latest file from S3 bucket with given prefix"""
    response = s3_client.list_objects_v2(
        Bucket=BUCKET_NAME,
        Prefix=prefix
    )
    
    if 'Contents' not in response:
        logger.error(f"No files found with prefix {prefix}")
        return None
        
    all_files = sorted(response['Contents'], key=lambda x: x['LastModified'], reverse=True)
    
    if not all_files:
        logger.error(f"No files found with prefix {prefix}")
        return None
        
    latest_file = all_files[0]['Key']
    logger.info(f"Found latest file: {latest_file}")
    
    return latest_file

def get_file_content(key):
    """Get content of file from S3"""
    try:
        response = s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
        content = response['Body'].read().decode('utf-8')
        return content
    except Exception as e:
        logger.error(f"Error getting S3 file content: {str(e)}")
        return None

def execute_cypher(query):
    """Execute a Cypher query in Neo4j"""
    with neo4j_driver.session(database=NEO4J_DATABASE) as session:
        result = session.run(query)
        return list(result)

def create_farcaster_accounts(csv_url):
    """Create Farcaster accounts in Neo4j"""
    query = f"""
    LOAD CSV WITH HEADERS FROM '{csv_url}' AS row
    MERGE (account:Account:Warpcast {{fid: row.fid}})
    ON CREATE SET
        account.uuid = randomUUID(),
        account.username = row.username,
        account.displayName = row.display_name,
        account.bio = row.bio,
        account.following_count = toInteger(row.following_count),
        account.follower_count = toInteger(row.follower_count),
        account.verified = toBoolean(row.verified),
        account.pfpUrl = row.pfp_url,
        account.profile_image = row.profile_image,
        account.power_badge = row.power_badge,
        account.mentioned_profiles = row.mentioned_profiles,
        account.city = row.city,
        account.state = row.state,
        account.state_code = row.state_code,
        account.country = row.country,
        account.country_code = row.country_code,
        account.createdDt = tostring(datetime()),
        account.lastUpdatedDt = tostring(datetime()),
        account.text_content = COALESCE(row.display_name, '') + ' ' + 
                               COALESCE(row.username, '') + ' ' + 
                               COALESCE(row.bio, '') + ' ' + 
                               COALESCE(row.city, '') + ' ' + 
                               COALESCE(row.state, '') + ' ' + 
                               COALESCE(row.country, '')
    ON MATCH SET
        account.username = row.username,
        account.displayName = row.display_name,
        account.bio = row.bio,
        account.following_count = toInteger(row.following_count),
        account.follower_count = toInteger(row.follower_count),
        account.verified = toBoolean(row.verified),
        account.pfp_url = row.pfp_url,
        account.profile_image = row.profile_image,
        account.power_badge = toBoolean(row.power_badge),
        account.mentioned_profiles = row.mentioned_profiles,
        account.city = row.city,
        account.state = row.state,
        account.state_code = row.state_code,
        account.country = row.country,
        account.country_code = row.country_code,
        account.lastUpdatedDt = tostring(datetime()),
        account.text_content = COALESCE(row.display_name, '') + ' ' + 
                               COALESCE(row.username, '') + ' ' + 
                               COALESCE(row.bio, '') + ' ' + 
                               COALESCE(row.city, '') + ' ' + 
                               COALESCE(row.state, '') + ' ' + 
                               COALESCE(row.country, '')
    RETURN count(*)
    """
    result = execute_cypher(query)
    count = result[0][0] if result else 0
    logger.info(f"Created/updated {count} Farcaster accounts in Neo4j")
    return count

In [3]:
# Main execution script - no classes
def main():
    timestamp = int(datetime.now().timestamp())
    test_mode = False  # Set to True for testing with a small subset
    test_limit = 100   # Only used if test_mode is True
    
    # Step 1: Load data from S3
    logger.info("Loading Warpcast account data from S3")
    latest_file = get_latest_file('warpcast/users/')
    if not latest_file:
        logger.error("No files found in the S3 bucket")
        return False
        
    content = get_file_content(latest_file)
    if not content:
        logger.error("Failed to read file content")
        return False
    
    # Step 2: Parse JSON
    try:
        raw_data = json.loads(content)
        logger.info(f"Successfully loaded data with {len(raw_data)} batch responses")
    except json.JSONDecodeError as e:
        logger.error(f"Error parsing JSON: {str(e)}")
        return False
    
    # Step 3: Extract and flatten user data
    logger.info("Processing Warpcast account data")
    all_users = []
    processed_count = 0
    
    for batch in raw_data:
        if not batch.get('success', False):
            continue
            
        batch_data = batch.get('data', {})
        users = batch_data.get('users', [])
        
        for user in users:
            if user:  # Skip empty entries
                # Flatten user object
                flat_user = {
                    'fid': user.get('fid'),
                    'username': user.get('username'),
                    'display_name': user.get('displayName'),
                    'custody_address': user.get('custodyAddress'),
                    'pfp_url': user.get('pfp_url'),
                    'bio': user.get('profile', {}).get('bio', {}).get('text'),
                    'followingCount': user.get(('following_count')),
                    'followerCount': int(user.get('follower_count')),
                    'verified': user.get('verified', False),
                    'power_badge': user.get('powerBadge', False)
                }
                
                # Extract mentioned profiles from bio
                if flat_user['bio'] and isinstance(flat_user['bio'], str):
                    mentions = re.findall(r'@(\w+)', flat_user['bio'])
                    if mentions:
                        flat_user['mentioned_profiles'] = json.dumps(mentions)
                
                # Add location data if available
                if 'profile' in user and 'location' in user['profile'] and user['profile']['location']:
                    location = user['profile']['location']
                    
                    if 'address' in location and location['address']:
                        address = location['address']
                        flat_user['city'] = address.get('city')
                        flat_user['state'] = address.get('state')
                        flat_user['state_code'] = address.get('stateCode')
                        flat_user['country'] = address.get('country')
                        flat_user['country_code'] = address.get('countryCode')
                
                # Handle verifications - they're strings, not objects
                if 'verifications' in user and user['verifications'] and len(user['verifications']) > 0:
                    flat_user['verification_address'] = user['verifications'][0]
                    flat_user['verification_type'] = 'ethereum'
                
                # Clean null values
                for key, value in flat_user.items():
                    if value is None:
                        flat_user[key] = ""
                
                all_users.append(flat_user)
                processed_count += 1
                
                if processed_count % 10000 == 0:
                    logger.info(f"Processed {processed_count} users so far")
    
    # If test mode, limit the number of records
    if test_mode:
        logger.info(f"Test mode: limiting to {test_limit} records")
        all_users = all_users[:test_limit]
    
    # Step 4: Convert to DataFrame
    df = pd.DataFrame(all_users)
    logger.info(f"Created DataFrame with {len(df)} users")
    
    # Step 5: Save to S3 and process with Neo4j
    # 5.1 Create accounts
    accounts_urls = save_df_as_csv(df, f"warpcast_accounts_{timestamp}")
    
    for i, url in enumerate(accounts_urls):
        logger.info(f"Processing account batch {i+1}/{len(accounts_urls)}")
        count = create_farcaster_accounts(url)
    
    
    logger.info("Completed Warpcast account ingestion process")
    return True

# Run the main function if this script is executed directly
if __name__ == "__main__":
    main()

2025-03-28 15:24:36,110 - __main__ - INFO - Loading Warpcast account data from S3
2025-03-28 15:24:37,203 - __main__ - INFO - Found latest file: warpcast/users/20250328_073938_responses.json
2025-03-28 15:25:18,414 - __main__ - INFO - Successfully loaded data with 8671 batch responses
2025-03-28 15:25:18,415 - __main__ - INFO - Processing Warpcast account data
2025-03-28 15:25:18,439 - __main__ - INFO - Processed 10000 users so far
2025-03-28 15:25:18,469 - __main__ - INFO - Processed 20000 users so far
2025-03-28 15:25:18,484 - __main__ - INFO - Processed 30000 users so far
2025-03-28 15:25:18,504 - __main__ - INFO - Processed 40000 users so far
2025-03-28 15:25:18,519 - __main__ - INFO - Processed 50000 users so far
2025-03-28 15:25:18,531 - __main__ - INFO - Processed 60000 users so far
2025-03-28 15:25:18,543 - __main__ - INFO - Processed 70000 users so far
2025-03-28 15:25:18,559 - __main__ - INFO - Processed 80000 users so far
2025-03-28 15:25:18,571 - __main__ - INFO - Processed

CypherSyntaxError: {code: Neo.ClientError.Statement.SyntaxError} {message: Invalid input 'WHERE': expected 'FOREACH', 'ORDER BY', 'CALL', 'CREATE', 'LOAD CSV', 'DELETE', 'DETACH', 'FIELDTERMINATOR', 'FINISH', 'INSERT', 'LIMIT', 'MATCH', 'MERGE', 'NODETACH', 'OFFSET', 'OPTIONAL', 'REMOVE', 'RETURN', 'SET', 'SKIP', 'UNION', 'UNWIND', 'USE', 'WITH' or <EOF> (line 3, column 13 (offset: 162))
"            WHERE row.verification_address IS NOT NULL AND trim(row.verification_address) <> ''"
             ^}

In [4]:
# %% cell 4 code - Fetch first 100 accounts from Neynar API and create nodes

import os
import time
import requests
import json
import logging
import pandas as pd
from dotenv import load_dotenv

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Load environment variables
load_dotenv(override=True)

# Neynar API configuration
NEYNAR_API_KEY = os.getenv('NEYNAR_API_KEY')
REQUESTS_PER_MINUTE = 30
REQUEST_DELAY = (60.0 / REQUESTS_PER_MINUTE) + 0.2

# Neo4j connection function from above
def execute_cypher(query, params=None):
    from neo4j import GraphDatabase
    
    uri = os.getenv('NEO4J_URI')
    username = os.getenv('NEO4J_USERNAME')
    password = os.getenv('NEO4J_PASSWORD')
    
    driver = GraphDatabase.driver(uri, auth=(username, password))
    with driver.session() as session:
        result = session.run(query, params)
        records = list(result)
        driver.close()
        return records

def query_neynar_api_for_users(fids):
    """Query Neynar API for user data by FIDs"""
    base_url = "https://api.neynar.com/v2/farcaster/user/bulk"
    headers = {
        "accept": "application/json",
        "api_key": NEYNAR_API_KEY
    }
    params = {
        "fids": ",".join(map(str, fids))
    }
    
    try:
        logger.info(f"Querying Neynar API for FIDs: {fids[:5]}... (total: {len(fids)})")
        response = requests.get(base_url, headers=headers, params=params)
        
        if 'X-RateLimit-Remaining' in response.headers:
            logger.info(f"Rate limit remaining: {response.headers['X-RateLimit-Remaining']}")
        
        response.raise_for_status()
        data = response.json()
        
        return data
    except Exception as e:
        logger.error(f"Error querying Neynar API: {e}")
        return {"error": str(e)}

def process_user_data(user_data):
    """Process user data and create nodes in Neo4j"""
    if not user_data or 'users' not in user_data:
        logger.error("No valid user data found")
        return 0
    
    users = user_data['users']
    logger.info(f"Processing {len(users)} users")
    
    # Convert to DataFrame for easier processing
    users_df = pd.json_normalize(users)
    
    # Create nodes one by one
    created_count = 0
    for _, row in users_df.iterrows():
        try:
            # Extract user data
            username = row.get('username', '')
            display_name = row.get('displayName', '')
            bio = row.get('profile.bio.text', '')
            
            # Create aggregated text field
            agg_text = f"{username} {display_name} {bio}".strip()
            
            user_data = {
                'fid': str(row.get('fid', '')),
                'username': username,
                'display_name': display_name,
                'pfp_url': row.get('pfp.url', ''),
                'follower_count': str(row.get('followerCount', 0)),
                'following_count': str(row.get('followingCount', 0)),
                'bio': bio,
                'agg_text': agg_text
            }
            
            # Create Cypher query for this user
            query = """
            MERGE (a:Account:Warpcast {fid: $fid})
            ON CREATE SET
                a.username = $username,
                a.displayName = $display_name,
                a.pfpUrl = $pfp_url,
                a.followerCount = $follower_count,
                a.followingCount = $following_count,
                a.bio = $bio,
                a.aggText = $agg_text,
                a.createdDt = datetime(),
                a.lastUpdatedDt = datetime()
            ON MATCH SET
                a.username = $username,
                a.displayName = $display_name,
                a.pfpUrl = $pfp_url,
                a.followerCount = $follower_count,
                a.followingCount = $following_count,
                a.bio = $bio,
                a.aggText = $agg_text,
                a.lastUpdatedDt = datetime()
            RETURN a
            """
            
            # Execute query
            result = execute_cypher(query, user_data)
            created_count += 1
                
        except Exception as e:
            logger.error(f"Error processing user {row.get('fid')}: {str(e)}")
    
    return created_count

# Main function to fetch and process first 100 accounts
def fetch_first_100_accounts():
    logger.info("Starting to fetch first 100 Warpcast accounts")
    
    # Define FID range (1-100)
    start_fid = 1
    end_fid = 100
    batch_size = 100
    
    # Create batch of FIDs
    fids = list(range(start_fid, end_fid + 1))
    
    # Query API
    user_data = query_neynar_api_for_users(fids)
    
    # Process and create nodes
    if user_data and not user_data.get('error'):
        created_count = process_user_data(user_data)
        logger.info(f"Successfully created/updated {created_count} Warpcast account nodes")
    else:
        logger.error(f"Failed to fetch user data: {user_data.get('error')}")
    
    logger.info("Completed Warpcast account fetch and node creation")

# Run the function
fetch_first_100_accounts()




2025-03-28 15:36:18,642 - __main__ - INFO - Starting to fetch first 100 Warpcast accounts
2025-03-28 15:36:18,643 - __main__ - INFO - Querying Neynar API for FIDs: [1, 2, 3, 4, 5]... (total: 100)
2025-03-28 15:36:19,170 - __main__ - INFO - Rate limit remaining: 299
2025-03-28 15:36:19,173 - __main__ - INFO - Processing 100 users
2025-03-28 15:37:32,170 - __main__ - INFO - Successfully created/updated 100 Warpcast account nodes
2025-03-28 15:37:32,172 - __main__ - INFO - Completed Warpcast account fetch and node creation


In [10]:
# Function to get the latest file from S3 with a given prefix
def get_latest_file(prefix):
    """
    Get the latest file from S3 bucket with the given prefix
    """
    try:
        response = s3_client.list_objects_v2(
            Bucket=BUCKET_NAME,
            Prefix=prefix
        )
        
        if 'Contents' not in response:
            logger.error(f"No files found with prefix {prefix}")
            return None
            
        all_files = sorted(response['Contents'], key=lambda x: x['LastModified'], reverse=True)
        
        if not all_files:
            logger.error(f"No files found with prefix {prefix}")
            return None
            
        latest_file = all_files[0]['Key']
        logger.info(f"Found latest file: {latest_file}")
        
        return latest_file
        
    except Exception as e:
        logger.error(f"Error accessing S3: {str(e)}")
        return None
    

# Function to get content of a file from S3
def get_file_content(key):
    """
    Get content of a file from S3
    """
    try:
        response = s3_client.get_object(Bucket=BUCKET_NAME, Key=key)
        content = response['Body'].read().decode('utf-8')
        
        return content
        
    except Exception as e:
        logger.error(f"Error getting S3 file content: {str(e)}")
        return None
    

# Function to save a DataFrame to S3 as CSV
def save_csv_to_s3(df, prefix):
    """
    Save a DataFrame to S3 as CSV
    Returns the S3 URL of the saved file
    """
    if df.empty:
        logger.warning("Attempted to save empty DataFrame")
        return None
        
    logger.info(f"Saving DataFrame with {len(df)} rows to S3 with prefix {prefix}")
    
    # Convert to CSV
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    csv_data = csv_buffer.getvalue()
    
    # Generate filename with timestamp
    timestamp = int(datetime.now().timestamp())
    chunk_name = f"{prefix}_{timestamp}.csv"
    
    try:
        # Upload to S3 with public-read ACL
        s3_client.put_object(
            Bucket=BUCKET_NAME,
            Key=chunk_name,
            Body=csv_data,
            ContentType='text/csv',
            ACL='public-read'  # Make it publicly accessible
        )
        
        # Get the S3 URL
        s3_url = f"https://{BUCKET_NAME}.s3.amazonaws.com/{chunk_name}"
        logger.info(f"Successfully saved to {s3_url}")
        
        return s3_url
    except Exception as e:
        logger.error(f"Error saving to S3: {str(e)}")
        return None
    



In [11]:
# Function to flatten the nested user object
def flatten_user(user):
    """
    Flatten the nested user object into a flat dictionary
    """
    flat_user = {
        'fid': user.get('fid'),
        'username': user.get('username'),
        'display_name': user.get('displayName'),
        'custody_address': user.get('custodyAddress'),
        'pfp_url': user.get('pfp', {}).get('url'),
        'bio': user.get('profile', {}).get('bio', {}).get('text'),
        'following_count': user.get('followingCount'),
        'follower_count': user.get('followerCount'),
        'verified': user.get('verified', False),
        'power_badge': user.get('powerBadge', False)
    }
    
    # Extract mentioned profiles from bio if available
    if flat_user['bio'] and isinstance(flat_user['bio'], str):
        # Find all @username mentions in the bio
        mentions = re.findall(r'@(\w+)', flat_user['bio'])
        if mentions:
            flat_user['mentioned_profiles'] = json.dumps(mentions)
        else:
            flat_user['mentioned_profiles'] = ''
    else:
        flat_user['mentioned_profiles'] = ''
    
    # Add additional fields from profile if available
    if 'profile' in user:
        profile = user['profile']
        
        # Add location data if available
        if 'location' in profile and profile['location']:
            location = profile['location']
            
            if 'address' in location and location['address']:
                address = location['address']
                flat_user['city'] = address.get('city', '')
                flat_user['state'] = address.get('state', '')
                flat_user['state_code'] = address.get('stateCode', '')
                flat_user['country'] = address.get('country', '')
                flat_user['country_code'] = address.get('countryCode', '')
            else:
                flat_user['city'] = ''
                flat_user['state'] = ''
                flat_user['state_code'] = ''
                flat_user['country'] = ''
                flat_user['country_code'] = ''
        else:
            flat_user['city'] = ''
            flat_user['state'] = ''
            flat_user['state_code'] = ''
            flat_user['country'] = ''
            flat_user['country_code'] = ''
    
    # Handle verifications
    if 'verifications' in user and user['verifications'] and len(user['verifications']) > 0:
        # Just take the first verification address as a string
        flat_user['verification_address'] = user['verifications'][0]
        flat_user['verification_type'] = 'ethereum'  # Default to ethereum
    else:
        flat_user['verification_address'] = ''
        flat_user['verification_type'] = ''
    
    # Clean any null values
    for key, value in flat_user.items():
        if value is None:
            flat_user[key] = ""
            
    return flat_user


# Function to process raw Warpcast data
def process_warpcast_data(raw_data):
    """
    Process raw Warpcast data into a DataFrame
    """
    logger.info("Processing Warpcast account data")
    
    # Create a list to collect all user data
    all_users = []
    
    # Iterate through each batch in the response
    for batch_idx, batch in enumerate(raw_data):
        if not batch.get('success', False):
            logger.warning(f"Skipping unsuccessful batch {batch_idx}")
            continue
            
        # Extract users data from the batch
        batch_data = batch.get('data', {})
        users = batch_data.get('users', [])
        
        # Add each user to our collection
        for user in users:
            if user:  # Skip empty entries
                # Flatten the user object for easier CSV handling
                flat_user = flatten_user(user)
                all_users.append(flat_user)
    
    # Convert to DataFrame
    users_df = pd.DataFrame(all_users)
    
    # Apply sanitization to text fields
    text_columns = ['username', 'display_name', 'bio']
    for col in text_columns:
        if col in users_df.columns:
            users_df[col] = users_df[col].apply(sanitize_text)
    
    # Convert boolean fields
    bool_columns = ['verified', 'power_badge']
    for col in bool_columns:
        if col in users_df.columns:
            users_df[col] = users_df[col].apply(str2bool)
    
    # Convert count fields to integers
    count_columns = ['following_count', 'follower_count']
    for col in count_columns:
        if col in users_df.columns:
            users_df[col] = pd.to_numeric(users_df[col], errors='coerce').fillna(0).astype(int)
    
    logger.info(f"Processed {len(users_df)} users")
    return users_df

# Function to process a CSV file from S3
def process_s3_file(url):
    """
    Read a CSV file from S3, sanitize text fields, and convert data types.
    
    Args:
        url: S3 URL for the CSV file
        
    Returns:
        pd.DataFrame: Processed DataFrame
    """
    try:
        # Read CSV from S3 URL
        df = pd.read_csv(url)
        
        # Sanitize text fields
        text_columns = ['text', 'displayName', 'username', 'display_name', 'bio']
        for col in text_columns:
            if col in df.columns:
                df[col] = df[col].apply(sanitize_text)
        
        # Convert boolean fields
        bool_columns = ['verified', 'power_badge']
        for col in bool_columns:
            if col in df.columns:
                df[col] = df[col].apply(str2bool)
        
        # Convert count fields to integers
        count_columns = ['following_count', 'follower_count', 'followers_count']
        for col in count_columns:
            if col in df.columns:
                df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0).astype(int)
        
        return df
    except Exception as e:
        logger.error(f"Error processing file {url}: {str(e)}")
        return None


In [12]:
# Function to get a Neo4j driver
def get_neo4j_driver():
    """Get a Neo4j driver with connection details from environment variables"""
    if not NEO4J_URI or not NEO4J_USERNAME or not NEO4J_PASSWORD:
        logger.error("Neo4j connection details not found in environment variables")
        return None
        
    try:
        driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USERNAME, NEO4J_PASSWORD))
        return driver
    except Exception as e:
        logger.error(f"Error connecting to Neo4j: {str(e)}")
        return None

# Function to run a Neo4j query
def run_neo4j_query(query, params=None):
    """Execute a Cypher query and return the results"""
    driver = get_neo4j_driver()
    if not driver:
        return None
        
    session = None
    response = None
    try:
        session = driver.session(database=NEO4J_DATABASE) if NEO4J_DATABASE else driver.session()
        response = list(session.run(query, params))
    except Exception as e:
        logger.error(f"Query failed: {e}")
        return None
    finally:
        if session:
            session.close()
        if driver:
            driver.close()
    return response






In [13]:
# Function to create or update Farcaster accounts in Neo4j
def create_farcaster_accounts(url):
    """Create or update Farcaster accounts in Neo4j from CSV URL"""
    logger.info(f"Creating/updating Farcaster accounts from {url}")
    
    query = f"""
    LOAD CSV WITH HEADERS FROM '{url}' AS row
    
    MERGE (account:Account:Farcaster {{fid: row.fid}})
    ON CREATE SET 
        account.username = row.username,
        account.displayName = row.display_name,
        account.bioText = row.bio,
        account.verified = row.verified,
        account.powerBadge = row.power_badge,
        account.followerCount = row.follower_count,
        account.followingCount = row.following_count,
        account.firstSeenAt = timestamp(),
        account.lastUpdatedAt = timestamp()
    ON MATCH SET 
        account.username = row.username,
        account.displayName = row.display_name,
        account.bioText = row.bio,
        account.verified = row.verified,
        account.powerBadge = row.power_badge,
        account.followerCount = row.follower_count,
        account.followingCount = row.following_count,
        account.lastUpdatedAt = timestamp()
    
    """
    
    result = run_neo4j_query(query)
    
    if result and result[0]:
        count = result[0].get('accountCount', 0)
        logger.info(f"Created/updated {count} Farcaster accounts in Neo4j")
        return count
    return 0

In [None]:
# Get latest Warpcast data file
latest_file = get_latest_file('warpcast/users/')
if not latest_file:
    print("No files found in S3 bucket")
else:
    # Get file content
    content = get_file_content(latest_file)
    if not content:
        print("Failed to read file content")
    else:
        # Parse JSON content
        raw_data = json.loads(content)
        print(f"Loaded data with {len(raw_data)} batch responses")
        
        # Process data into DataFrame
        users_df = process_warpcast_data(raw_data)
        print(f"Processed {len(users_df)} users")
        
        # Save to S3
        timestamp = int(datetime.now().timestamp())
        accounts_url = save_csv_to_s3(users_df, f"warpcast_accounts_{timestamp}")
        
        if accounts_url:
            # Create accounts in Neo4j
            count = create_farcaster_accounts(accounts_url)
            print(f"Created/updated {count} Farcaster accounts in Neo4j")

2025-03-28 15:10:27,824 - __main__ - INFO - Found latest file: warpcast/users/20250328_073938_responses.json
2025-03-28 15:10:58,004 - __main__ - INFO - Processing Warpcast account data


Loaded data with 8671 batch responses


2025-03-28 15:11:05,152 - __main__ - INFO - Processed 866901 users
2025-03-28 15:11:05,657 - __main__ - INFO - Saving DataFrame with 866901 rows to S3 with prefix warpcast_accounts_1743199865


Processed 866901 users


2025-03-28 15:11:43,931 - __main__ - INFO - Successfully saved to https://census-warpcast-account-metadata.s3.amazonaws.com/warpcast_accounts_1743199865_1743199868.csv
2025-03-28 15:11:43,965 - __main__ - INFO - Creating/updating Farcaster accounts from https://census-warpcast-account-metadata.s3.amazonaws.com/warpcast_accounts_1743199865_1743199868.csv
