-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
sqlite_datasource.py
201 lines (169 loc) · 6.78 KB
/
sqlite_datasource.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
from __future__ import annotations
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Dict,
List,
Literal,
Optional,
Type,
Union,
cast,
)
from great_expectations._docs_decorators import public_api
from great_expectations.compatibility import pydantic
from great_expectations.compatibility.typing_extensions import override
from great_expectations.datasource.fluent.config_str import ConfigStr
from great_expectations.datasource.fluent.sql_datasource import (
QueryAsset as SqlQueryAsset,
)
from great_expectations.datasource.fluent.sql_datasource import (
SQLDatasource,
SqlPartitioner,
_PartitionerOneColumnOneParam,
)
from great_expectations.datasource.fluent.sql_datasource import (
TableAsset as SqlTableAsset,
)
if TYPE_CHECKING:
# min version of typing_extension missing `Self`, so it can't be imported at runtime
from typing_extensions import Self
from great_expectations.datasource.fluent.interfaces import (
BatchMetadata,
BatchRequestOptions,
DataAsset,
SortersDefinition,
)
# This module serves as an example of how to extend _SQLAssets for specific backends. The steps are:
# 1. Create a plain class with the extensions necessary for the specific backend.
# 2. Make 2 classes XTableAsset and XQueryAsset by mixing in the class created in step 1 with
# sql_datasource.TableAsset and sql_datasource.QueryAsset.
#
# See SqliteDatasource, SqliteTableAsset, and SqliteQueryAsset below.
class PartitionerConvertedDateTime(_PartitionerOneColumnOneParam):
"""A partitioner than can be used for sql engines that represents datetimes as strings.
The SQL engine that this currently supports is SQLite since it stores its datetimes as
strings.
The DatetimePartitioner will also work for SQLite and may be more intuitive.
"""
# date_format_strings syntax is documented here:
# https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes
# It allows for arbitrary strings so can't be validated until conversion time.
date_format_string: str
column_name: str
method_name: Literal[
"partition_on_converted_datetime"
] = "partition_on_converted_datetime"
@property
@override
def param_names(self) -> List[str]:
# The datetime parameter will be a string representing a datetime in the format
# given by self.date_format_string.
return ["datetime"]
@override
def partitioner_method_kwargs(self) -> Dict[str, Any]:
return {
"column_name": self.column_name,
"date_format_string": self.date_format_string,
}
@override
def batch_request_options_to_batch_spec_kwarg_identifiers(
self, options: BatchRequestOptions
) -> Dict[str, Any]:
if "datetime" not in options:
raise ValueError(
"'datetime' must be specified in the batch request options to create a batch identifier"
)
return {self.column_name: options["datetime"]}
class SqliteDsn(pydantic.AnyUrl):
allowed_schemes = {
"sqlite",
"sqlite+pysqlite",
"sqlite+aiosqlite",
"sqlite+pysqlcipher",
}
host_required = False
SqlitePartitioner = Union[SqlPartitioner, PartitionerConvertedDateTime]
class _SQLiteAssetMixin:
@public_api
def add_partitioner_converted_datetime(
self: Self, column_name: str, date_format_string: str
) -> Self:
"""Associates a converted datetime partitioner with this sqlite data asset.
Args:
column_name: The column name of the date column where year and month will be parsed out.
date_format_string: Format for converting string representation of datetime to actual datetime object.
Returns:
This sql asset so we can use this method fluently.
"""
return self._add_partitioner( # type: ignore[attr-defined] # This is a mixin for a _SQLAsset
PartitionerConvertedDateTime(
method_name="partition_on_converted_datetime",
column_name=column_name,
date_format_string=date_format_string,
)
)
class SqliteTableAsset(_SQLiteAssetMixin, SqlTableAsset):
type: Literal["table"] = "table"
partitioner: Optional[SqlitePartitioner] = None # type: ignore[assignment] # override superclass type
class SqliteQueryAsset(_SQLiteAssetMixin, SqlQueryAsset):
type: Literal["query"] = "query"
partitioner: Optional[SqlitePartitioner] = None # type: ignore[assignment] # override superclass type
@public_api
class SqliteDatasource(SQLDatasource):
"""Adds a sqlite datasource to the data context.
Args:
name: The name of this sqlite datasource.
connection_string: The SQLAlchemy connection string used to connect to the sqlite database.
For example: "sqlite:///path/to/file.db"
create_temp_table: Whether to leverage temporary tables during metric computation.
assets: An optional dictionary whose keys are TableAsset names and whose values
are TableAsset objects.
"""
# class var definitions
asset_types: ClassVar[List[Type[DataAsset]]] = [SqliteTableAsset, SqliteQueryAsset]
# Subclass instance var overrides
# right side of the operator determines the type name
# left side enforces the names on instance creation
type: Literal["sqlite"] = "sqlite" # type: ignore[assignment]
connection_string: Union[ConfigStr, SqliteDsn]
_TableAsset: Type[SqlTableAsset] = pydantic.PrivateAttr(SqliteTableAsset)
_QueryAsset: Type[SqlQueryAsset] = pydantic.PrivateAttr(SqliteQueryAsset)
@public_api
@override
def add_table_asset( # noqa: PLR0913
self,
name: str,
table_name: str = "",
schema_name: Optional[str] = None,
order_by: Optional[SortersDefinition] = None,
batch_metadata: Optional[BatchMetadata] = None,
) -> SqliteTableAsset:
return cast(
SqliteTableAsset,
super().add_table_asset(
name=name,
table_name=table_name,
schema_name=schema_name,
order_by=order_by,
batch_metadata=batch_metadata,
),
)
add_table_asset.__doc__ = SQLDatasource.add_table_asset.__doc__
@public_api
@override
def add_query_asset(
self,
name: str,
query: str,
order_by: Optional[SortersDefinition] = None,
batch_metadata: Optional[BatchMetadata] = None,
) -> SqliteQueryAsset:
return cast(
SqliteQueryAsset,
super().add_query_asset(
name=name, query=query, order_by=order_by, batch_metadata=batch_metadata
),
)
add_query_asset.__doc__ = SQLDatasource.add_query_asset.__doc__