In [1]:
import os
import json
import pandas as pd
import requests
import threading
import time
import datetime
import logging
import gc

# Setup basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def exponential_backoff(retries, base_delay=10, max_delay=300):
    """Calculate sleep time with exponential backoff and cap the delay."""
    return min(base_delay * (2 ** retries), max_delay)

def normalize_data(post):
    """Flatten nested dictionaries and handle lists appropriately."""
    flattened_post = {}
    for key, value in post.items():
        if isinstance(value, dict):
            for subkey, subvalue in value.items():
                flattened_post[f"{key}_{subkey}"] = subvalue
        elif isinstance(value, list):
            flattened_post[key] = json.dumps(value)
        else:
            flattened_post[key] = value
    return flattened_post

def write_posts_to_csv(posts, base_csv_dir, flush_size=100):
    """Write normalized posts to CSV files grouped by date."""
    if not posts:
        return

    columns = [
        'platformId', 'platform', 'date', 'updated', 'type', 'message',
        'expandedLinks', 'link', 'postUrl', 'subscriberCount', 'score', 'media',
        'statistics_actual_likeCount', 'statistics_actual_shareCount',
        'statistics_actual_commentCount', 'statistics_actual_loveCount',
        'statistics_actual_wowCount', 'statistics_actual_hahaCount',
        'statistics_actual_sadCount', 'statistics_actual_angryCount',
        'statistics_actual_thankfulCount', 'statistics_actual_careCount',
        'statistics_expected_likeCount', 'statistics_expected_shareCount',
        'statistics_expected_commentCount', 'statistics_expected_loveCount',
        'statistics_expected_wowCount', 'statistics_expected_hahaCount',
        'statistics_expected_sadCount', 'statistics_expected_angryCount',
        'statistics_expected_thankfulCount', 'statistics_expected_careCount',
        'account_id', 'account_name', 'account_handle', 'account_profileImage',
        'account_subscriberCount', 'account_url', 'account_platform',
        'account_platformId', 'account_accountType',
        'account_pageAdminTopCountry', 'account_pageDescription',
        'account_pageCreatedDate', 'account_pageCategory', 'account_verified',
        'brandedContentSponsor_id', 'brandedContentSponsor_name',
        'brandedContentSponsor_handle', 'brandedContentSponsor_profileImage',
        'brandedContentSponsor_subscriberCount', 'brandedContentSponsor_url',
        'brandedContentSponsor_platform', 'brandedContentSponsor_platformId',
        'brandedContentSponsor_accountType',
        'brandedContentSponsor_pageDescription',
        'brandedContentSponsor_pageCreatedDate',
        'brandedContentSponsor_pageCategory', 'brandedContentSponsor_verified',
        'history', 'languageCode', 'legacyId', 'id'
    ]
    processed_posts = [normalize_data(post) for post in posts]
    df = pd.DataFrame(processed_posts, columns=columns)
    df['date'] = pd.to_datetime(df['date'], errors='coerce').dt.strftime('%Y-%m-%d %H:%M:%S')
    grouped = df.groupby(df['date'].str[:10])
    for day, group in grouped:
        month_dir = os.path.join(base_csv_dir, day[:7])
        day_file_path = os.path.join(month_dir, f"{day}.csv")
        os.makedirs(month_dir, exist_ok=True)
        mode = 'a' if os.path.exists(day_file_path) else 'w'
        group.to_csv(day_file_path, mode=mode, header=mode=='w', index=False)
        if len(group) >= flush_size:
            logging.info(f"Flushed {len(group)} posts to disk for date: {day}")

    # After writing the data to CSV
    del df
    gc.collect()

def threaded_fetch(api_key, base_url, start_date_str, end_date_str, base_csv_dir, max_posts=100, max_retries=5):
    start_date = datetime.datetime.strptime(start_date_str, '%Y-%m-%d')
    end_date = datetime.datetime.strptime(end_date_str, '%Y-%m-%d')
    offset = 0
    retries = 0
    total_collected = 0

    params = {
        'token': api_key,
        'startDate': start_date.strftime('%Y-%m-%d'),   
        'endDate': end_date.strftime('%Y-%m-%d'),
        'count': max_posts,
        'sortBy': 'date',
        'language': 'en',
        'includeHistory': True,
        'pageAdminTopCountry': 'US',
        'offset': offset,
        'searchTerm': 'a'
    }

    while True:
        try:
            response = requests.get(base_url, params={**params, 'offset': offset})
            response.raise_for_status()
            data = response.json()

            if response.status_code == 429:
                sleep_time = exponential_backoff(retries)
                logging.warning(f"Rate limit exceeded. Retrying after {sleep_time} seconds...")
                time.sleep(sleep_time)
                retries += 1
                continue
            elif response.status_code == 504:
                sleep_time = exponential_backoff(retries, base_delay=30, max_delay=600)
                logging.warning(f"Server Timeout (504). Retrying after {sleep_time} seconds...")
                time.sleep(sleep_time)
                retries += 1
                continue

            posts = data.get('result', {}).get('posts', [])
            if posts:
                num_posts = len(posts)
                total_collected += num_posts
                logging.info(f"Retrieved {num_posts} posts, Total collected: {total_collected}")
                write_posts_to_csv(posts, base_csv_dir)
                offset += num_posts
                if num_posts < max_posts:
                    logging.info("No more posts found or end of dataset reached.")
                    break
            else:
                logging.info("No more posts found.")
                break

        except json.JSONDecodeError:
            logging.error("Failed to parse JSON from response, retrying...")
            retries += 1
            continue
        except requests.RequestException as e:
            if retries < max_retries:
                sleep_time = exponential_backoff(retries)
                logging.error(f"HTTP error: {e}. Retrying after {sleep_time} seconds...")
                time.sleep(sleep_time)
                retries += 1
            else:
                logging.error("Max retries reached, aborting operation.")
                break

        finally:
            if retries == 0:
                time.sleep(10)  # Comply with rate limit

def main_concurrent_fetch(api_keys, base_url, start_date, end_date, base_csv_dir, segments):
    threads = []
    start_dates = pd.date_range(start=start_date, end=end_date, periods=segments+1)
    
    for i in range(len(start_dates)-1):
        api_key = api_keys[i % len(api_keys)]  # Rotate API keys
        thread = threading.Thread(target=threaded_fetch, args=(api_key, base_url, start_dates[i].strftime('%Y-%m-%d'), start_dates[i+1].strftime('%Y-%m-%d'), base_csv_dir))
        threads.append(thread)
        thread.start()
    
    for thread in threads:
        thread.join()

# Usage
api_keys = ['TlsZsjUjvjUVPkUx1cu84OXx0oNKJc5677rTfG3j', 'zPIesLc7EE3SpVblpkVxNB0C6Xs76nP0DVIUgDwQ']
base_url = 'https://api.crowdtangle.com/posts'
-01
base_csv_dir = 'crowdtangle_unfiltered_data'
segments = len(api_keys)  # Adjust based on API key management strategy

main_concurrent_fetch(api_keys, base_url, start_date, end_date, base_csv_dir, segments)


2024-06-04 15:24:10,423 - INFO - Retrieved 100 posts, Total collected: 100
2024-06-04 15:24:10,542 - INFO - Flushed 100 posts to disk for date: 2022-01-31
2024-06-04 15:24:12,243 - INFO - Retrieved 100 posts, Total collected: 100
2024-06-04 15:24:12,354 - INFO - Flushed 100 posts to disk for date: 2022-01-15
2024-06-04 15:24:35,463 - INFO - Retrieved 100 posts, Total collected: 200
2024-06-04 15:24:35,568 - INFO - Flushed 100 posts to disk for date: 2022-01-15
2024-06-04 15:24:37,834 - INFO - Retrieved 100 posts, Total collected: 200
2024-06-04 15:24:37,933 - INFO - Flushed 100 posts to disk for date: 2022-01-31
2024-06-04 15:25:01,059 - INFO - Retrieved 100 posts, Total collected: 300
2024-06-04 15:25:01,171 - INFO - Flushed 100 posts to disk for date: 2022-01-15
2024-06-04 15:25:02,914 - INFO - Retrieved 100 posts, Total collected: 300
2024-06-04 15:25:02,998 - INFO - Flushed 100 posts to disk for date: 2022-01-31
2024-06-04 15:25:29,870 - INFO - Retrieved 100 posts, Total collected:

In [None]:
api_keys = ['TlsZsjUjvjUVPkUx1cu84OXx0oNKJc5677rTfG3j','zPIesLc7EE3SpVblpkVxNB0C6Xs76nP0DVIUgDwQ']
