In [1]:
import pandas as pd
import requests
import re
import time
import os
from bs4 import BeautifulSoup

In [None]:
class HuggingFaceModelDownloader:
    def __init__(self, csv_file, output_file, max_retries=5, delay=5):
        self.csv_file = csv_file
        self.output_file = output_file
        self.max_retries = max_retries
        self.delay = delay
        self.model_list = self._load_model_list()

    def _load_model_list(self):
        df = pd.read_csv(self.csv_file)
        result_df = df.sort_values(by='total_score', ascending=False)
        model_list = result_df['id'].tolist()
        return model_list

    def _download_model_page(self, model_name):
        url = f'https://huggingface.co/{model_name}'
        for attempt in range(self.max_retries):
            try:
                response = requests.get(url, timeout=10)
                response.raise_for_status()
                soup = BeautifulSoup(response.content, 'html.parser')
                return soup
            except (requests.RequestException, requests.Timeout) as e:
                print(f"Attempt {attempt + 1} failed: {e}")
                if attempt < self.max_retries - 1:
                    time.sleep(self.delay)
                else:
                    raise

    def _extract_data(self, soup):
        task_doc = soup.select_one('body > div > main > div.SVELTE_HYDRATER.contents > header > div > div.mb-3.flex.flex-wrap.md\:mb-4')
        task = (None if task_doc is None else (task_doc.find('a', href=re.compile(r'/models\?pipeline_tag=.*')).text.strip() if task_doc.find('a', href=re.compile(r'/models\?pipeline_tag=.*')) else None))    

        downloads = soup.select_one('body > div:nth-of-type(1) > main > div:nth-of-type(2) > section:nth-of-type(2) > div:nth-of-type(1) > dl > dd')
        total = None if downloads is None else downloads.text.replace(',', '')

        target_div = soup.select_one('html > body > div > main > div:nth-of-type(2) > section:nth-of-type(2) > div:nth-of-type(1) > div')
        d_attribute = (target_div := soup.select_one('html > body > div > main > div:nth-of-type(2) > section:nth-of-type(2) > div:nth-of-type(1) > div')) and (first_path := target_div.find('path')) and first_path.get('d', None) or None

        return task, total, d_attribute

    def _parse_path_data(self, path_data):
        coordinates = re.findall(r'[-+]?[0-9]*\.?[0-9]+', path_data)
        points = [(float(coordinates[i]), float(coordinates[i + 1])) 
                  for i in range(0, len(coordinates), 2)]
        return points

    def _calculate_graph(self, path_data, download_total):
        if path_data is None:
            return [0] * 30
        else:
            try:
                points = self._parse_path_data(path_data)
            except IndexError:
                return [0] * 30

            max_day = max(point[0] for point in points)
            scale_factor = 30 / max_day
            normalized_points = [(x * scale_factor, 100 - y) for x, y in points]
            daily_downloads = [0] * 30

            for i in range(len(normalized_points) - 1):
                start_day, start_value = normalized_points[i]
                end_day, end_value = normalized_points[i + 1]
                start_day_int = int(start_day)
                end_day_int = int(end_day)
                range_span = end_day - start_day

                if start_day_int == end_day_int:
                    if start_day_int < 30:
                        daily_downloads[start_day_int] += (start_value + end_value) / 2
                else:
                    increment = (end_value - start_value) / range_span
                    for j in range(start_day_int, min(end_day_int + 1, 30)):
                        if j == start_day_int:
                            fraction = 1 - (start_day - start_day_int)
                            daily_downloads[j] += start_value + fraction * increment
                        elif j == end_day_int:
                            fraction = end_day - end_day_int
                            daily_downloads[j] += start_value + fraction * increment
                        else:
                            fraction = (j - start_day) / (end_day - start_day)
                            daily_downloads[j] += start_value + fraction * increment

            total_downloads = sum(daily_downloads)
            scaling_factor = float(download_total) / total_downloads
            daily_downloads = [d * scaling_factor for d in daily_downloads]
            daily_downloads = [int(round(d)) for d in daily_downloads]

            difference = int(download_total) - sum(daily_downloads)
            adjustment_indices = list(range(30)) if difference > 0 else list(range(29, -1, -1))

            for i in range(abs(difference)):
                daily_downloads[adjustment_indices[i % 30]] += 1 if difference > 0 else -1

            for i in range(30):
                if daily_downloads[i] < 0:
                    surplus = -daily_downloads[i]
                    daily_downloads[i] = 0
                    for j in range(30):
                        if daily_downloads[j] > surplus:
                            daily_downloads[j] -= surplus
                            break
                        else:
                            surplus -= daily_downloads[j]
                            daily_downloads[j] = 0

            return daily_downloads

    def save_model_data(self, model_name):
        soup = self._download_model_page(model_name)
        model_task, download_total, path_data = self._extract_data(soup)
        daily_downloads = self._calculate_graph(path_data, download_total)
        if model_task and model_task.lower() == 'transformers':
            model_task = None
            
        data = {
            "Model_name": [model_name],
            "Model_task": [model_task],
            **{f"{i+1}Day": [daily_downloads[i]] for i in range(30)},
        }
        df = pd.DataFrame(data)
        
        if os.path.exists(self.output_file):
            existing_df = pd.read_csv(self.output_file)
            combined_df = pd.concat([existing_df, df], ignore_index=True)
        else:
            combined_df = df
        
        combined_df.to_csv(self.output_file, index=False)

    def collect_data(self, start_index, end_index):
        print(f"총 행 수: {len(self.model_list)} 행")
        for i in range(start_index, end_index):
            print(self.model_list[i])
            self.save_model_data(self.model_list[i])
            print(f"{i+1}번 모델 데이터 수집 중입니다.")
        print("Data collection completed")

In [None]:
# Usage
downloader = HuggingFaceModelDownloader(
    csv_file="./sorted_hugging_face_model_influence_with_scores_0615.csv",
    output_file="20240617_Daily_Download.csv"
)
downloader.collect_data(0, 10000)