In [1]:
import pypandoc
import feedparser
from collections import defaultdict
from bs4 import BeautifulSoup
import requests
import os.path
from pathlib import Path
import base64
import re
from urllib.parse import urlparse
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from fake_useragent import UserAgent

# Use this to fake a valid user agent for requests
ua = UserAgent()
headers = {'User-Agent': ua.Chrome}

options = Options()
options.headless = True

export_path = './export/pandoc'
combined_path = os.path.join(export_path, 'combined')
images_path = os.path.join(combined_path, 'images')
html_path = os.path.join(export_path, 'html')
markdown_path = os.path.join(export_path, 'markdown')
report_path = os.path.join(export_path, 'report')


for p in [images_path, html_path, markdown_path, report_path]:
    Path(p).mkdir(parents=True, exist_ok=True)

uniprotOrgURL = 'www.uniprot.org'
uniprotBetaURL = 'beta.uniprot.org'
uniprotOrgKBPath = '/uniprot'
uniprotBetaKBPath = '/uniprotkb'

table_string = '<table'

In [2]:
def get_joined_categories(entry):
    return ','.join([tag['term'] for tag in entry['tags']])


def strip_outer_divs(html):
    return html.lstrip('<div>').rstrip('</div>').strip()


def remove_newlines(html):
    return html.replace('\n', ' ')


def replace_see_also_with_ul(parent):
    c = list(parent.children)
    if c[0].text != 'See also:' or not parent.find_all('br'):
        return
    lis = ''.join([f'<li>{el}</li>' for el in c[1:] if el.name != 'br'])
    paragraph = BeautifulSoup(f'<p>See also:<ul>{lis}</ul></p>')
    parent.replace_with(paragraph)

    
def has_color_style(soup):
    return len(soup.find_all(None,style=lambda value: value and 'color:' in value)) > 0


def convert_html_to_gfm(html, strict=True):
    if strict:
        pandoc_md = pypandoc.convert_text(html, 'markdown', format='html-native_divs-native_spans', extra_args=['--wrap=none'])
        return pypandoc.convert_text(pandoc_md, 'gfm', format='markdown', extra_args=['--wrap=none'])
    else:
        return pypandoc.convert_text(html, 'gfm', format='html', extra_args=['--wrap=none'])
    

def is_image_encoded(src):
    return src.startswith('data:')


def save_image_from_url(src):
    response = requests.get(src, headers=headers)
    if response.status_code == 200:
        image_file_name = os.path.basename(src)
        image_file_path = os.path.join(images_path, image_file_name)
        with open(image_file_path, 'wb') as f:
            f.write(response.content)
    return image_file_path


def add_padding_to_encoded_string(image_data):
    return image_data + '=' * (-len(image_data) % 4)


def save_image_from_encoded_string(src, image_file_basename):
    p = re.compile(r'data:image\/(?P<image_type>\w+);base64,(?P<image_data>.*)')
    m = p.match(src)
    assert m
    d = m.groupdict()
    image_type = d['image_type']
    image_data = d['image_data']
    image_file_name = f'{image_file_basename}.{image_type}'
    image_file_path = os.path.join(images_path, image_file_name)
    with open(image_file_path, 'wb') as f:
        f.write(base64.b64decode(add_padding_to_encoded_string(image_data)))
    return image_file_path
    
    
def find_and_save_images(soup, entry_id):
    encoded_img_counter = 1
    for el in soup.find_all('img'):
        src = el.attrs['src']
        try:
            if is_image_encoded(src):
                image_file_basename = f'{entry_id}-{encoded_img_counter}'
                encoded_img_counter += 1
                image_file_path = save_image_from_encoded_string(src, image_file_basename)
            else:
                image_file_path = save_image_from_url(src)
        except Exception as e:
            print(el.attrs['src'])
            raise e
        el.attrs['src'] = image_file_path
        

def does_page_exist(url):
    try:
        response = requests.get(url, headers=headers)
    except:
        return False
    return response.ok

        
def is_anchor_in_page(anchor):
    for el in driver.find_elements_by_id(anchor):
            return True
    return False

        
def is_uniprot_beta_link_ok(parsed):
    url = parsed.geturl()
    assert uniprotBetaURL in url
    print(url)
    driver.get(url)
    not_found_class_names = ['message--failure', 'error-page-container__art-work']
    for class_name in not_found_class_names:
        if driver.find_elements_by_class_name(class_name):
            return False, None
    if parsed.fragment:
        return True, is_anchor_in_page(parsed.fragment)
    
    return True, None


def check_and_standardize_link(url, el):
    parsed = urlparse(url)
    if parsed.scheme == 'ftp':
        return None, None
    paths = os.path.split(parsed.path)
    # Check if this is uniprot.org
    if paths[0] == uniprotOrgKBPath:
        paths = [uniprotBetaKBPath, *paths[1:]]
        parsed = parsed._replace(path=os.path.join(*paths))
        # Check that the corresponding beta page exists
        beta_parsed = parsed._replace(netloc=uniprotBetaURL)
        ok, anchor_found = is_uniprot_beta_link_ok(beta_parsed)
        el.attrs['href'] = beta_parsed.geturl()
        return ok, anchor_found
    else:
        # Well this isn't ideal but not sure how else to handle this
        # as if the resource is a SPA I won't know what to look for 
        # the resource not being able to be accessed
        return does_page_exist(url), None
        
def check_and_standardize_all_links(soup):
    dead_links = []
    dead_anchors = []
    for el in soup.find_all('a'):
        url = el.attrs['href']
        ok, anchor_found = check_and_standardize_link(url, el)
        if not ok:
            dead_links.append(url)
        if anchor_found != None and anchor_found:
            dead_anchors.append(url)
        print(url, ok, anchor_found)
    return dead_links, dead_anchors

In [8]:
with open('input/help.combined.rss') as f:
    rss = ''.join(f.readlines())
feed = feedparser.parse(rss)

In [52]:
for i, entry in enumerate(feed['entries']):
    driver = webdriver.Chrome(options=options, executable_path='/Users/dlrice/bin/chromedriver')
    if i < 49:
        continue
    print(i, entry.id)
    html = entry['content'][0]['value']
    soup = BeautifulSoup(html)
    
    with open(os.path.join(html_path, f'{entry.id}.html'), 'w') as f:
        f.write(soup.prettify())
    
    # If there is a See also: with <br/> separated items replace with <ul>
    for el in soup.find_all(text='See also:'):
        replace_see_also_with_ul(el.parent)
       
    # Save all images to disk (encoded base64 & urls) with correct img paths
    find_and_save_images(soup, entry.id)
    
    # Check all links before conversion and if broken, report
    dead_links, dead_anchors = check_and_standardize_all_links(soup)
    
    html = soup.prettify()
    
    # Check if page has color style applied and if so don't use strict conversion which removes html tags
    strict = not has_color_style(soup)
    
    md = convert_html_to_gfm(html, strict=strict)
    
    # Check if there any tables in the markdown
    n_tables_in_html = html.count(table_string)
    n_html_tables_in_md = md.count(table_string)
    n_md_tables_in_md = n_tables_in_html - n_html_tables_in_md
    other_html_tags_left = not strict
    
    # Save report
    if dead_links or dead_anchors or n_md_tables_in_md or n_html_tables_in_md or other_html_tags_left:
        with open(os.path.join(report_path, f'{entry.id}.txt'), 'w') as f:
            print(entry.id, file=f)
            print('---', file=f)
            if dead_links:
                print('Dead links',  file=f)
                print('\n'.join(dead_links),  end='\n\n', file=f)
            if dead_anchors:
                print('Dead anchor tags',  file=f)
                print('\n'.join(dead_anchors), end='\n\n', file=f)
            if n_md_tables_in_md:
                print('Number tables converted to Markdown',  file=f)
                print(n_md_tables_in_md, end='\n\n', file=f)
            if n_html_tables_in_md:
                print('Number tables left as HTML',  file=f)
                print(n_html_tables_in_md, end='\n\n', file=f)
            if other_html_tags_left:
                print('All extra HTML tags left (ie <span style="color: grey;">)?', file=f)
                print(other_html_tags_left, file=f)

    # Write Markdown to file
    with open(os.path.join(markdown_path, f'{entry.id}.md'), 'w') as f:
        print('---', file=f)
        print(f'title: {entry["title"]}', file=f)
        print(f'categories: {get_joined_categories(entry)}', file=f)
        print('---', end='\n\n', file=f)
        f.write(md)

KeyboardInterrupt: 

In [11]:
response.text

'\r\n\r\n\r\n<!DOCTYPE html>\r\n<html lang="en" class="no-js">\r\n\r\n<head>\r\n\r\n        <title>UniRef clusters: a comprehensive and scalable alternative for improving sequence similarity searches | Bioinformatics | Oxford Academic</title>\r\n\r\n        <script src="https://ajax.googleapis.com/ajax/libs/jquery/2.2.4/jquery.min.js" type="text/javascript"></script>\r\n<script>window.jQuery || document.write(\'<script src="//oup.silverchair-cdn.com/Themes/Silver/app/js/jquery.2.2.4.min.js" type="text/javascript">\\x3C/script>\')</script>\r\n<script src="//oup.silverchair-cdn.com/Themes/Silver/app/vendor/v-637769993881668327/jquery-migrate-1.4.1.min.js" type="text/javascript"></script>\r\n\r\n\r\n\r\n    \r\n    <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1" />\r\n\r\n    \r\n    <meta charset="utf-8" />\r\n    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />\r\n    <meta http-equiv="X-UA-Compatible" content="IE=Edge" />\r\n   

In [2]:
with open('../input/colheaders.rss') as f:
    rss = ''.join(f.readlines())
feed = feedparser.parse(rss)

In [47]:
re_sequence_annotation = re.compile(r'sequence_annotation_(.*)')
re_general_annotation = re.compile(r'general_annotation_(.*)')


import pandas as pd

get_dict = lambda x: dict(zip(x.Old, x.New))

df = pd.read_excel('/Users/dlrice/Downloads/return-fields-old-to-new.xlsx', sheet_name=None)

for sheet in df.values():
    sheet['Old'] = sheet['Old'].str.lower()

fields_uniprotkb = get_dict(df['uniprotkb'])
fields_uniref = get_dict(df['uniref'])
fields_uniparc = get_dict(df['uniparc'])
fields_proteomes = get_dict(df['proteomes'])

fields = [
    {
        'namespace' :'uniprotkb',
        'fields': fields_uniprotkb
    },
    {
        'namespace' :'uniref',
        'fields': fields_uniref
    },
    {
        'namespace' :'uniparc',
        'fields': fields_uniparc
    },
    {
        'namespace' :'proteomes',
        'fields': fields_proteomes
    },
]

def check_fields(x):
    found = {}
    for d in fields:
        if x in d['fields']:
            found[d['namespace']] = d['fields'][x]
    return found

            
def convert_return_field(old):
    lower = old.lower()
    

    found = check_fields(lower)
    if found:
        return found
    
    lower_spaces = lower.replace('_', ' ')
    found = check_fields(lower_spaces)
    if found:
        return found
    
    m = re_sequence_annotation.match(lower)
    if m:
        g = m.groups()[0].replace('_', ' ')
        feature = f'feature({g})'
        found = check_fields(feature)
        if found:
            return found

    feature = f'feature({lower_spaces})'
    found = check_fields(feature)
    if found:
        return found
        
    m = re_general_annotation.match(lower)
    if m:
        g = m.groups()[0].replace('_', ' ')
        comment = f'comment({g})'
        found = check_fields(comment)
        if found:
            return found
    
    comment = f'comment({lower_spaces})'
    found = check_fields(comment)
    if found:
        return found

In [None]:
general_annotation_ABSORPTION
comment(ABSORPTION)
absorption

In [29]:
m = re_general_annotation.match('general_annotation_ABSORPTION')

In [48]:
convert_return_field('general_annotation_ABSORPTION')

{'uniprotkb': 'absorption'}

In [30]:
m

<re.Match object; span=(0, 29), match='general_annotation_ABSORPTION'>

In [31]:
g = m.groups()[0].replace('_', ' ')
comment = f'comment({g})'
found = check_fields(comment)

In [33]:
comment

'comment(ABSORPTION)'

In [32]:
found

{}

In [22]:
check_fields('id')

{'uniprotkb': 'accession',
 'uniref': 'id',
 'uniparc': 'upi',
 'proteomes': 'upid'}

In [28]:
manual = {
    'virus_host': 'virus_hosts',
    'busco_score': 'busco',
    'general_annotation_ALLERGEN': 'cc_allergen'
}

In [24]:
ignore = {
    'active',
    'general_annotation_ENZYME_REGULATION',
    'blastAlignment',
    'alignments',
    'unirule_annnotation_types_added',
    
}

In [51]:
for i, entry in enumerate(feed['entries']):
    parsed = urlparse(entry['id'])
    old = parsed.path[1:]
    if old in ignore:
        continue
    converted = convert_return_field(old)
    if not converted:
        if old in manual:
            converted = manual[old]
            continue
        print('0:', old)
    elif len(converted) > 1:
        if len(set(converted.values())) > 1:
            print('>1 distinct:', old, converted)
            continue
    
    md = convert_html_to_gfm(html, strict=strict)

0: sequence_annotation_CALCIUM_BIND
0: general_annotation_CATALYTIC_ACTIVITY
0: citation_mapping_id
0: citation_scope
0: uniref_cluster_id
0: members_uniref
0: uniref_cluster_name
0: general_annotation_COFACTOR
0: common_taxon_id
0: cpd_score
0: crossref_data
0: database
0: creation_date_uniref
0: creation_date
0: modification_annotation_date
0: modification_sequence_date
0: involvement_in_disease
0: disease_id
0: domain_count
0: domains
0: e-value
0: ec_numbers
0: entry_uniref
0: entry
0: accession_numbers
0: general_annotation_ERRONEOUS_INITIATION
0: general_annotation_ERRONEOUS_TERMINATION
0: general_annotation_ERRONEOUS_TRANSLATION
0: feature_subsection
0: first_seen
0: sequence_status
0: general_annotation_FRAMESHIFT
0: encoded_on
0: gene_name
0: gene_names
0: gene_ontology
0: go_id
0: general_annotation
0: genome_assembly_representation
0: genome_accession
0: hit_length
0: identifier
0: identity_uniref
0: identity_blast
0: binary_interactions
0: interpro
0: keyword_id
0: proteome

In [39]:
converted

In [40]:
old

'general_annotation_ABSORPTION'

In [100]:
not converted

True

In [11]:
convert_return_field('subcellular_location')

comment(subcellular location)


In [13]:
found = check_fields('comment(subcellular location)')

In [15]:
df['uniprotkb']

Unnamed: 0,Old,New,df
0,3d,structure_3d,3d
1,annotation score,annotation_score,annotation score
2,chebi,ft_ca_bind,chebi
3,chebi(Catalytic activity),cc_catalytic_activity,chebi(catalytic activity)
4,chebi(Cofactor),cc_cofactor,chebi(cofactor)
...,...,...,...
112,tools,tools,tools
113,uniparcid,uniparc_id,uniparcid
114,version(entry),version,version(entry)
115,version(sequence),sequence_version,version(sequence)


In [14]:
found

{}

In [27]:
busco = [entry for entry in feed['entries'] if entry['id'] == 'http://www.uniprot.org/busco']
busco

[]

In [26]:
busco = [entry for entry in feed['entries'] if entry['id'] == 'http://www.uniprot.org/busco_score']
busco

[{'id': 'http://www.uniprot.org/busco_score',
  'guidislink': True,
  'link': 'http://www.uniprot.org/busco_score',
  'title': 'BUSCO',
  'title_detail': {'type': 'text/plain',
   'language': 'en-US',
   'base': 'http://www.uniprot.org',
   'value': 'BUSCO'},
  'authors': [{'name': 'uniprot'}],
  'author_detail': {'name': 'uniprot'},
  'author': 'uniprot',
  'published': '2019-11-21T10:33:25Z',
  'published_parsed': time.struct_time(tm_year=2019, tm_mon=11, tm_mday=21, tm_hour=10, tm_min=33, tm_sec=25, tm_wday=3, tm_yday=325, tm_isdst=0),
  'updated': '2021-10-07T10:52:35Z',
  'updated_parsed': time.struct_time(tm_year=2021, tm_mon=10, tm_mday=7, tm_hour=10, tm_min=52, tm_sec=35, tm_wday=3, tm_yday=280, tm_isdst=0),
  'content': [{'type': 'application/xhtml+xml',
    'language': 'en-US',
    'base': 'http://www.uniprot.org',
    'value': '<p>The Benchmarking Universal Single-Copy Ortholog (BUSCO) assessment tool is used, for eukaryotic and bacterial proteomes, to provide quantitative m

In [78]:
import itertools

for a, b in itertools.combinations(df, 2):
    print(a,b)
    merged = df[a].merge(df[b], on='Old', how='inner')
    print(merged[merged['New_x'] != merged['New_y']])

uniprotkb uniref
        Old      New_x  New_y
0   context        NaN    NaN
1        id  accession     id
4  reviewed   reviewed  types
uniprotkb uniparc
       Old       New_x New_y
0  context         NaN   NaN
1    genes  gene_names  gene
2       id   accession   upi
uniprotkb proteomes
  Old      New_x New_y
0  id  accession  upid
uniref uniparc
       Old New_x New_y
0       id    id   upi
4  context   NaN   NaN
uniref proteomes
    Old New_x     New_y
0    id    id      upid
1  name  name  organism
uniparc proteomes
  Old New_x New_y
0  id   upi  upid


In [52]:
c

NameError: name 'c' is not defined

In [55]:
c = list(parent.children)
if c[0].text != 'See also:' or not parent.find_all('br'):
    return
lis = ''.join([f'<li>{el}</li>' for el in c[1:] if el.name != 'br'])
paragraph = BeautifulSoup(f'<p>See also:<ul>{lis}</ul></p>')
parent.replace_with(paragraph)

'\n<p>\n Positions of the experimentally determined \'Turn\' region(s) (\n <a href="http://www.uniprot.org/manual/turn">\n  more...\n </a>\n )\n</p>\n'

In [4]:
def get_link(accession):
    return '<Link to={generatePath(LocationToPath[Location.HelpEntry], {accession: "' + accession + '"})}>'

def replace_anchor_with_link(html):
    re_anchor = re.compile(r'<a href=(.*)>')
    link = get_link('biophysicochemical_properties')
    html = re_anchor.sub(link, html)
    html = html.replace('</a>', '</Link>')
    html = html.replace('<p>', '<>')
    html = html.replace('</p>', '</>')
    return html

In [49]:
def get_help_link(accession):
    return '<Link to={generatePath(LocationToPath[Location.HelpEntry], {accession: "' + accession + '"})}>'


def get_help_accession(url):
    re_help_link = re.compile(r'"https?:\/\/www\.uniprot\.org\/manual\/(.*)"')
    m = re_help_link.match(url)
    return unquote(m.groups()[0])


def replace_anchor_with_link(html):
    re_anchor = re.compile(r'<a href=(.*)>')
    m = re_anchor.search(html)
    if not m:
        print('h')
        return html
    url = m.groups()[0]
    accession = get_help_accession(url)
    link = get_help_link(accession)
    html = re_anchor.sub(link, html)
    html = html.replace('</a>', '</Link>')
    html = html.replace('<p>', '<>')
    html = html.replace('</p>', '</>')
    return html

In [79]:
h = '''
<p>
 Indicates the wavelength at which photoreactive protein shows maximal light absorption (
 <a href="http://www.uniprot.org/manual/biophysicochemical%5Fproperties">
  more...
 </a>
 )
</p>
'''

In [42]:
re_anchor = re.compile(r'<a href=(.*)>')
m = re_anchor.search(h)

In [45]:
m.groups()[0]

'"http://www.uniprot.org/manual/biophysicochemical%5Fproperties"'

In [80]:
print(replace_anchor_with_link(h))


<>
 Indicates the wavelength at which photoreactive protein shows maximal light absorption (
 <Link to={generatePath(LocationToPath[Location.HelpEntry], {accession: "biophysicochemical_properties"})}>
  more...
 </Link>
 )
</>



In [83]:
print(h)


<p>
 Indicates the wavelength at which photoreactive protein shows maximal light absorption 
 <a href="http://www.uniprot.org/manual/biophysicochemical%5Fproperties">
  more...
 </a>
</p>



In [96]:
m = re_anchor.search(h)

In [97]:
m

<re.Match object; span=(69, 114), match='<a href="http://www.uniprot.org/manual/turn">'>

In [98]:
url = m.groups()[0]

In [99]:
m.start()

69

In [100]:
m.end()

114

In [31]:
def get_help_accession(url):
    re_help_link = re.compile(r'https?:\/\/www\.uniprot\.org\/manual\/(.*)')
    m = re_help_link.match(url)
    return unquote(m.groups()[0])

In [17]:
u = 'http://www.uniprot.org/manual/biophysicochemical%5Fproperties'

In [24]:
from urllib.parse import unquote

'biophysicochemical_properties'

In [10]:
assert parsed.netloc == 'www.uniprot.org' and parsed.path.startswith('/manual/')

In [13]:
accession = os.path.split(parsed.path)[-1]

'biophysicochemical%5Fproperties'

In [104]:
print(h)


<p>
 Positions of the experimentally determined 'Turn' region(s) (
 <Link to={generatePath(LocationToPath[Location.HelpEntry], {accession: "biophysicochemical_properties"})}>
  more...
 </a>
 )
</p>



In [106]:
print(h)


<>
 Positions of the experimentally determined 'Turn' region(s) (
 <Link to={generatePath(LocationToPath[Location.HelpEntry], {accession: "biophysicochemical_properties"})}>
  more...
 </Link>
 )
</>



In [54]:
d = c.prettify()

NameError: name 'c' is not defined

In [38]:
d.replace('<a href="http://www.uniprot.org/manual/turn">', 'foo')

"<html>\n <body>\n  <p>\n   Positions of the experimentally determined 'Turn' region(s) (\n   foo\n    more...\n   </a>\n   )\n  </p>\n </body>\n</html>"

In [53]:
d

NameError: name 'd' is not defined

In [40]:
print(d)

<html>
 <body>
  <p>
   Positions of the experimentally determined 'Turn' region(s) (
   <a href="http://www.uniprot.org/manual/turn">
    more...
   </a>
   )
  </p>
 </body>
</html>


In [37]:
print(a.prettify())

<a href="http://www.uniprot.org/manual/turn">
 more...
</a>



In [35]:
print(d.replace(a.prettify(), 'foo'))

<html>
 <body>
  <p>
   Positions of the experimentally determined 'Turn' region(s) (
   <a href="http://www.uniprot.org/manual/turn">
    more...
   </a>
   )
  </p>
 </body>
</html>


In [28]:
for a in c.find_all('a'):
    break

In [29]:
a

<a href="http://www.uniprot.org/manual/turn">
  more...
 </a>

In [17]:
a.replace_with(t)

<a href="http://www.uniprot.org/manual/turn">
  more...
 </a>

In [18]:
a

<a href="http://www.uniprot.org/manual/turn">
  more...
 </a>

In [20]:
print(c.prettify())

<?xml version="1.0" encoding="utf-8"?>
<p>
 Positions of the experimentally determined 'Turn' region(s) (
 <Link>
  {generatePath(LocationToPath[Location.HelpEntry], {
  accession: 'biophysicochemical_properties',
})}
&gt;
more...
 </Link>
 )
</p>
