# Notebook de récupération des données de Bluesky

Objectifs du notebook :

- Utilise la librairie atproto pour récuperer les données de Bluesky
- Inclure uniquement des post compris dans l'intervalle de temps des données de X
- Filtrer pour langue française uniquement
- Réunir les données avec une structure similaire au jeu de données de X

[Lien DrawDB](https://drawdb.vercel.app/editor?shareId=e6c18b8ae53063fa1dfa9cc8a849605f)

In [160]:
from time import sleep
from rich import print
from tqdm import tqdm
import pandas as pd
import atproto as at
import json
import os

### Création des DataFrames finaux vides, avec typage

In [184]:
bsky_post_df = pd.DataFrame(
    {
        "post_id": pd.Series(dtype="str"),
        "user_id": pd.Series(dtype="str"),
        "lang": pd.Series(dtype="str"),
        "text": pd.Series(dtype="str"),
        "date": pd.Series(dtype="datetime64[ns]"),
        "like_count": pd.Series(dtype="int"),
        "reply_count": pd.Series(dtype="int"),
        "retweet_count": pd.Series(dtype="int"),
        "quote_count": pd.Series(dtype="int"),
    }
)

bsky_user_df = pd.DataFrame(
    {
        "user_id": pd.Series(dtype="str"),
        "name": pd.Series(dtype="str"),
        "bio": pd.Series(dtype="str"),
        "followers_count": pd.Series(dtype="int"),
        "follows_count": pd.Series(dtype="int"),
    }
)

print(bsky_post_df.dtypes)
bsky_post_df.to_parquet("./data/bsky_post.parquet", index=None)

print(bsky_user_df.dtypes)
bsky_user_df.to_parquet("./data/bsky_user.parquet", index=None)

## Connection à l'API de Bluesky

La connection à l'API est libre et gratuite. Elle passe par le protocole décentralisée ATproto sur lequel repose le réseau Bluesky.

Si vous souhaitez lancer vous même le notebook, assurez vous de remplir le fichier `bsky_credentials.json` comme suit :
```json
{
    "login": "", // Votre login finissant par ".bsky.social"
    "password": "", // Votre mot de passe Bluesky
    "session_string": "" // Le code de session permet de ne pas surcharger de requête le réseau, il est automatiquement rempli après une première connexion via le notebook
}
```

In [162]:
if os.path.exists("bsky_credentials.json"):
    with open("bsky_credentials.json", "r") as f:
        credentials = json.load(f)
        BLUESKY_LOGIN = credentials["login"]
        BLUESKY_PASSWORD = credentials["password"]
        SESSION_STRING = credentials["session_string"]

client = at.Client()
client.login(login=BLUESKY_LOGIN, password=BLUESKY_PASSWORD, session_string=SESSION_STRING if SESSION_STRING else None)

if not SESSION_STRING:
    with open("bsky_credentials.json", "w") as f:
        credentials = {
            "login": BLUESKY_LOGIN,
            "password": BLUESKY_PASSWORD,
            "session_string": client.export_session_string(),
        }
        json.dump(credentials)

## A quoi ressemble une requête de recherche ?

Via bsky.feed.search_posts il est possible de demander au serveur des recherches avec un certain nombre de paramètres indiqué [ici](https://atproto.blue/en/latest/atproto/atproto_client.models.app.bsky.feed.search_posts.html)

In [None]:
request_parameters = {
    "q": "twitter",  
    "limit": 2,
    "lang": "fr",
    "cursor": None,
}

request = client.app.bsky.feed.search_posts(request_parameters)
print(dict(request))

D'après ce schéma de réponse, il faut donc :

- Formuler le dictionnaire de paramètres pour la requête que nous souhaitons faire
- Extraire du dictionnaire de réponses les données dont nous avons besoin

## Fonctions utilitaires de nettoyage des requêtes

In [174]:
def PostView_to_df(post):
    post = dict(post)
    for key, value in post.items():
        if value == "":
            raise ValueError(f"Value for {key} is empty")
    new_post = pd.DataFrame(
        {
            "post_id": [str(post['uri'])],
            "user_id": [str(post['author']['did'])],
            "lang": [str(post['record']['langs'][0])],
            "text": [str(post['record']['text'])],
            "date": [pd.to_datetime(post['indexed_at'])],
            "like_count": [int(post['like_count'])],
            "reply_count": [int(post['reply_count'])],
            "retweet_count": [int(post['repost_count'])],
            "quote_count": [int(post['quote_count'])],
        }
    )
    return new_post

In [164]:
def UserDID_to_df(did):
    try:
        request = dict(client.app.bsky.actor.get_profile({"actor":did}))
    except Exception as e:
        return Exception
    new_user = pd.DataFrame(
        {
            "user_id": [str(request['did'])],
            "name": [str(request['handle'])],
            "bio": [str(request['description'])],
            "followers_count": [int(request['followers_count'])],
            "follows_count": [int(request['follows_count'])],
        }
    )
    return new_user

In [None]:
test_request = "did:plc:jexvuwrwe6ya6k6rn6fspabe"
print(test_request)
test_request = UserDID_to_df(test_request)
test_request.head()

Unnamed: 0,user_id,name,bio,followers_count,follows_count
0,did:plc:jexvuwrwe6ya6k6rn6fspabe,zaizaidansleravin.bsky.social,Gay lesbiennes pd gouines…\n\nMon Twitter http...,17,57


## Loop principal de récupération des données

Peu d'infos sont disponibles sur les limitations de requêtes API pour la recherche de posts et d'utilisateurs. Le processus sera donc long pour éviter toute requête erronée. Sachant que chaque requête est potentiellement dédoublée pour obtenir à la fois le post et son auteur.

On définira d'abord la requête de manière proche de dataset disponible pour X afin d'avoir une comparaison plus pertinente, ces paramètres sont décrit à [l'annexe 2 de l'article de recherche](https://arxiv.org/html/2411.00376v1). Les termes de recherches ont été adaptés, certains traduits, afin d'obtenir plus de résultats en langue française.

Puis, la cellule suivante pourra être lancée incrémentalement pour ajouter des données au dataset. La limite de l'API supposée est de 3000/5min, soit 10/s.

In [171]:
since_date = pd.Timestamp("2024-05-01").strftime("%Y-%m-%dT%H:%M:%SZ")
until_date = pd.Timestamp("2024-08-01").strftime("%Y-%m-%dT%H:%M:%SZ")
terms = [
    "election 2024", "election US", "Biden", "Biden 2024", "Trump",
    "Donald Trump", "Joe Biden", "Kamala Harris", "Harris", "GOP",
    "Joseph Biden", "Nikki Haley", "RNC", "Ron DeSantis", "trump2024",
    "democrates", "Marianne Williamson", "Dean Phillips", "williamson2024",
    "phillips2024", "republicain", "Green party", "RFK Jr.",
    "Robert F. Kennedy Jr.", "Jill Stein", "Stein", "Cornel West",
    "ultramaga", "trumptrain", "voteblue2024", "vote blue",
    "bidenharris2024", "makeamericagreatagain", "Vivek Ramaswamy", "J.D. Vance", "Vance"
]
query = " || ".join(f'"{term}"' for term in terms)

request_parameters = {
    "q": query,
    "limit": 1,
    "lang": "fr",
    "since": since_date,
    "until": until_date,
    "sort": "top"
}

In [None]:
TO_ADD = 10000 # The number of posts to add to the dataset
bsky_post_df = pd.read_parquet("./data/bsky_post.parquet")
bsky_user_df = pd.read_parquet("./data/bsky_user.parquet")
wait_time = 0.1 # The time to wait between each request, can change if encountering errors
cursor = None
content = None

pbar = tqdm(range(TO_ADD), total=TO_ADD)
for i in pbar:
    request_parameters['cursor'] = cursor
    try:
        sleep(wait_time)
        raw_request = client.app.bsky.feed.search_posts(request_parameters)
        post_request = dict(raw_request)
        wait_time = 0.1
        cursor = post_request['cursor']
    except Exception as e:
        print(f"Error to fetch post: {e}")
        wait_time = wait_time * 2
        sleep(wait_time)
        i = i - 1
        continue
    
    for post in post_request['posts']:
        new_post_df = PostView_to_df(post)
        # Skip if post processing failed
        if new_post_df is None:
            i = i - 1
            continue
        if new_post_df["lang"].iloc[0] != "fr":
            i = i - 1
            continue

        post_id = new_post_df["post_id"].iloc[0]
        # Check if post already exists in the DataFrame
        is_new_post = not bsky_post_df["post_id"].isin([post_id]).any() if not bsky_post_df.empty else True
        
        if is_new_post:
            # Add new post
            bsky_post_df = pd.concat([bsky_post_df, new_post_df], ignore_index=True)
            
            # Try to fetch user data
            try:
                sleep(wait_time)
                wait_time = 0.1
                user_id = str(new_post_df["user_id"].iloc[0])
                new_user_df = UserDID_to_df(user_id)

            
                # Check if user already exists in the DataFrame
                is_new_user = not bsky_user_df["user_id"].isin([user_id]).any() if not bsky_user_df.empty else True
                
                if is_new_user:
                    bsky_user_df = pd.concat([bsky_user_df, new_user_df], ignore_index=True)
                
            except Exception as e:
                print(f"Error to fetch profile: {e}")
                wait_time = wait_time * 2
                sleep(wait_time)
                i = i - 1
                continue
    last_date = new_post_df["date"].iloc[0]
    pbar.set_postfix({"Latest post": last_date, "Latest cursor": cursor, "Current wait time": wait_time})
    bsky_post_df.to_parquet("./data/bsky_post.parquet", index=None)
    bsky_user_df.to_parquet("./data/bsky_user.parquet", index=None)

print(f"Latest cursor: {cursor}")
print(f"Total posts: {len(bsky_post_df)}")
print(f"Total users: {len(bsky_user_df)}")
print(bsky_user_df.head())
bsky_post_df.head()

  0%|          | 0/1000 [00:00<?, ?it/s]

  4%|▍         | 43/1000 [00:27<10:16,  1.55it/s, Latest post=2024-05-31 19:36:07.485000+00:00, Latest cursor=43, Current wait time=0.1]


KeyboardInterrupt: 