In [1]:
import requests
from typing import List, Tuple, Union, Optional

In [2]:
from scrapy_monit_web.monitor.scrapyd_api import api_daemon_status

In [7]:
r = api_daemon_status("http://127.0.0.1:6800/")

requesting to http://127.0.0.1:6800/daemonstatus.json
GOT:<Response [200]>


In [None]:
def subdict(_dict: dict, keys: Union[list, tuple]):
    return {k: _dict[k] for k in keys}


def api_daemon_status():
    url = "http://185.250.148.161/daemonstatus.json"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        fields = ["status","running", "pending", "finished", "node_name"]
        return subdict(data, fields)
    else:
        return None
    

def api_list_projects():
    url = "http://185.250.148.161/listprojects.json"
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        return data['projects']
    else:
        return None
    

def api_listversions(project: str='default'):
    """  """
    url = "http://185.250.148.161/listversions.json"
    # check if project in projects
    assert project in api_list_projects()
    params = {'project': project}
    response = requests.get(url, params=params)
    if response.status_code == 200:
        data = response.json()
        return data['versions']
    else:
        return None


def api_listspiders(project: str='default', _version: str=None):
    """  """
    url = "http://185.250.148.161/listspiders.json"
    # check if project in projects
    assert project in api_list_projects()
    if not _version is None:
        assert _version in api_listversions(project)
    params = {'project': project, '_version': _version}
    response = requests.get(url, params=params)
    if response.status_code == 200:
        data = response.json()
        return data['spiders']
    else:
        return None
    

def api_listjobs(project: str='default'):
    """ 
        {
            "status": "ok",
            "pending": [
                {
                    "project": "myproject", "spider": "spider1",
                    "id": "78391cc0fcaf11e1b0090800272a6d06"
                }
            ],
            "running": [
                {
                    "id": "422e608f9f28cef127b3d5ef93fe9399",
                    "project": "myproject", "spider": "spider2",
                    "start_time": "2012-09-12 10:14:03.594664"
                }
            ],
            "finished": [
                {
                    "id": "2f16646cfcaf11e1b0090800272a6d06",
                    "project": "myproject", "spider": "spider3",
                    "start_time": "2012-09-12 10:14:03.594664",
                    "end_time": "2012-09-12 10:24:03.594664",
                    "log_url": "/logs/myproject/spider3/2f16646cfcaf11e1b0090800272a6d06.log",
                    "items_url": "/items/myproject/spider3/2f16646cfcaf11e1b0090800272a6d06.jl"
                }
            ]
        }
    """
    url = "http://185.250.148.161/listjobs.json"
    # check if project in projects
    assert project in api_list_projects()

    params = {'project': project}
    response = requests.get(url, params=params)
    if response.status_code == 200:
        data = response.json()
        return data
    else:
        return None


def api_delversion(_version: str, project: str='default'):
    """  """
    url = "http://185.250.148.161/delversion.json"
    # check if project in projects
    assert project in api_list_projects()
    assert _version in api_listversions(project)

    params = {'project': project, '_version': _version}
    response = requests.post(url, params=params)
    if response.status_code == 200:
        data = response.json()
        return data['status']
    else:
        return None
    

def api_delproject(project: str):
    """  """
    url = "http://185.250.148.161/delproject.json"
    # check if project in projects
    assert project in api_list_projects()
    params = {'project': project}
    response = requests.post(url, params=params)
    if response.status_code == 200:
        data = response.json()
        return data['status']
    else:
        return None
    

def get_scrapyd_logs(project_name: str, spider_name: str, job_id: str):
    """  """
    # http://185.250.148.161; default; wt_list; f205c3d4f64311ed8ad9ed526d4a5438
    api_url = 'http://185.250.148.161'
    url = f"{api_url}/logs/{project_name}/{spider_name}/{job_id}.log"
    response = requests.get(url)
    if response.status_code == 200:
        return response.text
    else:
        raise Exception(f"Exception: status code got: {response.status_code}")
    



def run_scrapy_spider(spider_name: str, project_name: str='default'):
    """  """
    assert spider_name in api_listspiders(project=project_name)

    api_url = 'http://185.250.148.161'
    schedule_url = f'{api_url}/schedule.json'
    response = requests.post(schedule_url, data={'project': project_name, 'spider': spider_name})
    if response.status_code == 200:
        print('Spider scheduled successfully.')
    else:
        print('Spider scheduling failed.')


run_scrapy_spider('bestel_list')