# Loading Data

In [1]:
# Cell 1: Imports and Configuration

# Ensure the atproto library is up-to-date
!pip install --upgrade atproto

import os
from datetime import datetime, timezone, timedelta
from atproto import Client, models
# Import FirehoseError for specific Firehose errors
# Import RequestException for general ATProto API errors
from atproto.exceptions import FirehoseError, RequestException # Corrected import

# --- Bluesky Credentials ---
# It's recommended to use Colab Secrets for credentials instead of hardcoding.
# If you comment out the Colab Secrets part in Cell 2, uncomment these:
# BLUESKY_HANDLE = os.environ.get("BLUESKY_HANDLE", "yourname.bsky.social") # REPLACE or set env var
# BLUESKY_APP_PASSWORD = os.environ.get("BLUESKY_APP_PASSWORD", "xxxx-xxxx-xxxx-xxxx") # REPLACE or set env var

# --- Analysis Parameters ---
# How many posts to try and collect that fit the criteria
TARGET_POST_COUNT = int(os.environ.get("TARGET_POST_COUNT", 50))

# Timeframe for posts (ISO 8601 format, UTC is recommended: YYYY-MM-DDTHH:MM:SSZ)
# Example: Last 7 days
default_end_time = datetime.now(timezone.utc)
# --- Widened timeframe for testing ---
# default_start_time = default_end_time - timedelta(days=7) # Original
default_start_time = default_end_time - timedelta(days=30) # Example: Last 30 days for more data

TIMEFRAME_START_ISO = os.environ.get("TIMEFRAME_START_ISO", default_start_time.isoformat().replace('+00:00', 'Z'))
TIMEFRAME_END_ISO = os.environ.get("TIMEFRAME_END_ISO", default_end_time.isoformat().replace('+00:00', 'Z'))

# The AT URI of the feed to sample from.
# "What's Hot Classic" is a good general purpose feed.
# You can find others on sites like https://skyfeed.me/feeds or by looking at feed generator DIDs.
FEED_URI_TO_QUERY = os.environ.get("FEED_URI_TO_QUERY", "at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.generator/hot-classic")

# --- Safety Parameters ---
# Max number of feed pages to fetch to avoid running indefinitely
# --- Increased max pages for testing ---
# MAX_PAGES_TO_FETCH = 20 # Original
MAX_PAGES_TO_FETCH = 50 # Example: Fetch more pages

POSTS_PER_PAGE = 50 # API usually returns up to this many, can be set lower

print(f"Configuration:")
print(f"  Target post count: {TARGET_POST_COUNT}")
print(f"  Timeframe Start: {TIMEFRAME_START_ISO}")
print(f"  Timeframe End: {TIMEFRAME_END_ISO}")
print(f"  Feed URI: {FEED_URI_TO_QUERY}")
print(f"  Max pages to fetch: {MAX_PAGES_TO_FETCH}")
print(f"  Posts per page (limit): {POSTS_PER_PAGE}")
print("-" * 30)

# Helper to parse ISO string to timezone-aware datetime object
def parse_iso_datetime(iso_str: str) -> datetime | None:
    if not iso_str:
        return None
    try:
        # Python 3.11+ handles 'Z' directly. For older, replace 'Z'.
        if iso_str.endswith('Z'):
            iso_str = iso_str[:-1] + '+00:00'
        dt = datetime.fromisoformat(iso_str)
        # Ensure it's UTC if no timezone info was present but we expect UTC
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return dt
    except ValueError as e:
        print(f"Warning: Could not parse date '{iso_str}': {e}")
        return None

TIMEFRAME_START_DT = parse_iso_datetime(TIMEFRAME_START_ISO)
TIMEFRAME_END_DT = parse_iso_datetime(TIMEFRAME_END_ISO)

if not TIMEFRAME_START_DT or not TIMEFRAME_END_DT:
    raise ValueError("Invalid start or end datetime. Please check TIMEFRAME_START_ISO and TIMEFRAME_END_ISO.")
if TIMEFRAME_START_DT >= TIMEFRAME_END_DT:
    raise ValueError("TIMEFRAME_START_DT must be before TIMEFRAME_END_DT.")

Configuration:
  Target post count: 50
  Timeframe Start: 2025-05-24T09:40:22.485798Z
  Timeframe End: 2025-06-23T09:40:22.485798Z
  Feed URI: at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.generator/hot-classic
  Max pages to fetch: 50
  Posts per page (limit): 50
------------------------------


In [2]:
# Cell 2: Login to Bluesky
from google.colab import userdata
from atproto.exceptions import RequestException # Changed AtprotoError to RequestException

# Retrieve credentials from Colab Secrets
try:
    BLUESKY_HANDLE = userdata.get('BLUESKY_HANDLE')
    BLUESKY_APP_PASSWORD = userdata.get('BLUESKY_APP_PASSWORD')

    print(f"Retrieved HANDLE: {BLUESKY_HANDLE}") # Debug print
    print(f"Retrieved APP_PASSWORD: {'*' * len(BLUESKY_APP_PASSWORD) if BLUESKY_APP_PASSWORD else 'None'}") # Debug print (don't print the actual password)


    if not BLUESKY_HANDLE or not BLUESKY_APP_PASSWORD:
        raise ValueError("Bluesky credentials not found or empty in Colab Secrets.")

    client = Client()
    print(f"Attempting to log in as {BLUESKY_HANDLE} using Colab Secrets...")
    # The login method itself can raise atproto.exceptions.RequestException
    profile = client.login(BLUESKY_HANDLE, BLUESKY_APP_PASSWORD)
    print(f"Successfully logged in as {profile.handle} (DID: {profile.did})")

except RequestException as e: # Changed AtprotoError to RequestException
    print(f"Bluesky API Login Failed: {e}") # Specific API error
    client = None # Ensure client is None on failure

except Exception as e:
    print(f"General Login failed: {e}") # Other exceptions
    print("Ensure 'BLUESKY_HANDLE' and 'BLUESKY_APP_PASSWORD' are set in Colab Secrets.")
    print("Or comment out the login block if targeting a public feed anonymously.")
    client = None # Ensure client is None on failure

# Add a print statement after the try...except block to see the final client state
print(f"Client object after login attempt: {client}")

Retrieved HANDLE: dp2project.bsky.social
Retrieved APP_PASSWORD: *******************
Attempting to log in as dp2project.bsky.social using Colab Secrets...
Successfully logged in as dp2project.bsky.social (DID: did:plc:in6rgxlucspbkpnw22634urd)
Client object after login attempt: <atproto_client.client.client.Client object at 0x799b0670d050>


In [3]:
# Cell 3: Helper function to parse post data
def parse_post_view_data(post_view: models.AppBskyFeedDefs.PostView):
    """Extracts content and date from a PostView model."""
    content = ""
    created_at_str = ""
    author_did = post_view.author.did
    author_handle = post_view.author.handle

    try:
        if isinstance(post_view.record, models.AppBskyFeedPost.Record):
            content = post_view.record.text
            created_at_str = post_view.record.created_at
        elif isinstance(post_view.record, dict): # Fallback for unexpected record types
            content = post_view.record.get('text', '')
            created_at_str = post_view.record.get('createdAt', '') # ATProto spec uses createdAt
        else:
            # Handle cases where the record is of an unexpected type
            print(f"Warning: Encountered unexpected post record type for URI {post_view.uri}: {type(post_view.record)}")
            return None # Skip this post if the record is not a standard post type

        parsed_date = parse_iso_datetime(created_at_str)
        if parsed_date is None:
             print(f"Warning: Skipping post {post_view.uri} due to unparseable date: '{created_at_str}'")
             return None # Skip if date cannot be parsed

        return {
            "uri": post_view.uri,
            "cid": post_view.cid,
            "author_did": author_did,
            "author_handle": author_handle,
            "content": content,
            "created_at_iso": created_at_str, # Store original string
            "created_at_dt": parsed_date,   # Store parsed datetime object
            "reply_count": post_view.reply_count if post_view.reply_count is not None else 0,
            "repost_count": post_view.repost_count if post_view.repost_count is not None else 0,
            "like_count": post_view.like_count if post_view.like_count is not None else 0,
        }
    except Exception as e:
        print(f"Error parsing post {post_view.uri if hasattr(post_view, 'uri') else 'N/A'}: {e}")
        return None # Return None if any error occurs during parsing

# %%

In [6]:
# Cell 4: Fetching and Filtering Posts

def get_posts_from_feed_in_timeframe(
    client: Client,
    feed_uri: str,
    start_dt: datetime,
    end_dt: datetime,
    target_count: int,
    posts_per_page: int = 50, # How many posts to request per API call
    max_pages: int = 50       # Safety limit for number of pages
):
    """
    Fetches posts from a given feed URI that fall within a specified timeframe.

    Args:
        client: Authenticated atproto.Client instance.
        feed_uri: The AT URI of the feed to fetch (e.g., "What's Hot Classic").
        start_dt: The start of the desired timeframe (timezone-aware datetime).
        end_dt: The end of the desired timeframe (timezone-aware datetime).
        target_count: The desired number of posts to collect.
        posts_per_page: Number of posts to fetch in each API request.
        max_pages: Maximum number of pages to fetch to prevent infinite loops.

    Returns:
        A list of post data dictionaries.
    """
    collected_posts = []
    cursor = None
    pages_fetched = 0

    print(f"Fetching posts from feed: {feed_uri}")
    print(f"Looking for posts between {start_dt.isoformat()} and {end_dt.isoformat()}")

    while len(collected_posts) < target_count and pages_fetched < max_pages:
        try:
            print(f"Fetching page {pages_fetched + 1} (cursor: {cursor})...")
            # Construct the params object
            params = models.AppBskyFeedGetFeed.Params(
                feed=feed_uri,
                limit=posts_per_page,
                cursor=cursor
            )
            # Pass the params object to the get_feed method
            response = client.app.bsky.feed.get_feed(params)

        except RequestException as e:
            print(f"API Error fetching feed page {pages_fetched + 1}: {e}")
            # Decide if you want to retry or break on specific errors
            break # Break on API errors

        except Exception as e:
            print(f"Unexpected error fetching feed page {pages_fetched + 1}: {e}")
            break # Break on other unexpected errors


        if not response or not response.feed:
            print("No more posts in the feed or empty response for this page.")
            if pages_fetched == 0:
                print("Consider widening the timeframe or checking the feed URI.")
            break

        posts_on_page_in_timeframe = 0
        for feed_view_post in response.feed:
             # Ensure we are processing a standard PostView and it has a post attribute
            if not isinstance(feed_view_post, models.AppBskyFeedDefs.FeedViewPost) or not hasattr(feed_view_post, 'post'):
                 # print(f"Skipping item of unexpected type: {type(feed_view_post)}")
                 continue # Skip items that aren't standard feed view posts with a 'post' field

            post_data = parse_post_view_data(feed_view_post.post)

            if post_data: # Check if parsing was successful and returned data
                post_date = post_data["created_at_dt"]

                # post_date is already timezone-aware from parse_iso_datetime
                if start_dt <= post_date <= end_dt:
                    collected_posts.append(post_data)
                    posts_on_page_in_timeframe += 1
                    if len(collected_posts) >= target_count:
                        break # Reached target
                # Optional: Early exit if post is much older than start date,
                # assuming feed is roughly chronological. Be cautious with this
                # on feeds like 'What's Hot' which are based on popularity.
                # elif post_date < start_dt and pages_fetched > 0:
                #     print(f"Post {post_data['uri']} ({post_date}) is older than start date {start_dt}. Considering early exit.")
                #     # A more sophisticated check might be needed here.
                #     pass # Continue fetching for now on 'What's Hot'


        print(f"  Found {posts_on_page_in_timeframe} posts in timeframe on this page.")
        print(f"  Total collected so far: {len(collected_posts)} / {target_count}")

        if len(collected_posts) >= target_count:
            print("Target number of posts collected.")
            break

        cursor = response.cursor
        if not cursor:
            print("No more pages (cursor is null).")
            break
        pages_fetched += 1

    if pages_fetched >= max_pages:
        print(f"Reached maximum page limit ({max_pages}) before collecting {target_count} posts.")
    elif len(collected_posts) < target_count:
        print(f"Finished fetching all available pages within the limit ({max_pages}), but only collected {len(collected_posts)}/{target_count} posts in the specified timeframe.")


    return collected_posts

# --- Execute the fetching ---
# Check if client is authenticated and not None after the login attempt
if 'client' in locals() and client is not None and client.me:
    try:
        sampled_posts = get_posts_from_feed_in_timeframe(
            client,
            feed_uri=FEED_URI_TO_QUERY,
            start_dt=TIMEFRAME_START_DT,
            end_dt=TIMEFRAME_END_DT,
            target_count=TARGET_POST_COUNT,
            posts_per_page=POSTS_PER_PAGE,
            max_pages=MAX_PAGES_TO_FETCH # Using the potentially increased value
        )
        print(f"\n--- Collected {len(sampled_posts)} Posts ---")
        if sampled_posts:
            for i, post in enumerate(sampled_posts):
                date_str = post['created_at_dt'].strftime('%Y-%m-%d %H:%M:%S %Z') if post['created_at_dt'] else "N/A"
                print(f"{i+1}. Author: {post['author_handle']} ({post['author_did']})")
                print(f"   Date: {date_str}")
                print(f"   Content: '{post['content'][:150].replace(chr(10), ' ')}...'") # Truncate and remove newlines for display
                print(f"   URI: {post['uri']}")
                print(f"   Likes: {post['like_count']}, Reposts: {post['repost_count']}, Replies: {post['reply_count']}")
                print("-" * 20)

            # Convert collected posts to PySpark RDD
            # This requires SparkSession to be initialized, which happens in Cell 5.
            # Ensure Cell 5 is run before this part if you intend to use Spark.
            # Checking if spark is defined to avoid NameError if Cell 5 hasn't run.
            if 'spark' in globals() and sampled_posts:
                 posts_rdd = spark.sparkContext.parallelize(sampled_posts)
                 print("\n--- RDD created from collected posts ---")
                 print(f"Number of partitions in RDD: {posts_rdd.getNumPartitions()}")

                 # You can now perform RDD operations on posts_rdd

                 # Example: Print a few elements from the RDD
                 print("\nSample RDD elements:")
                 for i, post in enumerate(posts_rdd.take(5)): # take the first 5 elements
                     print(f"  {i+1}. {post}")

            elif sampled_posts:
                 print("SparkSession not initialized. Skipping RDD creation.")

            else:
                print("No posts collected, RDD not created.")


        else:
            print("No posts collected that fit the criteria.")

    except Exception as e:
        print(f"An error occurred during the post fetching process: {e}")


else:
    print("Client not authenticated. Skipping post fetching.")

Fetching posts from feed: at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.generator/hot-classic
Looking for posts between 2025-05-24T09:40:22.485798+00:00 and 2025-06-23T09:40:22.485798+00:00
Fetching page 1 (cursor: None)...
  Found 50 posts in timeframe on this page.
  Total collected so far: 50 / 50
Target number of posts collected.

--- Collected 50 Posts ---
1. Author: aphclarkson.bsky.social (did:plc:xv2as7miy7ghje2htfynaqex)
   Date: 2025-06-23 09:38:19 UTC
   Content: 'If the Khamenei regime does ever fall you end up with the question of who gets there hands on 400 kilograms of 60% enriched uranium whose location in ...'
   URI: at://did:plc:xv2as7miy7ghje2htfynaqex/app.bsky.feed.post/3lsbbwheycs2h
   Likes: 17, Reposts: 2, Replies: 4
--------------------
2. Author: austinwalker.bsky.social (did:plc:sykjce6e3ll5g6ev6evkcy3f)
   Date: 2025-06-23 09:37:47 UTC
   Content: 'Had a great time at @narrascope.org this weekend, and as someone recovering from painful dentist stuff, w

In [7]:
# Cell 5: Initialize Spark Session
from pyspark.sql import SparkSession

# Create a SparkSession (or get existing)
spark = SparkSession.builder \
    .appName("BlueskyPostAnalysis") \
    .getOrCreate()

print("SparkSession created.")

SparkSession created.
