In [1]:
%env MINIO_HOST=minio
%env MINIO_PORT=9000
%env AWS_SECRET_KEY=Z8lN07OWSzRqDjcqR8vTr5H9gsrFnSqEgV3z1gLD
%env AWS_ACCESS_KEY=CAsXP1ZVRAQXyftGOAim

env: MINIO_HOST=minio
env: MINIO_PORT=9000
env: AWS_SECRET_KEY=Z8lN07OWSzRqDjcqR8vTr5H9gsrFnSqEgV3z1gLD
env: AWS_ACCESS_KEY=CAsXP1ZVRAQXyftGOAim


In [3]:
%pip install minio

Collecting minio
  Using cached minio-7.2.15-py3-none-any.whl.metadata (6.7 kB)
Collecting pycryptodome (from minio)
  Downloading pycryptodome-3.23.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Using cached minio-7.2.15-py3-none-any.whl (95 kB)
Downloading pycryptodome-3.23.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (2.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: pycryptodome, minio
Successfully installed minio-7.2.15 pycryptodome-3.23.0
Note: you may need to restart the kernel to use updated packages.


In [4]:
import os
from functools import reduce
from typing import List, Literal, Tuple

from minio import Minio
from minio.commonconfig import CopySource
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, current_timestamp, lit

In [5]:

def process_bluesky(spark: SparkSession, input_topic_path: str) -> DataFrame:
    """
    Bluesky topic contains two tables: Posts and Likes, with the following
    schemas:

    Posts:
        uri: str
        text: str
        created_at: datetime
        author_did: str
        author_handle: str
        author_display_name: str
        like_count: int
        repost_count: int
        reply_count: int
        quote_count: int

    Likes:
        actor_did: str
        actor_handle: str
        actor_display_name: str
        created_at: datetime
        post_uri: str
    """
    df_posts = spark.read.format("delta").load(f"s3a://{input_topic_path}/posts")
    df_likes = spark.read.format("delta").load(f"s3a://{input_topic_path}/likes")
    # Select only the columns we need from the posts table
    df_posts = df_posts.select(
        col("uri"),
        col("text"),
        col("created_at"),
        col("author_did").alias("author_uri"),
        col("repost_count").alias("reposts"),
        col("reply_count").alias("replies"),
        col("quote_count").alias("quotes"),
    )

    # Add the null columns that we don't have in the bluesky data
    df_posts = df_posts.withColumn("title", lit(None).cast("string"))
    df_posts = df_posts.withColumn("description", lit(None).cast("string"))
    df_posts = df_posts.withColumn("video_attachments", lit(None).cast("array<string>"))
    df_posts = df_posts.withColumn("image_attachments", lit(None).cast("array<string>"))
    df_posts = df_posts.withColumn("audio_attachments", lit(None).cast("array<string>"))
    df_posts = df_posts.withColumn("views", lit(None).cast("int"))
    df_posts = df_posts.withColumn("shares", lit(None).cast("int"))
    df_posts = df_posts.withColumn("dislikes", lit(None).cast("int"))

    # Finally, re-count the likes from the likes table
    df_likes = df_likes.groupBy("post_uri").count().withColumnRenamed("count", "likes")
    # Join the likes table with the posts table
    df = df_posts.join(df_likes, df_posts.uri == df_likes.post_uri, "left")
    df = df.drop(df_likes.post_uri)

    # Add the source column
    df = df.withColumn("source", lit("bluesky"))

    return df

In [6]:

def process_twitter(spark: SparkSession, input_topic_path: str) -> DataFrame:
    """
    Twitter topic contains one single table: Posts, with the following schema:

    Posts:
        uri: str
        text: str
        created_at: datetime
        like_count: int
        reply_count: int
        repost_count: int
    """
    df = spark.read.format("delta").load(f"s3a://{input_topic_path}/posts")
    # Select only the columns we need from the posts table
    df = df.select(
        col("uri"),
        col("text"),
        col("created_at"),
        col("like_count").alias("likes"),
        col("reply_count").alias("replies"),
        col("repost_count").alias("reposts"),
    )

    # Add the null columns that we don't have in the twitter data
    df = df.withColumn("author_uri", lit(None).cast("string"))
    df = df.withColumn("title", lit(None).cast("string"))
    df = df.withColumn("description", lit(None).cast("string"))
    df = df.withColumn("video_attachments", lit(None).cast("array<string>"))
    df = df.withColumn("image_attachments", lit(None).cast("array<string>"))
    df = df.withColumn("audio_attachments", lit(None).cast("array<string>"))
    df = df.withColumn("views", lit(None).cast("int"))
    df = df.withColumn("shares", lit(None).cast("int"))
    df = df.withColumn("dislikes", lit(None).cast("int"))
    df = df.withColumn("quotes", lit(None).cast("int"))

    # Add the source column
    df = df.withColumn("source", lit("twitter"))

    return df

In [7]:

def process_youtube(
    spark: SparkSession,
    input_topic_path: str,
    output_path: str,
    minio_host: str,
    minio_port: str,
    minio_access_key: str,
    minio_secret_key: str,
) -> DataFrame:
    """
    Youtube topic contains two tables: Video Metadata, and Comments.
    And three BLOB stores: Videos, Thumbnails, and Audios.

    The schemas of the tables are as follows:

    Video Metadata:
        title: str
        description: str
        url: str
        videoId: str
        channel: str
        publishedAt: str
        tags: str
        thumbnail: str (The url)
        viewCount: Optional[int]
        likeCount: Optional[int]
        commentCount: Optional[int]
        duration: Optional[str]
        definition: Optional[str]
        captions: Optional[str]

    Comments: (We will ignore this table for now, as YT API does only return a few top level comments)
        channelName: str
        channelId: Optional[str]
        comment: str
        likes: int
        replies: int
        publishedAt: datetime
        videoId: str
        threadId: str
    """
    df = spark.read.format("delta").load(f"s3a://{input_topic_path}/video_metadata")
    # Select only the columns we need from the video metadata table
    df = df.select(
        col("videoId").alias("uri"),
        col("title"),
        col("description"),
        col("channel").alias("author_uri"),
        col("publishedAt").alias("created_at"),
        col("viewCount").alias("views"),
        col("likeCount").alias("likes"),
        col("commentCount").alias("replies"),
        col("captions").alias("text"),
    )

    minio = Minio(
        f"{minio_host}:{minio_port}",
        access_key=minio_access_key,
        secret_key=minio_secret_key,
        secure=False,
    )
    src_bucket, src_folder = input_topic_path.split("/", 1)
    dst_bucket, dst_folder = output_path.split("/", 1)

    for dst_subfolder, src_subfolder in [("videos", "videos"), ("thumbnails", "images"), ("audios", "audios")]:
        # Check if the folder exists in the input path
        src_subfolder = f"{src_folder}/{src_subfolder}"
        dst_subfolder = f"{dst_folder}/{dst_subfolder}/youtube"

        print(
            f"Copying data from bucket={src_bucket},folder={src_subfolder} to bucket={dst_bucket},folder={dst_subfolder}"
        )
        src_files = minio.list_objects(src_bucket, recursive=True, prefix=src_subfolder)
        for file in src_files:
            src_basename = os.path.basename(file.object_name)
            dst_file = os.path.join(dst_subfolder, src_basename)
            print(f"Server-copying file {src_bucket}/{file.object_name} to {dst_bucket}/{dst_file}")
            minio.copy_object(dst_bucket, dst_file, CopySource(src_bucket, file.object_name))

    df = df.withColumn(
        "video_attachments", lit([f"{output_path}/videos/youtube/{col('uri')}.mp4"]).cast("array<string>")
    )
    df = df.withColumn(
        "image_attachments", lit([f"{output_path}/images/youtube/{col('uri')}.jpg"]).cast("array<string>")
    )
    df = df.withColumn(
        "audio_attachments", lit([f"{output_path}/audios/youtube/{col('uri')}.mp3"]).cast("array<string>")
    )

    # Add the null columns that we don't have in the youtube data
    df = df.withColumn("shares", lit(None).cast("int"))
    df = df.withColumn("dislikes", lit(None).cast("int"))
    df = df.withColumn("reposts", lit(None).cast("int"))
    df = df.withColumn("quotes", lit(None).cast("int"))

    # Add the source column
    df = df.withColumn("source", lit("youtube"))
    return df

In [8]:
input_sources = [("bluesky", "trusted/bluesky-a1cb100f"), ("youtube", "trusted/youtube-a1cb100f"), ("bluesky", "trusted/bluesky-3c77f261"), ("youtube", "trusted/youtube-3c77f261")]
output_path = "exploitation/microsoft/edge_browser"
minio_host = os.environ["MINIO_HOST"]
minio_port = os.environ["MINIO_PORT"]
minio_access_key = os.environ["AWS_ACCESS_KEY"]
minio_secret_key = os.environ["AWS_SECRET_KEY"]

In [9]:
spark = SparkSession.builder \
    .appName("DataNormalizer") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-spark_2.12:3.2.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.executorEnv.AWS_ACCESS_KEY", os.environ["AWS_ACCESS_KEY"]) \
    .config("spark.executorEnv.AWS_SECRET_KEY", os.environ["AWS_SECRET_KEY"]) \
    .config("spark.hadoop.fs.s3a.endpoint", f"http://{os.environ['MINIO_HOST']}:{os.environ['MINIO_PORT']}") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

In [10]:
source_dataframes: List[DataFrame] = []
for source, input_topic_path in input_sources:
    if source == "bluesky":
        res = process_bluesky(spark, input_topic_path)
    elif source == "youtube":
        res = process_youtube(
            spark, input_topic_path, output_path, minio_host, minio_port, minio_access_key, minio_secret_key
        )
    elif source == "twitter":
        res = process_twitter(spark, input_topic_path)
    else:
        raise ValueError(f"Unknown source: {source}")

    source_dataframes.append(res)
# Combine all dataframes into one
combined_df = reduce(lambda df1, df2: df1.unionByName(df2), source_dataframes)

# De-duplicate any posts that were captured twice by different queries, based
# on a combination of the uri and the source
combined_df = combined_df.dropDuplicates(["uri", "source"])

# Add the extraction id column
combined_df = combined_df.withColumn("extraction_id", lit(current_timestamp()))

Copying data from bucket=trusted,folder=youtube-a1cb100f/videos to bucket=trusted,folder=microsoft/edge_browser/videos/youtube
Server-copying file trusted/youtube-a1cb100f/videos/-3J9J6iopDk.mp4 to trusted/microsoft/edge_browser/videos/youtube/-3J9J6iopDk.mp4
Server-copying file trusted/youtube-a1cb100f/videos/1kAAMawGxOQ.mp4 to trusted/microsoft/edge_browser/videos/youtube/1kAAMawGxOQ.mp4
Server-copying file trusted/youtube-a1cb100f/videos/27tr-dNQrxE.mp4 to trusted/microsoft/edge_browser/videos/youtube/27tr-dNQrxE.mp4
Server-copying file trusted/youtube-a1cb100f/videos/3o7pgfelr7I.mp4 to trusted/microsoft/edge_browser/videos/youtube/3o7pgfelr7I.mp4
Server-copying file trusted/youtube-a1cb100f/videos/7n3BZlASNKc.mp4 to trusted/microsoft/edge_browser/videos/youtube/7n3BZlASNKc.mp4
Server-copying file trusted/youtube-a1cb100f/videos/AUjfqGGqUBc.mp4 to trusted/microsoft/edge_browser/videos/youtube/AUjfqGGqUBc.mp4
Server-copying file trusted/youtube-a1cb100f/videos/DnxLtlho11s.mp4 to trus

In [11]:
combined_df.show()

+--------------------+--------------------+--------------------+--------------------+-------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+------+--------+-----+-------+--------------------+
|                 uri|                text|          created_at|          author_uri|reposts|replies|quotes|               title|         description|   video_attachments|   image_attachments|   audio_attachments|views|shares|dislikes|likes| source|       extraction_id|
+--------------------+--------------------+--------------------+--------------------+-------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+------+--------+-----+-------+--------------------+
|         1kAAMawGxOQ|                NULL| 2025-05-17 12:58:31|            TeddyKho|   NULL|    253|  NULL|HP MIDRANGE PAKET...|Motorola Edge 60 ...|[trusted/microsof...|[trusted/microso

In [12]:

# Write the DataFrame to the output path in Delta format
combined_df.write.format("delta").mode("append").save(f"s3a://{output_path}/data")

In [13]:
df = spark.read.format("delta").load(f"s3a://{output_path}")

In [14]:
df.show()

+--------------------+--------------------+--------------------+--------------------+-------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+------+--------+-----+-------+--------------------+
|                 uri|                text|          created_at|          author_uri|reposts|replies|quotes|               title|         description|   video_attachments|   image_attachments|   audio_attachments|views|shares|dislikes|likes| source|       extraction_id|
+--------------------+--------------------+--------------------+--------------------+-------+-------+------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+------+--------+-----+-------+--------------------+
|         1kAAMawGxOQ|                NULL| 2025-05-17 12:58:31|            TeddyKho|   NULL|    253|  NULL|HP MIDRANGE PAKET...|Motorola Edge 60 ...|[trusted/microsof...|[trusted/microso