In [None]:
import asyncio
import json
from typing import List, Dict
from httpx import AsyncClient, Response
from parsel import Selector
from loguru import logger as log

# initialize an async httpx client
client = AsyncClient(
    # enable http2
    http2=True,
    # add basic browser like headers to prevent being blocked
    headers={
        "Accept-Language": "en-US,en;q=0.9",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8",
        "Accept-Encoding": "gzip, deflate, br",
    },
)

def parse_profile(response: Response):
    """parse profile data from hidden scripts on the HTML"""
    assert response.status_code == 200, "request is blocked, use the ScrapFly codetabs"
    selector = Selector(response.text)
    data = selector.xpath("//script[@id='__UNIVERSAL_DATA_FOR_REHYDRATION__']/text()").get()
    profile_data = json.loads(data)["__DEFAULT_SCOPE__"]["webapp.user-detail"]["userInfo"]
    return profile_data
    


async def scrape_profiles(urls: List[str]) -> List[Dict]:
    """scrape tiktok profiles data from their URLs"""
    to_scrape = [client.get(url) for url in urls]
    data = []
    # scrape the URLs concurrently
    for response in asyncio.as_completed(to_scrape):
        response = await response
        profile_data = parse_profile(response)
        data.append(profile_data)
    log.success(f"scraped {len(data)} profiles from profile pages")
    return data


In [None]:
async def run():
    profile_data = await scrape_profiles(
        urls=[
            "https://www.tiktok.com/@oddanimalspecimens"
        ]
    )
    # save the result to a JSON file
    with open("profile_data.json", "w", encoding="utf-8") as file:
        json.dump(profile_data, file, indent=2, ensure_ascii=False)

await run()

In [None]:
%pip install scrapfly-sdk
 

In [None]:
import jmespath
import asyncio
import json
from typing import Dict, List
from loguru import logger as log
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse

SCRAPFLY = ScrapflyClient(key="scp-live-a2884ac67cc14f75ae06390d067aae3a")

js_scroll_function = """
function scrollToEnd(i) {
    // check if already at the bottom and stop if there aren't more scrolls
    if (window.innerHeight + window.scrollY >= document.body.scrollHeight) {
        console.log("Reached the bottom.");
        return;
    }

    // scroll down
    window.scrollTo(0, document.body.scrollHeight);

    // set a maximum of 15 iterations
    if (i < 15) {
        setTimeout(() => scrollToEnd(i + 1), 3000);
    } else {
        console.log("Reached the end of iterations.");
    }
}

scrollToEnd(0);
"""

def parse_channel(response: ScrapeApiResponse):
    """parse channel video data from XHR calls"""
    # extract the xhr calls and extract the ones for videos
    _xhr_calls = response.scrape_result["browser_data"]["xhr_call"]
    post_calls = [c for c in _xhr_calls if "/api/post/item_list/" in c["url"]]
    post_data = []
    for post_call in post_calls:
        try:
            data = json.loads(post_call["response"]["body"])["itemList"]
        except Exception:
            raise Exception("Post data couldn't load")
        post_data.extend(data)
    # parse all the data using jmespath
    parsed_data = []
    for post in post_data:
        result = jmespath.search(
            """{
            createTime: createTime,
            desc: desc,
            id: id,
            stats: stats,
            contents: contents[].{desc: desc, textExtra: textExtra[].{hashtagName: hashtagName}},
            video: video
            }""",
            post
        )
        parsed_data.append(result)    
    return parsed_data


async def scrape_channel(url: str) -> List[Dict]:
    """scrape video data from a channel (profile with videos)"""
    log.info(f"scraping channel page with the URL {url} for post data")
    response = await SCRAPFLY.async_scrape(
        ScrapeConfig(
            url,
            asp=True,
            country="AU",
            render_js=True,
            rendering_wait=5000,
            js=js_scroll_function,
            wait_for_selector="//div[@id='main-content-video_detail']",
        )
    )
    data = parse_channel(response)
    log.success(f"scraped {len(data)} posts data")
    return data

In [None]:
async def run():
    channel_data = await scrape_channel(
        url="https://www.tiktok.com/@alice_weidel_afd"
    )
    # save the result to a JSON file
    with open("channel_data.json", "w", encoding="utf-8") as file:
        json.dump(channel_data, file, indent=2, ensure_ascii=False)

await run()

In [None]:
import httpx
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse

# Initialize Scrapfly with your API key
scrapfly = ScrapflyClient(key="scp-live-a2884ac67cc14f75ae06390d067aae3a")


# Define the TikTok video URL
tiktok_url = "https://www.tiktok.com/@alice_weidel_afd/video/7472776863199350038"

# Scrape the TikTok page
response = scrapfly.scrape(ScrapeConfig(url=tiktok_url, render_js=True))

# Extract the video URL (TikTok embeds video URLs in JSON)
video_url = response.content.split('"playAddr":"')[1].split('"')[0].replace("\\u0026", "&")

print("Video URL:", video_url)

In [None]:
video_url

In [None]:
import httpx

video_response = httpx.get("")

with open("tiktok_video_02.mp4", "wb") as f:
    f.write(video_response.content)

print("Video downloaded successfully!")


In [None]:
import httpx
import re

# Replace with the TikTok video URL you want to scrape
tiktok_url = "https://www.tiktok.com/@alice_weidel_afd/video/7472776863199350038"

headers = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36"
}

response = httpx.get(tiktok_url, headers=headers)

# Extract the video URL using regex
match = re.search(r'"playAddr":"(https://.+?)"', response.text)
if match:
    video_url = match.group(1).replace("\\u0026", "&")
    print("Video URL:", video_url)
else:
    print("Failed to find the video URL.")


In [None]:
from scrapfly import ScrapeConfig, ScrapflyClient, ScrapeApiResponse

scrapfly = ScrapflyClient(key="scp-live-a2884ac67cc14f75ae06390d067aae3a")

tiktok_url = "https://www.tiktok.com/@alice_weidel_afd/video/7472776863199350038"

# Enable JavaScript rendering to fully load the TikTok page
response = scrapfly.scrape(ScrapeConfig(url=tiktok_url, render_js=True))

# Extract the video URL
match = re.search(r'"playAddr":"(https://.+?)"', response.content)
if match:
    video_url = match.group(1).replace("\\u0026", "&")
    print("Video URL:", video_url)
else:
    print("Failed to find the video URL.")


In [None]:
import asyncio
from playwright.async_api import async_playwright
#––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––
# setup
#––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(headless = False)
page = await browser.new_page()
search = 'https://www.tiktok.com/@alice_weidel_afd/video/7142743647753096454?lang=en'
await page.goto(search)
await page.wait_for_load_state("domcontentloaded")
#––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––
# wait for the needed element to load to ensure it exists 
#––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––––
wait = await page.wait_for_selector('.e11s2kul3 .css-xhj3u2-StyledVideoBlurBackground')

element = await page.query_selector(".xgplayer-container")
print(element)

In [None]:
# transcript and traslate 

# 🔑 Set Your DeepL API Key Here
DEEPL_API_KEY = ""  # Replace with your actual key

# Define DeepL Translation Function
def translate_text(text, source_lang="DE", target_lang="EN"):
    url = "https://api-free.deepl.com/v2/translate"  # Use "api.deepl.com" for Pro accounts
    params = {
        "auth_key": DEEPL_API_KEY,
        "text": text,
        "source_lang": source_lang,
        "target_lang": target_lang
    }
    response = requests.post(url, data=params)
    if response.status_code == 200:
        return response.json()["translations"][0]["text"]
    else:
        print("Translation Error:", response.text)
        return text  # Return original text in case of failure

# Load Whisper Model
model = WhisperModel("large-v3")  # Use "small", "large-v3", etc., depending on need

# Transcribe the Audio
segments, _ = model.transcribe(my_audio, word_timestamps=True)

# Define Output CSV File
csv_filename = "/content/drive/MyDrive/Thesis/Code/Translated.csv"

# Open CSV File for Writing
with open(csv_filename, mode="w", newline="", encoding="utf-8") as file:
    writer = csv.writer(file)

    # Write Header
    writer.writerow(["Segment Start", "Segment End", "German Text", "English Translation"])

    # Process Each Transcribed Segment
    for segment in segments:
        english_translation = translate_text(segment.text)
        writer.writerow([segment.start, segment.end, segment.text, english_translation])

print(f"Transcription and translation saved to {csv_filename}")
