In [13]:
# STANDARD
from pprint import pprint

# PyPI
from bs4 import BeautifulSoup as BS
import requests
from tqdm import tqdm

# LOCAL
from interface_db import db_interface_sqlite as data

In [14]:
with data.database() as db:
    db.create_tables()
    websites   = db.execute('SELECT name FROM sqlite_master WHERE type=\'table\' AND name=\'websites\'')
    categories = db.execute('SELECT name FROM sqlite_master WHERE type=\'table\' AND name=\'categories\'')
    blogs      = db.execute('SELECT name FROM sqlite_master WHERE type=\'table\' AND name=\'blogs\'')
    posts      = db.execute('SELECT name FROM sqlite_master WHERE type=\'table\' AND name=\'posts\'')
try:
    assert websites[0][0] == 'websites'
except AssertionError:
    print('ASSERTION ERROR: Error querying \'websites\'')
try:
    assert categories[0][0] == 'categories'
except AssertionError:
    print('ASSERTION ERROR: Error querying \'categories\'')
try:
    assert blogs[0][0] == 'blogs'
except AssertionError:
    print('ASSERTION ERROR: Error querying \'blogs\'')
try:
    assert posts[0][0] == 'posts'
except AssertionError:
    print('ASSERTION ERROR: Error querying \'posts\'')

In [15]:
class insert_results:
    def __init__(self, inserted=0, not_inserted=0, exceptions=0):
        self.inserted = inserted
        self.not_inserted = not_inserted
        self.exceptions = exceptions

In [16]:
def parse_html(url: str) -> str:
    """Process and return the parsed html of any webpage.

    Arguments:
        url (str): webpage address to be processed

    Return
        str: [potentially large] string of parsed html

    """
    response = requests.get(url)
    return BS(response.content, 'html.parser')


def check_url_new(table: str, url: str) -> bool:
    """Check a URL against a database table to see if it already exists.
    
    Arguments:
        url (str): webpage page address to check against database table
        table (str): database table to check against

    Return
        bool: True if url exists in given table; False if it doesn't exist.
    
    """
    with data.database() as db:
        existing_url = db.execute(f'SELECT url FROM {table} WHERE url = \'{url}\'')
        if len(existing_url) == 0:
            return True
        else: 
            return False

In [21]:
def insert_website(website_name: str, website_url: str) -> data.website:
    website = data.website(name = website_name, url = website_url)
    results = insert_results()
    with data.database() as db:
        print(f'Inserting {website_name} - {website_url}')
        result = db.insert_website(website)
        if type(result) == str:
            print(result)
        else:
            print(f'Insert successful. Name: {result.name}, URL: {result.url}')
        #new_website = db.query_websites(website_name)
        #print(f'Inserted or exists: {result.name} - {result.url}')
 #       if check_url_new('categories', category.url) == True:
 #           db.insert_category(category)
 #           results.inserted += 1
 #       else:
 #           # <placeholder for logging>
 #           results.not_inserted += 1
 #   return results



results = insert_website('Patheos Blogs', 'https://www.patheos.com/blogs2')
#print(results.inserted, 
#      results.not_inserted, 
#      results.exceptions)

Inserting Patheos Blogs - https://www.patheos.com/blogs2
Insert successful. Name: Patheos Blogs, URL: https://www.patheos.com/blogs2


In [9]:
with data.database() as db:
    result = db.execute("SELECT * FROM websites")
pprint(result)

[(1,
  'Patheos Blogs',
  'https://www.patheos.com/blogs',
  '2020-04-18',
  'user',
  '2020-04-18',
  'user'),
 (2,
  'Patheos Blogs',
  'https://www.patheos.com/blogs',
  '2020-04-18',
  'user',
  '2020-04-18',
  'user'),
 (3,
  'Patheos Blogs',
  'https://www.patheos.com/blogs',
  '2020-04-18',
  'user',
  '2020-04-18',
  'user'),
 (4,
  'Patheos Blogs',
  'https://www.patheos.com/blogs',
  '2020-04-18',
  'user',
  '2020-04-18',
  'user'),
 (5,
  'Patheos Blogs',
  'https://www.patheos.com/blogs',
  '2020-04-18',
  'user',
  '2020-04-18',
  'user'),
 (6,
  'Patheos Blogs',
  'https://www.patheos.com/blogs',
  '2020-04-18',
  'user',
  '2020-04-18',
  'user'),
 (7,
  'Patheos Blogs2',
  'https://www.patheos.com/blogs2',
  '2020-04-18',
  'user',
  '2020-04-18',
  'user'),
 (8,
  '2',
  'https://www.patheos.com/blogs2',
  '2020-04-18',
  'user',
  '2020-04-18',
  'user')]


In [20]:
def fetch_and_insert_categories(website_name: str) -> insert_results:
    category = data.category()
    results = insert_results()
    with data.database() as db:
        website = db.query_websites(website_name)
        print(f'Pulling categories for {website.name}')
        parsed_html = parse_html(website.url)
        for item in parsed_html.find_all('div', attrs={"class":"related-content clearfix related-content-sm decorated channel-list"}):
            category.url = item.find('a')['href']
            category.name = category.url.rsplit("/")[3]
            category.website_id = website.id
            if check_url_new('categories', category.url) == True:
                db.insert_category(category)
                results.inserted += 1
            else:
                # <placeholder for logging>
                results.not_inserted += 1
    return results



results = fetch_and_insert_categories('Patheos Blogs')
print(results.inserted, 
      results.not_inserted, 
      results.exceptions)



AttributeError: 'NoneType' object has no attribute 'name'

In [6]:
def fetch_and_insert_blogs(category_name: str) -> insert_results:
    blog = data.blog()
    results = insert_results()
    with data.database() as db:
        category = db.query_categories(category_name)
        print(category.url)
        print(f'Pulling blogs for {category.name}')
        parsed_html = parse_html(category.url)
        #pprint(parsed_html)
        for item in parsed_html.find_all('div', attrs={"class":"author-info"}):
            for title_html in item.find_all('div', attrs={"class":"title"}):
                title_html_a = title_html.find('a')
                blog.name = title_html_a.get_text()
                blog.url = title_html_a['href']
            for by_line_html in item.find_all('div', attrs={"class":"by-line"}):
                blog.author = by_line_html.find('a').get_text()
            blog.category_id = category.id
            if check_url_new('blogs', blog.url) == True:
                db.insert_blog(blog)
                results.inserted += 1
            else:
                # <placeholder for logging>
                results.not_inserted += 1
    return results



# results = fetch_and_insert_blogs('evangelical-blogs')
# print(results.inserted, 
#        results.not_inserted, 
#        results.exceptions)

In [None]:
fetch_and_insert_categories()

In [7]:
# OLD SCRAPING FUNCTION NEW FUNCTIONS ARE REFACTORED FROM

def fetch_blogs():
    """[ ] Document for refactoring
    
    """
    url = 'http://www.patheos.com/blogs'
    response = requests.get(url)
    soup = BS(response.content, 'html.parser')

    blog_list = []

    for blog in soup.find_all('div', attrs={"class":"related-content clearfix related-content-sm decorated channel-list"}):
        blog_url1 = blog.find('a')
        blog_url_test = blog.find('a')['href']
        blog_url2 = blog_url1['href']
        blog_list.append(blog_url2) 
    
    print('blog_url2')
    pprint(blog_url_test)
    pprint(blog_url1)
    pprint(blog_url2)
    pprint(blog_list)

    blog_lists = []
    
    for i in blog_list:
        split_url = i.rsplit("/")
        blog_name = split_url[3]
        blog_lists.append(blog_name)
    
    blog_dict = {i:[] for i in blog_lists}

    blog_cat_prefix = "https://www.patheos.com/"

    i = 0    
    for blog_urls in tqdm(blog_dict, desc='Fetch blog urls for each category'):
        if blog_urls:
            query_url = blog_cat_prefix + blog_urls
            #sleep(random.uniform(1, 3))
            subsub_blog = requests.get(query_url)
            soup2 = BS(subsub_blog.content, 'html.parser')
            for blog in soup2.find_all('div', attrs={"class":"author-info"}):
                for blog_url0 in blog.find_all('div', attrs={"class":"title"}):
                    blog_url1 = blog_url0.find('a')
                    blog_url2 = blog_url1['href']
                    blog_dict[blog_urls].append(blog_url2)

            i = i + 1
        else:
            continue
    return blog_dict

In [None]:
results = fetch_blogs()