In [5]:
import os
import time
import sqlite3
import logging
from datetime import timedelta

import requests
from bs4 import BeautifulSoup
import datetime
import pandas as pd

In [1]:
# set the logging level
# logging.basicConfig(level=logging.DEBUG)


class PublicationScraper:
    """
    A simple scraper that connects to the publication's archive
    and collect all the articles published this month
    """

    def __init__(self, publication: str, rollback=3, **kwargs):
        self.publication_url = publication
        self.archive_url = os.path.join(self.publication_url, "archive")
        self.current_date = datetime.date.today()

        self.target_urls = []
        self.target_dates = []
        self.pages = []

        self.data = {}

        self.check_archive()
        if not self.archive_available:
            raise NotImplementedError("Scraping from the profile page (without archive) is not supported.")

        if rollback:
            for m in range(1, rollback + 2):
                date = self.current_date.replace(day=1) - timedelta(1 * 30 * m)
                #
                month = str(date.month)
                month = f"0{month}" if len(month) == 1 else str(month)
                #
                target_url = os.path.join(self.archive_url, str(date.year), month)

                self.target_dates.append(date)
                self.target_urls.append(target_url)

        for date, url in zip(self.target_dates, self.target_urls):
            logging.info(f"Constructed the target url: {url}")
            req = requests.get(url)
            logging.info(f"Sent the GET request, status code: {req.status_code}")

            if req.url == os.path.join(self.archive_url, str(date.year)):
                raise NotImplementedError(
                    "Scraping just from the year is not implemented yet. (URL: %s)" % (self.publication_url)
                )

            page = BeautifulSoup(req.content, "html.parser")
            logging.info("Initialized the beautifulsoup")
            self.pages.append(page)

    def check_archive(self):
        r = requests.get(self.archive_url)
        if "PAGE NOT FOUND" in r.text and "404" in r.text:
            self.archive_available = False
        else:
            self.archive_available = True

    def scrape(self):
        for page, date, url in zip(self.pages, self.target_dates, self.target_urls):
            print("current date:", date)
            print("current url:", url)
            posts = self.get_posts(page)
            data = self.get_data(posts, url)
            self.data[str(date)] = data
        return self.data

    def scrape_profile(self):
        """
        It should be noted that the Medium sends a POST request to the -author_medium_url-/_/batch with some authentication/bot analysis
        headers. So it's highly possible to reverse-engineer the API and imitate a real client to collect the data.
        """
        raise NotImplementedError("Not implemented yet.")

    def get_posts(self, page) -> list[str]:
        posts = page.find_all("div", {"streamItem--postPreview"})
        return posts

    def get_data(self, posts, url) -> list[dict]:
        samples = []
        for idx, post in enumerate(posts):
            try:
                author = post.find("div", {"class": "postMetaInline-authorLockup"}).find("a").text
                uicaption = post.find("div", {"class": "ui-caption"})
                date = uicaption.find("a").find("time")["datetime"]
                reading_time = uicaption.find("span", {"class": "readingTime"})["title"]

                article_content = post.find("div", {"class": "postArticle-content"})
                title = post.select("h3.graf--title")[0].text
                post_url = article_content.parent["href"]
                preview_image_url = article_content.find("figure").find("img")["src"]
                claps = post.find("div", {"class": "multirecommend"}).find_all("span")[-1].text

                sample = {
                    "author": author,
                    "date": date,
                    "reading_time": reading_time,
                    "post_url": post_url,
                    "title": title,
                    "preview_image_url": preview_image_url,
                    "claps": claps,
                }

                sample = self.post_process(sample)

                samples.append(sample)
            except Exception as e:
                random_text = str(time.time()).replace(".", "")
                debugfile_name = f"debug_{random_text}.html"
                with open(debugfile_name, "w") as f:
                    f.write(post.prettify())
                print("An error occurred for the URL: %s | post saved as %s" % (url, debugfile_name))

        return samples

    def post_process(self, sample: dict):
        sample["date"] = sample["date"].split("T")[0]
        sample["reading_time"] = " ".join(sample["reading_time"].split()[:-1])
        sample["post_url"] = sample["post_url"].split("?source=collection_archive")[0]
        return sample


scraper = PublicationScraper("https://towardsdatascience.com/", rollback=3)
data = scraper.scrape()

current date: 2023-03-02
current url: https://towardsdatascience.com/archive\2023\03
current date: 2023-01-31
current url: https://towardsdatascience.com/archive\2023\01
current date: 2023-01-01
current url: https://towardsdatascience.com/archive\2023\01
current date: 2022-12-02
current url: https://towardsdatascience.com/archive\2022\12


In [2]:
class MediumScraper:
    def __init__(self, publications, suppress=False, **publication_scraper_kwargs):
        self.publications = publications
        self.archives = list(map(lambda x: os.path.join(x, "archive"), publications))
        self.scrapers = []
        self.data = {}
        for publication in publications:
            if suppress:
                try:
                    scraper = PublicationScraper(publication, **publication_scraper_kwargs)
                except Exception as e:
                    print("An error occurred:", e)
                    print("Removing the publication: %s" % (publication))
                    self.publications.remove(publication)
            else:
                scraper = PublicationScraper(publication, **publication_scraper_kwargs)

            self.scrapers.append((publication, scraper))

        self.data = {p: [] for p, s in self.scrapers}

    def scrape(self):
        for pub, scraper in self.scrapers:
            data = scraper.scrape()
            self.data[pub] = data

In [8]:
publications = [
    "https://towardsdatascience.com/",
    "https://medium.com/swlh",
    "https://humanparts.medium.com/",
    "https://medium.com/geekculture",
    "https://levelup.gitconnected.com/",
    "https://python.plainenglish.io/",
    "https://entrepreneurshandbook.co/",
]
crawler = MediumScraper(publications, suppress=True)

In [9]:
con = sqlite3.connect("medium.db")
cur = con.cursor()

cur.execute(
    "create table articles(publication_url, author, date, reading_time, post_url, title, preview_image_url, claps)"
)

# sanity check
r = cur.execute("select name from sqlite_master")
r.fetchone()

OperationalError: table articles already exists

In [None]:
for k, v in crawler.data.items():
    print("Key: %s" % k)
    for vk, vv in v.items():
        print("Date: %s" % vk, end="")
        print("\t %d" % len(vv))

Key: https://towardsdatascience.com/
Date: 2022-12-02	 9
Date: 2022-11-02	 8
Date: 2022-10-03	 10
Date: 2022-09-03	 8
Key: https://medium.com/swlh
Date: 2022-12-02	 10
Date: 2022-11-02	 10
Date: 2022-10-03	 10
Date: 2022-09-03	 10
Key: https://humanparts.medium.com/
Date: 2022-12-02	 3
Date: 2022-11-02	 4
Date: 2022-10-03	 6
Date: 2022-09-03	 3
Key: https://medium.com/geekculture
Date: 2022-12-02	 10
Date: 2022-11-02	 10
Date: 2022-10-03	 10
Date: 2022-09-03	 10
Key: https://levelup.gitconnected.com/
Date: 2022-12-02	 10
Date: 2022-11-02	 10
Date: 2022-10-03	 10
Date: 2022-09-03	 10
Key: https://python.plainenglish.io/
Date: 2022-12-02	 10
Date: 2022-11-02	 10
Date: 2022-10-03	 10
Date: 2022-09-03	 10
Key: https://entrepreneurshandbook.co/
Date: 2022-12-02	 10
Date: 2022-11-02	 10
Date: 2022-10-03	 10
Date: 2022-09-03	 10


In [7]:
commit_data = []

for k, v in crawler.data.items():
    for kk, vv in v.items():
        for post in vv:
            entry = [k] + list(map(lambda x: x[1], post.items()))
            commit_data.append(entry)

AttributeError: 'list' object has no attribute 'items'

In [54]:
for entry in commit_data:
    text_entry = "'" + "','".join(entry) + "'"
    cur.execute(f"INSERT INTO articles VALUES ({text_entry})")

In [59]:
df = pd.read_sql_query("SELECT * from articles", con)
df.to_csv("dataset.csv", index=False)