/
mbtiles.py
125 lines (99 loc) · 4.81 KB
/
mbtiles.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# http://mbtiles.org/
import mimetypes
import sqlite3
from tilecloud import BoundingPyramid, Bounds, Tile, TileCoord, TileStore
from tilecloud.lib.sqlite3_ import SQLiteDict, query
class Metadata(SQLiteDict):
"""A dict facade for the metadata table"""
CREATE_TABLE_SQL = "CREATE TABLE IF NOT EXISTS metadata (name text, value text, PRIMARY KEY (name))"
CONTAINS_SQL = "SELECT COUNT(*) FROM metadata WHERE name = ?"
DELITEM_SQL = "DELETE FROM metadata WHERE name = ?"
GETITEM_SQL = "SELECT value FROM metadata WHERE name = ?"
ITER_SQL = "SELECT name FROM metadata"
ITERITEMS_SQL = "SELECT name, value FROM metadata"
ITERVALUES_SQL = "SELECT value FROM metadata"
LEN_SQL = "SELECT COUNT(*) FROM metadata"
SETITEM_SQL = "INSERT OR REPLACE INTO metadata (name, value) VALUES (?, ?)"
class Tiles(SQLiteDict):
"""A dict facade for the tiles table"""
CREATE_TABLE_SQL = (
"CREATE TABLE IF NOT EXISTS tiles (zoom_level integer, tile_column integer, "
"tile_row integer, tile_data blob, PRIMARY KEY (zoom_level, tile_column, tile_row))"
)
CONTAINS_SQL = "SELECT COUNT(*) FROM tiles WHERE zoom_level = ? AND tile_column = ? AND tile_row = ?"
DELITEM_SQL = "DELETE FROM tiles WHERE zoom_level = ? AND tile_column = ? AND tile_row = ?"
GETITEM_SQL = "SELECT tile_data FROM tiles WHERE zoom_level = ? AND tile_column = ? AND tile_row = ?"
ITER_SQL = "SELECT zoom_level, tile_column, tile_row FROM tiles"
ITERITEMS_SQL = "SELECT zoom_level, tile_column, tile_row, tile_data FROM tiles"
ITERVALUES_SQL = "SELECT tile_data FROM tiles"
LEN_SQL = "SELECT COUNT(*) FROM tiles"
SETITEM_SQL = (
"INSERT OR REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?, ?, ?, ?)"
)
def __init__(self, tilecoord_in_topleft, *args, **kwargs):
self.tilecoord_in_topleft = tilecoord_in_topleft
SQLiteDict.__init__(self, *args, **kwargs)
def _packitem(self, key, value):
y = key.y if self.tilecoord_in_topleft else (1 << key.z) - key.y - 1
return (key.z, key.x, y, sqlite3.Binary(value) if value is not None else None)
def _packkey(self, key):
y = key.y if self.tilecoord_in_topleft else (1 << key.z) - key.y - 1
return (key.z, key.x, y)
def _unpackitem(self, row):
z, x, y, data = row
y = y if self.tilecoord_in_topleft else (1 << z) - y - 1
return (TileCoord(z, x, y), data)
def _unpackkey(self, row):
z, x, y = row
y = y if self.tilecoord_in_topleft else (1 << z) - y - 1
return TileCoord(z, x, y)
class MBTilesTileStore(TileStore):
"""A MBTiles tile store"""
BOUNDING_PYRAMID_SQL = (
"SELECT zoom_level, MIN(tile_column), MAX(tile_column) + 1, "
"MIN((1 << zoom_level) - tile_row - 1), MAX((1 << zoom_level) - tile_row - 1) + 1 "
"FROM tiles GROUP BY zoom_level ORDER BY zoom_level"
)
SET_METADATA_ZOOMS_SQL = "SELECT MIN(zoom_level), MAX(zoom_level) FROM tiles"
def __init__(self, connection, commit=True, tilecoord_in_topleft=False, **kwargs):
self.connection = connection
self.metadata = Metadata(self.connection, commit)
self.tiles = Tiles(tilecoord_in_topleft, self.connection, commit)
if "content_type" not in kwargs and "format" in self.metadata:
kwargs["content_type"] = mimetypes.types_map.get("." + self.metadata["format"])
TileStore.__init__(self, **kwargs)
def __contains__(self, tile):
return tile and tile.tilecoord in self.tiles
def __len__(self):
return len(self.tiles)
def delete_one(self, tile):
del self.tiles[tile.tilecoord]
return tile
def get_all(self):
for tilecoord, data in self.tiles.iteritems():
tile = Tile(tilecoord, data=data)
if self.content_type is not None:
tile.content_type = self.content_type
yield tile
def get_cheap_bounding_pyramid(self):
bounds = {}
for z, xstart, xstop, ystart, ystop in query(self.connection, self.BOUNDING_PYRAMID_SQL):
bounds[z] = (Bounds(xstart, xstop), Bounds(ystart, ystop))
return BoundingPyramid(bounds)
def get_one(self, tile):
try:
tile.data = self.tiles[tile.tilecoord]
except KeyError:
return None
if self.content_type is not None:
tile.content_type = self.content_type
return tile
def list(self):
return (Tile(tilecoord) for tilecoord in self.tiles)
def put_one(self, tile):
self.tiles[tile.tilecoord] = getattr(tile, "data", None)
return tile
def set_metadata_zooms(self):
for minzoom, maxzoom in query(self.connection, self.SET_METADATA_ZOOMS_SQL):
self.metadata["minzoom"] = minzoom
self.metadata["maxzoom"] = maxzoom