## Clean and process raw chunks of data from source repo

In [3]:
import os
import glob
import gzip
import json
import math
import duckdb
import re
import ast
import pandas as pd
from datetime import datetime, timezone


def replace_python_datetimes_with_strings(s: str) -> str:
    """
    Finds expressions like:
        datetime.datetime(2016, 4, 14, 15, 38, 50, tzinfo=datetime.timezone.utc)
    and replaces them with an ISO8601 string, e.g. '2016-04-14T15:38:50+00:00'.
    
    If it can't parse properly, it leaves the text as-is.
    """

    # Regex that captures `datetime.datetime(`, grabs everything up to the matching `)`
    # This is a simplistic approach and may need to be made more robust
    pattern = r"datetime\.datetime\(\s*([^)]+)\s*\)"

    def _parse_one_datetime(match):
        """
        Parse the inside of datetime.datetime(...) into a Python datetime,
        return the ISO8601 string.
        """
        inner_str = match.group(1)  # e.g. "2016, 4, 14, 15, 38, 50, tzinfo=datetime.timezone.utc"
        
        # If there's a tzinfo in there, let's remove it or parse it properly
        # This is a simple approach: if "tzinfo=datetime.timezone.utc" is present, we handle it.
        inner_str = inner_str.replace("tzinfo=datetime.timezone.utc", "")
        
        # Now we have something like: "2016, 4, 14, 15, 38, 50," or "2016, 4, 14, 15, 38, 50"
        # Let's split on commas
        parts = [p.strip() for p in inner_str.split(',') if p.strip()]
        # Expecting at least 6 parts: year, month, day, hour, minute, second
        if len(parts) < 6:
            # Fallback: if we can't parse, just return the original text
            return match.group(0)
        
        # Convert parts to integers
        try:
            year   = int(parts[0])
            month  = int(parts[1])
            day    = int(parts[2])
            hour   = int(parts[3])
            minute = int(parts[4])
            second = int(parts[5])
            # Build the datetime object with UTC by default
            dt = datetime(year, month, day, hour, minute, second, tzinfo=timezone.utc)
            # Return as an ISO format string in quotes
            return f"\"{dt.isoformat()}\""
        except Exception:
            # If anything fails, fall back to returning original text
            return match.group(0)

    # Apply the regex substitution to the entire string
    return re.sub(pattern, _parse_one_datetime, s)

# ------------------------------------------------------------------------------
# Configuration
# ------------------------------------------------------------------------------
TWEETS_ROOT = "C:/Users/Saeed/repos/usc-x-24-us-election"  # Where part_x directories live
DUCKDB_PATH = "tweets2.duckdb"                    # Where to store the new DuckDB
TABLE_NAME  = "tweets"

# ------------------------------------------------------------------------------
# Create a new DuckDB connection
# ------------------------------------------------------------------------------
con = duckdb.connect(database=DUCKDB_PATH, read_only=False)

# ------------------------------------------------------------------------------
# Create the tweets table if it does not exist
# ------------------------------------------------------------------------------
create_table_query = f"""
CREATE TABLE IF NOT EXISTS {TABLE_NAME} (
    id VARCHAR,
    text VARCHAR,
    url VARCHAR,
    timestamp TIMESTAMP,
    media VARCHAR,
    retweetedTweet BOOLEAN,
    retweetedTweetID VARCHAR,
    retweetedUserID VARCHAR,
    lang VARCHAR,
    replyCount BIGINT,
    retweetCount BIGINT,
    likeCount BIGINT,
    quoteCount BIGINT,
    conversationId VARCHAR,
    conversationIdStr VARCHAR,
    hashtags VARCHAR,
    mentionedUsers VARCHAR,
    links VARCHAR,
    viewCount BIGINT,
    quotedTweet BOOLEAN,
    location VARCHAR,
    cash_app_handle VARCHAR,
    username VARCHAR,
    bio VARCHAR,
    followersCount BIGINT,
    verified BOOLEAN
);
"""
con.execute(create_table_query)

# ------------------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------------------
def safe_json_loads(s):
    """
    Attempt to parse a string as JSON or Python literal dict.
    If parsing fails, return None.
    Now includes a step to transform 'datetime.datetime(...)' into a plain string.
    """
    if not s or not isinstance(s, str):
        return None

    s = s.strip()

    # 1) First pass: fix Python datetime objects
    s = replace_python_datetimes_with_strings(s)

    # 2) Attempt to parse as JSON
    try:
        return json.loads(s)
    except:
        # 3) Attempt Python literal_eval
        try:
            return ast.literal_eval(s)
        except:
            return None

def parse_view_count(value):
    """
    The viewCount column sometimes contains an object like:
      {'count': '14', 'state': 'EnabledWithCount'}
    We only want the integer from 'count'.
    """
    data = safe_json_loads(value)
    if data and isinstance(data, dict) and 'count' in data:
        try:
            return int(data['count'])
        except:
            pass
    return None

def parse_epoch_to_timestamp(epoch_val):
    """
    Convert the epoch float (e.g. 1720107523.0) to a UTC timestamp/datetime.
    If it's empty or invalid, return None.
    """
    if not epoch_val:
        return None
    try:
        epoch_float = float(epoch_val)
        if math.isinf(epoch_float) or math.isnan(epoch_float) or epoch_float < 0:
            return None
        return datetime.utcfromtimestamp(epoch_float)
    except:
        return None

def parse_bool(val):
    """
    Convert 'True'/'False' or similar strings to boolean.
    """
    if isinstance(val, bool):
        return val
    if isinstance(val, str):
        lower_val = val.strip().lower()
        if lower_val == 'true':
            return True
        if lower_val == 'false':
            return False
    return False

def parse_user_info(user_str):
    """
    The user column is a JSON-like string containing location, username, rawDescription, 
    followersCount, verified, etc.

    Return a dict with relevant fields:
      location, username, bio, followersCount, verified
    """
    #print(user_str)
    user_data = safe_json_loads(user_str)
    #print(user_data)
    if not user_data or not isinstance(user_data, dict):
        return {
            "location": None,
            "username": None,
            "bio": None,
            "followersCount": None,
            "verified": None
        }
    
    location = user_data.get('location', None)
    username = user_data.get('username', None)
    bio = user_data.get('rawDescription', None)
    followersCount = user_data.get('followersCount', None)
    verified = user_data.get('verified', False)
    
    return {
        "location": location,
        "username": username,
        "bio": bio,
        "followersCount": followersCount,
        "verified": verified
    }

def parse_row(row, columns):
    """
    Given a row (dictionary from csv) and a list of columns found in that CSV,
    extract the relevant fields with the correct transformations. 
    """
    _id                = row.get('id', None)
    rawContent         = row.get('rawContent', None) or row.get('text', None)
    url                = row.get('url', None)
    epoch              = row.get('epoch', None)
    media              = row.get('media', None)
    retweetedTweet     = row.get('retweetedTweet', None)
    retweetedTweetID   = row.get('retweetedTweetID', None)
    retweetedUserID    = row.get('retweetedUserID', None)
    lang               = row.get('lang', None)
    replyCount         = row.get('replyCount', None)
    retweetCount       = row.get('retweetCount', None)
    likeCount          = row.get('likeCount', None)
    quoteCount         = row.get('quoteCount', None)
    conversationId     = row.get('conversationId', None)
    conversationIdStr  = row.get('conversationIdStr', None)
    hashtags           = row.get('hashtags', None)
    mentionedUsers     = row.get('mentionedUsers', None)
    links              = row.get('links', None)
    viewCount          = row.get('viewCount', None)
    quotedTweet        = row.get('quotedTweet', None)
    cash_app_handle    = row.get('cash_app_handle', None)
    user_str           = row.get('user', None)
    
    timestamp       = parse_epoch_to_timestamp(epoch)
    retweetedTweet  = parse_bool(retweetedTweet)
    quotedTweet     = parse_bool(quotedTweet)

    def to_int(x):
        try:
            return int(float(x))
        except:
            return 0
    
    replyCount   = to_int(replyCount)
    retweetCount = to_int(retweetCount)
    likeCount    = to_int(likeCount)
    quoteCount   = to_int(quoteCount)

    vc_parsed    = parse_view_count(viewCount)
    if vc_parsed is None:
        try:
            vc_parsed = int(float(viewCount))
        except:
            vc_parsed = 0

    user_info    = parse_user_info(user_str)
    location     = user_info["location"]
    username     = user_info["username"]
    bio          = user_info["bio"]
    followers    = user_info["followersCount"]
    verified     = user_info["verified"]

    media_str      = media if media else ""
    hashtags_str   = str(hashtags) if hashtags else ""
    mentioned_str  = str(mentionedUsers) if mentionedUsers else ""
    links_str      = str(links) if links else ""

    return {
        "id": _id,
        "text": rawContent,
        "url": url,
        "timestamp": timestamp,
        "media": media_str,
        "retweetedTweet": retweetedTweet,
        "retweetedTweetID": str(retweetedTweetID) if retweetedTweetID else None,
        "retweetedUserID": str(retweetedUserID) if retweetedUserID else None,
        "lang": lang,
        "replyCount": replyCount,
        "retweetCount": retweetCount,
        "likeCount": likeCount,
        "quoteCount": quoteCount,
        "conversationId": str(conversationId) if conversationId else None,
        "conversationIdStr": str(conversationIdStr) if conversationIdStr else None,
        "hashtags": hashtags_str,
        "mentionedUsers": mentioned_str,
        "links": links_str,
        "viewCount": vc_parsed,
        "quotedTweet": quotedTweet,
        "location": location,
        "cash_app_handle": cash_app_handle,
        "username": username,
        "bio": bio,
        "followersCount": followers if followers else 0,
        "verified": verified
    }

# ------------------------------------------------------------------------------
# Main ingest logic with retry and chunk support
# ------------------------------------------------------------------------------
def process_csv_gz(file_path, attempt=1, max_retries=3, chunk_size=50000):
    """
    Read a gzipped CSV file in chunks, parse it, and insert cleaned rows into DuckDB.
    Retry logic is added if errors occur (up to max_retries).
    """
    try:
        print(f"[Attempt {attempt}] Processing file: {file_path}")

        with gzip.open(file_path, 'rt', encoding='utf-8') as gz:
            # Read CSV in chunks to reduce memory usage
            for df_chunk in pd.read_csv(gz, engine='python', dtype=str, chunksize=chunk_size):
                records = []
                for _, row in df_chunk.iterrows():
                    cleaned = parse_row(row.to_dict(), df_chunk.columns)
                    records.append(cleaned)

                if records:
                    insert_df = pd.DataFrame(records)
                    con.execute("BEGIN;")
                    con.append(TABLE_NAME, insert_df)
                    con.execute("COMMIT;")

        print(f"Finished processing file: {file_path}")

    except Exception as e:
        print(f"Error processing file: {file_path}, Error: {str(e)}")
        if attempt < max_retries:
            print(f"Retrying file: {file_path} (attempt {attempt+1}/{max_retries})")
            process_csv_gz(file_path, attempt=attempt+1, max_retries=max_retries, chunk_size=chunk_size)
        else:
            print(f"Max retries reached for file: {file_path}. Skipping...")

def main():
    part_dirs = sorted(glob.glob(os.path.join(TWEETS_ROOT, "part_*")))
    for part_dir in part_dirs:
        csv_gz_files = glob.glob(os.path.join(part_dir, "*.csv.gz"))
        for file_path in csv_gz_files:
            process_csv_gz(file_path, max_retries=3, chunk_size=50000)

    print("Done processing all CSV.GZ files!")

if __name__ == "__main__":
    main()
    # Close the DuckDB connection at the end
    con.close()


[Attempt 1] Processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chunk_1.csv.gz
Finished processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chunk_1.csv.gz
[Attempt 1] Processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chunk_10.csv.gz
Finished processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chunk_10.csv.gz
[Attempt 1] Processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chunk_11.csv.gz
Finished processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chunk_11.csv.gz
[Attempt 1] Processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chunk_12.csv.gz
Finished processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chunk_12.csv.gz
[Attempt 1] Processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chunk_13.csv.gz
Finished processing file: C:/Users/Saeed/repos/usc-x-24-us-election\part_1\may_july_chu

In [4]:
import duckdb
import pandas as pd

def analyze_tweets_database(db_path="tweets2.duckdb"):
    """
    Comprehensive analysis of the tweets database based on our schema
    """
    con = duckdb.connect(database=db_path, read_only=True)
    
    try:
        # 1. Basic Statistics
        print("=== Basic Statistics ===")
        basic_stats = con.execute("""
            SELECT 
                COUNT(*) as total_tweets,
                COUNT(DISTINCT username) as unique_users,
                COUNT(*) FILTER (WHERE verified = true) as verified_users,
                AVG(followersCount) as avg_followers,
                AVG(likeCount) as avg_likes,
                AVG(retweetCount) as avg_retweets,
                COUNT(DISTINCT lang) as language_count
            FROM tweets
        """).fetchdf()
        
        stats = basic_stats.iloc[0]
        print(f"Total Tweets: {stats['total_tweets']:,}")
        print(f"Unique Users: {stats['unique_users']:,}")
        print(f"Verified Users: {stats['verified_users']:,}")
        print(f"Average Followers: {stats['avg_followers']:,.2f}")
        print(f"Average Likes: {stats['avg_likes']:,.2f}")
        print(f"Average Retweets: {stats['avg_retweets']:,.2f}")
        print(f"Number of Languages: {stats['language_count']:,}")

        # 2. Engagement Statistics
        print("\n=== Engagement Statistics ===")
        engagement_stats = con.execute("""
            SELECT 
                'Engagement' as metric,
                AVG(replyCount) as avg_replies,
                MAX(replyCount) as max_replies,
                AVG(retweetCount) as avg_retweets,
                MAX(retweetCount) as max_retweets,
                AVG(likeCount) as avg_likes,
                MAX(likeCount) as max_likes,
                AVG(quoteCount) as avg_quotes,
                MAX(quoteCount) as max_quotes,
                AVG(viewCount) as avg_views,
                MAX(viewCount) as max_views
            FROM tweets
            WHERE replyCount IS NOT NULL 
              AND retweetCount IS NOT NULL 
              AND likeCount IS NOT NULL
        """).fetchdf()
        
        e = engagement_stats.iloc[0]
        print(f"Replies:  Avg: {e['avg_replies']:,.1f}, Max: {e['max_replies']:,}")
        print(f"Retweets: Avg: {e['avg_retweets']:,.1f}, Max: {e['max_retweets']:,}")
        print(f"Likes:    Avg: {e['avg_likes']:,.1f}, Max: {e['max_likes']:,}")
        print(f"Quotes:   Avg: {e['avg_quotes']:,.1f}, Max: {e['max_quotes']:,}")
        print(f"Views:    Avg: {e['avg_views']:,.1f}, Max: {e['max_views']:,}")

        # 3. Language Distribution
        print("\n=== Language Distribution (Top 10) ===")
        lang_dist = con.execute("""
            SELECT 
                lang,
                COUNT(*) as count,
                COUNT(*) * 100.0 / SUM(COUNT(*)) OVER () as percentage
            FROM tweets
            WHERE lang IS NOT NULL
            GROUP BY lang
            ORDER BY count DESC
            LIMIT 10
        """).fetchdf()
        print(lang_dist)

        # 4. Top Users by Engagement
        print("\n=== Top Users by Engagement (Top 10) ===")
        top_users = con.execute("""
            SELECT 
                username,
                bio,
                followersCount,
                verified,
                COUNT(*) as tweet_count,
                AVG(likeCount) as avg_likes,
                AVG(retweetCount) as avg_retweets
            FROM tweets
            WHERE username IS NOT NULL
            GROUP BY username, bio, followersCount, verified
            HAVING COUNT(*) > 3
            ORDER BY (AVG(likeCount) + AVG(retweetCount)) DESC
            LIMIT 10
        """).fetchdf()
        print(top_users)

        # 5. Retweet Analysis
        print("\n=== Retweet Statistics ===")
        retweet_stats = con.execute("""
            SELECT 
                COUNT(*) FILTER (WHERE retweetedTweet = true) as retweet_count,
                COUNT(*) FILTER (WHERE retweetedTweet = true) * 100.0 / COUNT(*) as retweet_percentage
            FROM tweets
        """).fetchdf()
        r = retweet_stats.iloc[0]
        print(f"Total Retweets: {r['retweet_count']:,} ({r['retweet_percentage']:.1f}%)")

        # 6. Location Analysis
        print("\n=== Top Locations (Top 10) ===")
        locations = con.execute("""
            SELECT 
                location,
                COUNT(*) as count
            FROM tweets
            WHERE location IS NOT NULL AND location != ''
            GROUP BY location
            ORDER BY count DESC
            LIMIT 10
        """).fetchdf()
        print(locations)

        # 7. Sample Tweets
        print("\n=== Random Tweet Samples ===")
        samples = con.execute("""
            SELECT 
                text,
                username,
                timestamp,
                likeCount,
                retweetCount,
                replyCount,
                viewCount
            FROM tweets 
            WHERE text IS NOT NULL
            ORDER BY random() 
            LIMIT 3
        """).fetchdf()
        
        for i, row in samples.iterrows():
            print(f"\n--- Tweet {i+1} ---")
            print(f"Time: {row['timestamp']}")
            print(f"User: {row['username']}")
            print(f"Text: {str(row['text'])[:200]}...")
            print(f"Engagement: {row['replyCount']} replies, {row['retweetCount']} retweets, "
                  f"{row['likeCount']} likes, {row['viewCount']} views")

    finally:
        con.close()

if __name__ == "__main__":
    analyze_tweets_database()

=== Basic Statistics ===
Total Tweets: 34,761,518.0
Unique Users: 3,408,018.0
Verified Users: 0.0
Average Followers: 41,013.60
Average Likes: 78,900,920,591.58
Average Retweets: 285,554,591,546.28
Number of Languages: 101.0

=== Engagement Statistics ===
Replies:  Avg: 208,500,324,391.0, Max: 1,824,183,955,798,540,800
Retweets: Avg: 285,554,591,546.3, Max: 1,816,540,335,826,436,352
Likes:    Avg: 78,900,920,591.6, Max: 1,814,654,783,417,925,888
Quotes:   Avg: 1.3, Max: 174,726
Views:    Avg: 52,202,984,693.5, Max: 1,814,654,783,417,925,632

=== Language Distribution (Top 10) ===
  lang     count  percentage
0   en  29050277   87.336466
1  qme   1200324    3.608642
2   es    603902    1.815565
3  und    266688    0.801768
4   fr    224270    0.674243
5   et    186674    0.561215
6   pt    181690    0.546231
7   ja    149042    0.448078
8  qht    126633    0.380708
9   de    124316    0.373742

=== Top Users by Engagement (Top 10) ===
          username                                   