In [2]:
from datetime import datetime
from typing import Dict, List, Optional, Union
from models.main import SourceMap, Format, ContentType, LinkStore

from secrets import token_urlsafe
import pandas as pd
from urllib.parse import urlparse



In [3]:
def create_map( name: str, home_link: str, formatter: str,links: List[Dict] = [], assumed_tags: str = '',
                     compulsory_tags: List[str] = [], is_rss: bool = False, is_collection: bool = True,
                     watermarks: List[str] = [], source_id: str = None, datetime_format: str="", is_third_party: bool = False
                      ):
                      return SourceMap.adapter().create(source_name=name, source_home_link=home_link,
                      source_id=token_urlsafe(16) + "_"+ name.lower() if source_id is None else source_id, 
                      formatter=formatter, assumed_tags=assumed_tags.strip(), links=links,
                      compulsory_tags=compulsory_tags, is_collection=is_collection,
                      is_rss=is_rss, watermarks=watermarks,
                      datetime_format=datetime_format, is_third_party=is_third_party
                      )

In [21]:
def tag_formatter(tag:str) -> Union[str, None]:
        return tag.replace("/",".").replace(" and ", " ")

In [22]:
def get_collection_selector(name, extras = [], defaults= {}):
    _defaults = {"sel": "xpath", "param": "text()", "type": "text", "parent": name}
    _defaults.update(defaults)
    data = {}
    for key in ['sel', 'param', 'parent', 'type'] + extras:
        value = input(f"Please enter {key} value for {name}: ")
        if value == "" or value == " " and key in _defaults:
            data[key] = _defaults[key]
        else:
            data[key] = value
    return data


In [23]:
def is_quit_param(txt):
    return txt.lower() == "q" or txt == "" or txt == " " or txt.lower() == "quit"

In [24]:
def _create_collection_format():
    data = {}
    # is_xml = input('Enter x for xml_collection_format or h for html_collection_format: ').lower() == 'x'
    itertag = input('Enter itertag for this selector: ')
    if itertag != '' or itertag != " ":
        data['itertag'] = itertag
    data['title'] = get_collection_selector('title')
    data['link'] = get_collection_selector('link')
    data['creator'] = get_collection_selector('creator')
    while True:
        sec_act = input("Enter name for new selector or type 'q' quit for exiting: ")
        if is_quit_param(sec_act):
            break
        else:
            data[sec_act] = get_collection_selector(sec_act)
    return data




In [25]:
def create_collection_format(name, format_):
    data = _create_collection_format()
    Format.adapter().update_one({"format_id": format_.format_id}, **{name:data})
    setattr(format_, name, data)
    return format_


In [26]:
def get_container_identity():
    data = {}
    data['param'] = input("Enter param for container identity: ")
    data['is_multiple'] = input("Is this identity for multiple items y/n: ").lower() == "y"
    data['content_type'] = input('Enter content-type for the identity a for article, i for image, v for video: ')
    cmap = {'a': 'article', 'i': 'image', 'v': 'video'}
    data['content_type'] = cmap[data['content_type']] if data['content_type'] != "" else cmap['a']
    data['is_bakeable'] = input('Is this identity contains multiple articles y/n: ').lower() == "y"
    title_selectors = []
    while True:
        action = input('Enter title selector or "q" for quit: ')
        if is_quit_param(action):
            break
        else:
            title_selectors.append(action)
    creator_selectors = []
    while True:
        action = input('Enter creator selector or "q" for quit: ')
        if is_quit_param(action):
            break
        else:
            creator_selectors.append(action)
    body_selectors = []
    while True:
        action = input('Enter body selector or "q" for quit: ')
        if is_quit_param(action):
            break
        else:
            body_selectors.append(action)
    return data, title_selectors, creator_selectors, body_selectors

In [27]:
def get_query_selector():
    data = {}
    for key in ['tag', 'id', 'class_list', 'exact_class']:
        value = input(f'Enter {key} for query: ')
        if key == "class_list" and len(value) > 0:
            data[key] = value.split(" ")
        elif len(value) > 0 and value != " ":
            data[key] = value
    return data


In [28]:
def _create_container_format():
    data = {}
    data['idens'] = []
    data['title_selectors'] = []
    data['creator_selectors'] = []
    data['body_selectors'] = []
    data['ignorables'] = []
    while True:
        action = input('Enter a for adding new identity, q for quit: ')
        if is_quit_param(action):
            break
        iden, title_selectors, creator_selectors, body_selectors = get_container_identity()
        data['title_selectors'] += title_selectors
        data['creator_selectors'] += creator_selectors
        data['body_selectors'] += body_selectors
        while True:
            action = input('Enter a for adding new ignorable, q for quit: ')
            if is_quit_param(action):
                break
            else:
                data['ignorables'].append(get_query_selector())
        data['idens'].append(iden)
    data['title_selectors'] = list(set(data['title_selectors']))
    data['creator_selectors'] = list(set(data['creator_selectors']))
    data['body_selectors'] = list(set(data['body_selectors']))
    return data
    



In [29]:
def create_html_container_format(format_):
    data = _create_container_format()
    Format.adapter().update_one({"format_id": format_.format_id}, **{'html_article_format':data})
    format_.html_article_format = data
    return format_



In [30]:
def create_extra_format(format_):
    extra_format = {}
    if hasattr(format_, "extra_formats") and format_.extra_formats is not None:
        extra_format = format_.extra_formats
    while True:
        action = input('Enter c for adding new collection format and a for adding article format, q for quit: ')
        if is_quit_param(action):
            break
        else:
            name = input("Enter name for the format: ")
            if is_quit_param(name):
                break
            if action.lower() == "c":
                data = _create_collection_format()
                extra_format[name] = data
            elif action.lower() == "a":
                data = _create_container_format()
                extra_format[name] = data
    Format.adapter().update_one({"format_id": format_.format_id}, **{"extra_formats": extra_format})
    format_.extra_formats = extra_format
    return format_

                


In [31]:
def create_xml_container_format(format_):
    data = {}
    ct = input('Enter content-type for format a for article, i for image, v for video: ')
    cmap = {"a": "article", "i": "image", "v": "video"}
    data['content_type'] = cmap[ct] if ct != "" or ct != " " else cmap['a']
    data['struct']= _create_container_format()
    Format.adapter().update_one({"format_id":format_.format_id}, **{"xml_article_format": data})
    format_.xml_article_format = data
    return format_


In [32]:
def create_format( name: str, format_id: str, source_home_link: str,
                            created_on= datetime.now()):
                            return Format.adapter().create(source_name=name, format_id=format_id,
                            source_home_link=source_home_link,
                            created_on=created_on
                            )

In [33]:
def print_all_formats(format_):
    keys = ['xml_collection_format', 'html_collection_format', 'html_article_format', 'xml_article_format']
    format_keys= [] 
    for key in keys:
        if hasattr(format_, key) and getattr(format_, key) is not None:
            format_keys.append(key)
    if hasattr(format_, "extra_formats") and format_.extra_formats is not None:
        format_keys += list(format_.extra_formats.keys())
    for index, key in enumerate(format_keys):
        print(index, ". ", key,"\n")
    print("\n"*2)
    return format_keys


In [34]:
def interactive_format_prompt(source_name, source_id, source_home_link, formatter = None):
    format_ = Format.adapter().find_one({"format_id": source_id}, silent=True)
    if format_ is None:
        format_ = create_format(source_name, source_id, source_home_link)
    while True:
        keys = print_all_formats(format_)
        kmap = {'xml_collection_format': 'xc', 'html_collection_format': 'hc', 'html_article_format': 'ha', 'xml_article_format': 'xa', 'extra_formats': 'e'}
        prompt = "Please press "
        for key in kmap.keys():
            if key not in keys:
                prompt += f"{kmap[key]} for {key}, "
        prompt +=", 'q' for quit: "
        action = input(prompt)
        if is_quit_param(action):
            break
        elif action.lower() == 'xc':
            format_ = create_collection_format('xml_collection_format', format_)
        elif action.lower() == 'hc':
            format_ = create_collection_format('html_collection_format', format_)
        elif action.lower() == 'ha':
            format_ = create_html_container_format(format_)
        elif action.lower() == 'xa':
            format_ = create_xml_container_format(format_)
        elif action.lower() == 'e':
            format_ = create_extra_format(format_)
    keys = print_all_formats(format_)
    action = input("Enter index number for selecting the formatter, or q for quit: ")
    ask_for_default_formatter = input('Do you want to set this formatter as default for source y/n: ').lower() == "y"
    if ( not is_quit_param(action)) and action.isdigit():
        formatter = keys[int(action)]        
    return formatter, ask_for_default_formatter

    
    
    
    
        

    
    


In [35]:
def _create_source_map(category, source_name, link, rss = 'FALSE'):
    is_rss = rss is not None and len(rss) > 0 and rss != "FALSE"
    url = rss if is_rss else link
    assumed_tags=  tag_formatter(category)
    compulsory_tags = input("Enter compulsory tags for \n" + url)
    if compulsory_tags == "q":
        return None
    compulsory_tags = compulsory_tags.split(" ")
    watermarks = input("Enter watermarks for \n" + url).split(" ")
    url_parse = urlparse(url)
    source_home_link = url_parse.scheme +"://" + url_parse.netloc
    source_map = SourceMap.adapter().find_one({"$and": [{"source_home_link":  source_home_link}, {"is_rss": is_rss}]}, silent=True)

    source_map_compulsory_tags = source_map.compulsory_tags if source_map is not None and           source_map.compulsory_tags is not None else []
    source_map_assumed_tags = source_map.assumed_tags if source_map is not None and source_map.assumed_tags is not None else ""
    source_map_assumed_tags += " " + assumed_tags
    source_map_watermarks = source_map.watermarks if source_map is not None and source_map.watermarks is not None else []
    source_map_watermarks += watermarks

    if len(compulsory_tags) > 0:
            compulsory_tags_action = input("Enter a for append, r for replace and l for leave: ")
            if compulsory_tags_action.lower() == "a":
                source_map_compulsory_tags += compulsory_tags
                source_map_compulsory_tags = list(set(source_map_compulsory_tags))
            elif compulsory_tags_action.lower() == "r":
                source_map_compulsory_tags = compulsory_tags
    if source_map is None:
            is_collection = input("Is the source a collection y/n: ").lower() == "y"
            source_map = create_map(name=source_name, source_id=token_urlsafe(16)+ "_" + source_name.lower(),
                                        formatter=None,assumed_tags=source_map_assumed_tags, compulsory_tags=source_map_compulsory_tags,
                                        home_link=source_home_link, watermarks=source_map_watermarks, is_rss=is_rss,is_collection=is_collection,            
            )
    formatter, is_update_default_formatter = interactive_format_prompt(source_map.source_name, source_map.source_id, source_map.source_home_link, formatter=source_map.formatter)
    if is_update_default_formatter:
        SourceMap.adapter().update_one({"source_id": source_map.source_id}, **{"formatter": formatter})
        source_map.formatter = formatter
    if source_map.links is None:
        source_map.links = []
    source_map.links.append(LinkStore(link=url, assumed_tags=assumed_tags,
                                     compulsory_tags=compulsory_tags if len(compulsory_tags) == 0 else None,
                                     formatter=formatter))
    SourceMap.adapter().update_one({"source_id": source_map.source_id}, **{"links": [link.to_dict() for link in source_map.links]})
    return source_map 

    
    
    

In [41]:
category = "hobbies__interests/arts__crafts/photography and technology__computing/consumer_electronics/camera"
source_name="iso_500px"
link = "https://iso.500px.com/"
# rss="FALSE"
rss="https://iso.500px.com/feed/"
    

In [43]:
_create_source_map(category=category, source_name=source_name, link=link, rss=rss)

0 .  xml_collection_format 




0 .  xml_collection_format 

1 .  html_article_format 




0 .  xml_collection_format 

1 .  html_article_format 






SourceMap(source_name='iso_500px', source_id='jCdPRiPUH4HsMlq6s1GL5A_iso_500px', source_home_link='https://iso.500px.com', assumed_tags='hobbies__interests.arts__crafts.photography technology__computing.consumer_electronics.camera', compulsory_tags=['photography'], is_rss=True, is_collection=True, links=[LinkStore(link='https://iso.500px.com/feed/', assumed_tags='hobbies__interests.arts__crafts.photography technology__computing.consumer_electronics.camera', formatter='xml_collection_format', compulsory_tags=None), LinkStore(link='https://iso.500px.com/feed/', assumed_tags='hobbies__interests.arts__crafts.photography technology__computing.consumer_electronics.camera', formatter='xml_collection_format', compulsory_tags=None)], formatter='xml_collection_format', watermarks=['', ''], is_structured_aggregator=True, datetime_format='', is_third_party=False)