# 1. Upload, parse, and store the SQLite File from the Android Application.

Retrieve the sqlite database from your android application. This is found in.
`/data/data/app.omnivore.omnivore.debug/databases/omnivore-database`

You may be able to run `adb root; adb pull /data/data/app.omnivore.omnivore.debug/databases/omnivore-database ./omnivore-database` for this.

Place this file at the root of this directory as omnivore-database

In [None]:
import sqlite3
con = sqlite3.connect("omnivore-database")

In [None]:
import os

# This will include the /api/graphql - for instance http://localhost:4000/graphql
OMNIVORE_API_URL = os.environ.get('OMNIVORE_API_URL')
# The API key will have the following format "00000000-0000-0000-0000-000000000000"
OMNIVORE_API_KEY = os.environ.get('OMNIVORE_API_KEY')
SCHEMA_URL = "https://raw.githubusercontent.com/omnivore-app/omnivore/c9fcbe72ddc6f40dd06e7073b8ffe3c1e71bd650/packages/api/src/generated/schema.graphql"
REQUESTS_SLEEP_TIME = 60 # Number of seconds


if not OMNIVORE_API_URL:
    OMNIVORE_API_URL=input('Enter your omnivore API URL (Should include /api/graphql)')

if not OMNIVORE_API_KEY:
    OMNIVORE_API_KEY=input('Enter your omnivore API key (should have a format similar to 00000000-0000-0000-0000-000000000000)')

In [None]:
cur = con.cursor()
query = cur.execute('''SELECT si.pageUrlString, si.isArchived, si.savedAt, si.title, (SELECT  group_concat(sil.name, ",") from SavedItemAndSavedItemLabelCrossRef silcr
	INNER JOIN  SavedItemLabel sil on sil.savedItemLabelId = silcr.savedItemLabelId where si.savedItemId = silcr.savedItemId)
FROM SavedItem si
GROUP by si.savedItemId
ORDER by si.savedAt DESC;
''');

items = query.fetchall()

display(items)

Extract the articles and tags from the HTML doc.

In [None]:
import datetime 

def process_list(item_tuples):
    items = [] 
    for article in item_tuples:
        items.append({
            'read': article[1],
            'time_added': datetime.datetime.fromisoformat(article[2]),
            'href': article[0],
            'tags': article[4].split(',') if article[4] is not None else [],
            'title': article[3],
        })

    return items

articles = process_list(item_tuples=items)
labels = set([item for sublist in [article['tags'] for article in articles if len(article['tags']) > 0] for item in sublist])

# 2. Store the articles and tags in a SQLLite Database

We want to be able to track our process, as the API for Omnivore has rate limiting, and takes a while to upload the files. For this we will use a SQL Database.

We will store the Articles in the format: 
- *read*: Boolean on wether the article has been read
- *time_added*: The time the item was added
- *tags*: An array of strings
- *href*: The url
- *id*: nullable field that stores the omnivore file.

and the tags with:
- *name*: Name of the tag 
- *id*: if the tag has been stored. 

In [None]:
import sqlite3

conn = sqlite3.connect('omnivore.db')

# Create a cursor object to execute SQL commands
cursor = conn.cursor()

# Create the Article Table 
cursor.execute('''CREATE TABLE IF NOT EXISTS articles (
                    id TEXT nullable,
                    read BOOLEAN,
                    time_added TEXT,
                    tags TEXT, 
                    href TEXT PRIMARY KEY 
                )''')

cursor.execute('''CREATE TABLE IF NOT EXISTS tags (
                    id TEXT nullable,
                    name TEXT PRIMARY KEY
                )''')


In [None]:
import json

insert_tag_sql = f"""INSERT OR IGNORE into tags (name) values (?)"""
cursor.executemany(insert_tag_sql, [(label,) for label in labels])

insert_article_sql = f"INSERT OR IGNORE into articles (read, time_added, href, tags) values (?,?,?,?)"
article_values = [(article['read'], article['time_added'].isoformat(), article['href'], json.dumps(article['tags'])) for article in articles]
cursor.executemany(insert_article_sql, article_values)

In [None]:
import requests

with requests.get(SCHEMA_URL) as r:
    r.raise_for_status()
    schema = r.text

    assert schema is not None

print(schema[:100])

In [None]:
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport

def create_client():
    transport = AIOHTTPTransport(
       url=OMNIVORE_API_URL,
        headers = {
            'authorization': OMNIVORE_API_KEY,
        }
    )
    return Client(transport=transport, schema=schema, fetch_schema_from_transport=False, execute_timeout=None)

In [None]:
# Doing a "test query" to check if everything is correct

async with create_client() as session: 
    r = session.execute(gql("""
    query Viewer {
        me {
            id
            name
            profile {
                username
            }
        }
    }
    """))

    result = await r
    USERNAME = result['me']['profile']['username']

    print(f"Hello {result['me']['name']} ({USERNAME})!")

In [None]:
def row_to_tag(row):  
   return { 
      "name": row[1],
      "id": row[0]
   }

async def getExistingTags():
  async with create_client() as session: 
    r = session.execute(gql("""
    query Labels {
        labels {
              ...on LabelsSuccess { 
                  labels { name, id }
              }
          }
    }
    """))

    result = await r
    return result['labels']['labels']

#Then remove all the tags from the ones we created before
async def saveTags(tagName): 
    async with create_client() as client: 
      mutation = f"""
      mutation {{
        createLabel(input: {{color: "#000", name: "{tagName}" }}) {{
          ... on CreateLabelSuccess {{
            label {{
              id
              name
              color
              description
              createdAt
            }}
          }}
          ... on CreateLabelError {{
            errorCodes
          }}
        }}
      }}
      """
      r = await client.execute(gql(mutation))
      print(r)
      return r['createLabel']['label']['id']

server_tags = await getExistingTags()
all_tags = [row_to_tag(row) for row in cursor.execute('select * from tags').fetchall()]
unsaved_tags = list(filter(lambda row: row['id'] is None, all_tags))
presaved_tags = list(filter(lambda row: row['id'] is not None, all_tags))

tagIds = {f"{dictionary['name']}": dictionary["id"] for dictionary in presaved_tags + server_tags}
for tagValue in unsaved_tags: 
    if tagValue["name"] not in tagIds:
      try:
          tag = tagValue['name']
          id = await saveTags(tag)
          tagIds[tag] = id
      except Exception as e:
          print("An error occurred:", e)


query = "UPDATE tags SET id = ? where name = ?"
cursor.executemany(query, [(value, key) for key, value in tagIds.items()])

tagIds

In [None]:
import backoff
import asyncio

createArticle = gql("""
  mutation CreateArticleSavingRequest($url: String!) {
    createArticleSavingRequest(input: {url: $url}) {
      ... on CreateArticleSavingRequestSuccess {
        articleSavingRequest {
          id
          status
          slug
          createdAt
          updatedAt
          url
          errorCode
        }
      }
      ... on CreateArticleSavingRequestError {
        errorCodes
      }
    }
  }
""")

setLabels = gql("""
mutation SetLabel($articleId: ID!, $labelIds: [ID!]!) { 
    setLabels(input: {pageId: $articleId, labelIds: $labelIds}) {
        ...on SetLabelsSuccess { 
            labels { 
                id
            }
        }
    }
}
""")

updatePageSavedDate =  gql("""
mutation UpdatePageDate($id: ID!, $date: Date!) {
    updatePage(input: {pageId: $id, savedAt: $date, publishedAt: $date}) {
        ... on UpdatePageSuccess {
            updatedPage {
                id
                savedAt
                publishedAt
                title
            }
        }
        ...on UpdatePageError {
            errorCodes
        }
    }
}
""")

archivePage = gql("""
mutation ArchivePage($id: ID!) {
    setLinkArchived (input: {linkId: $id, archived: true}) {
        ... on ArchiveLinkSuccess {
            linkId
            message
        }
        ... on ArchiveLinkError {
            message
            errorCodes
        }
    }
}
""")

createTag = gql("""
mutation CreateLabel($nam: String!, $col: String, $desc: String) {
  createLabel(input: {name: $nam, color: $col, description: $desc}) {
    ... on CreateLabelSuccess {
      label {
        id
        name
        color
        description
        createdAt
      }
    }
    ... on CreateLabelError {
      errorCodes
    }
  }
}
""")
                
@backoff.on_predicate(
    backoff.runtime,
    predicate=lambda r: isinstance(r, AIOHTTPTransport),
    value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
    jitter=None,
)
async def archiveArticle(articleId): 
  async with create_client() as client: 
    try: 
      res = await client.execute(archivePage, variable_values={ 'id': articleId })
      # return res
      return None;
    except Exception as e:
          if (hasattr(e, 'code') and e.code == 429): 
            return session.transport
          print(e)

@backoff.on_predicate(
    backoff.runtime,
    predicate=lambda r: isinstance(r, AIOHTTPTransport),
    value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
    jitter=None,
)
async def saveLabels(articleId, labels): 
    async with create_client() as client: 
      try:
        return await client.execute(setLabels, variable_values={'articleId': articleId, 'labelIds': labels})
      except Exception as e:
          if (hasattr(e, 'code') and e.code == 429): 
            return session.transport
          print(e)

@backoff.on_predicate(
    backoff.runtime,
    predicate=lambda r: isinstance(r, AIOHTTPTransport),
    value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
    jitter=None,
)
async def saveArticle(article):
    async with create_client() as client: 
      try: 
        url = article['href']
        tags = article['tags']
        # First createArticleSavingRequest
        r = await client.execute(createArticle, variable_values={'url': url})
        rid = r['createArticleSavingRequest']['articleSavingRequest']['id']
        
        if len(tags) != 0: 
            await saveLabels(rid, tags)
                            
        # Return the article with the id of the saved document
        return {**article, 'id': rid }
      except Exception as e:
          if (hasattr(e, 'code') and e.code == 429): 
            return session.transport
          # I don't know why this happens and I will figure it out later.
          print(e)

@backoff.on_predicate(
    backoff.runtime,
    predicate=lambda r: isinstance(r, AIOHTTPTransport),
    value=lambda r: int(r.response_headers["RateLimit-Reset"]) + 1,
    jitter=None,
)
async def updateArticleTimeAfterProcessing(articleId, date = None):
    async with create_client() as client: 
        try: 
            if date is not None: 
              # Wait a bit, it seems there's a race condition.
              res = client.execute(updatePageSavedDate, variable_values={
                  'id': articleId,
                  'date': date,
              })
              return await res

        except Exception as e:
          if (hasattr(e, 'code') and e.code == 429): 
            return session.transport
          
          # I don't know why this happens and I will figure it out later.
          print(e)

         

In [None]:
import asyncio 

BATCH_UPDATE_SIZE = 10 # Bad name, we save to the DB after 100 so that we have a stop point.
PARALLEL_API_CALL_SIZE = 1;

def row_to_article(row):  
   return { 
      "id": row[0],
      "read": bool(row[1]),
      "time_added": row[2],
      "tags": [tagIds[tag] for tag in json.loads(row[3]) if tag != ''],
      "href": row[4]
   }

all_articles = [row_to_article(row) for row in cursor.execute('select * from articles').fetchall()]
unsaved_articles = list(filter(lambda article: article['id'] is None, all_articles)) 
saved_articles = list(filter(lambda article: article['id'] is not None, all_articles)) 

saved_articles

It is likely that your articles will take a long time to process. Due to this we have added a wait time, and then go through and update all the labels and archive them where appropriate. 

In [None]:
import time

# 30 min, might be longer, or shorter. Not 100% sure. Comment this bit out to execute immediately.
time.sleep(1800)

all_articles = [row_to_article(row) for row in cursor.execute('select * from articles').fetchall()]
saved_articles = list(filter(lambda article: article['id'] is not None, all_articles)) 

articleBatches = [saved_articles[i:i+BATCH_UPDATE_SIZE] for i in range(0, len(saved_articles), BATCH_UPDATE_SIZE)]
for archive_batch in articleBatches: 
   archive_futures = [] 
   for article in archive_batch:
      archive_futures.append(asyncio.ensure_future(updateArticleTimeAfterProcessing(article['id'], article['time_added'])))
      if (article['read'] == True): 
         archive_futures.append(asyncio.ensure_future(archiveArticle(article['id'])))
            
   results = await asyncio.gather(*archive_futures);