In [7]:
import sqlite3
import inspect

def wrapper_f(f):
    """
    A higher-order function that makes a function 'f' checkpointable,
    supporting both synchronous and asynchronous functions.

    It checks if 'f' is awaitable. If so, it returns an async wrapper;
    otherwise, it returns a synchronous wrapper. Both wrappers handle
    the database logic for checkpointing.
    """
    conn = sqlite3.connect('checkpoints.db')
    cursor = conn.cursor()
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS checkpoints (
            key TEXT PRIMARY KEY,
            status TEXT
        )
    ''')
    conn.commit()

    # Determine if the function f is awaitable (a coroutine function)
    is_async = inspect.iscoroutinefunction(f)

    if is_async:
        async def checkpointable_f(*args):
            key = f.__name__ + ':' + ':'.join(args)
            cursor.execute('SELECT status FROM checkpoints WHERE key = ?', (key,))
            result = cursor.fetchone()

            if result:
                print(f"Skipping async call for inputs {args} as a checkpoint exists.")
                return

            print(f"Executing async function for inputs {args}...")
            try:
                await f(*args)
                cursor.execute('INSERT INTO checkpoints (key, status) VALUES (?, ?)', (key, 'SUCCESS'))
                conn.commit()
                print(f"Successfully executed and checkpointed async function for inputs {args}.")
            except Exception as e:
                print(f"An error occurred in async function for inputs {args}: {e}")
                # Optional: store a 'FAILURE' status
                raise e

        return checkpointable_f
    else:
        def checkpointable_f(*args):
            key = f.__name__ + ':' + ':'.join(args)
            cursor.execute('SELECT status FROM checkpoints WHERE key = ?', (key,))
            result = cursor.fetchone()

            if result:
                print(f"Skipping sync call for inputs {args} as a checkpoint exists.")
                return

            print(f"Executing sync function for inputs {args}...")
            try:
                f(*args)
                cursor.execute('INSERT INTO checkpoints (key, status) VALUES (?, ?)', (key, 'SUCCESS'))
                conn.commit()
                print(f"Successfully executed and checkpointed sync function for inputs {args}.")
            except Exception as e:
                print(f"An error occurred in sync function for inputs {args}: {e}")
                # Optional: store a 'FAILURE' status
                raise e

        return checkpointable_f

In [8]:
from playwright.async_api import Page
async def crawl_and_save_html(page : Page, url : str, output_filename : str):
    """
    Crawls a given URL and saves the full HTML content to a file.
    
    Args:
        url (str): The URL of the website to crawl.
        output_filename (str): The name of the file to save the HTML to.
    """
    
    try:
        print(f"Navigating to {url}...")
        await page.goto(url)
        await page.wait_for_selector(".observation-table")
        print("Navigation successful. Getting page content...")
        
        # Get the full HTML content of the page
        html_content = await page.content()
        
        # Save the HTML content to a file
        with open(output_filename, 'w', encoding='utf-8') as f:
            f.write(html_content)
            
        print(f"HTML content saved to {output_filename}.")
        
    except Exception as e:
        print(f"An error occurred: {e}")
        raise e

In [9]:
import pandas as pd
from bs4 import BeautifulSoup
def parse_row(data_row):
    # Parse the HTML string using BeautifulSoup
    soup = data_row
    # Find all the table cells (td tags) in the row
    cells = soup.find_all('td')
    # Initialize lists to store the extracted data
    column_names = []
    data_values = []
    # Loop through each cell to extract the column name and its corresponding data
    for cell in cells:
        # Extract the column name from the class attribute
        # The class name is in the format "cdk-column-columnName"
        column_name = next(c for c in cell['class'] if c.startswith('cdk-column-')).replace('cdk-column-', '')
        column_names.append(column_name)

        # Extract the text content from the cell.
        # The data is either in a direct span or a nested span with class 'wu-value-to'
        value_element = cell.find('span', class_='wu-value-to')
        if value_element:
            # If the value is in a wu-value-to span, get that text
            data_values.append(value_element.get_text(strip=True))
        else:
            # Otherwise, get the text directly from the cell's main span tag
            data_values.append(cell.get_text(strip=True))

    # Create a dictionary from the column names and data values
    data_dict = {column_names[i]: data_values[i] for i in range(len(column_names))}
    return data_dict

def parse_one_html(html_string):
    soup = BeautifulSoup(html_string, 'html.parser')
    tables = soup.find_all(class_="observation-table")
    if len(tables) != 1:
        raise Exception(f"table length is not 1, {len(tables)} found")
    table = tables[0]
    return pd.DataFrame([parse_row(data_row) for data_row in table.find_all("tr",class_="mat-mdc-row")])  



In [10]:
from datetime import date, timedelta
from datetime import date

def get_dates_in_year(year):
    """
    Returns a list of all date objects for a given year.
    """
    # Start date is January 1st of the given year
    start_date = date(year, 1, 1)
    # End date is December 31st of the given year
    end_date = date(year, 12, 31)

    all_dates = []
    current_date = start_date
    
    # Loop from the start date until the end date
    while current_date <= end_date:
        all_dates.append(current_date)
        current_date += timedelta(days=1)
        
    return all_dates

# Output: 2025-9-20
def format_date(my_date : date):
    return f"{my_date.year}-{my_date.month}-{my_date.day}"

In [11]:
import os
from typing import *
from pathlib import Path
from playwright.async_api import async_playwright
import playwright
async def create_context(p):
    # Path to your unpacked extension folder
    path_to_extension = Path("./uBlock-Origin-Lite-Chrome-Web-Store")
    # A temporary directory to store user data for the persistent context
    user_data_dir = "./tmp/test-user-data-dir"

    # Launch the browser with a persistent context and the extension-loading arguments
    context = await p.chromium.launch_persistent_context(
        user_data_dir,
        headless=False, # Set to True for headless mode if the extension supports it
        args=[
            f"--disable-extensions-except={path_to_extension}",
            f"--load-extension={path_to_extension}",
        ],
    )

    # Get the service worker for the extension, which handles background tasks
    # and acts as an entry point for testing the extension.
    # We wait for the first service worker to be created if it's not immediately available.
    if len(context.service_workers) == 0:
        service_worker = await context.wait_for_event("serviceworker")
    else:
        service_worker = context.service_workers[0]

    print(f"Extension's service worker URL: {service_worker.url}")
    return context

async def download_and_crawl_list(dates : List[str], output_dir : str):
    """
    Crawls a given URL and saves the full HTML content to a file.
    
    Args:
        url (str): The URL of the website to crawl.
        output_filename (str): The name of the file to save the HTML to.
    """    
    async with async_playwright() as p:
        # browser = await p.chromium.launch(headless=False)
        browser = await create_context(p)
        page = await browser.new_page()
        async def helper(date, output_dir):
            target_url = f"https://www.wunderground.com/history/daily/us/ny/new-york-city/KLGA/date/{date}"
            output_file = os.path.join(output_dir, f"{date}.html")
            await crawl_and_save_html(page, target_url, output_file)
            
            with open(output_file) as f:
                df = parse_one_html(f.read())
                df.to_csv(os.path.join(output_dir, f"{date}.csv"), index=False) 
            import time
            time.sleep(1)
            return 
        
        checkpoint_f = wrapper_f(helper)
        for date in dates:
            await checkpoint_f(date, output_dir)

In [12]:
!mkdir weather -p
!mkdir "./tmp/test-user-data-dir" -p
dd = list(map(format_date, get_dates_in_year(2024)))
import random
random.shuffle(dd)
await download_and_crawl_list(dd, output_dir="weather")

Extension's service worker URL: chrome-extension://mdlecblcpnnlmbndbdggogadajhlpkkf/js/background.js
Executing async function for inputs ('2024-11-21', 'weather')...
Navigating to https://www.wunderground.com/history/daily/us/ny/new-york-city/KLGA/date/2024-11-21...
Navigation successful. Getting page content...
HTML content saved to weather/2024-11-21.html.
Successfully executed and checkpointed async function for inputs ('2024-11-21', 'weather').
Executing async function for inputs ('2024-9-17', 'weather')...
Navigating to https://www.wunderground.com/history/daily/us/ny/new-york-city/KLGA/date/2024-9-17...
Navigation successful. Getting page content...
HTML content saved to weather/2024-9-17.html.
Successfully executed and checkpointed async function for inputs ('2024-9-17', 'weather').
Executing async function for inputs ('2024-8-4', 'weather')...
Navigating to https://www.wunderground.com/history/daily/us/ny/new-york-city/KLGA/date/2024-8-4...
Navigation successful. Getting page c