As a brief background:

I want to gather scores for a subset of TikTok data to compare the quality of our models - Appen managed services - Our moderator team

On meta, this is an easy task, as there is infrastructure built in our resolvers to do this.

This does not exist for TikTok.

I created a protocol that could possibly facilitate the scoring of arbitrary TikTok content.

The problem?

*There are no nodes that do frame extraction/thumbnails for TikTok videos*
This is a notebook to bridge that gap.

Input: TikTok content with Metadata/videos downloaded
Output: TikTok content with frames extracted and "content_type" set to "VIDEO"


In [2]:
import itertools
from qa_frame.models.avro.schemas.video import Content, Video, ContentType, Derived
from qa_frame.interfaces.api.clients.frs import FRSClient
from qa_frame.consts.api import frs
from qa_frame.interfaces.api.clients.seldon import SeldonClient
from qa_frame.interfaces.external.s3 import S3Client
from qa_frame.consts.providers.s3 import SamplePostsCSV
from requests import RequestException
import snowflake.connector
from qa_frame.interfaces.db.snowflake import SnowflakeDriver
from qa_frame.models.environment import EnvConfig

from base import BaseScript

CONFIG = EnvConfig.get()
RUN_ID = 1675808881

In [None]:
class InferenceCompareReader(SnowflakeDriver):

    def read_content_from_run_id(self, run_id: int | str) -> list[Content]:
        """
        Reads content from the ``QA_FRAME_INFERENCE_COMPARE`` table for a certain run_id
        Parameters
        ----------
        run_id : int | str
            ID of the inference compare run to read content for

        Returns
        -------
        contents : list[Content]
            Content objects read
        """
        return [Content(**row["OUTPUT_DATA"]) for row in self._read(
            """
            SELECT OUTPUT_DATA
            FROM STAGING.QA_FRAME_INFERENCE_COMPARE
            WHERE RUN_ID = '%s'
            """, run_id
        )]

reader = InferenceCompareReader(snowflake.connector.connect(**CONFIG.interface.db.snowflake.__dict__), CONFIG.interface.db.snowflake.environment)

In [None]:
print([c.content_id for c in reader.read_content_from_run_id(RUN_ID)])

In [None]:
"""
Gathers TikTok video IDs that need to be scored
"""
s3 = S3Client(CONFIG.external.s3_client)
tiktok_video_ids = s3.read_csv(
    url=str(SamplePostsCSV.AMS_HPQ_TT_DATA.value),
    filter_=lambda csv_: (
        Video(**{"content_id": row["content_id"][1:-1]})
        for row in csv_
    )
)  # Formatted as video.Video objects


In [None]:
"""
Gathers metadata for our video IDs

Must port-forward the TikTok metadata service
"""

class GatherTiktokMetadataForVideos(BaseScript):

    name = "gather_tiktok_metadata"

    def __init__(self):
        super().__init__()
        self.client = SeldonClient[Video, Video](base_url="http://localhost:8080/api/v1.0")

    def run(self):
        batch_size = 50
        scored_content_count = 0

        while batch := list(itertools.islice(tiktok_video_ids, batch_size)):
            predictions, retry = [], True
            while retry:
                try:
                    predictions = [v for v in self.client.predictions(batch) if v.metadata_fetch_status == 200]
                    scored_content_count += len(predictions)
                    retry = False
                except RequestException:
                    continue

            self._write_json_files("tiktok_videos_with_metadata", *predictions)
            self.logger.info("Predictions received", total_count=scored_content_count)

GatherTiktokMetadataForVideos().run()

In [None]:
"""
DownloadTikTokVideos
"""

class DownloadTikTokVideos(BaseScript):
    name = "download_tiktok_videos"

    def __init__(self):
        super().__init__()
        self.client = SeldonClient[Video, Video](base_url="http://localhost:8080/api/v1.0")

    def run(self):
        batch_size = 50
        scored_content_count = 0

        videos_with_metadata = self._read_json_files("tiktok_videos_with_metadata_1675844107", Video)
        while batch := list(itertools.islice(videos_with_metadata, batch_size)):
            predictions, retry = [], True
            while retry:
                try:
                    predictions = self.client.predictions(batch)
                    scored_content_count += len(predictions)
                    retry = False
                except RequestException:
                    continue

            self._write_json_files("tiktok_videos_downloaded", *predictions)
            self.logger.info("Predictions received", total_count=scored_content_count)

DownloadTikTokVideos().run()

In [None]:
"""
Merge together metadata and download url
"""

class MergeTiktokMetadataAndDownloadedVideo(BaseScript):  # Also will add a content_type
    name = "merge_tiktok_videos"

    def run(self):
        for video_metadata in self._read_json_files("tiktok_videos_with_metadata_1675844107", Content):
            corresponding_downloaded_video = next(
                v
                for v in self._read_json_files("tiktok_videos_downloaded_1675910561", Video)
                if v.content_id == video_metadata.content_id
            )
            merged_video = Content(**video_metadata.as_dict() | {"content_type": ContentType.VIDEO, "video_url": corresponding_downloaded_video.video_url})
            self._write_json_files("videos", merged_video)

MergeTiktokMetadataAndDownloadedVideo().run()

In [3]:
"""
Extract frames from merged videos
"""
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, wait
from queue import Queue
from requests import ConnectionError, Session
from requests.adapters import HTTPAdapter
from time import sleep


max_video_seconds = 60
num_threads = 32


class ExtractTiktokVideoFrames(BaseScript):
    name = "extract_tiktok_video_frames"

    def __init__(self):
        super().__init__()
        self.session = Session()
        self.session.mount("http://", HTTPAdapter(pool_connections=num_threads, pool_maxsize=num_threads))
        self.frs = FRSClient("http://localhost:8080", session=self.session)
        self.contents = self._read_json_files("videos_1675913568", Content)
        self.contents_to_write = Queue[Content]()

    def write_to_file(self):
        while True:
            if not self.contents_to_write.empty():
                self._write_json_files("videos_with_frames", self.contents_to_write.get())

    def get_frames(self, content: Content):
        retry = True
        retry_count = 0

        while retry:
            try:
                response = self.frs.store_video_frames(
                    video_uri=content.video_url,
                    platform=frs.Platform.TIKTOK,
                    content_id=content.content_id,
                    max_seconds=max_video_seconds
                )
                retry = False
                self.contents_to_write.put(Content(**content.as_dict() | {"derived": Derived(**response["derived"]), "thumbnail": None}))
            except ConnectionError:
                print("connection error, retrying in 10")
                sleep(10)
            except RequestException:
                if retry_count == 2:
                    return
                retry_count += 1
                print("request exception, waiting 10 and retrying")
                sleep(10)

    def run(self):
        content_writer = Thread(target=self.write_to_file)
        content_writer.daemon = True
        content_writer.start()

        with ThreadPoolExecutor(max_workers=num_threads) as thread_pool:
            futures = thread_pool.map(self.get_frames, self._read_json_files("videos_1675913568", Content))
            wait(futures)
        content_writer.join()

ExtractTiktokVideoFrames().run()

{[37m[39;49;00m
[37m  [39;49;00m[94m"asctime"[39;49;00m:[37m [39;49;00m[33m"2023-02-08 16:10:53"[39;49;00m,[37m[39;49;00m
[37m  [39;49;00m[94m"levelname"[39;49;00m:[37m [39;49;00m[33m"INFO"[39;49;00m,[37m[39;49;00m
[37m  [39;49;00m[94m"lineno"[39;49;00m:[37m [39;49;00m[34m54[39;49;00m,[37m[39;49;00m
[37m  [39;49;00m[94m"msg"[39;49;00m:[37m [39;49;00m[33m"Initialized output directory"[39;49;00m,[37m[39;49;00m
[37m  [39;49;00m[94m"output_path"[39;49;00m:[37m [39;49;00m[33m"/Users/ryan.demarigny/PycharmProjects/qa-frame-notebooks/notebooks/output/extract_tiktok_video_frames"[39;49;00m[37m[39;49;00m
}[37m[39;49;00m

{[37m[39;49;00m
[37m  [39;49;00m[94m"asctime"[39;49;00m:[37m [39;49;00m[33m"2023-02-08 16:10:53"[39;49;00m,[37m[39;49;00m
[37m  [39;49;00m[94m"levelname"[39;49;00m:[37m [39;49;00m[33m"INFO"[39;49;00m,[37m[39;49;00m
[37m  [39;49;00m[94m"lineno"[39;49;00m:[37m [39;49;00m[34m152[39;49;00m,[37m[39

AttributeError: 'NoneType' object has no attribute '_condition'