Skip to content

Commit

Permalink
Merge pull request #131 from hplt-project/server-patch
Browse files Browse the repository at this point in the history
  • Loading branch information
jelmervdl committed Nov 14, 2023
2 parents 2da188c + eb2895c commit 3ee797f
Showing 1 changed file with 7 additions and 18 deletions.
25 changes: 7 additions & 18 deletions opuscleaner/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
from opuscleaner.config import DATA_PATH, FILTER_PATH, COL_PY, SAMPLE_PY, SAMPLE_SIZE
from opuscleaner.datasets import list_datasets, Path
from opuscleaner.download import app as download_app
from opuscleaner.filters import filter_format_command, get_global_filter, get_global_filters, set_global_filters, list_filters, FilterType, FilterStep, FilterPipeline
from opuscleaner.filters import filter_format_command, format_shell, get_global_filter, get_global_filters, set_global_filters, list_filters, FilterType, FilterStep, FilterPipeline
from opuscleaner.sample import sample


Expand Down Expand Up @@ -128,7 +128,7 @@ class ParsedFilterOutput(BaseModel):
returncode: int
stdout: List[Dict[str,str]]
stderr: str

def __init__(self, output:FilterOutput):
lines = []

Expand Down Expand Up @@ -173,33 +173,22 @@ async def get_dataset_sample(name:str, columns:List[Tuple[str,Path]]) -> FilterO
return FilterOutput([lang for lang, _ in columns], 0, stdout, bytes())


def format_shell(val: Any) -> str:
if isinstance(val, bool):
return '1' if val else ''
elif isinstance(val, tuple):
raise NotImplementedError()
elif isinstance(val, list):
raise NotImplementedError()
else:
return str(val)


async def exec_filter_step(filter_step: FilterStep, langs: List[str], input: bytes) -> Tuple[bytes,bytes]:
filter_definition = get_global_filter(filter_step.filter)

command = filter_format_command(filter_definition, filter_step, langs)

# Make sure the path to the python binary (and the installed utils)
# is in the PATH variable. If you load a virtualenv this happens by
# default, but if you call it with the virtualenv's python binary
# default, but if you call it with the virtualenv's python binary
# directly it wont.
pyenv_bin_path = os.path.dirname(sys.executable)
os_env_bin_paths = os.environ.get('PATH', '').split(os.pathsep)
filter_env = {
**os.environ,
'PATH': os.pathsep.join([pyenv_bin_path] + os_env_bin_paths)
} if pyenv_bin_path not in os_env_bin_paths else None

p_filter = await asyncio.create_subprocess_shell(command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
Expand Down Expand Up @@ -273,11 +262,11 @@ async def get_sample(name:str, filters:List[FilterStep]) -> AsyncIterator[Filter
))

assert len(sample_cache[name]) == i + 1

# Again shield from cancellation. If we don't need this filter's output
# in the next `get_sample()`, `cancel_cached_tasks()` will cancel it.
sample = await asyncio.shield(sample_cache[name][i].future)

# Return the (partially) filtered sample
yield sample

Expand Down Expand Up @@ -396,7 +385,7 @@ def api_get_dataset_filters_as_openfilter(name:str) -> Response:
}

input_files = pipeline.files

preprocess_steps = []

filter_steps: List[Dict[str,Any]] = []
Expand Down

0 comments on commit 3ee797f

Please sign in to comment.