Skip to content

Commit

Permalink
disk usage (#2)
Browse files Browse the repository at this point in the history
* more setup

* <bot> update setup.cfg

* <bot> update dependencies*.log files(s)

* <bot> update dependencies*.log files(s)

* <bot> update dependencies*.log files(s)

* <bot> update dependencies*.log files(s)

* try with new setup action

* <bot> update dependencies*.log files(s)

* <bot> update dependencies*.log files(s)

* <bot> update dependencies*.log files(s)

* add python server

---------

Co-authored-by: github-actions <github-actions@github.com>
  • Loading branch information
dsschult and github-actions committed Aug 31, 2023
1 parent fc44732 commit da7e46a
Show file tree
Hide file tree
Showing 17 changed files with 883 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.10
FROM python:3.11

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y attr

Expand Down
40 changes: 40 additions & 0 deletions cephfs_disk_usage/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import asyncio
import logging

from wipac_dev_tools import from_environment

from .server import Server

# handle logging
setlevel = {
'CRITICAL': logging.CRITICAL, # execution cannot continue
'FATAL': logging.CRITICAL,
'ERROR': logging.ERROR, # something is wrong, but try to continue
'WARNING': logging.WARNING, # non-ideal behavior, important event
'WARN': logging.WARNING,
'INFO': logging.INFO, # initial debug information
'DEBUG': logging.DEBUG # the things no one wants to see
}

default_config = {
'LOG_LEVEL': 'INFO',
}
config = from_environment(default_config)
if config['LOG_LEVEL'].upper() not in setlevel:
raise Exception('LOG_LEVEL is not a proper log level')
logformat = '%(asctime)s %(levelname)s %(name)s %(module)s:%(lineno)s - %(message)s'

logging.basicConfig(format=logformat, level=setlevel[config['LOG_LEVEL'].upper()])


# start server
async def main():
s = Server()
await s.start()
try:
await asyncio.Event().wait()
finally:
await s.stop()


asyncio.run(main())
234 changes: 234 additions & 0 deletions cephfs_disk_usage/server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
"""
Server
"""

import asyncio
from dataclasses import dataclass as dc
import json
import logging
import os
from pathlib import Path
import stat
from typing import Self

from tornado.web import RequestHandler, HTTPError
from rest_tools.server import RestServer
from wipac_dev_tools import from_environment

from . import __version__ as version

logger = logging.getLogger('server')


class Error(RequestHandler):
def prepare(self):
raise HTTPError(404, 'invalid route')


class BaseHandler(RequestHandler):
def initialize(self, filesystems):
self.filesystems = filesystems

def set_default_headers(self):
self._headers['Server'] = f'CephFS Disk Usage {version}'

def get_template_namespace(self):
ret = super().get_template_namespace()
ret['os'] = os
ret['json_encode'] = json.dumps
return ret


class Health(BaseHandler):
async def get(self):
ret = {}
for k in self.filesystems:
ret[k] = await self.filesystems[k].status()
self.write(ret)


class Main(BaseHandler):
async def get(self):
paths = {}
for k in self.filesystems:
paths[k] = await self.filesystems[k].dir_entry('/')
self.render('main.html', paths=paths)


class Details(BaseHandler):
async def get(self, path):
for fs in self.filesystems:
if path.startswith(fs):
data = await self.filesystems[fs].dir_entry(path[len(fs):])
break
else:
raise HTTPError(400, 'bad path')

self.render('details.html', path=path, data=data)


async def call(*args, shell=False):
if shell:
ret = await asyncio.create_subprocess_shell(' '.join(args), stdout=asyncio.subprocess.PIPE)
else:
ret = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
out,err = await ret.communicate()
if ret.returncode:
raise Exception(f'call failed: return code {ret.returncode}')
return out


@dc
class Entry:
name: str
path: str
size: int
is_dir: bool = False
is_link: bool = False
nfiles: int = 0
percent_size: float = 0.0


@dc
class DirEntry:
name: str
path: str
size: int
nfiles: int
children: list

@classmethod
def from_entry(cls, e: Entry) -> Self:
if not e.is_dir:
raise Exception('is not a directory!')
return cls(e.name, e.path, e.size, e.nfiles, [])


class POSIXFileSystem:
def __init__(self, base_path):
self.base_path = Path(base_path)

async def status(self):
try:
async with asyncio.timeout(5):
await call('/usr/bin/ls', str(self.base_path))
except Exception as e:
return f'FAIL: {e}'
return 'OK'

async def _get_meta(self, path: Path) -> Entry:
"""Get recursive size and nfiles for a path"""
if not path.is_relative_to(self.base_path):
raise Exception('not relative to base path')
p = str(path)
async with asyncio.timeout(30):
stats = await asyncio.to_thread(path.stat)
is_dir = stat.S_ISDIR(stats.st_mode)
is_link = stat.S_ISLNK(stats.st_mode)
if is_dir:
size, nfiles = await asyncio.gather(
call('du', '-s', '-b', p, shell=True),
call(f'find "{p}" -type f | wc -l', shell=True)
)
size = int(size.split()[0])
nfiles = int(nfiles.strip())
else:
size = stats.st_size
nfiles = 0
return Entry(path.name, p, size, is_dir, is_link, nfiles)

async def dir_entry(self, path: str) -> DirEntry:
"""Get directory contents"""
fullpath = self.base_path / path.lstrip('/')
if not fullpath.is_dir():
raise Exception('not a directory!')

tasks = []
async with asyncio.TaskGroup() as tg:
for child in fullpath.iterdir():
tasks.append(tg.create_task(self._get_meta(child)))

ret = DirEntry.from_entry(await self._get_meta(fullpath))
for task in tasks:
r = await task
r.percent_size = r.size*100.0/ret.size
ret.children.append(r)
ret.children.sort(key=lambda r: r.name)

return ret


class CephFileSystem(POSIXFileSystem):
async def _get_meta(self, path: Path) -> Entry:
"""Get recursive size and nfiles for a path"""
if not path.is_relative_to(self.base_path):
raise Exception('not relative to base path')
p = str(path)
async with asyncio.timeout(30):
stats = await asyncio.to_thread(path.stat)
is_dir = stat.S_ISDIR(stats.st_mode)
is_link = stat.S_ISLNK(stats.st_mode)
if is_dir:
size, nfiles = await asyncio.gather(
call('/usr/bin/getfattr', '-n', 'ceph.dir.rbytes', p),
call('/usr/bin/getfattr', '-n', 'ceph.dir.rfiles', p)
)
size = int(size.split('=')[-1].strip('" \n'))
nfiles = int(nfiles.split('=')[-1].strip('" \n'))
else:
size = stats.st_size
nfiles = 0
return Entry(path.name, p, size, is_dir, is_link, nfiles)


class Server:
def __init__(self, s3_override=None):
static_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
template_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'templates')

default_config = {
'HOST': 'localhost',
'PORT': 8080,
'DEBUG': False,
'MAX_BODY_SIZE': 10**9,
'CI_TESTING': '',
}
config = from_environment(default_config)

kwargs = {}
if config['CI_TESTING']:
cwd = os.path.abspath(config['CI_TESTING'])
kwargs['filesystems'] = {
cwd: POSIXFileSystem(cwd),
}
else:
kwargs['filesystems'] = {
'/data/ana': CephFileSystem('/data/ana'),
'/data/user': CephFileSystem('/data/user'),
}

server = RestServer(
static_path=static_path,
template_path=template_path,
debug=config['DEBUG'],
max_body_size=config['MAX_BODY_SIZE'],
)

server.add_route('/', Main, kwargs)
# handle moving up gracefully
if config['CI_TESTING']:
server.add_route(cwd, Main, kwargs)
else:
server.add_route('/data', Main, kwargs)
server.add_route('/healthz', Health, kwargs)
server.add_route(r'(.*)', Details, kwargs)

server.startup(address=config['HOST'], port=config['PORT'])

self.server = server

async def start(self):
pass

async def stop(self):
await self.server.stop()
Binary file added cephfs_disk_usage/static/apple-touch-icon.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit da7e46a

Please sign in to comment.