[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/daviddavo/pocket2omnivore/HEAD?labpath=pocket2omnivore.ipynb)

> This notebook is part of the [How to export your Pocket data and migrate to Omnivore](https://blog.ddavo.me/posts/tutorials/pocket-to-omnivore) tutorial

# 1. Upload, parse, and store the Pocket File. 

First, let upload the `ril_export.html` file generated in https://getpocket.com/export

The html has the following extructure:

- `<h1>` Unread
- `<ul>` with list items of `<a>`. The href is the link to the article, and the anchor text is the title. It also has a `tags` and `time_added` attributes.
- `<h1>` Read
- Another `<ul>` like the one above

We will transform this into a dict of: 
- *read*: Boolean on wether the article has been read
- *time_added*: The time the item was added
- *tags*: An array of strings
- *href*: The url
- *title*: The title of the article

In [None]:
from ipywidgets import FileUpload
from IPython.display import display
from datetime import datetime
from pathlib import Path

upload = FileUpload(accept='.html', multiple=False)

def save_file():
    for v in upload.value:
        content = v['content']
        with open(v['name'], 'wb') as f:
            f.write(bytes(content))

upload.observe(save_file, names='value')

display(upload)

In [None]:
assert Path("ril_export.html").exists(), "Upload the file before continue running"

In [None]:
from bs4 import BeautifulSoup
import os

OMNIVORE_API_URL = "https://api-prod.omnivore.app/api/graphql"
# The API key will have the following format "00000000-0000-0000-0000-000000000000"
OMNIVORE_API_KEY = os.environ.get('OMNIVORE_API_KEY')
SCHEMA_URL = "https://raw.githubusercontent.com/omnivore-app/omnivore/c9fcbe72ddc6f40dd06e7073b8ffe3c1e71bd650/packages/api/src/generated/schema.graphql"
REQUESTS_SLEEP_TIME = 60 # Number of seconds

if not OMNIVORE_API_KEY:
    OMNIVORE_API_KEY=input('Enter your omnivore API key (should have a format similar to 00000000-0000-0000-0000-000000000000)')

In [None]:
with open('ril_export.html', 'r') as f:
    soup = BeautifulSoup(f, 'html.parser')

soup.title

Extract the articles and tags from the HTML doc.

In [None]:
def process_list(h1):
    ul = h1.find_next_sibling('ul')
    print(len(ul), h1.text, 'articles')
    read = h1.text != 'Unread'

    items = []
    for a in ul.findAll('a', href=True):
        items.append({
            'read': read,
            'time_added': datetime.fromtimestamp(int(a['time_added'])),
            'href': a['href'],
            'tags': a['tags'].split(','),
            'title': a.text,
        })

    return items

articles = [item for sublist in [process_list(h1) for h1 in soup.findAll('h1')] for item in sublist]
labels = set([item for sublist in [article['tags'] for article in articles if article['tags'][0] != ''] for item in sublist])

# 2. Store the articles and tags in a SQLLite Database

We want to be able to track our process, as the API for Omnivore has rate limiting, and takes a while to upload the files. For this we will use a SQL Database.

We will store the Articles in the format: 
- *read*: Boolean on wether the article has been read
- *time_added*: The time the item was added
- *tags*: An array of strings
- *href*: The url
- *id*: nullable field that stores the omnivore file.

and the tags with:
- *name*: Name of the tag 
- *id*: if the tag has been stored. 

In [None]:
import sqlite3

conn = sqlite3.connect('omnivore.db')

# Create a cursor object to execute SQL commands
cursor = conn.cursor()

# Create the Article Table 
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
                    id TEXT nullable,
                    read BOOLEAN,
                    time_added TEXT,
                    tags TEXT, 
                    href TEXT PRIMARY KEY 
                )''')

cursor.execute('''CREATE TABLE IF NOT EXISTS tags (
                    id TEXT nullable,
                    name TEXT PRIMARY KEY
                )''')

In [None]:
import json

insert_tag_sql = f"""INSERT OR IGNORE into tags (name) values (?)"""
cursor.executemany(insert_tag_sql, [(label,) for label in labels])

###
#             'read': read,
#             'time_added': datetime.fromtimestamp(int(a['time_added'])),
#             'href': a['href'],
#             'tags': a['tags'].split(','),
#             'title': a.text,
insert_article_sql = f"INSERT OR IGNORE into articles (read, time_added, href, tags) values (?,?,?,?)"
article_values = [(article['read'], article['time_added'].isoformat(), article['href'], json.dumps(article['tags'])) for article in articles]
cursor.executemany(insert_article_sql, article_values)


In [None]:
import requests

with requests.get(SCHEMA_URL) as r:
    r.raise_for_status()
    schema = r.text

    assert schema is not None

print(schema[:100])

In [None]:
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

def create_client():
    transport = AIOHTTPTransport(
       url=OMNIVORE_API_URL,
        headers = {
            'authorization': OMNIVORE_API_KEY,
        }
    )
    return Client(transport=transport, schema=schema, fetch_schema_from_transport=False, execute_timeout=None)

In [None]:
# Doing a "test query" to check if everything is correct

async with create_client() as session: 
    r = session.execute(gql("""
    query Viewer {
        me {
            id
            name
            profile {
                username
            }
        }
    }
    """))

    result = await r
    USERNAME = result['me']['profile']['username']

    print(f"Hello {result['me']['name']} ({USERNAME})!")

In [None]:
def row_to_tag(row):  
   return { 
      "name": row[1],
      "id": row[0]
   }

all_tags = [row_to_tag(row) for row in cursor.execute('select * from tags').fetchall()]
unsaved_tags = list(filter(lambda row: row['id'] is None, all_tags))
presaved_tags = list(filter(lambda row: row['id'] is not None, all_tags))

#Then remove all the tags from the ones we created before
async def saveTags(tagName): 
    async with create_client() as client: 
      mutation = f"""
      mutation {{
        createLabel(input: {{color: "#000", name: "{tagName}" }}) {{
          ... on CreateLabelSuccess {{
            label {{
              id
              name
              color
              description
              createdAt
            }}
          }}
          ... on CreateLabelError {{
            errorCodes
          }}
        }}
      }}
      """
      r = await client.execute(gql(mutation))
      print(r)
      return r['createLabel']['label']['id']

tagIds = {}
for tagValue in unsaved_tags: 
    try:
        tag = tagValue['name']
        id = await saveTags(tag)
        tagIds[tag] = id
    except Exception as e:
        print("An error occurred:", e)


query = "UPDATE tags SET id = ? where name = ?"
cursor.executemany(query, [(value, key) for key, value in tagIds.items()])

# Add all the ones already saved.
tagIds.update({f"{dictionary['name']}": dictionary["id"] for dictionary in presaved_tags})
tagIds

In [None]:
import backoff
import asyncio

createArticle = gql("""
  mutation CreateArticleSavingRequest($url: String!) {
    createArticleSavingRequest(input: {url: $url}) {
      ... on CreateArticleSavingRequestSuccess {
        articleSavingRequest {
          id
          status
          slug
          createdAt
          updatedAt
          url
          errorCode
        }
      }
      ... on CreateArticleSavingRequestError {
        errorCodes
      }
    }
  }
""")

setLabels = gql("""
mutation SetLabel($articleId: ID!, $labelIds: [ID!]!) { 
    setLabels(input: {pageId: $articleId, labelIds: $labelIds}) {
        ...on SetLabelsSuccess { 
            labels { 
                id
            }
        }
    }
}
""")

updatePageSavedDate =  gql("""
mutation UpdatePageDate($id: ID!, $date: Date!) {
    updatePage(input: {pageId: $id, savedAt: $date, publishedAt: $date}) {
        ... on UpdatePageSuccess {
            updatedPage {
                id
                savedAt
                publishedAt
                title
            }
        }
        ...on UpdatePageError {
            errorCodes
        }
    }
}
""")

archivePage = gql("""
mutation ArchivePage($id: ID!) {
    setLinkArchived (input: {linkId: $id, archived: true}) {
        ... on ArchiveLinkSuccess {
            linkId
            message
        }
        ... on ArchiveLinkError {
            message
            errorCodes
        }
    }
}
""")

createTag = gql("""
mutation CreateLabel($nam: String!, $col: String, $desc: String) {
  createLabel(input: {name: $nam, color: $col, description: $desc}) {
    ... on CreateLabelSuccess {
      label {
        id
        name
        color
        description
        createdAt
      }
    }
    ... on CreateLabelError {
      errorCodes
    }
  }
}
""")
                
@backoff.on_predicate(
    backoff.runtime,
    predicate=lambda r: isinstance(r, AIOHTTPTransport),
    value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
    jitter=None,
)
async def archiveArticle(articleId): 
  async with create_client() as client: 
    try: 
      res = await client.execute(archivePage, { 'id': articleId })
      return res
    except Exception as e:
          if (hasattr(e, 'code') and e.code == 429): 
            return session.transport
          print(e)

@backoff.on_predicate(
    backoff.runtime,
    predicate=lambda r: isinstance(r, AIOHTTPTransport),
    value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
    jitter=None,
)
async def saveLabels(articleId, labels): 
    async with create_client() as client: 
      try:
        client.execute(setLabels, {'articleId': articleId, 'labelIds': labels})
      except Exception as e:
          if (hasattr(e, 'code') and e.code == 429): 
            return session.transport
          print(e)

@backoff.on_predicate(
    backoff.runtime,
    predicate=lambda r: isinstance(r, AIOHTTPTransport),
    value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
    jitter=None,
)
async def saveArticle(article):
    async with create_client() as client: 
      try: 
        url = article['href']
        tags = article['tags']
        # First createArticleSavingRequest
        r = await client.execute(createArticle, variable_values={'url': url})
        print(r)
        rid = r['createArticleSavingRequest']['articleSavingRequest']['id']
        
        if len(tags) != 0: 
            await saveLabels(rid, tags)
                            
        # Return the article with the id of the saved document
        return {**article, 'id': rid }
      except Exception as e:
          if (hasattr(e, 'code') and e.code == 429): 
            return session.transport
          # I don't know why this happens and I will figure it out later.
          print(e)

@backoff.on_predicate(
    backoff.runtime,
    predicate=lambda r: isinstance(r, AIOHTTPTransport),
    value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
    jitter=None,
)
async def updateArticleTimeAfterProcessing(articleId, date = None):
    async with create_client() as client: 
        try: 
            if date is not None: 
              # Wait a bit, it seems there's a race condition.
              res = await client.execute(updatePageSavedDate, {
                  'id': articleId,
                  'date': date,
              })
              return res

        except Exception as e:
          if (hasattr(e, 'code') and e.code == 429): 
            return session.transport
          
          # I don't know why this happens and I will figure it out later.
          print(e)

         

In [None]:
import asyncio 

BATCH_UPDATE_SIZE = 100 # Bad name, we save to the DB after 100 so that we have a stop point.
PARALLEL_API_CALL_SIZE = 10;

def row_to_article(row):  
   return { 
      "id": row[0],
      "read": bool(row[1]),
      "time_added": row[2],
      "tags": json.loads(row[3]),
      "href": row[4]
   }

all_articles = [row_to_article(row) for row in cursor.execute('select * from articles').fetchall()]
unsaved_articles = list(filter(lambda article: article['id'] is None, all_articles)) 
saved_articles = list(filter(lambda article: article['id'] is not None, all_articles)) 

saved_articles

In [None]:
articleBatches = [unsaved_articles[i:i+BATCH_UPDATE_SIZE] for i in range(0, len(unsaved_articles), BATCH_UPDATE_SIZE)]
savedArticles = [] + saved_articles
for batch in articleBatches: 
   parrallel_requests = [batch[i:i+PARALLEL_API_CALL_SIZE] for i in range(0, len(batch), PARALLEL_API_CALL_SIZE)]
   batchSaved = []
   iteration = 0;
   # From the 10, save them. 
   for parallel_call_batch in parrallel_requests: 
      futures = []
      # Concurrently execute everything in the batch 
      for article in parallel_call_batch: 
         futures.append(asyncio.ensure_future(saveArticle(article)))

      print(futures)
      iteration = iteration + 1
      print(iteration)

      completedFutures = [] 
      completedFutures = await asyncio.gather(*futures);
         
      # Then wait for the respsonses before completing the next batch
      for savedArticle in completedFutures:
         batchSaved.append(savedArticle)
         savedArticles.append(savedArticle)

    # Archive if it is needed. 
   archive_futures = []
   batched_saved_articles = [batchSaved[i:i+PARALLEL_API_CALL_SIZE] for i in range(0, len(batchSaved), PARALLEL_API_CALL_SIZE)]
   for archive_batch in batched_saved_articles: 
      for article in archive_batch:
         archive_futures.append(asyncio.ensure_future(updateArticleTimeAfterProcessing(article['id'], article['time_added'])))
         if (article['read'] == True): 
            archive_futures.append(asyncio.ensure_future(archiveArticle(article['id'])))
            
      results = await asyncio.gather(*archive_futures);
      print(len(results))
   # for article in batchSaved: 
   #    archive_futures.append(asyncio.ensure_future(updateArticleTimeAfterProcessing(article['id'], article['time_added'])))
   #    await asyncio.gather(*archive_futures);

   
   query = "UPDATE articles SET id = ? where href = ?"
   cursor.executemany(query, [(saved_article['id'], saved_article['href']) for saved_article in batchSaved])
   