Skip to content

Commit

Permalink
autotest: Add a VSIFile wrapper class (#8222)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbaston committed May 30, 2024
1 parent ebc103e commit 5394ddf
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 103 deletions.
231 changes: 230 additions & 1 deletion autotest/gcore/vsifile.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import pytest
from lxml import etree

from osgeo import gdal
from osgeo import gdal, ogr


###############################################################################
Expand Down Expand Up @@ -1460,3 +1460,232 @@ def test_vsifile_CopyFileRestartable(tmp_vsimem):
assert retcode == 0
assert output_payload is None
assert gdal.VSIStatL(dstfilename).size == 3


###############################################################################
# Test VSIFile helper class


def test_vsifile_class_write_ascii(tmp_path):

fname = tmp_path / "test.txt"

lines = ["permission is hereby granted", "free of charge", "to any person"]

with gdaltest.vsi_open(fname, "w") as f:
assert f.tell() == 0

for line in lines:
f.write(line)
f.write("\n")

with open(fname, "r") as f:
assert [line.strip() for line in f.readlines()] == lines


def test_vsifile_class_read_ascii(tmp_path):

fname = str(tmp_path / "test.txt")

lines = ["permission is hereby granted", "free of charge", "to any person"]

with open(fname, "w", newline="\n") as f:
for line in lines:
f.write(line)
f.write("\n")

with pytest.raises(Exception):
f.write(b"some bytes")

# read entire file
with gdaltest.vsi_open(fname, "r") as f:
contents = f.read()

assert type(contents) is str

lines_in = [line.strip() for line in contents.strip().split("\n")]
assert lines_in == lines

# read some characters
f = gdaltest.vsi_open(fname)
assert f.read(10) == "permission"

# skip a character
f.seek(1, os.SEEK_CUR)
assert f.read(9) == "is hereby"

# seek backwards
f.seek(-2, os.SEEK_CUR)
assert f.read(2) == "by"

# jump to beginning
f.seek(0, os.SEEK_SET)
assert f.read(10) == "permission"

# can't jump before the beginning
pos = f.tell()
with pytest.raises(OSError, match="negative offset"):
f.seek(-2, os.SEEK_SET) == -1
assert pos == f.tell()

# jump to end
f.seek(0, os.SEEK_END)
assert f.read(10) == ""

f.seek(-7, os.SEEK_END)
assert f.read() == "person\n"

f.close()
f.close() # no harm in closing an already-closed file


def test_vsifile_class_read_binary(tmp_path):

fname = tmp_path / "test.wkb"

g = ogr.CreateGeometryFromWkt("POINT (15 17)")
wkb = g.ExportToWkb()

with open(fname, "wb") as f:
f.write(wkb)

# read entire file
with gdaltest.vsi_open(fname, "rb") as f:
contents = f.read()

assert type(contents) is bytes

assert contents == wkb

# read some bytes
f = gdaltest.vsi_open(fname, "rb")
assert f.read(5) == wkb[:5]

f.seek(10, os.SEEK_SET)
assert f.read(5) == wkb[10:15]


def test_vsifile_class_write_binary(tmp_path):

fname = tmp_path / "test.wkb"

g = ogr.CreateGeometryFromWkt("POINT (15 17)")
wkb = g.ExportToWkb()

with gdaltest.vsi_open(fname, "wb") as f:
f.write(wkb[:8])
f.write(wkb[8:])

with open(fname, "rb") as f:
assert f.read() == wkb

with gdaltest.vsi_open(fname, "rb") as f:
with pytest.raises(OSError, match="Expected to write"):
f.write(wkb)


def random_lines():
import random
import string

lines = []
for i in range(50):
lines.append(
"".join([random.choice(string.ascii_letters) for j in range(20 + 3 * i)])
)
lines.append(" ")
lines.append("")
lines.append("theend")
lines.append("")

return lines


@pytest.mark.parametrize("terminating_newline", (True, False))
def test_vsifile_class_line_iteration(tmp_path, terminating_newline):

fname = str(tmp_path / "test.txt")

lines_out = random_lines()

with open(fname, "w") as f:
for line in lines_out:
f.write(line)
f.write("\n")

if not terminating_newline:
f.write("lastline")
lines_out.append("lastline")

with gdaltest.vsi_open(fname) as f:
lines_in = [line for line in f]

assert lines_in == lines_out


def test_vsifile_class_binary_line_iteration(tmp_path):

fname = str(tmp_path / "test.txt")

lines_out = [x.encode() for x in random_lines()]

with open(fname, "wb") as f:
for line in lines_out:
f.write(line)
f.write(b"\n")

with gdaltest.vsi_open(fname, "rb") as f:
lines_in = [line for line in f]

assert lines_in == lines_out


def test_vsifile_class_zipped_csv_reader(tmp_path):

test_csv = str(tmp_path / "input.csv")
test_zip = str(tmp_path / "input.zip")

import csv
import shutil
import zipfile

shutil.copy("../ogr/data/prime_meridian.csv", test_csv)

with zipfile.ZipFile(test_zip, "w") as zf:
zf.write(test_csv, arcname="input.csv")

with gdaltest.vsi_open(f"/vsizip/{test_zip}/input.csv") as f:
records = [x for x in csv.DictReader(f)]

assert len(records) == 4
assert (
records[2]["INFORMATION_SOURCE"]
== "Institut Geographique National (IGN), Paris"
)


def test_vsifile_class_file_does_not_exist(tmp_path):

with pytest.raises(OSError, match="No such file or directory"):
gdaltest.vsi_open(tmp_path / "does_not_exist.txt")


def test_vsifile_class_read_from_closed_file(tmp_path):

with gdaltest.vsi_open(tmp_path / "out.txt", "w") as f:
f.write("abc")

with pytest.raises(ValueError, match="closed file"):
f.seek(0)


def test_vsifile_class_append(tmp_vsimem):

fname = tmp_vsimem / "out.txt"

with gdaltest.vsi_open(fname, "w") as f:
f.write("abc")
with gdaltest.vsi_open(fname, "a") as f:
f.write("def")
with gdaltest.vsi_open(fname) as f:
assert f.read() == "abcdef"
Loading

0 comments on commit 5394ddf

Please sign in to comment.