In [1]:
!pip install requests feedparser pandas pyarrow tqdm urllib3 backoff fake_useragent http_request_randomizer

Collecting feedparser
  Downloading feedparser-6.0.11-py3-none-any.whl.metadata (2.4 kB)
Collecting backoff
  Downloading backoff-2.2.1-py3-none-any.whl.metadata (14 kB)
Collecting fake_useragent
  Downloading fake_useragent-2.0.3-py3-none-any.whl.metadata (17 kB)
Collecting http_request_randomizer
  Downloading http_request_randomizer-1.3.2.tar.gz (10.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.9/10.9 MB[0m [31m21.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting sgmllib3k (from feedparser)
  Downloading sgmllib3k-1.0.0.tar.gz (5.8 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting httmock>=1.3.0 (from http_request_randomizer)
  Downloading httmock-1.4.0-py3-none-any.whl.metadata (2.4 kB)
Downloading feedparser-6.0.11-py3-none-any.whl (81 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.3/81.3 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading backoff-2.

In [26]:
!pip install datasets kagglehub huggingface_hub tqdm



In [30]:
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

TOKEN = os.getenv("DEVTO_API_KEY")
MY_DATASET_NAME = "Alaamer/devto_articles"
PARQUET_PATH = "devto_articles.parquet"
OUT_DIR = "devto_data"

In [27]:
from huggingface_hub import login, whoami
login(token=TOKEN, add_to_git_credential=True)
whoami()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


{'type': 'user',
 'id': '675047b890ba48ec35e04e36',
 'name': 'Alaamer',
 'fullname': 'The First',
 'email': 'ahmedmuhmmed239@gmail.com',
 'emailVerified': True,
 'canPay': False,
 'periodEnd': None,
 'isPro': False,
 'avatarUrl': 'https://cdn-avatars.huggingface.co/v1/production/uploads/no-auth/gelK-ZhS7T9nWeSNQvgI5.png',
 'orgs': [],
 'auth': {'type': 'access_token',
  'accessToken': {'displayName': 'Mediumdataset',
   'role': 'write',
   'createdAt': '2025-03-02T13:15:32.245Z'}}}

In [29]:
def get_file_size_mb(file_path):
  """Gets the size of a file in megabytes (MB).

  Args:
    file_path: The path to the file.

  Returns:
    The size of the file in megabytes, or -1 if the file does not exist.
  """
  try:
    size_bytes = os.path.getsize(file_path)
    size_mb = size_bytes / (1024 * 1024)  # Convert bytes to MB
    return size_mb
  except FileNotFoundError:
    return -1

file_path = '/content/large_dataset.parquet'
file_size_mb = get_file_size_mb(file_path)

if file_size_mb != -1:
  print(f"The size of {file_path} is {file_size_mb:.2f} MB.")  # Format to 2 decimal places
else:
  print(f"File not found: {file_path}")

File not found: /content/large_dataset.parquet


In [17]:
"""
DevToScraper: A robust Dev.to API scraper

This script scrapes articles from the Dev.to API with full support for all query parameters,
efficient data storage, rate limiting, proxy rotation, checkpointing, and error handling.
"""

import os
import json
import time
import random
import logging
import requests
import datetime
import orjson
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
from typing import Dict, List, Optional, Union, Any
from requests.exceptions import RequestException, HTTPError, ConnectionError, Timeout
from fake_useragent import UserAgent
from tqdm.auto import tqdm

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("devto_scraper.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger("DevToScraper")

class DevToScraper:
    """
    A robust and efficient scraper for the Dev.to API that handles rate limiting,
    checkpointing, and supports all available query parameters.
    """

    # Base API URL
    BASE_URL = "https://dev.to/api/articles"

    # Config parameters
    def __init__(
        self,
        api_key: Optional[str] = None,
        output_dir: str = "devto_data",
        checkpoint_file: str = "checkpoint.json",
        max_retries: int = 5,
        retry_delay: int = 5,
        max_backoff_delay: int = 60,
        per_page: int = 30,
        rate_limit_pause: float = 1.0,
        use_proxies: bool = False,
        proxy_list: Optional[List[str]] = None,
        rotate_user_agent: bool = True
    ):
        """
        Initialize the Dev.to scraper with configuration parameters.

        Args:
            api_key: Optional API key for authenticated requests
            output_dir: Directory to store output files
            checkpoint_file: File to store checkpoint data
            max_retries: Maximum number of retries for failed requests
            retry_delay: Initial delay between retries (seconds)
            max_backoff_delay: Maximum backoff delay (seconds)
            per_page: Number of articles per page (1-1000)
            rate_limit_pause: Time to pause between requests (seconds)
            use_proxies: Whether to use proxy rotation
            proxy_list: List of proxy URLs to use
            rotate_user_agent: Whether to rotate user agents
        """
        self.api_key = api_key or os.environ.get("DEVTO_API_KEY")
        self.output_dir = Path(output_dir)
        self.checkpoint_file = self.output_dir / checkpoint_file
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.max_backoff_delay = max_backoff_delay
        self.per_page = min(max(1, per_page), 1000)  # Ensure within valid range
        self.rate_limit_pause = rate_limit_pause
        self.use_proxies = use_proxies
        self.proxy_list = proxy_list or []
        self.rotate_user_agent = rotate_user_agent

        # Create user agent rotator if enabled
        self.user_agent = UserAgent() if rotate_user_agent else None

        # Create output directory if it doesn't exist
        self.output_dir.mkdir(parents=True, exist_ok=True)

        # Initialize checkpoint data
        self.checkpoint_data = self._load_checkpoint()

        # Initialize results container
        self.results = []

        logger.info(f"DevToScraper initialized with output dir: {self.output_dir}")

    def _load_checkpoint(self) -> Dict[str, Any]:
        """Load checkpoint data from file if it exists."""
        if self.checkpoint_file.exists():
            try:
                with open(self.checkpoint_file, 'r') as f:
                    checkpoint = json.load(f)
                    logger.info(f"Loaded checkpoint: {checkpoint}")
                    return checkpoint
            except (json.JSONDecodeError, IOError) as e:
                logger.error(f"Error loading checkpoint: {e}")

        # Default checkpoint data
        return {
            "last_completed_page": 0,
            "query_params": {},
            "last_article_id": None,
            "total_articles": 0,
            "last_run": None
        }

    def _save_checkpoint(self, page: int, query_params: Dict[str, Any], last_article_id: Optional[int] = None) -> None:
        """Save current state to checkpoint file."""
        self.checkpoint_data.update({
            "last_completed_page": page,
            "query_params": query_params,
            "last_article_id": last_article_id,
            "total_articles": len(self.results),
            "last_run": datetime.datetime.now().isoformat()
        })

        try:
            with open(self.checkpoint_file, 'w') as f:
                json.dump(self.checkpoint_data, f, indent=2)
            logger.debug(f"Saved checkpoint: page {page}, articles: {len(self.results)}")
        except IOError as e:
            logger.error(f"Error saving checkpoint: {e}")

    def _get_headers(self) -> Dict[str, str]:
        """Generate request headers, including API key if available."""
        headers = {
            "Accept": "application/json",
            "User-Agent": self.user_agent.random if self.rotate_user_agent else "DevToScraper/1.0"
        }

        if self.api_key:
            headers["api-key"] = self.api_key

        return headers

    def _get_proxy(self) -> Optional[Dict[str, str]]:
        """Get random proxy from the proxy list if enabled."""
        if not self.use_proxies or not self.proxy_list:
            return None

        proxy = random.choice(self.proxy_list)
        return {"http": proxy, "https": proxy}

    def _make_request(self, url: str, params: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        Make request to Dev.to API with retry logic, backoff, and error handling.

        Args:
            url: API endpoint URL
            params: Query parameters for the request

        Returns:
            List of article data dictionaries
        """
        retry_count = 0
        backoff_time = self.retry_delay

        while retry_count <= self.max_retries:
            try:
                # Apply rate limiting
                time.sleep(self.rate_limit_pause)

                # Get headers and proxy
                headers = self._get_headers()
                proxies = self._get_proxy()

                logger.debug(f"Making request to {url} with params: {params}")
                response = requests.get(
                    url,
                    params=params,
                    headers=headers,
                    proxies=proxies,
                    timeout=30
                )

                # Check if rate limited (429)
                if response.status_code == 429:
                    retry_after = int(response.headers.get('Retry-After', backoff_time))
                    logger.warning(f"Rate limited. Waiting for {retry_after} seconds.")
                    time.sleep(retry_after)
                    retry_count += 1
                    continue

                # Raise exception for other HTTP errors
                response.raise_for_status()

                # Parse response data
                data = response.json()

                # If successful, return the data
                return data

            except (RequestException, HTTPError, ConnectionError, Timeout) as e:
                retry_count += 1
                logger.warning(f"Request failed (attempt {retry_count}/{self.max_retries}): {str(e)}")

                if retry_count <= self.max_retries:
                    # Implement exponential backoff
                    sleep_time = min(backoff_time, self.max_backoff_delay)
                    logger.info(f"Retrying in {sleep_time} seconds...")
                    time.sleep(sleep_time)
                    backoff_time *= 2  # Exponential backoff
                else:
                    logger.error(f"Max retries reached. Last error: {str(e)}")
                    raise
            except json.JSONDecodeError:
                logger.error(f"Invalid JSON response. Status code: {response.status_code}")
                raise

        return []

    def _file_hash_exists(self, data_hash: str) -> bool:
      """
      Check if a file with the given content hash already exists.

      Args:
          data_hash: Hash of the data content

      Returns:
          True if a file with this hash exists, False otherwise
      """
      hash_file = self.output_dir / "file_hashes.json"

      # Create hash file if it doesn't exist
      if not hash_file.exists():
          with open(hash_file, 'w') as f:
              json.dump({"hashes": []}, f)
          return False

      try:
          with open(hash_file, 'r') as f:
              hash_data = json.load(f)
              return data_hash in hash_data.get("hashes", [])
      except (json.JSONDecodeError, IOError):
          return False

    def _add_file_hash(self, data_hash: str) -> None:
        """
        Add a file hash to the tracking file.

        Args:
            data_hash: Hash of the data content
        """
        hash_file = self.output_dir / "file_hashes.json"

        try:
            # Read existing hashes
            if hash_file.exists():
                with open(hash_file, 'r') as f:
                    hash_data = json.load(f)
            else:
                hash_data = {"hashes": []}

            # Add new hash
            if data_hash not in hash_data["hashes"]:
                hash_data["hashes"].append(data_hash)

            # Write updated hashes
            with open(hash_file, 'w') as f:
                json.dump(hash_data, f)
        except (json.JSONDecodeError, IOError) as e:
            logger.error(f"Error updating file hash tracking: {e}")

    def _compute_data_hash(self, data: List[Dict[str, Any]]) -> str:
        """
        Compute a hash of the dataset content.
        Uses the first 10 article IDs and the total count as a simplified hash.

        Args:
            data: List of article data

        Returns:
            String hash representing the data content
        """
        if not data:
            return "empty_dataset"

        # Get total count and first 10 article IDs (sorted for consistency)
        article_ids = sorted([article.get('id') for article in data if article.get('id')])
        sample_ids = article_ids[:10] if len(article_ids) > 10 else article_ids

        # Create a hash string from sample IDs and total count
        hash_components = [str(id) for id in sample_ids] + [str(len(data))]
        hash_string = "_".join(hash_components)

        return hash_string

    def _should_save_dataset(self, data: List[Dict[str, Any]]) -> bool:
        """
        Determine if the current dataset should be saved by comparing it to the dataset registry.

        Args:
            data: List of article data

        Returns:
            True if the dataset should be saved, False if it's a duplicate
        """
        if not data:
            return False

        # Create a fingerprint of the dataset
        article_ids = sorted([article.get('id') for article in data if article.get('id')])
        dataset_fingerprint = {
            "article_count": len(data),
            "first_10_ids": article_ids[:10] if len(article_ids) >= 10 else article_ids,
            "last_10_ids": article_ids[-10:] if len(article_ids) >= 10 else article_ids,
            "timestamp": datetime.datetime.now().isoformat()
        }

        # Path to the dataset registry file
        registry_path = self.output_dir / "dataset_registry.json"

        # Load existing registry
        if registry_path.exists():
            try:
                with open(registry_path, 'rb') as f:
                    registry = orjson.loads(f.read())
            except (orjson.JSONDecodeError, IOError) as e:
                logger.error(f"Error loading dataset registry: {e}")
                registry = {"datasets": []}
        else:
            registry = {"datasets": []}

        # Check if this dataset is already in the registry
        for existing_dataset in registry["datasets"]:
            if (
                existing_dataset.get("article_count") == dataset_fingerprint["article_count"] and
                existing_dataset.get("first_10_ids") == dataset_fingerprint["first_10_ids"] and
                existing_dataset.get("last_10_ids") == dataset_fingerprint["last_10_ids"]
            ):
                logger.info(f"Duplicate dataset detected. Original saved at: {existing_dataset.get('timestamp')}")
                return False

        # If we get here, this is a new dataset - add it to the registry
        registry["datasets"].append(dataset_fingerprint)

        # Save updated registry
        try:
            with open(registry_path, 'wb') as f:
                f.write(orjson.dumps(registry, option=orjson.OPT_INDENT_2))
        except IOError as e:
            logger.error(f"Error saving dataset registry: {e}")

        return True


    def _save_to_csv(self, filename: str) -> bool:
        """
        Save results to CSV file if content is new.

        Returns:
            True if file was saved, False if skipped (duplicate)
        """
        if not self.results:
            logger.warning("No results to save to CSV")
            return False

        # Check if we already have a file with this content
        if not self._should_save_dataset(self.results):
            logger.info(f"Skipping CSV save: Dataset with same content already exists")
            return False

        try:
            df = pd.DataFrame(self.results)
            csv_path = self.output_dir / filename
            df.to_csv(csv_path, index=False)
            logger.info(f"Saved {len(df)} articles to CSV: {csv_path}")
            return True
        except Exception as e:
            logger.error(f"Error saving to CSV: {e}")
            return False

    def _save_to_parquet(self, filename: str) -> bool:
        """
        Save results to Parquet file if content is new.

        Returns:
            True if file was saved, False if skipped (duplicate)
        """
        if not self.results:
            logger.warning("No results to save to Parquet")
            return False

        try:
            df = pd.DataFrame(self.results)
            table = pa.Table.from_pandas(df)
            parquet_path = self.output_dir / filename
            pq.write_table(table, parquet_path)
            logger.info(f"Saved {len(df)} articles to Parquet: {parquet_path}")
            return True
        except Exception as e:
            logger.error(f"Error saving to Parquet: {e}")
            return False

    def scrape(self,
              page: Optional[int] = None,
              per_page: Optional[int] = None,
              tag: Optional[str] = None,
              tags: Optional[str] = None,
              tags_exclude: Optional[str] = None,
              username: Optional[str] = None,
              state: Optional[str] = None,
              top: Optional[int] = None,
              collection_id: Optional[int] = None,
              max_pages: Optional[int] = None,
              resume_from_checkpoint: bool = True) -> List[Dict[str, Any]]:
          """
          Scrape articles from Dev.to API using the provided parameters.

          Args:
              page: Page number to start from
              per_page: Number of results per page (1-1000)
              tag: Filter by tag name
              tags: Filter by multiple tags (comma separated)
              tags_exclude: Exclude specific tags (comma separated)
              username: Filter by username
              state: Filter by state (fresh/rising/all)
              top: Filter by top articles from last N days (integer)
              collection_id: Filter by collection ID
              max_pages: Maximum number of pages to scrape (None for all available)
              resume_from_checkpoint: Whether to resume from last checkpoint

          Returns:
              List of article data
          """
          # Build query parameters
          query_params = {}
          if per_page is not None:
              query_params["per_page"] = min(max(1, per_page), 1000)
          else:
              query_params["per_page"] = self.per_page

          if tag:
              query_params["tag"] = tag
          if tags:
              query_params["tags"] = tags
          if tags_exclude:
              query_params["tags_exclude"] = tags_exclude
          if username:
              query_params["username"] = username
          if state:
              query_params["state"] = state
          if top:
              query_params["top"] = top
          if collection_id:
              query_params["collection_id"] = collection_id

          # Determine starting page
          current_page = 1
          if resume_from_checkpoint and self.checkpoint_data["last_completed_page"] > 0:
              # Check if we're continuing with the same query
              if self.checkpoint_data["query_params"] == query_params:
                  current_page = self.checkpoint_data["last_completed_page"] + 1
                  logger.info(f"Resuming from checkpoint at page {current_page}")
              else:
                  logger.info("Query parameters changed, starting from page 1")

          if page is not None:
              current_page = max(1, page)

          logger.info(f"Starting scrape from page {current_page} with params: {query_params}")

          # Clear results if not resuming or if query changed
          if not resume_from_checkpoint or self.checkpoint_data["query_params"] != query_params:
              self.results = []

          # Load existing articles if resuming with same query
          existing_article_ids = set()
          if resume_from_checkpoint and self.results:
              existing_article_ids = {article['id'] for article in self.results if 'id' in article}

          # Track if any new data was added during this scrape operation
          new_articles_count = 0

          try:
              keep_scraping = True
              while keep_scraping:
                  # Update page parameter
                  page_params = {**query_params, "page": current_page}

                  # Make API request
                  logger.info(f"Fetching page {current_page}...")
                  articles = self._make_request(self.BASE_URL, page_params)

                  # Check if we received valid data
                  if not articles:
                      logger.info(f"No more articles found at page {current_page}")
                      break

                  # Process articles
                  page_new_articles = 0
                  for article in articles:
                      article_id = article.get('id')

                      # Skip duplicates
                      if article_id in existing_article_ids:
                          continue

                      self.results.append(article)
                      existing_article_ids.add(article_id)
                      page_new_articles += 1
                      new_articles_count += 1

                  logger.info(f"Page {current_page}: Added {page_new_articles} new articles. Total: {len(self.results)}")

                  # Save checkpoint after each page
                  last_article_id = articles[-1].get('id') if articles else None
                  self._save_checkpoint(current_page, query_params, last_article_id)

                  # Stop conditions
                  if len(articles) < query_params.get("per_page", self.per_page):
                      logger.info(f"Received fewer articles than requested. Reached end.")
                      keep_scraping = False

                  if max_pages is not None and current_page >= max_pages:
                      logger.info(f"Reached maximum page limit: {max_pages}")
                      keep_scraping = False

                  # Move to next page
                  current_page += 1

          except Exception as e:
              logger.error(f"Error during scraping: {e}")
              # Save what we have so far only if we have new data
              if new_articles_count > 0:
                  self._save_to_csv("devto_articles_partial.csv")
                  self._save_to_parquet("devto_articles_partial.parquet")
              raise

          # Save final results
          timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")

          # Only save if the dataset is new
          csv_saved = self._save_to_csv(f"devto_articles_{timestamp}.csv")

          # Only save parquet if CSV was saved (meaning the content was new)
          if csv_saved:
              self._save_to_parquet(f"devto_articles_{timestamp}.parquet")

          logger.info(f"Scraping completed. Total articles: {len(self.results)}, New articles: {new_articles_count}")
          return self.results

    def search_by_tags(self, tags_list: List[str], **kwargs) -> List[Dict[str, Any]]:
        """
        Search articles by multiple tags, processing each tag individually.

        Args:
            tags_list: List of tags to search for
            **kwargs: Additional parameters to pass to the scrape method

        Returns:
            Combined list of articles from all tags
        """
        all_results = []
        existing_article_ids = set()

        for tag in tqdm(tags_list, desc="Processing tags", unit="tag"):
            logger.info(f"Searching for tag: {tag}")

            # Store the current results
            temp_results = self.results
            self.results = []

            # Scrape for this specific tag
            tag_results = self.scrape(tag=tag, resume_from_checkpoint=False, **kwargs)

            # Filter out duplicates and add to all_results
            for article in tag_results:
                article_id = article.get('id')
                if article_id not in existing_article_ids:
                    all_results.append(article)
                    existing_article_ids.add(article_id)

            # Restore original results
            self.results = temp_results

            logger.info(f"Found {len(tag_results)} articles for tag '{tag}'. "
                       f"Total unique articles: {len(all_results)}")

        return all_results

    def get_popular_tags(self, limit: int = 20) -> List[str]:
        """
        Get the most popular tags from Dev.to.

        Args:
            limit: Maximum number of tags to return

        Returns:
            List of popular tag names
        """
        url = "https://dev.to/api/tags"
        params = {"per_page": limit}

        try:
            tags_data = self._make_request(url, params)
            return [tag.get('name') for tag in tags_data if tag.get('name')]
        except Exception as e:
            logger.error(f"Error fetching popular tags: {e}")
            return []

    def scrape_comprehensive_dataset(self,
                                     max_tags: int = 50,
                                     articles_per_tag: int = 100,
                                     **kwargs) -> List[Dict[str, Any]]:
        """
        Build a comprehensive dataset by scraping multiple popular tags.

        Args:
            max_tags: Maximum number of tags to scrape
            articles_per_tag: Maximum articles to scrape per tag
            **kwargs: Additional parameters to pass to the scrape method

        Returns:
            Combined list of articles from all sources
        """
        # Get popular tags
        popular_tags = self.get_popular_tags(limit=max_tags)
        logger.info(f"Found {len(popular_tags)} popular tags to scrape")

        # Search by each tag
        per_page = kwargs.get('per_page', self.per_page)
        max_pages = (articles_per_tag + per_page - 1) // per_page  # Ceiling division

        all_articles = self.search_by_tags(
            popular_tags,
            max_pages=max_pages,
            **kwargs
        )

        # Save comprehensive dataset
        self.results = all_articles
        timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        self._save_to_csv(f"devto_comprehensive_{timestamp}.csv")
        self._save_to_parquet(f"devto_comprehensive_{timestamp}.parquet")

        logger.info(f"Comprehensive dataset created with {len(all_articles)} unique articles")
        return all_articles


def main():
    """Main function to demonstrate the DevToScraper functionality."""
    TOKEN = "PJ4Cz4okTiWh8QBeqppMrrrK"
    os.environ["DEVTO_API_KEY"] = TOKEN
    # Configuration
    config = {
        # API Authentication (optional)
        "api_key": os.environ.get("DEVTO_API_KEY", ""),  # Set your API key in env var

        # Output settings
        "output_dir": OUT_DIR,
        "checkpoint_file": "checkpoint.json",

        # Performance settings
        "max_retries": 5,
        "retry_delay": 3,
        "max_backoff_delay": 60,
        "per_page": 1000,  # Articles per page (max 1000)
        "rate_limit_pause": 1.0,  # Seconds between requests

        # Proxy & User-Agent settings
        "use_proxies": False,  # Set to True to enable proxies
        "proxy_list": [
            # Add your proxies here, e.g.:
            # "http://user:pass@proxy1.example.com:8080",
            # "http://user:pass@proxy2.example.com:8080",
        ],
        "rotate_user_agent": True,  # Rotate user agents to avoid detection
    }

    # Example proxy configuration (uncomment and modify as needed)
    """
    config["use_proxies"] = True
    config["proxy_list"] = [
        "http://user:pass@proxy1.example.com:8080",
        "http://user:pass@proxy2.example.com:8080",
    ]
    """

    try:
        # Initialize scraper with config
        scraper = DevToScraper(**config)

        # --- USAGE EXAMPLES ---

        # Example 1: Simple scraping with pagination
        # scraper.scrape(
        #     per_page=50,
        #     max_pages=10,
        #     state="rising"
        # )

        # Example 2: Scrape articles by specific tag
        # scraper.scrape(
        #     tag="python",
        #     per_page=50,
        #     max_pages=5
        # )

        # Example 3: Scrape articles by specific username
        # scraper.scrape(
        #     username="ben",
        #     per_page=50
        # )

        # Example 4: Build comprehensive dataset across popular tags
        scraper.scrape_comprehensive_dataset(
            max_tags=5000,
            articles_per_tag=1000,
            per_page=5000
        )

        logger.info(f"Scraping completed successfully. "
                   f"Data saved to {config['output_dir']} directory.")

    except Exception as e:
        logger.error(f"Error in main execution: {e}")
        raise


if __name__ == "__main__":
    main()



In [19]:
import os
import pandas as pd
from tqdm.auto import tqdm

def concatenate_csv_files(directory: str, unique_only_values: bool = False) -> pd.DataFrame:
    """
    Concatenates all .csv files in the specified directory.

    Parameters:
        directory (str): The path to the directory containing .csv files.
        unique_only_values (bool): If True, removes duplicate rows based on all columns.

    Returns:
        pd.DataFrame: A DataFrame containing concatenated data from all .csv files.
    """
    all_dfs = []

    for file in tqdm(os.listdir(directory), desc="Processing csv files", unit="file"):
        if file.endswith(".csv"):
            file_path = os.path.join(directory, file)
            df = pd.read_csv(file_path)
            all_dfs.append(df)

    if not all_dfs:
        raise ValueError("No CSV files found in the directory.")

    concatenated_df = pd.concat(all_dfs, ignore_index=True)

    if unique_only_values:
        concatenated_df = concatenated_df.drop_duplicates()

    return concatenated_df


In [20]:
path = OUT_DIR
df = concatenate_csv_files(path)
print("Shape: ",df.shape)
udf = df.drop_duplicates()
print("Unique Shape: ",udf.shape)

Processing csv files:   0%|          | 0/1832 [00:00<?, ?file/s]

Shape:  (747617, 29)


In [34]:
udf.columns

Index(['type_of', 'id', 'title', 'description', 'readable_publish_date',
       'slug', 'path', 'url', 'comments_count', 'public_reactions_count',
       'collection_id', 'published_timestamp', 'language', 'subforem_id',
       'positive_reactions_count', 'cover_image', 'social_image',
       'canonical_url', 'created_at', 'edited_at', 'crossposted_at',
       'published_at', 'last_comment_at', 'reading_time_minutes', 'tag_list',
       'tags', 'user', 'organization', 'flare_tag'],
      dtype='object')

In [31]:
udf.to_parquet(PARQUET_PATH)

In [32]:
size = get_file_size_mb(PARQUET_PATH)
print(f"File size: {size:.2f} MB")

File size: 143.21 MB


In [33]:
# Upload using the `datasets` library
from datasets import load_dataset

dataset = load_dataset("parquet", data_files=PARQUET_PATH)
dataset.push_to_hub(MY_DATASET_NAME)

Generating train split: 0 examples [00:00, ? examples/s]

Uploading the dataset shards:   0%|          | 0/2 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/154 [00:00<?, ?ba/s]

Creating parquet from Arrow format:   0%|          | 0/154 [00:00<?, ?ba/s]

README.md:   0%|          | 0.00/24.0 [00:00<?, ?B/s]

CommitInfo(commit_url='https://huggingface.co/datasets/Alaamer/devto_articles/commit/b5687884faa4d515cd10d5f5c4f0ab3c458f4495', commit_message='Upload dataset', commit_description='', oid='b5687884faa4d515cd10d5f5c4f0ab3c458f4495', pr_url=None, repo_url=RepoUrl('https://huggingface.co/datasets/Alaamer/devto_articles', endpoint='https://huggingface.co', repo_type='dataset', repo_id='Alaamer/devto_articles'), pr_revision=None, pr_num=None)