# Web scraper for tasks

* extract infos from web pages 

In [None]:

async def scrap_page (url, index, site):
    # if url is not set, return
    if not url:
        return
    try:
        from playwright.async_api import async_playwright
        playwright = await async_playwright().start()
        browser = await playwright.chromium.launch()
        page = await browser.new_page()

        print(f"Scraping {url} ...")
        await page.goto(url)

        # wait for page to load
        await page.wait_for_load_state('networkidle')
         
        # scrap selector
        if 'selector' in site:
            selector = site['selector']
            # get all elements
            elements = await page.query_selector_all(selector)

            # print number of results
            print(f"Found {len(elements)} results")

            # print all elements text
            for element in elements:
                found = {}
                # select title in element
                select_title = site['select_title'] if 'select_title' in site else None
                if select_title:
                    title = await element.query_selector(select_title)
                    title_text = await title.inner_text()
                    found['title'] = title_text
                    # print(title_text)
                # select link in element
                select_link = site['select_link'] if 'select_link' in site else None
                if select_link:
                    link = await element.query_selector(select_link)
                    link_href = await link.get_attribute('href')
                    found['link'] = link_href

                    # if link is relative, add base url
                    if link_href.startswith('/'):
                        base_url = site['base_url'] if 'base_url' in site else None
                        if base_url:
                            # remove query string from link_href
                            if '?' in link_href:
                                link_href = link_href.split('?')[0]
                            link_href = f"{base_url}{link_href}"
                            found['link_full'] = link_href

                            # compute md5 hash of link_href
                            import hashlib
                            found['link_hash'] = hashlib.md5(link_href.encode()).hexdigest()

                    # print(link_href)
                # select time in element
                select_time = site['select_time'] if 'select_time' in site else None
                if select_time:
                    time = await element.query_selector(select_time)
                    time_text = await time.get_attribute('datetime')
                    found['time'] = time_text
                    # print(time_text)
                # add found to site
                print(found)
                if 'results' not in site:
                    site['results'] = []

                site['results'].append(found)          


        # save screenshot
        # get name of site
        name = site['name'] if 'name' in site else 'unknown'
        output = site['output'] if 'output' in site else '.'
        target_file = f"{output}/screenshot-{name}-{index}.png"
        await page.screenshot(path=target_file, full_page=True)
        await browser.close()
        await playwright.stop()
    except Exception as e:
        print(e)
        
    print(site)
    
    # optional: display screenshot
    show_image = site['show_image'] if 'show_image' in site else False
    if show_image:
        # if file exists, display it
        import os.path
        if os.path.isfile(target_file):
            from IPython.display import Image
            img = Image(filename=target_file)
            display(img)
    

In [None]:
def create_db_scraps (site):
    db_path = site['sqlite'] if 'sqlite' in site else '../my-data/db-scraps.sqlite'

    # check if the file exists
    import os.path
    # WARNING: the path is relative to the current directory
    if os.path.isfile(db_path):
        print(f"The DB file exists: {db_path}")
        return
    
    # create a sqlite database
    import sqlite3
    conn = sqlite3.connect(db_path)
    c = conn.cursor()

    # create a table with columns id, title, url, date, source is not exists
    c.execute('CREATE TABLE IF NOT EXISTS news (id INTEGER PRIMARY KEY, md5 TEXT, tag TEXT, title TEXT, url TEXT, date TEXT)')
    c.execute('CREATE UNIQUE INDEX IF NOT EXISTS idx_md5 ON news (md5)')

    # commit the changes
    conn.commit()

    # close the connection
    conn.close()

In [None]:
def update_db_scraps (site):
    create_db_scraps(site)
    
    results = site['results'] if 'results' in site else []
    db_path = site['sqlite'] if 'sqlite' in site else '../my-data/db-scraps.sqlite'

    # insert into the database table news
    import sqlite3
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    # for each result
    nb_dup = 0
    for result in results:
        # check if the md5 hash is already in the database
        md5 = result['link_hash'] if 'link_hash' in result else None
        if md5:
            c.execute('SELECT * FROM news WHERE md5=?', (md5,))
            row = c.fetchone()
            if row:
                print(f"Already in database: {row}")
                nb_dup += 1
                continue
        # insert into the database
        c.execute('INSERT INTO news (md5, tag, title, url, date) VALUES (?, ?, ?, ?, ?)', (md5, site['name'], result['title'], result['link_full'], result['time']))

    # commit the changes
    conn.commit()
    # close the connection
    conn.close()

    print(f"Inserted {len(results) - nb_dup} new results in the database")
    print(f"Skipped {nb_dup} duplicates")


In [None]:
async def scrap_site (site):
    url = site['url'] if 'url' in site else None
    # if url is not set, return
    if not url:
        return
    
    page_start = site['page_start'] if 'page_start' in site else 1
    page_end = site['page_end'] if 'page_end' in site else 1

    for page_index in range(page_start, page_end + 1):
        print(f"Page {page_index}")
        await scrap_page(f"{url}{page_index}", page_index, site)
        # maybe stop if max results is reached

    # save results
    # path_results = f"{path_srap}/results-YYMMDD-HHIISS.json"
    import datetime
    now = datetime.datetime.now()

    path_scrap = site['output'] if 'output' in site else '.'
    path_results = f"{path_scrap}/results-{now.strftime('%Y%m%d-%H%M%S')}.json"
    import json
    with open(path_results, 'w') as outfile:
        json.dump(site, outfile, indent=4)

    update_db_scraps (site)



In [None]:
# load json config my-config.json
config = {}
import json
with open('my-config.json') as json_file:
    config = json.load(json_file)
    # print(config)
    scraps = config['scraps'] if 'scraps' in config else []
    # print(scraps)
    # select random scrap
    import random
    site = random.choice(scraps)
    print(site)
    await scrap_site(site)