<a href="https://colab.research.google.com/github/HenryBlackie/TikTok-Research-API/blob/main/TikTok_Research_API.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#TikTok Research API
- Add proper error handling
- Add code to detect and report expired access token
- Add code to handle an exceeded quota limit

In [None]:
# @title TikTok Client Setup {run: "auto"}

import re
import requests
import time
import os
import pandas as pd
from datetime import datetime, timedelta

def gen_access_token():
    client_key = "" # @param {type:"string"}
    client_secret = "" # @param {type:"string"}

    # Configure request data
    url = "https://open.tiktokapis.com/v2/oauth/token/"
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    data = {"client_key": client_key,
            "client_secret": client_secret,
            "grant_type": "client_credentials"}

    # Get OAuth token from TikTok API
    oauth_token = requests.post(url=url, headers=headers, data=data).json()

    # Print token generation status
    if "access_token" in oauth_token:
        print("Access token successfully generated.")
    else:
        print("Failed to generate access token")
        print(f"{oauth_token['error']}: {oauth_token['error_description']}")

    return oauth_token['access_token']

def query_video_comments(video_id, client_access_token=""):
    # Generate access token if not provided
    if not client_access_token:
        client_access_token = gen_access_token()

    api_url = "https://open.tiktokapis.com/v2/research/video/comment/list/"
    fields = "id,video_id,text,like_count,reply_count,parent_comment_id,create_time"
    headers = {"Authorization": f"Bearer {client_access_token}",
               "Content-Type": "application/json"}
    data = {"video_id": video_id,
            "max_count": 100,
            "cursor": 0}

    # Keep sending queries until TikTok indicates there are no more comments
    comment_objects = []
    retry_count = 0
    while data['cursor'] < 1000 and retry_count < 5:
        decode_error = True
        while decode_error:
            print(f"Requesting comments {data['cursor']} to {data['cursor'] + data['max_count']}.")
            # Get data from TikTok API
            response = requests.post(f"{api_url}?fields={fields}", json=data, headers=headers)

            # Save response to log
            log_response(str(response.text))

            # Convert content to dict
            try:
                response = response.json()
                decode_error = False
            except Exception as e:
                print(e)
                time.sleep(60)
                decode_error = True

        # Break out of while loop if an error is returned
        if response["error"]["code"] != "ok":
            print(response["error"]["code"] + " : " + response["error"]["message"])

            # Retry on timeout error
            if response["error"]["code"] == "timeout":
                print("Retrying in 30 seconds.")
                time.sleep(30)
                retry_count += 1
                continue
            else:
                break

        # Save comment objects
        if len(response["data"]["comments"]) > 0:
            comment_objects = comment_objects + response["data"]["comments"]

        # Break if there are no more comments
        if not response["data"]["has_more"]:
            break

        # Update cursor
        data["cursor"] = response["data"]["cursor"]

        # Wait before sending next query
        time.sleep(1)

    print(f"Returning {len(comment_objects)} comments.")
    return comment_objects

def query_all_user_videos(username, search_start_date, search_end_date):
    # Generate client access token
    client_access_token = gen_access_token()

    # Convert search dates to datetime objects
    search_start_date = datetime.strptime(search_start_date, "%Y-%m-%d").date()
    search_end_date = datetime.strptime(search_end_date, "%Y-%m-%d").date()

    # Configure API parameters
    api_url = "https://open.tiktokapis.com/v2/research/video/query/"
    fields = "id,video_description,create_time, region_code,share_count,view_count,like_count,comment_count, music_id,hashtag_names,username,effect_ids,playlist_id,voice_to_text"
    headers = {"Authorization": f"Bearer {client_access_token}",
               "Content-Type": "application/json"}
    data = {"query": {"and": [{"field_name": "username",
                               "operation": "EQ",
                               "field_values": [username]}]},
            "max_count": 100,
            "cursor": 0,
            "start_date": search_start_date,
            "end_date": None}
    date_delta = {"offset": 0,
                  "interval": 30}

    print(f"Username: {username}\nTime Frame: {search_start_date} to {search_end_date}")

    video_objects = []
    # Query video IDs between date boundaries
    while data["start_date"] + timedelta(days=date_delta["interval"]) <= search_end_date:
        # Calculate date boundaries for search
        data["start_date"] = search_start_date + timedelta(days=date_delta["offset"])
        data["end_date"] = data["start_date"] + timedelta(days=date_delta["interval"])
        if data["end_date"] >= search_end_date:
            data["end_date"] = search_end_date

        print(f"Requesting videos between {data['start_date']} and {data['end_date']}.")

        # Convert dates to correct format YYYYMMDD
        data["start_date"] = data["start_date"].strftime("%Y%m%d")
        data["end_date"] = data["end_date"].strftime("%Y%m%d")

        # Send request to TikTok
        response = requests.post(f"{api_url}?fields={fields}", json=data, headers=headers).json()
        if response["error"]["code"] != "ok":
            print(response["error"]["code"] + " : " + response["error"]["message"])

        # Save video objects
        if len(response["data"]["videos"]) > 0:
            video_objects = video_objects + response["data"]["videos"]

        # Keep repeating search until has_more flag is False
        while response["data"]["has_more"]:
            # Update cursor
            data["cursor"] = response["data"]["cursor"]
            print(f"Continuing request from index {data['cursor']}")

            # Send request to TikTok
            response = requests.post(f"{api_url}?fields={fields}", json=data, headers=headers).json()
            if response["error"]["code"] != "ok":
                print(response["error"]["code"] + " : " + response["error"]["message"])

            # Save video objects
            if len(response["data"]["videos"]) > 0:
                video_objects = video_objects + response["data"]["videos"]

        # Restore dates to original format YYYY-MM-DD
        data["start_date"] = datetime.strptime(data["start_date"], "%Y%m%d").date()
        data["end_date"] = datetime.strptime(data["end_date"], "%Y%m%d").date()

        # Reset cursor
        data["cursor"] = 0

        # Update offset
        date_delta["offset"] += date_delta["interval"]

        # Wait before sending next query
        #time.sleep(1)

    # Save videos data
    save_data(pd.DataFrame.from_dict(video_objects), username, "videos", "video_objects")

    # Iterate through videos and request comments for each
    for idx, video_data in enumerate(video_objects):
        print(f"Requesting data for video {idx+1} of {len(video_objects)}.")
        comment_objects = query_video_comments(video_data["id"], client_access_token)

        save_data(pd.DataFrame.from_dict(comment_objects), username, video_data["id"], "comment_objects")

def extract_url_data(url):
    # Define regex pattern
    pattern = r"@([a-zA-Z0-9_\.]+).*?/?(?:video|photo)?/?(\d+)?|\@([a-zA-Z0-9_\.]+)$"

    # Use re.search to find the pattern in the URL
    match = re.search(pattern, url)

    # Return matches if found
    if match:
        username = match.group(1) or match.group(3)
        video_id = match.group(2) if match.group(2) else None
        return username, video_id
    else:
        return None, None

def save_data(df, username, filename, data_type):
    if "g_drive_dir" in globals():
        save_dir = g_drive_dir
    else:
        save_dir = "/content/"

    # Save data
    if data_type == "video_objects":
        os.makedirs(os.path.join(save_dir, username), exist_ok=True)
        filepath = os.path.join(save_dir, username, f"{filename}.xlsx")
        df.to_excel(filepath)
        print(f"Saved data to {filepath}.")
    elif data_type == "comment_objects":
        os.makedirs(os.path.join(save_dir, username, "comments"), exist_ok=True)
        filepath = os.path.join(save_dir, username, "comments", f"{filename}.xlsx")
        df.to_excel(filepath)
        print(f"Saved data to {filepath}.")

def log_response(content):
    if "g_drive_dir" in globals():
        save_dir = g_drive_dir
    else:
        save_dir = "/content/"

    with open(os.path.join(save_dir, "responses.log"), "a+") as f:
        f.write(f"{content}\n")

In [None]:
# @title Setup Google Drive
# @markdown Run this to setup a connection to Google Drive. Otherwise all output data will be lost upon disconnection.

import os
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

output_dir = "Colab Data/tiktok-research-api" # @param {type:"string"}
global g_drive_dir
g_drive_dir = os.path.join("/content/drive/MyDrive/", output_dir)
print(f"Setup output directory as {g_drive_dir}.")

In [None]:
# @title Get Video Comments

import pandas as pd
import os

#video_id = 7222436435448646955 # @param {type:"integer"}
url = "" # @param {type:"string"}
username, video_id = extract_url_data(url)

comment_objects = query_video_comments(video_id)

if len(comment_objects) > 0:
    # Build comments into DataFrame
    df = pd.DataFrame.from_dict(comment_objects)
    os.makedirs("/content/video_comments/")
    df.to_excel(f"/content/video_comments/{video_id}.xlsx")
    print(f"Saved {len(df)} comments to {video_id}.xlsx")
else:
    print(f"No comments were saved for video {video_id}.")

In [None]:
# @title Get User Data

from datetime import datetime

from google.colab import output
output.no_vertical_scroll()

url = "" # @param {type:"string"}
username, video_id = extract_url_data(url)

search_start_date = None # @param {type:"date"}
search_end_date = None # @param {type:"date"}
# @markdown It is highly recommended to set search start and end dates. If left blank, the script will search from September 2016 to the current date. This will likely take a long time and is a wasteful use of your API quota.

# Set start/end dates to default values if they were not provided
# The default start date is 2016-09-20 - Douyin initial release
# The default end date is the current date
if search_start_date == None:
    search_start_date = "2016-09-20"
if search_end_date == None:
    search_end_date = datetime.now().strftime("%Y-%m-%d")

query_all_user_videos(username, search_start_date, search_end_date)

# Debugging

In [None]:
# @title URL Extraction Test {run: "auto"}
# @markdown This codeblock can be used to verify the data being extracted from any provided URLs. This is only useful for debugging.

url = "" # @param {type:"string"}
username, video_id = extract_url_data(url)

print(f"Username: {username}")
print(f"Video ID: {video_id}")