-
Notifications
You must be signed in to change notification settings - Fork 34
WIP: Add multiple files support #127
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d0718c5
b08d8d5
a31de17
a106db5
edc0af7
8758eb7
442db66
f941959
bdd9608
5818621
b93225f
309fc95
28afa2f
134f0d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,6 @@ | ||
[flake8] | ||
per-file-ignores = | ||
lms/tests/*.py:S101 | ||
lms/lmstests/sandbox/flake8/defines.py:E501 | ||
lms/tests/test_exercise_unit_tests.py:Q001,S101 | ||
lms/tests/test_extractor.py:W293,S101 | ||
ignore=I100,I201,W503 | ||
lms/tests/test_exercise_unit_tests.py:Q001 | ||
lms/tests/test_extractor.py:W293 | ||
ignore=I100,S101,I201,W503 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,22 +1,34 @@ | ||
from abc import abstractmethod | ||
from dataclasses import dataclass | ||
import re | ||
from re import IGNORECASE | ||
import string | ||
from typing import ( | ||
Any, ClassVar, Iterator, Pattern, Sequence, Tuple, Union, cast, | ||
Any, ClassVar, Iterator, List, | ||
Pattern, Sequence, Tuple, Union, cast, | ||
) | ||
|
||
from loguru import logger | ||
from werkzeug.datastructures import FileStorage | ||
|
||
Text = Union[str, bytes] | ||
CodeFile = Union[Sequence[Text], str, bytes] | ||
|
||
|
||
@dataclass | ||
class File: | ||
path: str | ||
code: Text | ||
|
||
|
||
class Extractor: | ||
UPLOAD_TITLE: ClassVar[Pattern] = re.compile(r'Upload\s+(\d+)', IGNORECASE) | ||
|
||
def __init__(self, to_extract: Any): | ||
def __init__(self, to_extract: FileStorage): | ||
self.to_extract = to_extract | ||
cursor_position = to_extract.tell() | ||
self.file_content = to_extract.read() | ||
to_extract.seek(cursor_position) | ||
self.filename = to_extract.filename | ||
|
||
@staticmethod | ||
def _convert_to_text(code: CodeFile) -> str: | ||
|
@@ -29,7 +41,7 @@ def _convert_to_text(code: CodeFile) -> str: | |
if code and isinstance(code, bytes): | ||
return code.decode(errors='replace') | ||
|
||
assert isinstance(code, str) # noqa: S101 | ||
assert isinstance(code, str) | ||
return code | ||
|
||
@classmethod | ||
|
@@ -38,37 +50,38 @@ def _split_header(cls, code: CodeFile) -> Tuple[str, str]: | |
|
||
clean_text = code.strip('#' + string.whitespace) | ||
first_line_end = clean_text.find('\n') | ||
first_line = clean_text[:first_line_end].strip() | ||
if first_line_end == -1: | ||
first_line_end = len(clean_text) | ||
first_line = clean_text[:first_line_end].strip().replace('_', ' ') | ||
code_lines = clean_text[first_line_end:].strip() | ||
|
||
logger.debug(f'Upload title: {first_line}') | ||
return first_line, code_lines | ||
|
||
@classmethod | ||
def _clean(cls, code: Union[Sequence, str]) -> Tuple[str, str]: | ||
def _clean(cls, code: Union[Sequence, str]) -> Tuple[int, str]: | ||
first_line, code_text = cls._split_header(code) | ||
upload_title = cls.UPLOAD_TITLE.fullmatch(first_line) | ||
if upload_title: | ||
return upload_title.group(1), code_text | ||
exercise_id = int(upload_title.group(1)) | ||
return exercise_id, code_text | ||
|
||
logger.debug(f'Unmatched title: {first_line}') | ||
return '', '' | ||
return 0, '' | ||
|
||
@abstractmethod | ||
def can_extract(self) -> bool: | ||
pass | ||
raise NotImplementedError() | ||
|
||
@classmethod | ||
@abstractmethod | ||
def get_exercise(cls, to_extract: Any) -> Tuple[str, str]: | ||
pass | ||
def get_exercise(self, to_extract: Any) -> Tuple[int, List[File]]: | ||
yammesicka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
raise NotImplementedError() | ||
|
||
@abstractmethod | ||
def get_exercises(self): | ||
pass | ||
def get_exercises(self) -> Iterator[Tuple[int, List[File]]]: | ||
raise NotImplementedError() | ||
|
||
def __iter__(self) -> Iterator[Tuple[str, str]]: | ||
def __iter__(self) -> Iterator[Tuple[int, List[File]]]: | ||
for cls in self.__class__.__subclasses__(): | ||
logger.debug(f'Trying extractor: {cls.__name__}') | ||
extractor = cls(to_extract=self.to_extract) | ||
if extractor.can_extract(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
my suggestion is to do something like: for cls ....:
if extractor.can_extract():
for ...
yield ...
return
logger.warning(f"couldn't find extractor for {self.to_extract}") |
||
yield from extractor.get_exercises() | ||
for solution_id, files in extractor.get_exercises(): | ||
yield (solution_id, files) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,46 +1,65 @@ | ||
from itertools import chain | ||
import itertools | ||
import json | ||
from operator import itemgetter | ||
from typing import Any, Dict, Iterator, List, Tuple | ||
|
||
from lms.extractors.base import Extractor | ||
from lms.extractors.base import Extractor, File | ||
|
||
|
||
NotebookJson = Dict[str, Any] | ||
Cell = Dict[str, Any] | ||
|
||
|
||
class Notebook(Extractor): | ||
POSSIBLE_JSON_EXCEPTIONS = ( | ||
json.JSONDecodeError, KeyError, StopIteration, UnicodeDecodeError, | ||
) | ||
|
||
def __init__(self, **kwargs): | ||
super().__init__(**kwargs) | ||
try: | ||
cells = self._get_code_cells() | ||
self.cells = chain([next(cells)], cells) # Run the generator | ||
except (json.JSONDecodeError, KeyError): | ||
# Triggers StopIteration if `cells` is empty (see example below). | ||
first_cell = next(cells) | ||
self.cells = itertools.chain([first_cell], cells) | ||
except self.POSSIBLE_JSON_EXCEPTIONS: | ||
self.is_json = False | ||
else: | ||
self.is_json = True | ||
|
||
def can_extract(self) -> bool: | ||
return self.is_json | ||
|
||
def _get_code_cells(self) -> Iterator[Cell]: | ||
notebook = json.loads(self.to_extract) | ||
cells = notebook['cells'] | ||
yield from filter(self._is_code_cell, cells) | ||
|
||
@staticmethod | ||
def _is_code_cell(cell: Cell) -> bool: | ||
return ( | ||
cell.get('cell_type', '') == 'code' | ||
and bool(cell.get('source')) | ||
) | ||
|
||
@classmethod | ||
def get_exercise(cls, to_extract: Cell) -> Tuple[str, str]: | ||
def _get_code_cells(self) -> Iterator[Cell]: | ||
notebook = json.loads(self.file_content) | ||
cells = notebook['cells'] | ||
yield from filter(self._is_code_cell, cells) | ||
|
||
def get_exercise(self, to_extract: Cell) -> Tuple[int, List[File]]: | ||
code: List[str] = to_extract.get('source', []) | ||
return cls._clean(code) | ||
exercise_id, clean_code = self._clean(code) | ||
yammesicka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return (exercise_id, [File('/main.py', clean_code)]) | ||
|
||
def get_exercises(self) -> Iterator[Tuple[str, str]]: | ||
def get_exercises(self) -> Iterator[Tuple[int, List[File]]]: | ||
"""Yield exercise ID and code from notebook.""" | ||
yield from filter(itemgetter(0), map(self.get_exercise, self.cells)) | ||
for cell in self.cells: | ||
exercise_id, files = self.get_exercise(cell) | ||
yammesicka marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if exercise_id and files and files[0].code: | ||
yield (exercise_id, files) | ||
|
||
|
||
if __name__ == '__main__': | ||
# An example of how the itertools.chain + next() trick works | ||
cells = iter([1, 2, 3]) | ||
assert list(itertools.chain([next(cells)], cells)) == [1, 2, 3] | ||
try: | ||
list(itertools.chain([next(cells)], cells)) | ||
raise AssertionError() | ||
except StopIteration: | ||
pass |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
from typing import Iterator, List, Tuple | ||
from zipfile import BadZipFile, ZipFile | ||
|
||
from loguru import logger | ||
|
||
from lms.extractors.base import Extractor, File | ||
from lms.models.errors import BadUploadFile | ||
|
||
|
||
class Ziparchive(Extractor): | ||
def __init__(self, **kwargs): | ||
super().__init__(**kwargs) | ||
self.is_zipfile = ( | ||
self.filename is not None | ||
and self.filename.endswith('.zip') | ||
) | ||
if not self.is_zipfile: | ||
return | ||
|
||
try: | ||
self.archive = ZipFile(self.to_extract.stream._file) | ||
except BadZipFile: | ||
self.is_zipfile = False | ||
|
||
def can_extract(self) -> bool: | ||
return self.is_zipfile | ||
|
||
@staticmethod | ||
def _extract(archive: ZipFile, filename: str) -> File: | ||
with archive.open(filename) as current_file: | ||
logger.debug(f'Extracting from archive: {filename}') | ||
code = current_file.read() | ||
decoded = code.decode('utf-8', errors='ignore') | ||
return File(path=f'/{filename}', code=decoded) | ||
|
||
def get_exercise(self, file: ZipFile) -> Tuple[int, List[File]]: | ||
assert self.filename is not None | ||
exercise_id, _ = self._clean(self.filename.rpartition('.')[0]) | ||
if not exercise_id: | ||
raise BadUploadFile('Invalid zip name', self.filename) | ||
|
||
with file as archive: | ||
namelist = archive.namelist() | ||
files = [self._extract(archive, filename) for filename in namelist] | ||
return exercise_id, files | ||
|
||
def get_exercises(self) -> Iterator[Tuple[int, List[File]]]: | ||
exercise_id, files = self.get_exercise(self.archive) | ||
if exercise_id and files and any(file.code for file in files): | ||
yield (exercise_id, files) |
Uh oh!
There was an error while loading. Please reload this page.