In [140]:
from commit import CommitWrapperEventModel, PostRecordModel, FollowRecordModel, LikeRecordModel, RepostRecordModel
from pydantic import BaseModel, Field
import json
import copy
import datetime
from collections import defaultdict
from itertools import combinations
from tqdm.notebook import tqdm
import time
from typing import List, Any

In [141]:
def transform_events(events: List[CommitWrapperEventModel]) -> List[dict]:
    """
    Transforms a list of CommitWrapperEventModel objects into a list of
    flat dictionaries based on the specified CREATE STREAM schema.
    """
    output_list = []
    for event in events:
        # 1. Get the record and event time
        record = event.commit.record
        
        # Convert microsecond timestamp to a datetime object
        commit_time_dt = datetime.datetime.fromtimestamp(
            event.time_us / 1_000_000, tz=datetime.timezone.utc
        )

        # 2. Create the base dictionary common to all event types
        base_dict = {
            "did": event.did,
            "kind": event.kind,
            "time": int(record.createdAt.timestamp() * 1e9),  # This is the primary_time
            "time_str": record.createdAt.__str__(), # Converted from time_us
            "cid": event.commit.cid,
            "operation": event.commit.operation,
            "record_type": record.record_type,
        }

        # 3. Add specific fields based on the record type
        record_specific_dict = {}
        event_type = "UNKNOWN"
        if record.record_type == "app.bsky.feed.post":
            # Ensure type checker knows this is a PostRecordModel
            event_type = "CreatePost"
            assert isinstance(record, PostRecordModel) 
            record_specific_dict = {
                "langs": record.langs,
                "text": record.text + '.',
            }
        elif record.record_type == "app.bsky.graph.follow":
            assert isinstance(record, FollowRecordModel)
            event_type = "CreateFollow"
            record_specific_dict = {
                "subject": record.subject,
            }
        elif record.record_type == "app.bsky.feed.like":
            assert isinstance(record, LikeRecordModel)
            event_type = "CreateLike"
            record_specific_dict = {
                "subject_cid": record.subject.cid,
                "subject_uri": record.subject.uri,
            }
        elif record.record_type == "app.bsky.feed.repost":
            assert isinstance(record, RepostRecordModel)
            event_type = "CreateRepost"
            record_specific_dict = {
                "subject_cid": record.subject.cid,
                "subject_uri": record.subject.uri,
            }
        else:
            # Skip any unknown record types
            continue
            
        # 4. Combine base and specific dicts and add to list
        # Using {**a, **b} for compatibility (Python 3.5+)
        final_dict = {"event_type": event_type, **base_dict, **record_specific_dict}
        output_list.append(final_dict)

    return output_list

In [142]:
with open("bluesky_cached_models.json") as f:
    data = json.load(f)
    models = []
    for model in data:
        model["commit"]["record"]["$type"] = model["commit"]["record"][
            "record_type"
        ]
        del model["commit"]["record"]["record_type"]
        model = CommitWrapperEventModel.model_validate(model)
        if model.commit.record.record_type == "app.bsky.feed.post":
            model.commit.record.langs = ' '.join(model.commit.record.langs)
            model.commit.record.text = model.commit.record.text.replace(',', '')
            model.commit.record.text = model.commit.record.text.replace('\n', '')
            model.commit.record.text = model.commit.record.text.strip()
        # if model.did != 'did:plc:7pjwjzhppos2ljh3pcpiqtfr':
        #     continue
        models.append(model)

In [143]:
ordered_primary_time_models = sorted(models, key=lambda model: model.commit.record.createdAt)
ordered_received_models = sorted(models, key=lambda model: model.received_time)

In [144]:
transformed_models = transform_events(ordered_primary_time_models)

time = models[0].received_time
with open('bluesky_ordered.corecsv', 'w', newline='') as f:
    f.write('asdasd\n')
    for idx, model in enumerate(transformed_models):
        next_time = models[idx].received_time
        if time > next_time:
            time_diff_nanoseconds = 0
        else:
            time_diff_nanoseconds = (next_time - time).microseconds
        # f.write(f"{str(time_diff_nanoseconds  * 1000)}\n")
        vals = [str(val) for val in model.values()]
        vals = ','.join(vals)
        f.write(f"{vals}\n")
        time = next_time

In [145]:
transformed_models = transform_events(ordered_received_models)

time = models[0].received_time
with open('bluesky_unordered.corecsv', 'w', newline='') as f:
    f.write('asdasd\n')
    for idx, model in enumerate(transformed_models):
        next_time = models[idx].received_time
        if time > next_time:
            time_diff_nanoseconds = 0
        else:
            time_diff_nanoseconds = (next_time - time).microseconds
        f.write(f"{str(time_diff_nanoseconds  * 1000)}\n")
        vals = [str(val) for val in model.values()]
        vals = ','.join(vals)
        f.write(f"{vals}\n")
        time = next_time

# Query 1

/*Detect Bot Accounts (3 posts in 10 seconds)*/

SELECT *

FROM Bluesky

WHERE CreatePost ; CreatePost ; CreatePost

PARTITION BY [did]

WITHIN 10 SECONDS

In [240]:
messages_by_did_in_window = defaultdict(list)
outputs = []

assert sorted(ordered_primary_time_models, key=lambda model: model.commit.record.createdAt) == ordered_primary_time_models, [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in models]

for model in tqdm(ordered_primary_time_models):
    # time.sleep(0.001)
    if model.commit.operation != "create":
        continue
    if model.commit.record.record_type != "app.bsky.feed.post":
        continue
    primary_time = model.commit.record.createdAt
    did = model.did
    while len(messages_by_did_in_window[did]) != 0 and (primary_time - messages_by_did_in_window[did][0].commit.record.createdAt) > datetime.timedelta(seconds=10):
        messages_by_did_in_window[did].pop(0)
            
    messages_by_did_in_window[did].append(model)

    assert sorted(messages_by_did_in_window[did], key=lambda model: model.commit.record.createdAt) == messages_by_did_in_window[did], [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in messages_by_did_in_window[did]]
    
    # output
    if len(messages_by_did_in_window[did]) >= 3:
        valid = messages_by_did_in_window[did][:-1][::-1]
        for comb in combinations(valid, 2):
            output = [*comb, model]
            assert (output[-1].commit.record.createdAt - output[0].commit.record.createdAt) <= datetime.timedelta(seconds=10), f"{(output[-1].commit.record.createdAt - output[0].commit.record.createdAt)}"
            outputs.append(output)
                          
                          
    # if len(outputs) >= 10:
    #     break

  0%|          | 0/20000 [00:00<?, ?it/s]

In [241]:
len(outputs)

3963

In [242]:
for output in outputs:
    m1, m2, m3 = output
    assert m3.received_time - m1.received_time <= datetime.timedelta(seconds=10)
    assert m1.did == m2.did == m3.did

# Quarantine

In [243]:
drop_ids = []
not_drop_ids = [] 
with open('../../../../logs/logfile_20251029_152154.log', 'r') as f:
    for line in f.read().splitlines():
        if "Not dropping event" in line:
            event_num = line.split()[-1]
            not_drop_ids.append(int(event_num))
        if "Dropping event" in line:
            event_num = line.split()[-1]
            drop_ids.append(int(event_num))

In [244]:
print(len(drop_ids), len(not_drop_ids))

436 18564


In [245]:
assert len(set(drop_ids) & set(not_drop_ids)) == 0
drop_ids.extend(range(0, 1000))
assert len(drop_ids) + len(not_drop_ids) == len(ordered_received_models)

In [246]:
ordered_received_models_with_quarantine = [model for c, model in enumerate(ordered_received_models) if c in not_drop_ids]

In [247]:
ordered_primary_time_models_with_quarantine = sorted(ordered_received_models_with_quarantine, key=lambda model: (model.commit.record.createdAt, model.received_time))

# Quarantined Query 1 (10 second bounded wait)

/*Detect Bot Accounts (3 posts in 10 seconds)*/

SELECT *

FROM Bluesky

WHERE CreatePost ; CreatePost ; CreatePost

PARTITION BY [did]

WITHIN 10 SECONDS

In [248]:
messages_by_did_in_window = defaultdict(list)
outputs = []

assert sorted(ordered_primary_time_models_with_quarantine, key=lambda model: model.commit.record.createdAt) == ordered_primary_time_models_with_quarantine, [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in ordered_primary_time_models_with_quarantine]

for model in tqdm(ordered_primary_time_models_with_quarantine):
    # time.sleep(0.001)
    if model.commit.operation != "create":
        continue
    if model.commit.record.record_type != "app.bsky.feed.post":
        continue
    primary_time = model.commit.record.createdAt
    did = model.did
    while len(messages_by_did_in_window[did]) != 0 and (primary_time - messages_by_did_in_window[did][0].commit.record.createdAt) > datetime.timedelta(seconds=10):
        messages_by_did_in_window[did].pop(0)
            
    messages_by_did_in_window[did].append(model)

    assert sorted(messages_by_did_in_window[did], key=lambda model: model.commit.record.createdAt) == messages_by_did_in_window[did], [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in messages_by_did_in_window[did]]
    
    # output
    if len(messages_by_did_in_window[did]) >= 3:
        valid = messages_by_did_in_window[did][:-1][::-1]
        for comb in combinations(valid, 2):
            output = [*comb, model]
            assert (output[-1].commit.record.createdAt - output[0].commit.record.createdAt) <= datetime.timedelta(seconds=10), f"{(output[-1].commit.record.createdAt - output[0].commit.record.createdAt)}"
            outputs.append(output)
                          
                          
    # if len(outputs) >= 10:
    #     break

  0%|          | 0/18564 [00:00<?, ?it/s]

In [249]:
len(outputs)

168

In [250]:
for output in outputs:
    m1, m2, m3 = output
    assert m3.commit.record.createdAt - m1.commit.record.createdAt <= datetime.timedelta(seconds=10)
    assert m1.did == m2.did == m3.did

In [251]:
other_outs = []
with open('../../../../out.txt', 'r') as f:
    other_outs = f.read().splitlines()

In [252]:
other_outs = other_outs[7:]
len(other_outs)

168

# Quarantined Query 2 (10 second bounded wait)

/*Detect Bot Accounts (3 posts in 10 seconds)*/

SELECT *

FROM Bluesky

WHERE CreatePost : CreatePost : CreatePost

PARTITION BY [did]

WITHIN 10 SECONDS

In [222]:
messages_by_did_in_window = defaultdict(list)
outputs = []

assert sorted(ordered_primary_time_models_with_quarantine, key=lambda model: model.commit.record.createdAt) == ordered_primary_time_models_with_quarantine, [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in ordered_primary_time_models_with_quarantine]

for model in tqdm(ordered_primary_time_models_with_quarantine):
    # time.sleep(0.001)
    if model.commit.operation != "create":
        continue
    if model.commit.record.record_type != "app.bsky.feed.post":
        continue
    primary_time = model.commit.record.createdAt
    did = model.did
    while len(messages_by_did_in_window[did]) != 0 and (primary_time - messages_by_did_in_window[did][0].commit.record.createdAt) > datetime.timedelta(seconds=10):
        messages_by_did_in_window[did].pop(0)
            
    messages_by_did_in_window[did].append(model)

    assert sorted(messages_by_did_in_window[did], key=lambda model: model.commit.record.createdAt) == messages_by_did_in_window[did], [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in messages_by_did_in_window[did]]
    
    # output
    if len(messages_by_did_in_window[did]) >= 3:
        valid = messages_by_did_in_window[did][:-1][::-1]
        for comb in combinations(valid, 2):
            output = [*comb, model]
            assert (output[-1].commit.record.createdAt - output[0].commit.record.createdAt) <= datetime.timedelta(seconds=10), f"{(output[-1].commit.record.createdAt - output[0].commit.record.createdAt)}"
            idxs = list(map(lambda messsage: messages_by_did_in_window[did].index(messsage), output))
            idxs.sort()
            if idxs != list(range(min(idxs), max(idxs) + 1)):
                continue
            outputs.append(output)
                          
                          
    # if len(outputs) >= 10:
    #     break

  0%|          | 0/18564 [00:00<?, ?it/s]

In [223]:
len(outputs)

37

In [224]:
for output in outputs:
    m1, m2, m3 = output
    assert m3.commit.record.createdAt - m1.commit.record.createdAt <= datetime.timedelta(seconds=10)
    assert m1.did == m2.did == m3.did

In [230]:
other_outs = []
with open('../../../../out.txt', 'r') as f:
    other_outs = f.read().splitlines()

In [229]:
other_outs = other_outs[7:]
len(other_outs)

37

# Quarantined Query 3 (10 second bounded wait)

SELECT *

FROM Bluesky

WHERE CreatePost ; CreatePost ; (CreatePost OR CreateLike)

PARTITION BY [did]

WITHIN 10 SECONDS

In [271]:
messages_by_did_in_window = defaultdict(list)
outputs = []

assert sorted(ordered_primary_time_models_with_quarantine, key=lambda model: model.commit.record.createdAt) == ordered_primary_time_models_with_quarantine, [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in ordered_primary_time_models_with_quarantine]

for model in tqdm(ordered_primary_time_models_with_quarantine):
    # time.sleep(0.001)
    if model.commit.operation != "create":
        continue
    if model.commit.record.record_type not in ["app.bsky.feed.post", "app.bsky.feed.like"]:
        continue
    primary_time = model.commit.record.createdAt
    did = model.did
    while len(messages_by_did_in_window[did]) != 0 and (primary_time - messages_by_did_in_window[did][0].commit.record.createdAt) > datetime.timedelta(seconds=10):
        messages_by_did_in_window[did].pop(0)
            
    messages_by_did_in_window[did].append(model)

    assert sorted(messages_by_did_in_window[did], key=lambda model: model.commit.record.createdAt) == messages_by_did_in_window[did], [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in messages_by_did_in_window[did]]
    
    # output
    if len(messages_by_did_in_window[did]) >= 3:
        valid = messages_by_did_in_window[did][:-1][::-1]
        for comb in combinations(valid, 2):
            output = [*comb, model]
            assert (output[-1].commit.record.createdAt - output[0].commit.record.createdAt) <= datetime.timedelta(seconds=10), f"{(output[-1].commit.record.createdAt - output[0].commit.record.createdAt)}"
            if output[0].commit.record.record_type != "app.bsky.feed.post" or output[1].commit.record.record_type != "app.bsky.feed.post":
                continue
            outputs.append(output)
                          
                          
    # if len(outputs) >= 10:
    #     break

  0%|          | 0/18564 [00:00<?, ?it/s]

In [272]:
len(outputs)

171

In [273]:
for output in outputs:
    m1, m2, m3 = output
    assert m3.commit.record.createdAt - m1.commit.record.createdAt <= datetime.timedelta(seconds=10)
    assert m1.did == m2.did == m3.did

In [274]:
other_outs = []
with open('../../../../out.txt', 'r') as f:
    other_outs = f.read().splitlines()

In [275]:
other_outs = other_outs[7:]
len(other_outs)

171

# Quarantined Query 4 (10 second bounded wait)

SELECT *

FROM Bluesky

WHERE CreatePost : CreatePost : (CreatePost OR CreateLike)

PARTITION BY [did]

WITHIN 10 SECONDS

In [260]:
messages_by_did_in_window = defaultdict(list)
outputs = []

assert sorted(ordered_primary_time_models_with_quarantine, key=lambda model: model.commit.record.createdAt) == ordered_primary_time_models_with_quarantine, [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in ordered_primary_time_models_with_quarantine]

for model in tqdm(ordered_primary_time_models_with_quarantine):
    # time.sleep(0.001)
    if model.commit.operation != "create":
        continue
    if model.commit.record.record_type not in ["app.bsky.feed.post", "app.bsky.feed.like"]:
        continue
    primary_time = model.commit.record.createdAt
    did = model.did
    while len(messages_by_did_in_window[did]) != 0 and (primary_time - messages_by_did_in_window[did][0].commit.record.createdAt) > datetime.timedelta(seconds=10):
        messages_by_did_in_window[did].pop(0)
            
    messages_by_did_in_window[did].append(model)

    assert sorted(messages_by_did_in_window[did], key=lambda model: model.commit.record.createdAt) == messages_by_did_in_window[did], [model.commit.record.createdAt.strftime("%Y-%m-%d %H:%M:%S") for model in messages_by_did_in_window[did]]
    
    # output
    if len(messages_by_did_in_window[did]) >= 3:
        valid = messages_by_did_in_window[did][:-1][::-1]
        for comb in combinations(valid, 2):
            output = [*comb, model]
            assert (output[-1].commit.record.createdAt - output[0].commit.record.createdAt) <= datetime.timedelta(seconds=10), f"{(output[-1].commit.record.createdAt - output[0].commit.record.createdAt)}"
            idxs = list(map(lambda messsage: messages_by_did_in_window[did].index(messsage), output))
            idxs.sort()
            if idxs != list(range(min(idxs), max(idxs) + 1)):
                continue
            if output[0].commit.record.record_type != "app.bsky.feed.post" or output[1].commit.record.record_type != "app.bsky.feed.post":
                continue
            outputs.append(output)
                          
                          
    # if len(outputs) >= 10:
    #     break

  0%|          | 0/18564 [00:00<?, ?it/s]

In [261]:
len(outputs)

38

In [262]:
for output in outputs:
    m1, m2, m3 = output
    assert m3.commit.record.createdAt - m1.commit.record.createdAt <= datetime.timedelta(seconds=10)
    assert m1.did == m2.did == m3.did

In [263]:
other_outs = []
with open('../../../../out.txt', 'r') as f:
    other_outs = f.read().splitlines()

In [264]:
other_outs = other_outs[7:]
len(other_outs)

38

# DEBUG

In [276]:
dids = set()
for output in outputs:
    dids.add(output[0].did)

In [277]:
other_filtered = defaultdict(list)
for did in dids:
    for other_out in other_outs:
        if did in other_out:
            other_filtered[did].append(other_out)

In [278]:
other_filtered.keys()

dict_keys(['did:plc:hrerjtl6cy24eellqmrdbl7h', 'did:plc:qk3fxv4s4ifs5fbgh4odv3b7', 'did:plc:p467xcx65s6aedpkzdocj52o', 'did:plc:4k3g3hazhzvdx5v35pg5x2ve', 'did:plc:duaatzbzy7qm4ppl2hluilpg', 'did:plc:o6ggjvnj4ze3mnrpnv5oravg', 'did:plc:layutr5agle2nkpntyykph7z', 'did:plc:paj6ipalc3udmxlestdq7qfd', 'did:plc:vgpa562ktrs7c3qnwlise3c7', 'did:plc:cni7pf6kpddowzq6ndlohm64', 'did:plc:yvl3tawvgtw7v5zv42zu6r65'])

In [279]:
filtered = defaultdict(list)
for did in dids:
    for out in outputs:
        if out[0].did == did:
            filtered[did].append(out)

In [280]:
filtered.keys()

dict_keys(['did:plc:hrerjtl6cy24eellqmrdbl7h', 'did:plc:qk3fxv4s4ifs5fbgh4odv3b7', 'did:plc:p467xcx65s6aedpkzdocj52o', 'did:plc:4k3g3hazhzvdx5v35pg5x2ve', 'did:plc:duaatzbzy7qm4ppl2hluilpg', 'did:plc:o6ggjvnj4ze3mnrpnv5oravg', 'did:plc:layutr5agle2nkpntyykph7z', 'did:plc:paj6ipalc3udmxlestdq7qfd', 'did:plc:vgpa562ktrs7c3qnwlise3c7', 'did:plc:cni7pf6kpddowzq6ndlohm64', 'did:plc:yvl3tawvgtw7v5zv42zu6r65'])

In [281]:
for did in dids:
    print("did: ", did, sep="")
    print("Len in other_filtered: ", len(other_filtered[did]), sep="")
    print("Len in filtered: ", len(filtered[did]), sep="")

did: did:plc:hrerjtl6cy24eellqmrdbl7h
Len in other_filtered: 84
Len in filtered: 84
did: did:plc:qk3fxv4s4ifs5fbgh4odv3b7
Len in other_filtered: 3
Len in filtered: 3
did: did:plc:p467xcx65s6aedpkzdocj52o
Len in other_filtered: 3
Len in filtered: 3
did: did:plc:4k3g3hazhzvdx5v35pg5x2ve
Len in other_filtered: 6
Len in filtered: 6
did: did:plc:duaatzbzy7qm4ppl2hluilpg
Len in other_filtered: 4
Len in filtered: 4
did: did:plc:o6ggjvnj4ze3mnrpnv5oravg
Len in other_filtered: 8
Len in filtered: 8
did: did:plc:layutr5agle2nkpntyykph7z
Len in other_filtered: 56
Len in filtered: 56
did: did:plc:paj6ipalc3udmxlestdq7qfd
Len in other_filtered: 1
Len in filtered: 1
did: did:plc:vgpa562ktrs7c3qnwlise3c7
Len in other_filtered: 1
Len in filtered: 1
did: did:plc:cni7pf6kpddowzq6ndlohm64
Len in other_filtered: 4
Len in filtered: 4
did: did:plc:yvl3tawvgtw7v5zv42zu6r65
Len in other_filtered: 1
Len in filtered: 1
