In [None]:
pip install boto3

In [None]:
pip install atproto

In [None]:
# aws stuff
import boto3
from botocore.exceptions import ClientError
# json necessary to parse secret string, and write/read s3 objects
import json

# From the transformers package, import ViTImageProcessor and ViTForImageClassification
from transformers import ViTImageProcessor, ViTForImageClassification

# From the PIL package, import Image and Markdown
from PIL import Image

# import requests
import requests

# import torch
import torch

# import matplotlib
import matplotlib.pyplot as plt

# url getter for mpl
import urllib

import numpy as np

# import bluesky api
from atproto import Client

# import colab secrets to store login credentials
from google.colab import userdata

# datetime is necessary for caturday check and logging
import datetime
import zoneinfo

# these imports are to use github apis to do logging, base64 is to parse the json
# import requests # already imported for something else
import base64

# adding a hugginface login so that we can authenticate
from huggingface_hub import login

In [None]:
# input variables - x is target follows y is num of posts we want to look through, whichever end criteria we reach first
EMBEDDED_PIC = 'app.bsky.embed.images#view'
EMBEDDED_VID = 'app.bsky.embed.video#view'
FEED_CATURDAY = 'at://did:plc:tenurhgjptubkk5zf5qhi3og/app.bsky.feed.generator/#caturday'
FEED_SIAMESE = 'at://did:plc:jv3qdc5vxujp6taaa7nte35i/app.bsky.feed.generator/aaac6wmikqyhq'
# FEED_CATPICS = 'at://did:plc:q6gjnaw2blty4crticxkmujt/app.bsky.feed.generator/cv:cat'
# FEED_CATS = 'at://did:plc:jfhpnnst6flqway4eaeqzj2a/app.bsky.feed.generator/cats'
# FEED_TUXEDOCATS = 'at://did:plc:eubjsqnf5edgvcc6zuoyixhw/app.bsky.feed.generator/tuxedo-cats'
# FEED_CATURDAY = 'at://did:plc:pmyqirafcp3jqdhrl7crpq7t/app.bsky.feed.generator/aaad4sb7tyvjw' # this one is old idk why it disappeared but it was still working?
FEED_NAME = {FEED_CATURDAY: "'Caturday'", FEED_SIAMESE: "'Siamese Cats'", FEED_CATPICS: "'Cat Pics'", FEED_CATS: "'Cats!'", FEED_TUXEDOCATS: "'Tuxedo Cats'"}
URL_BEGIN = 'https://bsky.app/profile/'
URL_POST = '/post/'
# my did to check against
MY_DID = 'did:plc:ktkc7jfakxzjpooj52ffc6ra'

# query creation constants
SCHEMA = '"bsky"'
TABLE_FOLLOWS = '"follows"'
COL_USERID = 'user_id'
COL_USERHANDLE = 'user_handle'
COL_FOLLOWURI = 'follow_uri'
COL_FOLLOWDATE = 'follow_date'
COL_FOLLOWSYOU = 'follows_you'
DATETIME_NOW = 'NOW()'
END_LINE = ','
END_QUERY = ';'
TABLE_POSTS = '"posts"'
COL_POSTCID = 'post_cid'
COL_SEENTIME = 'seen_time'

CATURDAY_DOW = 'Saturday'
USER_TIMEZONE = "US/Eastern" # you should fill this in with your own timezone here

LINE_BREAK = '\n'
END_LOGGING = '\n____________________\n'

FILE_PATH = "LOGGING_ADD_02.txt"  # Replace with the file path in your repo
BRANCH = "main"  # Replace with your branch name

AWS_KEY = userdata.get('aws_access_key')
AWS_SECRET_KEY = userdata.get('aws_secret_access_key')
REGION = userdata.get('aws_region')
SECRETS_ID = userdata.get('aws_secretsmanager_id')

DDB = 'dynamodb'
S3 = 's3'
DDB_TABLE = 'rickybot-ddb'
S3_BUCKET = 'rickybot-s3'
DDB_CACHE_KEY = 'CACHE'
DDB_CACHE_ATTRIBUTE = 'CIDS'

PRIMARY_KEY = 'DOW' # the dynamodb table's primary key. there is no sort key
DOW_KEYS = {
    'Sunday': 'SUN',
    'Monday': 'MON',
    'Tuesday': 'TUE',
    'Wednesday': 'WED',
    'Thursday': 'THU',
    'Friday': 'FRI+SAT',
    'Saturday': 'FRI+SAT'
}

# run settings
POSTS_CATURDAY = 1000
FOLLOWS_CATURDAY = 1000 #350 when automated to 1 run per 1 hour
POSTS_OTHERCAT = 1000
FOLLOWS_OTHERCAT = 1000 # 400 when automated to 1 run per 2 hours
# running these very frequently so we don't need to do too many:
# by my math cap for day is 9250, so 1 run per hr caps at 385, 2 hours is 770, but we need to save some of that room for deletions, especially on Fridays. So when we automate I want to do 1000-350, 1000-400

In [None]:
# get the day of the week so we know what dynamodb key to pull from and which bucket to aggregate to
# doing this first because we do not run this on saturday and can bail out early if we get into this code for some reason
# also we are running this at about 1am, the following day after all runs have concluded for the previous. so we're aggregating the previous day's results
cur_timestamp = datetime.datetime.now(zoneinfo.ZoneInfo(USER_TIMEZONE))
dow = cur_timestamp.strftime("%A")
str_timestamp = str(cur_timestamp) # we'll need this to use as the attribute for ddb

# use the day of the week to pull up the corresponding key for our dynamodb entries and our s3 bucket
ddb_key = DOW_KEYS[dow]
print(ddb_key)

# also this is where we can initialize our RUNNING LOG string.
running_logging_text = str_timestamp + LINE_BREAK

In [None]:
# connect to aws
try:
  aws_session = boto3.Session(
          aws_access_key_id = AWS_KEY,
          aws_secret_access_key = AWS_SECRET_KEY,
          region_name = REGION
      )
except:
  print('failed to begin AWS session')
  # return with error
  # this is the only error that we can't log to github, because we never got the credentials

In [None]:
# then connect to secrets manager
try:
  secrets_client = aws_session.client('secretsmanager')
  secret_value = secrets_client.get_secret_value(SecretId=SECRETS_ID)
  secret_string = secret_value['SecretString']
  secret_map = json.loads(secret_string)
except:
  print('failed to reach aws secrets manager')
  # return with error

In [None]:
# create constants from the values in the secrets manager
BSKY_USERNAME = secret_map['bsky_username']
BSKY_PASS = secret_map['bsky_password']
GITHUB_TOKEN = secret_map['github_token']
GITHUB_REPO = secret_map['github_user/repo']
HUGGING_TOKEN = secret_map['hugging_token']

In [None]:
# now that we have our github token set up we should set up our logging function to use whenever we encounter any errors
def logging_add(logging_text)
  # LOGGING ALL THE CHANGES TO OUR LOGGING FILE IN GITHUB
  commit_message = "Logging for follower additions on " + str_timestamp

  # Step 1: Get the file's current content and SHA
  url = f"https://api.github.com/repos/{GITHUB_REPO}/contents/{FILE_PATH}"
  headers = {"Authorization": f"token {GITHUB_TOKEN}"}
  response = requests.get(url, headers=headers)
  response_json = response.json()

  # Decode the content of the file
  file_sha = response_json["sha"]
  content = base64.b64decode(response_json["content"]).decode("utf-8")

  # Step 2: Modify the file content
  new_content = content + LINE_BREAK + logging_text + END_LOGGING
  encoded_content = base64.b64encode(new_content.encode("utf-8")).decode("utf-8")

  # Step 3: Push the updated content
  data = {
      "message": commit_message,
      "content": encoded_content,
      "sha": file_sha,
      "branch": BRANCH,
  }
  update_response = requests.put(url, headers=headers, json=data)

  if update_response.status_code == 200:
      print("Logging file updated successfully! Here's what was added to the logs:\n")
      print(logging_text + END_LOGGING)
  else:
      print(f"Error: {update_response.json()}")

In [None]:
# log in to huggingface to authenticate
try:
  login(HUGGINGFACE_TOKEN)
except:
  warning = 'WARNING - failed to authenticate huggingface'
  print(warning)
  running_logging_text += warning + LINE_BREAK
  # this should NOT return early due to this error, as the huggingface authentication is not necessary to use the public model we're using

In [None]:
# initialize the ViT model
try:
  # Load the feature extractor for the vision transformer
  feature_extractor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224')
  # Load the pre-trained weights from vision transformer
  model = ViTForImageClassification.from_pretrained('google/vit-base-patch16-224')
except:
  print('ERROR - failed to initialize ViT model')
  running_logging_text += 'ERROR - failed to initialize ViT model'
  logging_add(running_logging_text)
  # this is a critical failure, so return early here

In [None]:
# next we need to retrieve our cached posts from the dynamodb, so log into dynamodb here
try:
  dynamodb = aws_session.resource(DDB)
  table = dynamodb.Table(DDB_TABLE)
except:
  err = 'ERROR - failed to get dynamo db table'
  print(err)
  running_logging_text += err
  logging_add(running_logging_text)
  # return with error, we cant add the results to the db and we can't retrieve the cache (which isn't as important, but still)

In [None]:
# pull the cache of post CIDs seen in the previous run from dynamodb - if we fail any step here just leave a warning that we couldn't check the cache
cached_posts = set() # in case the ddb fails to retrieve, initialize an empty set
ddb_response = {} # same reason
seen_posts = set() # initialize the seen posts set, we'll be using it to replace our cache at the end of the run
try:
  ddb_response = table.get_item(
      Key={'DOW': DDB_CACHE_KEY},
  )
except ClientError as e:
  warning = f"WARNING - failed to check post cache key's existence: {e}"
  print(warning)
  running_logging_text += warning + LINE_BREAK

# print('ddb response:', ddb_response)
# this if else checks to see if there is anything
if 'Item' not in ddb_response:
  # check the status code to skip a redundant warning, if we errored out before there will be no key
  if 'HTTPStatusCode' in ddb_response and ddb_response['HTTPStatusCode'] == 200:
    warning = 'WARNING - successful response from dynamodb but there were no items in the post cache key.'
    print(warning)
    running_logging_text += warning + LINE_BREAK
else:
  if DDB_CACHE_ATTRIBUTE in ddb_response['Item']:
    cached_posts = ddb_response['Item'][DDB_CACHE_ATTRIBUTE]
    print(f'imported {len(cached_posts)} prior seen posts from the dynamodb table')
  else:
    warning = 'WARNING - somehow there were items in the dynamodb cache key, but the attribute for cached posts was not present'
    print(warning)
    running_logging_text += warning + LINE_BREAK

In [None]:
# and now we can log into the bluesky client
try:
  client = Client()
  client.login(BSKY_USERNAME, BSKY_PASSWORD)
except:
  err = 'ERROR - failed to log in to the bluesky client'
  print(err)
  running_logging_text += err
  logging_add(running_logging_text)
  # return here, cannot proceed without bluesky

In [None]:
# this is our code to identify post images as catposts

# 281: 'tabby, tabby cat'
# 282: 'tiger cat', 283: 'Persian cat', 284: 'Siamese cat, Siamese', 285: 'Egyptian cat', 286: 'cougar, puma, catamount, mountain lion, painter, panther, Felis concolor', 287: 'lynx, catamount', 288: 'leopard, Panthera pardus', 289: 'snow leopard, ounce, Panthera uncia', 290: 'jaguar, panther, Panthera onca, Felis onca', 291: 'lion, king of beasts, Panthera leo', 292: 'tiger, Panthera tigris', 293: 'cheetah, chetah, Acinonyx jubatus',
# 281 to 293
cat_labels = set()
for i in range(281, 294):
  cat_labels.add(i)

# these labels are to remove drawings, memes/reposts, and images with a lot of text respectively
bad_labels = {
917 : 'comic book', 916 : 'web site, website, internet site, site', 921 : 'book jacket, dust cover, dust jacket, dust wrapper'}

def test_bsky_image(url):
  f = urllib.request.urlopen(url)
  image = plt.imread(f, format='jpeg')
  # plt.imshow(image)
  inputs = feature_extractor(images=image, return_tensor="pt")
  pixel_values = inputs["pixel_values"]
  pixel_values = np.array(pixel_values)
  pixel_values = torch.tensor(pixel_values)
  outputs = model(pixel_values)
  logits = outputs.logits
  predicted_class_idx = logits.argmax(-1).item()
  sorted_preds = torch.argsort(logits, descending=True)[0]
  top_predictions = [sorted_preds[i].item() for i in range(50)] # 50 is semi-arbitrary based on our findings from testing pics # could see tuning this down to 40 but can't tell if it would pick up more or less cats
  top_values = [logits[0][pred].item() for pred in top_predictions]
  # print('label predictions', top_predictions)
  # print('values of predictions', top_values)
  found_cat_label = -1
  found_bad_label = -1
  bad_labels_found = []
  cat_score = 0
  for i, pred in enumerate(top_predictions):
    predicted_class = model.config.id2label[pred]
    # print(predicted_class)
    if pred in cat_labels:
      if found_cat_label == -1:
        found_cat_label = i
      cat_score += top_values[i]
    if pred in bad_labels:
      if found_bad_label == -1:
        found_bad_label = i
      bad_labels_found.append(pred)
      bad_labels_found.append(bad_labels[pred])
      cat_score -= top_values[i]
  # print(' ')
  print('    found cat label:', found_cat_label)
  print('    found bad label:', found_bad_label, bad_labels_found)
  would_pass = found_cat_label >= 0 and found_bad_label < 0
  # print('AI cat score: ', cat_score)
  # print('    passed cat test:', would_pass)
  return would_pass

In [None]:
# just getting a previous count of our followers and following for the logs
try:
  following = client.get_profile(actor=BSKY_USERNAME).follows_count
  followers = client.get_profile(actor=BSKY_USERNAME).followers_count
  prev_stats = f'prior followers: {str(followers)} | previously following: {str(following)}'
  print(prev_stats)
  running_logging_text += prev_stats + LINE_BREAK
except:
  warning = 'WARNING - failed to get previous following and followers count'
  print(warning)
  running_logging_text += warning + LINE_BREAK

In [None]:
# after successfully identifying a cat post, this function likes the post and follows the user, returning the follow uri string
def like_post_and_add_user(post):
  user_did = post.author.did
  post_cid = post.cid
  post_uri = post.uri
  followed_user = ''
  try:
    followed_user = client.follow(user_did).uri
    liked_post = client.like(uri=post_uri, cid=post_cid).uri
    print(f'      ✓✓✓ ✅ Successfully liked post and followed user: {post.author.handle}')
  except:
    print(f'      ✓✓✗ ❌ failed at either liking post or following user: {post.author.handle}')
  return followed_user

In [None]:
def get_post_follow_likers(post_uri, like_count, users_followed, max_new_followers):
  # need to try opening post, get the list of likers, iterate through following them, add each to the users added
  new_follows_count = 0
  likes_remaining = like_count
  try:
    while likes_remaining > 0:
      if (likes_remaining > 100):
        print(f'        Starting new page of likes. {likes_remaining} likes remaining to check on this post.')
      limit = min(likes_remaining, 100)
      likes_remaining -= limit
      next_page = ''
      response = client.get_likes(uri = post_uri, limit= limit, cursor= next_page)
      likes = response.likes
      next_page = response.cursor

      for like in likes:
        you_follow_them = like.actor.viewer.following
        you_are_followed_by = like.actor.viewer.followed_by
        user_did = like.actor.did
        user_handle = like.actor.handle
        if you_follow_them or you_are_followed_by or user_did == MY_DID or user_did in users_followed:
          print(f'        Already seen user. handle: {user_handle}')
          continue
        else:
          # like the user
          follow_uri = client.follow(user_did).uri
          date_added = DATETIME_NOW
          follows_you = 'FALSE'
          users_followed.append(user_did) # now we only need to save the user_did in the set instead of the whole string, and so we don't need a whole second already added dids set
          print(f'        Followed post-liker. handle: {user_handle}')
          new_follows_count += 1
          if new_follows_count >= max_new_followers:
            break
      # print(f'from {len(likes)} likes on this post you followed {new_followers_count}, saw {follow_them_count} users you already follow, and saw {followed_by_count} users that already follow you')
  except Exception as e:
    print(f'ERROR: THERE WAS AN ISSUE CHECKING THIS POST FOR LIKES. \n{e}')
  return new_follows_count

In [None]:
def createPostUrl(feed_post):
  url_handle = feed_post.post.author.handle
  url_ending_index = feed_post.post.uri.find('.feed.post/') + 11
  url_ending = feed_post.post.uri[url_ending_index : ]
  return URL_BEGIN + url_handle + URL_POST + url_ending

def follow_more_users(post_count, follows_count, feed):
  if post_count == 0 or follows_count == 0:
    return []
  posts_to_check = post_count
  successful_cat_post_like_count = 3
  max_errors_allowed = 5
  next_page = ''
  new_follow_count_from_posts = 0
  new_follow_count_from_likes = 0
  page_count = 0
  users_followed = set()

  logging_posts = 0
  logging_pics = 0
  logging_errors_count = 0
  logging_errors_description = []
  logging_notcat = 0
  logging_cat = 0
  logging_vid = 0
  logging_nomedia = 0
  logging_alreadyfollowed = 0
  logging_mutuals = 0
  logging_myposts = 0
  logging_seenpost = 0
  global running_logging_text
  running_logging_text += f'Feed {FEED_NAME[feed]}:' + LINE_BREAK

  def log_results():
    global running_logging_text
    sum_new_follows = new_follow_count_from_posts + new_follow_count_from_likes
    sum_skipped_posts = logging_seenpost + logging_alreadyfollowed + logging_myposts
    sum_unprocessed = logging_nomedia + logging_vid
    print(f'followed {sum_new_follows} new users. {new_follow_count_from_posts} posters and {new_follow_count_from_likes} likers.')
    running_logging_text += f'  Followed {sum_new_follows} new user{"s" if sum_new_follows != 1 else ""}{"!" if sum_new_follows > 0 else "."}' + LINE_BREAK
    running_logging_text += f'    Of those follows, {new_follow_count_from_posts} were posters and {new_follow_count_from_likes} were from likes.' + LINE_BREAK
    running_logging_text += f'  {logging_posts} posts in total were viewed during this run.' + LINE_BREAK
    running_logging_text += f'  Skipped Posts: ({sum_skipped_posts}) - {logging_seenpost} posts were previously seen, {logging_alreadyfollowed} were from users already followed, {logging_myposts} were your posts.' + LINE_BREAK
    running_logging_text += f'  Mutuals: {logging_mutuals} posts were from users that follow you, and these posts were liked.' + LINE_BREAK
    running_logging_text += f'  Unprocessed: ({sum_unprocessed}) - {logging_nomedia} posts had no media attached, and {logging_vid} posts had videos attached.' + LINE_BREAK
    running_logging_text += f'  Processed: {logging_pics} posts had pics attached: {logging_cat} were identified as cat pics and {logging_notcat} were not cats.' + LINE_BREAK
    running_logging_text += f'  {"No errors were encountered while processing pics." if logging_errors_count == 0 else str(logging_errors_count) + " ERROR(S) ENCOUNTERED PROCESSING PICS FROM THIS FEED"} ' + LINE_BREAK
    if logging_errors_count > 0:
      running_logging_text += ';; \n'.join(logging_errors_description) + LINE_BREAK

  while posts_to_check > 0:
    print(f'[checking page {page_count} of feed {FEED_NAME[feed]}, {posts_to_check} posts left to check, and have found {new_follow_count_from_posts + new_follow_count_from_likes} new users to follow]')
    page_count += 1
    limit = min(posts_to_check, 100)
    posts_to_check -= limit
    try:
      # print('next page', next_page)
      data = client.app.bsky.feed.get_feed({
          'feed': 'at://did:plc:jfhpnnst6flqway4eaeqzj2a/app.bsky.feed.generator/cats',
          'limit': limit,
          'cursor': next_page
      }, headers={})
      next_page = data.cursor
      # print(data)

      for i, f in enumerate(data.feed):
        you_follow_them = f.post.author.viewer.following
        you_are_followed_by = f.post.author.viewer.followed_by
        did = f.post.author.did
        post_cid = f.post.cid
        seen_posts.add(post_cid) # now that we're using a dynamodb cache we want to add the cid to the seen posts set regardless of what happens to it so that it's saved for next run

        logging_posts += 1
        if did == MY_DID:
          print(f'{i} - 😎 skipped. This was your own post.')
          logging_myposts += 1
          continue
        elif post_cid in cached_posts or post_cid in seen_posts:
          print(f'{i} - 👀 skipped. Post with cid {post_cid} has already been viewed. Found in {"db cache" if post_cid in cached_posts else "current set"}.')
          logging_seenpost += 1
          continue
        else:
          # TODO: the way I have it if you are following them you never check the photo to see if it's a good one to get the likes from.
          if you_follow_them and you_are_followed_by:
            print(f'{i} 💕 user: {f.post.author.handle} is a mutual follower. Liking this post. {createPostUrl(f)}')
            # this can break if you get rate limited. So far hasn't broken when posts are deleted but should have been wrapped in one just in case
            try:
              liked_post = client.like(uri=f.post.uri, cid=f.post.cid).uri
            except Exception as e:
              logging_errors_count += 1
              print(f'    ✓✗ ‼️ liking post {i} caused an error. {logging_errors_count} errors seen this run.\n{e}')
              logging_errors_description.append(f'{i}. {e}')
              # got rate limited and know that if you hit like 100 errors or so you'll eventually get a timeout and the entire nootebook will be borked.
              if logging_errors_count >= max_errors_allowed:
                print(f'seen more errors ({logging_errors_count}) than the acceptable number of errors ({max_errors_allowed}). terminating run.')
                log_results()
                return users_followed
            logging_mutuals += 1
          elif did in already_added_dids:
            print(f'{i} ✗ 👀 user: {f.post.author.handle} was already followed in this batch.')
            logging_alreadyfollowed += 1
          elif you_follow_them or you_are_followed_by:
            print(f'{i} ✗ 👀 user: {f.post.author.handle} {"already follows you." if you_are_followed_by else ""}{"is already being followed." if you_follow_them else ""}')
            logging_alreadyfollowed += 1
          elif not f.post.embed or f.post.embed.py_type != EMBEDDED_PIC:
            if f.post.embed.py_type == EMBEDDED_VID:
              print(f'{i} ✗ 🎥 video post: {createPostUrl(f)}')
              logging_vid += 1
            else:
              print(f'{i} ✗ 🔲 no pic for post {i}')
              logging_nomedia += 1
          else:
            print(i, '✓', '📷', f.post.embed.images[0].fullsize)
            print(f'    post: {createPostUrl(f)}')
            logging_pics += 1
            try:
              handle = f.post.author.handle
              print(f'    user: {handle}')
              is_cat = test_bsky_image(f.post.embed.images[0].fullsize)
              if is_cat:
                print(f'    ✓✓ 😺 successfully found cat pic at post {i}. It has {f.post.like_count} likes.')
                new_follow_count_from_posts += 1
                logging_cat += 1
                follow_uri = like_post_and_add_user(f.post)
                date_added = DATETIME_NOW
                follows_you = 'FALSE'
                users_followed.add(did)
                # so we have a cat post. If it is a solid or particularly good cat post it should probably have a lot of likes, and we can go in and follow all those likers
                if f.post.like_count >= successful_cat_post_like_count:
                  print(f'      👍🏻 This cat post got {f.post.like_count}, and I would call it successful, so following its likers.')
                  likers_added = get_post_follow_likers(f.post.uri, f.post.like_count, users_followed, follows_count - (new_follow_count_from_posts + new_follow_count_from_likes))
                  print(f'      {"✅" if likers_added > 0 else "0️⃣"} Added {likers_added} users that liked that post.')
                  new_follow_count_from_likes += likers_added
                if new_follow_count_from_posts + new_follow_count_from_likes >= follows_count:
                  print(f'Successfully followed the desired number of new users! breaking out of loop.')
                  break
              else:
                print(f'    ✓✗ ❌ post {i} was not a cat pic')
                logging_notcat += 1
            except Exception as e:
              logging_errors_count += 1
              print(f'    ✓✗ ❓ post {i} image caused an error. {logging_errors_count} errors seen this run.\n{e}')
              print(f'errors ({logging_errors_count}), acceptable number of errors ({max_errors_allowed}).')
              logging_errors_description.append(f'{i}. {e}')
              # got rate limited and know that if you hit like 100 errors or so you'll eventually get a timeout and the entire notebook will be borked.
              if logging_errors_count >= max_errors_allowed:
                print(f'seen more errors ({logging_errors_count}) than the acceptable number of errors ({max_errors_allowed}). terminating run.')
                log_results()
                return users_followed
    except Exception as e:
      print(f'error encountered from trying to get feed. terminating run.\n{repr(e)}: {e}')
      logging_errors_count += 1
      logging_errors_description.append('CRITICAL ERROR ENCOUNTERED WHILE GETTING FEED:')
      logging_errors_description.append(f'{repr(e)}: {e}')
      log_results()
      return users_followed
  log_results()
  return users_followed

    # print(data.feed[0].post.embed.images[0].fullsize)

In [None]:
# FINALLY THE ACTUAL RUN! determine whether you're checking the caturday feed or the regular cat feed
is_caturday = dow == CATURDAY_DOW
followed_users = set()
if is_caturday:
  print("IT'S CATURDAY! Checking the Caturday feed for new followers.")
  followed_users = follow_more_users(POSTS_CATURDAY, FOLLOWS_CATURDAY, FEED_CATURDAY)
else:
  print("Just a regular day, but we're still following more cats. :3")
  followed_users = follow_more_users(POSTS_OTHERCAT, FOLLOWS_OTHERCAT, FEED_SIAMESE)

# just in case we said we followed ourselves somehow, we'll discard that value
followed_users.discard(MY_DID)
# and add the results to our dynamodb
if len(followed_users) > 0:
  try:
    table.update_item(
        Key={'DOW': ddb_key},
        UpdateExpression='SET #attr = :val',
        ExpressionAttributeNames={
            '#attr': str_timestamp
        },
        ExpressionAttributeValues={
            ':val': followed_users
        }
    )
  except ClientError as e:
    err = f'ERROR - failed to store followed users in dynamodb.\n{e}'
    print(err)
    running_logging_text += err

In [None]:
# now we also need to update the cached posts with what we saw this run
try:
  table.update_item(
      Key={'DOW': DDB_CACHE_KEY},
      UpdateExpression='SET #attr = :val',
      ExpressionAttributeNames={
          '#attr': DDB_CACHE_ATTRIBUTE
      },
      ExpressionAttributeValues={
          ':val': seen_posts
      }
  )
except ClientError as e:
  warning = f'WARNING - failed to store followed users in dynamodb.\n{e}'
  print(warning)
  running_logging_text += warning + LINE_BREAK

In [None]:
end_timestamp = datetime.datetime.now(zoneinfo.ZoneInfo(USER_TIMEZONE))
time_diff = end_timestamp - cur_timestamp
running_logging_text += f'time diff: {str(time_diff)} | completed run at: {str(end_timestamp)}' + LINE_BREAK
print(f'end timestamp: {end_timestamp}\ntime diff (runtime): {time_diff}')

In [None]:
# now get an updated count of our followers and following for the logs
try:
  following = client.get_profile(actor=BSKY_USERNAME).follows_count
  followers = client.get_profile(actor=BSKY_USERNAME).followers_count
  cur_stats = f'cur followers: {str(followers)} | now following: {str(following)}'
  print(cur_stats)
  running_logging_text += cur_stats + LINE_BREAK
except:
  warning = 'WARNING - failed to get updated following and followers count'
  print(warning)
  running_logging_text += warning + LINE_BREAK

In [None]:
# a successful run! add our logging string
logging_add(running_logging_text)

In [None]:
# and for debugging purposes, I want to scan the dynamodb table to see how it's looking now.
# THIS SHOULD NOT BE IN THE FINAL CODE
print(table.scan())