diff --git a/.github/workflows/cs102.yml b/.github/workflows/cs102.yml index 4509ac4..cb286a1 100644 --- a/.github/workflows/cs102.yml +++ b/.github/workflows/cs102.yml @@ -10,7 +10,7 @@ jobs: - name: Set up Python 3.8.6 uses: actions/setup-python@v2 with: - python-version: '3.8.6' + python-version: '3.10' - name: Install dependencies run: | python -m pip install --upgrade pip diff --git a/homework02/sudoku.py b/homework02/sudoku.py index df78ab1..df02ea2 100644 --- a/homework02/sudoku.py +++ b/homework02/sudoku.py @@ -1,11 +1,12 @@ import pathlib +import random import typing as tp T = tp.TypeVar("T") def read_sudoku(path: tp.Union[str, pathlib.Path]) -> tp.List[tp.List[str]]: - """ Прочитать Судоку из указанного файла """ + """Прочитать Судоку из указанного файла""" path = pathlib.Path(path) with path.open() as f: puzzle = f.read() @@ -19,7 +20,7 @@ def create_grid(puzzle: str) -> tp.List[tp.List[str]]: def display(grid: tp.List[tp.List[str]]) -> None: - """Вывод Судоку """ + """Вывод Судоку""" width = 2 line = "+".join(["-" * (width * 3)] * 3) for row in range(9): @@ -38,11 +39,18 @@ def group(values: tp.List[T], n: int) -> tp.List[tp.List[T]]: Сгруппировать значения values в список, состоящий из списков по n элементов >>> group([1,2,3,4], 2) - [[1, 2], [3, 4]] + [[1, 2], [3, 4]]) >>> group([1,2,3,4,5,6,7,8,9], 3) [[1, 2, 3], [4, 5, 6], [7, 8, 9]] """ - pass + i = 0 + j = n + s = [] + while j <= len(values): + s.append(values[i:j]) + i = j + j += n + return s def get_row(grid: tp.List[tp.List[str]], pos: tp.Tuple[int, int]) -> tp.List[str]: @@ -55,7 +63,7 @@ def get_row(grid: tp.List[tp.List[str]], pos: tp.Tuple[int, int]) -> tp.List[str >>> get_row([['1', '2', '3'], ['4', '5', '6'], ['.', '8', '9']], (2, 0)) ['.', '8', '9'] """ - pass + return grid[pos[0]] def get_col(grid: tp.List[tp.List[str]], pos: tp.Tuple[int, int]) -> tp.List[str]: @@ -68,7 +76,10 @@ def get_col(grid: tp.List[tp.List[str]], pos: tp.Tuple[int, int]) -> tp.List[str >>> get_col([['1', '2', '3'], ['4', '5', '6'], ['.', '8', '9']], (0, 2)) ['3', '6', '9'] """ - pass + post = [] + for i in range(len(grid)): + post.append(grid[i][pos[1]]) + return post def get_block(grid: tp.List[tp.List[str]], pos: tp.Tuple[int, int]) -> tp.List[str]: @@ -82,10 +93,14 @@ def get_block(grid: tp.List[tp.List[str]], pos: tp.Tuple[int, int]) -> tp.List[s >>> get_block(grid, (8, 8)) ['2', '8', '.', '.', '.', '5', '.', '7', '9'] """ - pass + square = [] + for i in range(pos[0] // 3 * 3, pos[0] // 3 * 3 + 3): + for j in range(pos[1] // 3 * 3, pos[1] // 3 * 3 + 3): + square.append(grid[i][j]) + return square -def find_empty_positions(grid: tp.List[tp.List[str]]) -> tp.Optional[tp.Tuple[int, int]]: +def find_empty_positions(grid: tp.List[tp.List[str]]): """Найти первую свободную позицию в пазле >>> find_empty_positions([['1', '2', '.'], ['4', '5', '6'], ['7', '8', '9']]) @@ -95,7 +110,14 @@ def find_empty_positions(grid: tp.List[tp.List[str]]) -> tp.Optional[tp.Tuple[in >>> find_empty_positions([['1', '2', '3'], ['4', '5', '6'], ['.', '8', '9']]) (2, 0) """ - pass + x = -1 + for i in range(len(grid)): + x += 1 + y = 0 + for j in range(len(grid[i])): + if grid[i][j] == ".": + return x, y + y += 1 def find_possible_values(grid: tp.List[tp.List[str]], pos: tp.Tuple[int, int]) -> tp.Set[str]: @@ -109,11 +131,13 @@ def find_possible_values(grid: tp.List[tp.List[str]], pos: tp.Tuple[int, int]) - >>> values == {'2', '5', '9'} True """ - pass + full = set(get_row(grid, pos) + get_col(grid, pos) + get_block(grid, pos)) + possible_values = set("123456789") - full + return possible_values -def solve(grid: tp.List[tp.List[str]]) -> tp.Optional[tp.List[tp.List[str]]]: - """ Решение пазла, заданного в grid """ +def solve(grid: tp.List[tp.List[str]]): + """Решение пазла, заданного в grid""" """ Как решать Судоку? 1. Найти свободную позицию 2. Найти все возможные значения, которые могут находиться на этой позиции @@ -125,13 +149,33 @@ def solve(grid: tp.List[tp.List[str]]) -> tp.Optional[tp.List[tp.List[str]]]: >>> solve(grid) [['5', '3', '4', '6', '7', '8', '9', '1', '2'], ['6', '7', '2', '1', '9', '5', '3', '4', '8'], ['1', '9', '8', '3', '4', '2', '5', '6', '7'], ['8', '5', '9', '7', '6', '1', '4', '2', '3'], ['4', '2', '6', '8', '5', '3', '7', '9', '1'], ['7', '1', '3', '9', '2', '4', '8', '5', '6'], ['9', '6', '1', '5', '3', '7', '2', '8', '4'], ['2', '8', '7', '4', '1', '9', '6', '3', '5'], ['3', '4', '5', '2', '8', '6', '1', '7', '9']] """ - pass + empty_positions = find_empty_positions(grid) + if empty_positions is None: + return grid + else: + possible_values = find_possible_values(grid, empty_positions) + for i in possible_values: + grid[empty_positions[0]][empty_positions[1]] = i + next = solve(grid) + if next: + return next + grid[empty_positions[0]][empty_positions[1]] = "." def check_solution(solution: tp.List[tp.List[str]]) -> bool: - """ Если решение solution верно, то вернуть True, в противном случае False """ + """Если решение solution верно, то вернуть True, в противном случае False""" # TODO: Add doctests with bad puzzles - pass + all = ["1", "2", "3", "4", "5", "6", "7", "8", "9"] + for i in range(0, len(solution)): + for j in range(0, len(solution)): + pos = i, j + if ( + not sorted(get_row(solution, pos)) == all + or not sorted(get_col(solution, pos)) == all + or not sorted(get_block(solution, pos)) == all + ): + return False + return True def generate_sudoku(N: int) -> tp.List[tp.List[str]]: @@ -156,7 +200,24 @@ def generate_sudoku(N: int) -> tp.List[tp.List[str]]: >>> check_solution(solution) True """ - pass + grid = [["." for i in range(9)] for i in range(9)] + print(N) + if N > 81: + N = 81 + print(grid) + grid = solve(grid) + print(grid) + k = 0 + while k != 81 - N: + i = random.randint(0, 8) + j = random.randint(0, 8) + if grid[i][j] == ".": + k = k + else: + grid[i][j] = "." + k += 1 + print(grid) + return grid if __name__ == "__main__": @@ -168,3 +229,8 @@ def generate_sudoku(N: int) -> tp.List[tp.List[str]]: print(f"Puzzle {fname} can't be solved") else: display(solution) + if check_solution(solution): + print("Solution is correct") + else: + print("LoL") + display(generate_sudoku(1)) diff --git a/homework03/life.py b/homework03/life.py index 7aef0b6..a95dcf8 100644 --- a/homework03/life.py +++ b/homework03/life.py @@ -30,45 +30,127 @@ def __init__( def create_grid(self, randomize: bool = False) -> Grid: # Copy from previous assignment - pass + if randomize: + return [[random.randint(0, 1) for i in range(self.rows)] for i in range(self.cols)] + else: + return [[0 for i in range(self.rows)] for i in range(self.cols)] def get_neighbours(self, cell: Cell) -> Cells: # Copy from previous assignment - pass + height, width = cell + cells = [] + if height == 0 and width == 0: + cells.append(self.curr_generation[0][1]) + cells.append(self.curr_generation[1][0]) + cells.append(self.curr_generation[1][1]) + elif height == 0 and width == self.cols - 1: + cells.append(self.curr_generation[0][width - 1]) + cells.append(self.curr_generation[1][width]) + cells.append(self.curr_generation[1][width - 1]) + elif height == self.rows - 1 and width == 0: + cells.append(self.curr_generation[height - 1][0]) + cells.append(self.curr_generation[height - 1][1]) + cells.append(self.curr_generation[height][1]) + elif height == self.rows - 1 and width == self.cols - 1: + cells.append(self.curr_generation[height - 1][width - 1]) + cells.append(self.curr_generation[height][width - 1]) + cells.append(self.curr_generation[height - 1][width]) + elif height == 0: + cells.append(self.curr_generation[0][width - 1]) + cells.append(self.curr_generation[0][width + 1]) + cells.append(self.curr_generation[1][width]) + cells.append(self.curr_generation[1][width - 1]) + cells.append(self.curr_generation[1][width + 1]) + elif width == 0: + cells.append(self.curr_generation[height - 1][0]) + cells.append(self.curr_generation[height + 1][0]) + cells.append(self.curr_generation[height][1]) + cells.append(self.curr_generation[height - 1][1]) + cells.append(self.curr_generation[height + 1][1]) + elif height == self.rows - 1: + cells.append(self.curr_generation[height][width - 1]) + cells.append(self.curr_generation[height][width + 1]) + cells.append(self.curr_generation[height - 1][width]) + cells.append(self.curr_generation[height - 1][width - 1]) + cells.append(self.curr_generation[height - 1][width + 1]) + elif width == self.cols - 1: + cells.append(self.curr_generation[height - 1][width]) + cells.append(self.curr_generation[height + 1][width]) + cells.append(self.curr_generation[height][width - 1]) + cells.append(self.curr_generation[height - 1][width - 1]) + cells.append(self.curr_generation[height + 1][width - 1]) + else: + cells.append(self.curr_generation[height][width - 1]) + cells.append(self.curr_generation[height][width + 1]) + cells.append(self.curr_generation[height - 1][width]) + cells.append(self.curr_generation[height + 1][width]) + cells.append(self.curr_generation[height - 1][width - 1]) + cells.append(self.curr_generation[height - 1][width + 1]) + cells.append(self.curr_generation[height + 1][width + 1]) + cells.append(self.curr_generation[height + 1][width - 1]) + return cells def get_next_generation(self) -> Grid: # Copy from previous assignment - pass + next_grid = [[0 for i in range(self.cols)] for i in range(self.rows)] + for i in range(self.rows): + for j in range(self.cols): + if self.curr_generation[i][j] == 1 and ( + sum(self.get_neighbours((i, j))) == 2 or sum(self.get_neighbours((i, j))) == 3 + ): + next_grid[i][j] = 1 + elif self.curr_generation[i][j] == 0 and sum(self.get_neighbours((i, j))) == 3: + next_grid[i][j] = 1 + else: + next_grid[i][j] = 0 + return next_grid def step(self) -> None: """ Выполнить один шаг игры. """ - pass + self.prev_generation = self.curr_generation + self.curr_generation = self.get_next_generation() + self.generations += 1 @property def is_max_generations_exceeded(self) -> bool: """ Не превысило ли текущее число поколений максимально допустимое. """ - pass + if self.max_generations == self.generations: + return True + else: + return False @property def is_changing(self) -> bool: """ Изменилось ли состояние клеток с предыдущего шага. """ - pass + if self.curr_generation == self.prev_generation: + return False + else: + return True @staticmethod def from_file(filename: pathlib.Path) -> "GameOfLife": """ Прочитать состояние клеток из указанного файла. """ - pass + f = open(filename, "r").readlines() + grid = [] + for i in range(len(f)): + r = list(map(int, f[i].split())) + grid.append(r) + life = GameOfLife(size=(len(grid), len(grid[0])), randomize=False) + life.curr_generation = grid + return life def save(self, filename: pathlib.Path) -> None: """ Сохранить текущее состояние клеток в указанный файл. """ - pass + with open(filename, "w") as f: + for i in self.curr_generation: + f.write("".join([str(j) for j in i]) + "\n") diff --git a/homework03/life_console.py b/homework03/life_console.py index ddeb9ef..e5a90b8 100644 --- a/homework03/life_console.py +++ b/homework03/life_console.py @@ -9,14 +9,32 @@ def __init__(self, life: GameOfLife) -> None: super().__init__(life) def draw_borders(self, screen) -> None: - """ Отобразить рамку. """ - pass + """Отобразить рамку.""" + screen.border("|", "|", "-", "-", "+", "+", "+", "+") def draw_grid(self, screen) -> None: - """ Отобразить состояние клеток. """ - pass + """Отобразить состояние клеток.""" + for i in range(self.life.rows): + for j in range(self.life.cols): + if self.life.curr_generation[i][j] == 0: + screen.addch(i + 1, j + 1, " ") + else: + screen.addch(i + 1, j + 1, "#") def run(self) -> None: screen = curses.initscr() # PUT YOUR CODE HERE - curses.endwin() + while True: + self.draw_borders(screen) + self.draw_grid(screen) + screen.refresh() + self.life.step() + if screen.getch() == 32: # space + curses.endwin() + break + + +if __name__ == "__main__": + life = GameOfLife((100, 100), max_generations=50) + ui = Console(life) + ui.run() diff --git a/homework03/life_gui.py b/homework03/life_gui.py index 1126b29..19b8bf5 100644 --- a/homework03/life_gui.py +++ b/homework03/life_gui.py @@ -7,15 +7,89 @@ class GUI(UI): def __init__(self, life: GameOfLife, cell_size: int = 10, speed: int = 10) -> None: super().__init__(life) + self.cell_size = cell_size + self.width = life.rows * cell_size + self.height = life.cols * cell_size + self.screen_size = self.width, self.height + self.screen = pygame.display.set_mode(self.screen_size) + self.cell_width = self.width // self.cell_size + self.cell_height = self.height // self.cell_size + self.speed = speed def draw_lines(self) -> None: - # Copy from previous assignment - pass + for x in range(0, self.width, self.cell_size): + pygame.draw.line(self.screen, pygame.Color("black"), (x, 0), (x, self.height)) + for y in range(0, self.height, self.cell_size): + pygame.draw.line(self.screen, pygame.Color("black"), (0, y), (self.width, y)) def draw_grid(self) -> None: - # Copy from previous assignment - pass + for i in range(self.cell_height): + for j in range(self.cell_width): + if self.life.curr_generation[i][j] == 1: + pygame.draw.rect( + self.screen, + pygame.Color("green"), + ( + j * self.cell_size + 1, + i * self.cell_size + 1, + self.cell_size - 1, + self.cell_size - 1, + ), + ) + else: + pygame.draw.rect( + self.screen, + pygame.Color("white"), + ( + j * self.cell_size + 1, + i * self.cell_size + 1, + self.cell_size - 1, + self.cell_size - 1, + ), + ) + + def myself(self, height, width) -> None: + col = height // self.cell_size + row = width // self.cell_size + if self.life.curr_generation[row][col] == 0: + self.life.curr_generation[row][col] = 1 + else: + self.life.curr_generation[row][col] = 0 def run(self) -> None: - # Copy from previous assignment - pass + pygame.init() + clock = pygame.time.Clock() + pygame.display.set_caption("Game of Life") + self.screen.fill(pygame.Color("white")) + running = True + pause = False + while running: + for event in pygame.event.get(): + if event.type == pygame.KEYDOWN: + if event.key == pygame.K_SPACE: + pause = not pause + elif event.type == pygame.MOUSEBUTTONDOWN: + height, width = event.pos + self.myself(height, width) + self.draw_grid() + self.draw_lines() + pygame.display.flip() + elif event.type == QUIT: + running = False + if not self.life.is_changing: + running = False + elif self.life.is_max_generations_exceeded: + running = False + if not pause: + self.life.step() + self.draw_lines() + self.draw_grid() + pygame.display.flip() + clock.tick(self.speed) + pygame.quit() + + +if __name__ == "__main__": + game = GameOfLife(size=(100, 100), randomize=True) + gui = GUI(life=game, cell_size=10, speed=10) + gui.run() diff --git a/homework03/life_proto.py b/homework03/life_proto.py index c6d6010..a816204 100644 --- a/homework03/life_proto.py +++ b/homework03/life_proto.py @@ -30,33 +30,32 @@ def __init__( self.speed = speed def draw_lines(self) -> None: - """ Отрисовать сетку """ + """Отрисовать сетку""" for x in range(0, self.width, self.cell_size): pygame.draw.line(self.screen, pygame.Color("black"), (x, 0), (x, self.height)) for y in range(0, self.height, self.cell_size): pygame.draw.line(self.screen, pygame.Color("black"), (0, y), (self.width, y)) def run(self) -> None: - """ Запустить игру """ + """Запустить игру""" pygame.init() clock = pygame.time.Clock() pygame.display.set_caption("Game of Life") self.screen.fill(pygame.Color("white")) - # Создание списка клеток - # PUT YOUR CODE HERE - + # Добавил поле + self.grid = self.create_grid(randomize=True) + self.draw_grid(self.grid) running = True while running: for event in pygame.event.get(): if event.type == QUIT: running = False self.draw_lines() - # Отрисовка списка клеток # Выполнение одного шага игры (обновление состояния ячеек) - # PUT YOUR CODE HERE - + self.grid = self.get_next_generation() + self.draw_grid(self.grid) pygame.display.flip() clock.tick(self.speed) pygame.quit() @@ -79,13 +78,42 @@ def create_grid(self, randomize: bool = False) -> Grid: out : Grid Матрица клеток размером `cell_height` х `cell_width`. """ - pass - - def draw_grid(self) -> None: + if randomize: + return [ + [random.randint(0, 1) for i in range(self.cell_width)] + for i in range(self.cell_height) + ] + else: + return [[0 for i in range(self.cell_width)] for i in range(self.cell_height)] + + def draw_grid(self, grid: list[list[int]]) -> None: """ Отрисовка списка клеток с закрашиванием их в соответствующе цвета. """ - pass + for i in range(self.cell_height): + for j in range(self.cell_width): + if grid[i][j] == 1: + pygame.draw.rect( + self.screen, + pygame.Color("green"), + ( + j * self.cell_size + 1, + i * self.cell_size + 1, + self.cell_size - 1, + self.cell_size - 1, + ), + ) + else: + pygame.draw.rect( + self.screen, + pygame.Color("white"), + ( + j * self.cell_size + 1, + i * self.cell_size + 1, + self.cell_size - 1, + self.cell_size - 1, + ), + ) def get_neighbours(self, cell: Cell) -> Cells: """ @@ -105,7 +133,58 @@ def get_neighbours(self, cell: Cell) -> Cells: out : Cells Список соседних клеток. """ - pass + height, width = cell + cells = [] + if height == 0 and width == 0: + cells.append(self.grid[0][1]) + cells.append(self.grid[1][0]) + cells.append(self.grid[1][1]) + elif height == 0 and width == self.cell_width - 1: + cells.append(self.grid[0][width - 1]) + cells.append(self.grid[1][width]) + cells.append(self.grid[1][width - 1]) + elif height == self.cell_height - 1 and width == 0: + cells.append(self.grid[height - 1][0]) + cells.append(self.grid[height - 1][1]) + cells.append(self.grid[height][1]) + elif height == self.cell_height - 1 and width == self.cell_width - 1: + cells.append(self.grid[height - 1][width - 1]) + cells.append(self.grid[height][width - 1]) + cells.append(self.grid[height - 1][width]) + elif height == 0: + cells.append(self.grid[0][width - 1]) + cells.append(self.grid[0][width + 1]) + cells.append(self.grid[1][width]) + cells.append(self.grid[1][width - 1]) + cells.append(self.grid[1][width + 1]) + elif width == 0: + cells.append(self.grid[height - 1][0]) + cells.append(self.grid[height + 1][0]) + cells.append(self.grid[height][1]) + cells.append(self.grid[height - 1][1]) + cells.append(self.grid[height + 1][1]) + elif height == self.cell_height - 1: + cells.append(self.grid[height][width - 1]) + cells.append(self.grid[height][width + 1]) + cells.append(self.grid[height - 1][width]) + cells.append(self.grid[height - 1][width - 1]) + cells.append(self.grid[height - 1][width + 1]) + elif width == self.cell_width - 1: + cells.append(self.grid[height - 1][width]) + cells.append(self.grid[height + 1][width]) + cells.append(self.grid[height][width - 1]) + cells.append(self.grid[height - 1][width - 1]) + cells.append(self.grid[height + 1][width - 1]) + else: + cells.append(self.grid[height][width - 1]) + cells.append(self.grid[height][width + 1]) + cells.append(self.grid[height - 1][width]) + cells.append(self.grid[height + 1][width]) + cells.append(self.grid[height - 1][width - 1]) + cells.append(self.grid[height - 1][width + 1]) + cells.append(self.grid[height + 1][width + 1]) + cells.append(self.grid[height + 1][width - 1]) + return cells def get_next_generation(self) -> Grid: """ @@ -116,4 +195,20 @@ def get_next_generation(self) -> Grid: out : Grid Новое поколение клеток. """ - pass + next_grid = [[0 for i in range(self.cell_width)] for i in range(self.cell_height)] + for i in range(self.cell_height): + for j in range(self.cell_width): + if self.grid[i][j] == 1 and ( + sum(self.get_neighbours((i, j))) == 2 or sum(self.get_neighbours((i, j))) == 3 + ): + next_grid[i][j] = 1 + elif self.grid[i][j] == 0 and sum(self.get_neighbours((i, j))) == 3: + next_grid[i][j] = 1 + else: + next_grid[i][j] = 0 + return next_grid + + +if __name__ == "__main__": + game = GameOfLife(width=640, height=480, cell_size=10, speed=10) + game.run() diff --git a/homework04/pyvcs/index.py b/homework04/pyvcs/index.py index 85a5e91..0ae8a3b 100644 --- a/homework04/pyvcs/index.py +++ b/homework04/pyvcs/index.py @@ -25,30 +25,105 @@ class GitIndexEntry(tp.NamedTuple): name: str def pack(self) -> bytes: - # PUT YOUR CODE HERE - ... + return struct.pack( + f">10i20sh{len(self.name)}s3x", + *[ + self.ctime_s, + self.ctime_n, + self.mtime_s, + self.mtime_n, + self.dev, + self.ino, + self.mode, + self.uid, + self.gid, + self.size, + self.sha1, + self.flags, + self.name.encode(), + ], + ) @staticmethod def unpack(data: bytes) -> "GitIndexEntry": - # PUT YOUR CODE HERE - ... + ctime_s, ctime_n, mtime_s, mtime_n, dev, ino, mode, uid, gid, size = struct.unpack( + ">4L6i", data[:40] + ) + sha1 = data[40:60] + flags = struct.unpack(">H", data[60:62])[0] + name = data[62 : data[62:].index(b"\x00\x00\x00") + 62].decode("ascii") + + return GitIndexEntry( + ctime_s, ctime_n, mtime_s, mtime_n, dev, ino, mode, uid, gid, size, sha1, flags, name + ) def read_index(gitdir: pathlib.Path) -> tp.List[GitIndexEntry]: - # PUT YOUR CODE HERE - ... + if not pathlib.Path(gitdir / "index").is_file(): + return [] + with open(gitdir / "index", "rb") as f: + arr = f.read() + l = struct.unpack(">I", arr[8:12])[0] + data = arr[12:] + res = [] + for _ in range(l): + res.append(GitIndexEntry.unpack(data)) + index = data.index(res[-1].name.encode()) + len(res[-1].name) + 3 + data = data[index:] + return res def write_index(gitdir: pathlib.Path, entries: tp.List[GitIndexEntry]) -> None: - # PUT YOUR CODE HERE - ... + with open(gitdir / "index", "wb") as f: + all = b"" + all += b"DIRC" + all += struct.pack(">I", 2) + all += struct.pack(">I", len(entries)) + for i in entries: + all += i.pack() + all += hashlib.sha1(all).digest() + f.write(all) def ls_files(gitdir: pathlib.Path, details: bool = False) -> None: - # PUT YOUR CODE HERE - ... + res = read_index(gitdir) + for i in res: + if details: + print("100644 " + i.sha1.hex() + " 0\t" + i.name, flush=True) + else: + print(i.name, flush=True) def update_index(gitdir: pathlib.Path, paths: tp.List[pathlib.Path], write: bool = True) -> None: - # PUT YOUR CODE HERE - ... + objects = [] + for i in paths: + path = pathlib.Path(i) + stats = os.stat(path) + + with open(path, "rb") as f: + hash = hash_object(f.read(), "blob", True) + + e = GitIndexEntry( + ctime_s=int(stats.st_ctime), + ctime_n=int(stats.st_ctime), + mtime_s=int(stats.st_mtime), + mtime_n=int(stats.st_mtime), + dev=stats.st_dev, + ino=stats.st_ino, + mode=stats.st_mode, + uid=stats.st_uid, + gid=stats.st_uid, + size=stats.st_size, + sha1=bytes.fromhex(hash), + flags=0, + name=str(path).replace("\\", "/"), + ) + + objects.append(e) + objects.sort(key=lambda x: x.name) + if not (gitdir / "index").exists(): + write_index(gitdir, objects) + else: + index = read_index(gitdir) + index += objects + write_index(gitdir, index) diff --git a/homework04/pyvcs/objects.py b/homework04/pyvcs/objects.py index 013cc8d..1190134 100644 --- a/homework04/pyvcs/objects.py +++ b/homework04/pyvcs/objects.py @@ -11,40 +11,109 @@ def hash_object(data: bytes, fmt: str, write: bool = False) -> str: - # PUT YOUR CODE HERE - ... + root = ".git" + header = f"{fmt} {len(data)}\0" + store = header.encode() + data + hash = hashlib.sha1(store).hexdigest() -def resolve_object(obj_name: str, gitdir: pathlib.Path) -> tp.List[str]: - # PUT YOUR CODE HERE - ... + if not write: + return hash + + path = root + "/objects/" + hash[:2] + if not os.path.exists(path): + os.makedirs(path) + + with open(path + os.sep + hash[2:], "wb") as f: + f.write(zlib.compress(store)) + return hash -def find_object(obj_name: str, gitdir: pathlib.Path) -> str: - # PUT YOUR CODE HERE - ... + +def resolve_object(obj_name: str, gitdir: pathlib.Path) -> tp.List[str]: + if not 4 <= len(obj_name) <= 40: + raise Exception(f"Not a valid object name {obj_name}") + objs_path = gitdir / "objects" / obj_name[:2] + objects = [] + for obj_path in objs_path.iterdir(): + if not obj_path.name.find(obj_name[2:]): + objects.append(obj_name[:2] + obj_path.name) + if len(objects): + return objects + else: + raise Exception(f"Not a valid object name {obj_name}") + + +def find_object(obj_name: str, gitdir: pathlib.Path) -> tp.Optional[str]: + if obj_name[2:] in gitdir.parts[-1]: + return f"{gitdir.parts[-2]}{gitdir.parts[-1]}" + else: + return None def read_object(sha: str, gitdir: pathlib.Path) -> tp.Tuple[str, bytes]: - # PUT YOUR CODE HERE - ... + path = gitdir / "objects" / sha[:2] / sha[2:] + with open(path, mode="rb") as f: + obj_data = zlib.decompress(f.read()) + h = obj_data.find(b"\x00") + header = obj_data[:h] + b = obj_data.find(b" ") + fmt = header[:b].decode("ascii") + content_len = int(header[b:h].decode("ascii")) + content = obj_data[h + 1 :] + assert content_len == len(content) + return (fmt, content) def read_tree(data: bytes) -> tp.List[tp.Tuple[int, str, str]]: - # PUT YOUR CODE HERE - ... + result = [] + while len(data) != 0: + b = data.find(b" ") + fmt = int(data[:b].decode()) + data = data[b + 1 :] + ln = data.find(b"\x00") + length = data[:ln].decode() + data = data[ln + 1 :] + sha = bytes.hex(data[:20]) + data = data[20:] + result.append((fmt, length, sha)) + return result def cat_file(obj_name: str, pretty: bool = True) -> None: - # PUT YOUR CODE HERE - ... + gitdir = repo_find() + + for obj in resolve_object(obj_name, gitdir): + header, content = read_object(obj, gitdir) + if header == "tree": + result = "" + tree_files = read_tree(content) + for f in tree_files: + object_type = read_object(f[2], pathlib.Path(".git"))[0] + result += str(f[0]).zfill(6) + " " + result += object_type + " " + result += f[2] + "\t" + result += f[1] + "\n" + print(result) + else: + print(content.decode()) def find_tree_files(tree_sha: str, gitdir: pathlib.Path) -> tp.List[tp.Tuple[str, str]]: - # PUT YOUR CODE HERE - ... + result = [] + header, data = read_object(tree_sha, gitdir) + for f in read_tree(data): + if read_object(f[2], gitdir)[0] == "tree": + tree = find_tree_files(f[2], gitdir) + for blob in tree: + name = f[1] + "/" + blob[0] + result.append((name, blob[1])) + else: + result.append((f[1], f[2])) + return result def commit_parse(raw: bytes, start: int = 0, dct=None): - # PUT YOUR CODE HERE - ... + data = zlib.decompress(raw) + i = data.find(b"tree") + return data[i + 5 : i + 45] diff --git a/homework04/pyvcs/porcelain.py b/homework04/pyvcs/porcelain.py index 6f2cde2..3a0ae78 100644 --- a/homework04/pyvcs/porcelain.py +++ b/homework04/pyvcs/porcelain.py @@ -1,5 +1,6 @@ import os import pathlib +import shutil import typing as tp from pyvcs.index import read_index, update_index @@ -9,15 +10,37 @@ def add(gitdir: pathlib.Path, paths: tp.List[pathlib.Path]) -> None: - # PUT YOUR CODE HERE - ... + update_index(gitdir, paths) def commit(gitdir: pathlib.Path, message: str, author: tp.Optional[str] = None) -> str: - # PUT YOUR CODE HERE - ... + index = read_index(gitdir) + return commit_tree( + gitdir=gitdir, tree=write_tree(gitdir, index), message=message, author=author + ) def checkout(gitdir: pathlib.Path, obj_name: str) -> None: - # PUT YOUR CODE HERE - ... + head = gitdir / "refs" / "heads" / obj_name + if head.exists(): + with head.open("r") as f: + obj_name = f.read() + index = read_index(gitdir) + for i in index: + if pathlib.Path(i.name).is_file(): + if "/" in i.name: + shutil.rmtree(i.name[: i.name.find("/")]) + else: + os.chmod(i.name, 0o777) + os.remove(i.name) + obj_path = gitdir / "objects" / obj_name[:2] / obj_name[2:] + with obj_path.open("rb") as f: + com = f.read() + for j in find_tree_files(commit_parse(com).decode(), gitdir): + if "/" in j[0]: + dir_name = j[0][: j[0].find("/")] + pathlib.Path(dir_name).absolute().mkdir() + + with open(j[0], "w") as file: + header, content = read_object(j[1], gitdir) + file.write(content.decode()) diff --git a/homework04/pyvcs/refs.py b/homework04/pyvcs/refs.py index 1e45b90..f4d974a 100644 --- a/homework04/pyvcs/refs.py +++ b/homework04/pyvcs/refs.py @@ -3,30 +3,41 @@ def update_ref(gitdir: pathlib.Path, ref: tp.Union[str, pathlib.Path], new_value: str) -> None: - # PUT YOUR CODE HERE - ... + refpath = gitdir / pathlib.Path(ref) + with refpath.open("w") as f: + f.write(new_value) def symbolic_ref(gitdir: pathlib.Path, name: str, ref: str) -> None: - # PUT YOUR CODE HERE - ... + if ref_resolve(gitdir, ref) is None: + return None + with (gitdir / name).open("w") as f: + f.write(f"ref: {ref}") -def ref_resolve(gitdir: pathlib.Path, refname: str) -> str: - # PUT YOUR CODE HERE - ... +def ref_resolve(gitdir: pathlib.Path, refname: str) -> tp.Optional[str]: + if refname == "HEAD": + refname = get_ref(gitdir) + if not (gitdir / refname).exists(): + return None + with open((gitdir / refname), "r") as f: + out = f.read() + return out def resolve_head(gitdir: pathlib.Path) -> tp.Optional[str]: - # PUT YOUR CODE HERE - ... + return ref_resolve(gitdir, "HEAD") def is_detached(gitdir: pathlib.Path) -> bool: - # PUT YOUR CODE HERE - ... + with (gitdir / "HEAD").open("r") as f: + chref = str(f.read()) + if chref.find("ref") == -1: + return True + return False def get_ref(gitdir: pathlib.Path) -> str: - # PUT YOUR CODE HERE - ... + with open((gitdir / "HEAD"), "r") as f: + gref = f.read() + return gref[gref.find(" ") + 1 :].strip() diff --git a/homework04/pyvcs/repo.py b/homework04/pyvcs/repo.py index cef16a6..0d13a3e 100644 --- a/homework04/pyvcs/repo.py +++ b/homework04/pyvcs/repo.py @@ -4,10 +4,41 @@ def repo_find(workdir: tp.Union[str, pathlib.Path] = ".") -> pathlib.Path: - # PUT YOUR CODE HERE - ... + workdir = pathlib.Path(workdir) + if os.environ.get("GIT_DIR"): + git_dir = os.environ["GIT_DIR"] + else: + git_dir = ".git" + path = workdir / git_dir + if path.exists(): + return path + else: + for i in path.parents: + if i.name == git_dir: + return i + raise Exception("Not a git repository") def repo_create(workdir: tp.Union[str, pathlib.Path]) -> pathlib.Path: - # PUT YOUR CODE HERE - ... + workdir = pathlib.Path(workdir) + if not workdir.is_dir(): + raise Exception(f"{workdir.name} is not a directory") + if os.environ.get("GIT_DIR"): + git_dir = os.environ["GIT_DIR"] + else: + git_dir = ".git" + path = workdir / git_dir + path.mkdir() + (path / "refs").mkdir() + (path / "refs/heads").mkdir() + (path / "refs/tags").mkdir() + (path / "objects").mkdir() + with open(path / "HEAD", "w") as head: + head.write("ref: refs/heads/master\n") + with open(path / "config", "w") as config: + config.write( + "[core]\n\trepositoryformatversion = 0\n\tfilemode = true\n\tbare = false\n\tlogallrefupdates = false\n" + ) + with open(path / "description", "w") as description: + description.write("Unnamed pyvcs repository.\n") + return path diff --git a/homework04/pyvcs/tree.py b/homework04/pyvcs/tree.py index f79b026..821f87b 100644 --- a/homework04/pyvcs/tree.py +++ b/homework04/pyvcs/tree.py @@ -10,8 +10,27 @@ def write_tree(gitdir: pathlib.Path, index: tp.List[GitIndexEntry], dirname: str = "") -> str: - # PUT YOUR CODE HERE - ... + data = b"" + + for file in index: + if "/" in file.name: + s = file.name.find("/") + directory = file.name[:s].encode() + mode = oct(file.mode)[2:].encode() + child = ( + b"" + + mode + + b" " + + file.name[file.name.find("/") + 1 :].encode() + + b"\0" + + file.sha1 + ) + hash = bytes.fromhex(hash_object(child, "tree", True)) + data += b"40000 " + directory + b"\0" + hash + else: + data += oct(file.mode)[2:].encode() + b" " + file.name.encode() + b"\0" + file.sha1 + + return hash_object(data, fmt="tree", write=True) def commit_tree( @@ -21,5 +40,28 @@ def commit_tree( parent: tp.Optional[str] = None, author: tp.Optional[str] = None, ) -> str: - # PUT YOUR CODE HERE - ... + zone = time.timezone + offset = "+" if zone < 0 else "-" + zone = abs(zone) + zone = zone // 60 + offset += f"{zone // 60:02}" + offset += f"{zone % 60:02}" + + local = time.localtime() + sec = time.mktime(local) + sec = int(sec) + + if not author: + author = f"{os.getenv('GIT_AUTHOR_NAME')} <{os.getenv('GIT_AUTHOR_EMAIL')}>" + + dt = [f"tree {tree}"] + + if parent: + dt.append(f"parent {parent}") + + dt.extend( + [f"author {author} {sec} {offset}", f"committer {author} {sec} {offset}", f"\n{message}\n"] + ) + string = "\n".join(dt) + + return hash_object(string.encode(), "commit", True) diff --git a/homework04/setup.py b/homework04/setup.py index 51dc915..56f124a 100644 --- a/homework04/setup.py +++ b/homework04/setup.py @@ -1,6 +1,5 @@ -from setuptools import setup - import pyvcs +from setuptools import setup AUTHOR = "Dmitrii Sorokin" AUTHOR_EMAIL = "dementiy@yandex.ru" diff --git a/homework04/tests/test_index.py b/homework04/tests/test_index.py index eaf5576..0905b61 100644 --- a/homework04/tests/test_index.py +++ b/homework04/tests/test_index.py @@ -3,9 +3,8 @@ import unittest from unittest.mock import patch -from pyfakefs.fake_filesystem_unittest import TestCase - import pyvcs +from pyfakefs.fake_filesystem_unittest import TestCase from pyvcs.index import GitIndexEntry, ls_files, read_index, update_index, write_index from pyvcs.repo import repo_create diff --git a/homework04/tests/test_objects.py b/homework04/tests/test_objects.py index 5cbad4f..aadf5e1 100644 --- a/homework04/tests/test_objects.py +++ b/homework04/tests/test_objects.py @@ -5,9 +5,8 @@ import zlib from unittest.mock import patch -from pyfakefs.fake_filesystem_unittest import TestCase - import pyvcs +from pyfakefs.fake_filesystem_unittest import TestCase from pyvcs import index, objects, porcelain, repo, tree diff --git a/homework04/tests/test_porcelain.py b/homework04/tests/test_porcelain.py index a65eb51..758740f 100644 --- a/homework04/tests/test_porcelain.py +++ b/homework04/tests/test_porcelain.py @@ -3,9 +3,8 @@ import unittest from unittest.mock import patch -from pyfakefs.fake_filesystem_unittest import TestCase - import pyvcs +from pyfakefs.fake_filesystem_unittest import TestCase from pyvcs.porcelain import add, checkout, commit from pyvcs.repo import repo_create diff --git a/homework04/tests/test_refs.py b/homework04/tests/test_refs.py index 8012f57..0dc4337 100644 --- a/homework04/tests/test_refs.py +++ b/homework04/tests/test_refs.py @@ -1,8 +1,7 @@ import unittest -from pyfakefs.fake_filesystem_unittest import TestCase - import pyvcs +from pyfakefs.fake_filesystem_unittest import TestCase from pyvcs.refs import get_ref, is_detached, ref_resolve, resolve_head, update_ref from pyvcs.repo import repo_create diff --git a/homework04/tests/test_repo.py b/homework04/tests/test_repo.py index 4107078..e534eb1 100644 --- a/homework04/tests/test_repo.py +++ b/homework04/tests/test_repo.py @@ -2,7 +2,6 @@ import pathlib from pyfakefs.fake_filesystem_unittest import TestCase - from pyvcs import repo diff --git a/homework04/tests/test_tree.py b/homework04/tests/test_tree.py index 1c32a62..8ac69ae 100644 --- a/homework04/tests/test_tree.py +++ b/homework04/tests/test_tree.py @@ -4,9 +4,8 @@ import unittest from unittest.mock import patch -from pyfakefs.fake_filesystem_unittest import TestCase - import pyvcs +from pyfakefs.fake_filesystem_unittest import TestCase from pyvcs.index import read_index, update_index from pyvcs.repo import repo_create from pyvcs.tree import commit_tree, write_tree diff --git a/homework05/research/age.py b/homework05/research/age.py index 492ae28..7b1949d 100644 --- a/homework05/research/age.py +++ b/homework05/research/age.py @@ -8,10 +8,22 @@ def age_predict(user_id: int) -> tp.Optional[float]: """ Наивный прогноз возраста пользователя по возрасту его друзей. - Возраст считается как медиана среди возраста всех друзей пользователя - :param user_id: Идентификатор пользователя. :return: Медианный возраст пользователя. """ - pass + data = get_friends(user_id, fields=["bdate"]).items + data = tp.cast(tp.List[tp.Dict[str, tp.Any]], data) + ages = [] + for friend in data: + try: + day, month, year = [int(i) for i in friend["bdate"].split(".")] + bdate = dt.date(year, month, day) + age = dt.date.today().year - bdate.year + ages.append(age) + except: + pass + if ages: + return statistics.median(ages) + else: + return None diff --git a/homework05/research/network.py b/homework05/research/network.py index 6b6db7c..2793d8e 100644 --- a/homework05/research/network.py +++ b/homework05/research/network.py @@ -5,8 +5,7 @@ import matplotlib.pyplot as plt import networkx as nx import pandas as pd - -from vkapi.friends import get_friends, get_mutual +from vkapi.friends import MutualFriends, get_friends, get_mutual def ego_network( @@ -14,11 +13,21 @@ def ego_network( ) -> tp.List[tp.Tuple[int, int]]: """ Построить эгоцентричный граф друзей. - :param user_id: Идентификатор пользователя, для которого строится граф друзей. :param friends: Идентификаторы друзей, между которыми устанавливаются связи. """ - pass + graph = [] + if friends is None: + friends_response = get_friends(user_id, fields=["nickname"]).items + friends_response = tp.cast(tp.List[tp.Dict[str, tp.Any]], friends_response) + friends = [user["id"] for user in friends_response if not user.get("deactivated")] + mutual_users = get_mutual(source_uid=user_id, target_uids=friends) + mutual_users = tp.cast(tp.List[MutualFriends], mutual_users) + for user in mutual_users: + for common_friend in user["common_friends"]: + link = (user["id"], common_friend) + graph.append(link) + return graph def plot_ego_network(net: tp.List[tp.Tuple[int, int]]) -> None: diff --git a/homework05/vkapi/config.py b/homework05/vkapi/config.py index 8459497..4968f5e 100644 --- a/homework05/vkapi/config.py +++ b/homework05/vkapi/config.py @@ -2,6 +2,6 @@ VK_CONFIG = { "domain": "https://api.vk.com/method", - "access_token": "", + "access_token": "vk1.a.UFTN6YfTQC6xL6suMVEjBZKzWnFlu5PoGa-JJ_C4zi51ns17LyVgsqyOKlDQDFGNZ-PeYeZwxmDSG0mk18bm_QuaApT8i5H9kg40Xd0ujQpnCw0PLykno3l_ohCOLvvMtSjQ_PCR6agAAkvr0hsLIcvsEDb5dt-1GJ3LvkomoDvJ10jfiDQ1VrHSofAl6nMf&expires_in=86400&user_id=337563534", "version": "5.126", } diff --git a/homework05/vkapi/friends.py b/homework05/vkapi/friends.py index dad6a5c..161b312 100644 --- a/homework05/vkapi/friends.py +++ b/homework05/vkapi/friends.py @@ -3,6 +3,7 @@ import time import typing as tp +from tqdm import tqdm from vkapi import config, session from vkapi.exceptions import APIError @@ -16,19 +17,36 @@ class FriendsResponse: def get_friends( - user_id: int, count: int = 5000, offset: int = 0, fields: tp.Optional[tp.List[str]] = None + user_id: tp.Optional[int] = None, + count: int = 5000, + offset: int = 0, + fields: tp.Optional[tp.List[str]] = None, ) -> FriendsResponse: """ Получить список идентификаторов друзей пользователя или расширенную информацию о друзьях пользователя (при использовании параметра fields). - :param user_id: Идентификатор пользователя, список друзей для которого нужно получить. :param count: Количество друзей, которое нужно вернуть. :param offset: Смещение, необходимое для выборки определенного подмножества друзей. :param fields: Список полей, которые нужно получить для каждого пользователя. :return: Список идентификаторов друзей пользователя или список пользователей. """ - pass + params = { + "access_token": config.VK_CONFIG["access_token"], + "user_id": user_id if user_id is not None else "", + "fields": ",".join(fields) if fields is not None else "", + "v": config.VK_CONFIG["version"], + "count": count, + "offset": offset, + } + method = "friends.get" + response = session.get(method, params=params) + response_json = response.json() + if "error" in response_json or not response.ok: + raise APIError(response_json["error"]["error_msg"]) + response_json = response_json["response"] + friends = FriendsResponse(count=response_json["count"], items=response_json["items"]) + return friends class MutualFriends(tp.TypedDict): @@ -48,7 +66,6 @@ def get_mutual( ) -> tp.Union[tp.List[int], tp.List[MutualFriends]]: """ Получить список идентификаторов общих друзей между парой пользователей. - :param source_uid: Идентификатор пользователя, чьи друзья пересекаются с друзьями пользователя с идентификатором target_uid. :param target_uid: Идентификатор пользователя, с которым необходимо искать общих друзей. :param target_uids: Cписок идентификаторов пользователей, с которыми необходимо искать общих друзей. @@ -57,4 +74,49 @@ def get_mutual( :param offset: Смещение, необходимое для выборки определенного подмножества общих друзей. :param progress: Callback для отображения прогресса. """ - pass + + progress = tqdm + method = "friends.getMutual" + if target_uid is not None: + params = { + "access_token": config.VK_CONFIG["access_token"], + "source_uid": source_uid, + "target_uid": target_uid, + "order": order, + "count": count, + "offset": offset, + "v": config.VK_CONFIG["version"], + } + response = session.get(method, params=params) + response_json = response.json() + if "error" in response_json or not response.ok: + raise APIError(response_json["error"]["error_msg"]) + response_json = response_json["response"] + return response_json + else: + target_uids_list = tp.cast(tp.List, target_uids) + requests_qty = math.ceil(len(target_uids_list) / 100) + list_of_mutual_friends = [] + for request_num in progress(range(requests_qty)): + if request_num % 3 == 0 and request_num != 0: + time.sleep(1) + params = { + "access_token": config.VK_CONFIG["access_token"], + "target_uids": ",".join([str(i) for i in target_uids_list]), + "order": order, + "count": count if count is not None else "", + "offset": offset + request_num * 100, + "v": config.VK_CONFIG["version"], + } + response = session.get(method, params=params) + response_json = response.json() + if "error" in response_json or not response.ok: + raise APIError(response_json["error"]["error_msg"]) + for friend in response_json["response"]: + mutual_friends = MutualFriends( + id=friend["id"], + common_friends=friend["common_friends"], + common_count=friend["common_count"], + ) + list_of_mutual_friends.append(mutual_friends) + return list_of_mutual_friends diff --git a/homework05/vkapi/session.py b/homework05/vkapi/session.py index 0643389..e7fb2a3 100644 --- a/homework05/vkapi/session.py +++ b/homework05/vkapi/session.py @@ -5,16 +5,33 @@ from requests.packages.urllib3.util.retry import Retry -class Session: +class TimeoutHTTPAdapter(HTTPAdapter): + def __init__(self, *args, **kwargs): + if "timeout" in kwargs: + self.timeout = kwargs["timeout"] + del kwargs["timeout"] + super().__init__(*args, **kwargs) + + def send(self, request, **kwargs): + timeout = kwargs.get("timeout") + if timeout is None: + kwargs["timeout"] = self.timeout + return super().send(request, **kwargs) + + +class Session(requests.Session): """ Сессия. - :param base_url: Базовый адрес, на который будут выполняться запросы. :param timeout: Максимальное время ожидания ответа от сервера. :param max_retries: Максимальное число повторных запросов. :param backoff_factor: Коэффициент экспоненциального нарастания задержки. """ + timeout: float + base_url: str + backoff_factor: float + def __init__( self, base_url: str, @@ -22,10 +39,25 @@ def __init__( max_retries: int = 3, backoff_factor: float = 0.3, ) -> None: - pass + super().__init__() + self.base_url = base_url + self.timeout = timeout + self.backoff_factor = backoff_factor + self.max_retries = max_retries + + retry_strategy = Retry( + total=self.max_retries, + backoff_factor=self.backoff_factor, + status_forcelist=[429, 500, 502, 503, 504], + method_whitelist=["HEAD", "GET", "OPTIONS"], + ) + adapter = TimeoutHTTPAdapter(timeout=self.timeout, max_retries=retry_strategy) + self.mount("https://", adapter) - def get(self, url: str, *args: tp.Any, **kwargs: tp.Any) -> requests.Response: - pass + def get(self, url: str, *args: tp.Any, **kwargs: tp.Any) -> requests.Response: # type: ignore + query = f"{self.base_url}/{url}" + return super().get(query, *args, **kwargs) - def post(self, url: str, *args: tp.Any, **kwargs: tp.Any) -> requests.Response: - pass + def post(self, url: str, *args: tp.Any, **kwargs: tp.Any) -> requests.Response: # type: ignore + query = f"{self.base_url}/{url}" + return super().post(query, *args, **kwargs) diff --git a/homework05/vkapi/wall.py b/homework05/vkapi/wall.py index a045c6c..61437b9 100644 --- a/homework05/vkapi/wall.py +++ b/homework05/vkapi/wall.py @@ -1,3 +1,4 @@ +import math import textwrap import time import typing as tp @@ -5,7 +6,6 @@ import pandas as pd from pandas import json_normalize - from vkapi import config, session from vkapi.exceptions import APIError @@ -20,7 +20,47 @@ def get_posts_2500( extended: int = 0, fields: tp.Optional[tp.List[str]] = None, ) -> tp.Dict[str, tp.Any]: - pass + + script = f""" + var i = 0; + var result = []; + while (i < {max_count}){{ + if ({offset}+i+100 > {count}){{ + result.push(API.wall.get({{ + "owner_id": "{owner_id}", + "domain": "{domain}", + "offset": "{offset} +i", + "count": "{count}-(i+{offset})", + "filter": "{filter}", + "extended": "{extended}", + "fields": "{fields}" + }})); + }} + result.push(API.wall.get({{ + "owner_id": "{owner_id}", + "domain": "{domain}", + "offset": "{offset} +i", + "count": "{count}", + "filter": "{filter}", + "extended": "{extended}", + "fields": "{fields}" + }})); + i = i + {max_count}; + }} + return result; + """ + + params = { + "code": script, + "access_token": config.VK_CONFIG["access_token"], + "v": config.VK_CONFIG["version"], + } + + response = session.post("execute", data=params) + response_json = response.json() + if "error" in response_json or not response.ok: + raise APIError(response_json["error"]["error_msg"]) + return response_json["response"]["items"] def get_wall_execute( @@ -36,9 +76,7 @@ def get_wall_execute( ) -> pd.DataFrame: """ Возвращает список записей со стены пользователя или сообщества. - @see: https://vk.com/dev/wall.get - :param owner_id: Идентификатор пользователя или сообщества, со стены которого необходимо получить записи. :param domain: Короткий адрес пользователя или сообщества. :param offset: Смещение, необходимое для выборки определенного подмножества записей. @@ -49,4 +87,26 @@ def get_wall_execute( :param fields: Список дополнительных полей для профилей и сообществ, которые необходимо вернуть. :param progress: Callback для отображения прогресса. """ - pass + if progress is None: + progress = lambda x: x + response = pd.DataFrame() + for _ in progress(range(math.ceil(count / 2500))): + response = pd.concat( + [ + response, + json_normalize( + get_posts_2500( + owner_id, + domain, + offset, + count, + max_count, + filter, + extended, + fields, + ) + ), + ] + ) + time.sleep(1) + return response