-
Notifications
You must be signed in to change notification settings - Fork 68
/
table.py
663 lines (542 loc) · 27.3 KB
/
table.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
import asyncio
from dataclasses import dataclass
from enum import Enum
import os
from typing import Any, Dict, List, Optional, Union
from synapseclient import (
Synapse,
Schema as Synapse_Schema,
Table as Synapse_Table,
Column as Synapse_Column,
)
from synapseclient.table import (
CsvFileTable as Synapse_CsvFileTable,
TableQueryResult as Synaspe_TableQueryResult,
delete_rows,
)
from synapseclient.models import Annotations, AnnotationsValue
from opentelemetry import trace, context
tracer = trace.get_tracer("synapseclient")
# TODO: Have a plug-and-play interface to plugin different dataframes,
# or perhaps stream a CSV back when querying for data and uploading data
class FacetType(str, Enum):
"""Set to one of the enumerated values to indicate a column should be treated as
a facet."""
ENUMERATION = "enumeration"
"""Returns the most frequently seen values and their respective frequency counts;
selecting these returned values will cause the table results to be filtered such
that only rows with the selected values are returned."""
RANGE = "range"
"""Allows the column to be filtered by a chosen lower and upper bound; these bounds
are inclusive."""
class ColumnType(str, Enum):
"""The column type determines the type of data that can be stored in a column.
Switching between types (using a transaction with TableUpdateTransactionRequest
in the "changes" list) is generally allowed except for switching to "_LIST"
suffixed types. In such cases, a new column must be created and data must be
copied over manually"""
STRING = "STRING"
"""The STRING data type is a small text strings with between 1 and 1,000 characters.
Each STRING column will have a declared maximum size between 1 and 1,000 characters
(with 50 characters as the default when maximumSize = null). The maximum STRING size
is applied to the budget of the maximum table width, therefore it is best to use the
smallest possible maximum size for the data. For strings larger than 250 characters,
consider using the LARGETEXT column type for improved performance. Each STRING column
counts as maxSize*4 (4 bytes per character) towards the total width of a table."""
DOUBLE = "DOUBLE"
"""The DOUBLE data type is a double-precision 64-bit IEEE 754 floating point. Its
range of values is approximately +/-1.79769313486231570E+308 (15 significant decimal
digits). Each DOUBLE column counts as 23 bytes towards the total width of a table."""
INTEGER = "INTEGER"
"""The INTEGER data type is a 64-bit two's complement integer. The signed integer has
a minimum value of -2^63 and a maximum value of 2^63-1. Each INTEGER column counts as
20 bytes towards the total width of a table."""
BOOLEAN = "BOOLEAN"
"""The BOOLEAN data type has only two possible values: 'true' and 'false'. Each
BOOLEAN column counts as 5 bytes towards the total width of a table."""
DATE = "DATE"
"""The DATE data type represent the specified number of milliseconds since the
standard base time known as 'the epoch', namely January 1, 1970, 00:00:00 GM.
Each DATE column counts as 20 bytes towards the total width of a table."""
FILEHANDLEID = "FILEHANDLEID"
"""The FILEHANDLEID data type represents a file stored within a table. To store a
file in a table, first use the 'File Services' to upload a file to generate a new
FileHandle, then apply the fileHandle.id as the value for this column. Note: This
column type works best for files that are binary (non-text) or text files that are 1
MB or larger. For text files that are smaller than 1 MB consider using the LARGETEXT
column type to improve download performance. Each FILEHANDLEID column counts as 20
bytes towards the total width of a table."""
ENTITYID = "ENTITYID"
"""The ENTITYID type represents a reference to a Synapse Entity. Values will include
the 'syn' prefix, such as 'syn123'. Each ENTITYID column counts as 44 bytes towards
the total width of a table."""
SUBMISSIONID = "SUBMISSIONID"
"""The SUBMISSIONID type represents a reference to an evaluation submission. The
value should be the ID of the referenced submission. Each SUBMISSIONID column counts
as 20 bytes towards the total width of a table."""
EVALUATIONID = "EVALUATIONID"
"""The EVALUATIONID type represents a reference to an evaluation. The value should be
the ID of the referenced evaluation. Each EVALUATIONID column counts as 20 bytes
towards the total width of a table."""
LINK = "LINK"
"""The LINK data type represents any URL with 1,000 characters or less. Each LINK
column counts as maxSize*4 (4 bytes per character) towards the total width of a
table."""
MEDIUMTEXT = "MEDIUMTEXT"
"""The MEDIUMTEXT data type represents a string that is between 1 and 2,000
characters without the need to specify a maximum size. For smaller strings where the
maximum size is known consider using the STRING column type. For larger strings,
consider using the LARGETEXT or FILEHANDLEID column types. Each MEDIUMTEXT column
counts as 421 bytes towards the total width of a table."""
LARGETEXT = "LARGETEXT"
"""The LARGETEXT data type represents a string that is greater than 250 characters
but less than 524,288 characters (2 MB of UTF-8 4 byte chars). For smaller strings
consider using the STRING or MEDIUMTEXT column types. For larger strings, consider
using the FILEHANDELID column type. Each LARGE_TEXT column counts as 2133 bytes
towards the total width of a table."""
USERID = "USERID"
"""The USERID data type represents a reference to a Synapse User. The value should
be the ID of the referenced User. Each USERID column counts as 20 bytes towards the
total width of a table."""
STRING_LIST = "STRING_LIST"
"""Multiple values of STRING."""
INTEGER_LIST = "INTEGER_LIST"
"""Multiple values of INTEGER."""
BOOLEAN_LIST = "BOOLEAN_LIST"
"""Multiple values of BOOLEAN."""
DATE_LIST = "DATE_LIST"
"""Multiple values of DATE."""
ENTITYID_LIST = "ENTITYID_LIST"
"""Multiple values of ENTITYID."""
USERID_LIST = "USERID_LIST"
"""Multiple values of USERID."""
JSON = "JSON"
"""A flexible type that allows to store JSON data. Each JSON column counts as 2133
bytes towards the total width of a table. A JSON value string should be less than
524,288 characters (2 MB of UTF-8 4 byte chars)."""
@dataclass
class CsvResultFormat:
"""CSV result format options."""
quote_character: Optional[str] = '"'
"""default double quote"""
escape_character: Optional[str] = "\\"
"""default backslash"""
line_end: Optional[str] = str(os.linesep)
"""defaults to os.linesep"""
separator: Optional[str] = ","
"""defaults to comma"""
header: Optional[bool] = True
"""True by default"""
include_row_id_and_row_version: Optional[bool] = True
"""True by default"""
download_location: Optional[str] = None
"""directory path to download the CSV file to"""
def convert_into_api_parameters(self):
"""Converts the CsvResultFormat into a dictionary that can be passed into the synapseclient."""
return {
"resultsAs": "csv",
"quoteCharacter": self.quote_character,
"escapeCharacter": self.escape_character,
"lineEnd": self.line_end,
"separator": self.separator,
"header": self.header,
"includeRowIdAndRowVersion": self.include_row_id_and_row_version,
"downloadLocation": self.download_location,
}
@dataclass
class RowsetResultFormat:
"""Rowset result format options."""
limit: Optional[int] = None
"""specify the maximum number of rows to be returned, defaults to None"""
offset: Optional[int] = None
def convert_into_api_parameters(self):
"""Converts the RowsetResultFormat into a dictionary that can be passed
into the synapseclient."""
return {
"resultsAs": "rowset",
"limit": self.limit,
"offset": self.offset,
}
@dataclass
class JsonSubColumn:
"""For column of type JSON that represents the combination of multiple
sub-columns, this property is used to define each sub-column."""
name: str
"""The display name of the column."""
column_type: ColumnType
"""The column type determines the type of data that can be stored in a column.
Switching between types (using a transaction with TableUpdateTransactionRequest
in the "changes" list) is generally allowed except for switching to "_LIST" suffixed
types. In such cases, a new column must be created and data must be copied
over manually"""
json_path: str
"""Defines the JSON path of the sub column. Use the '$' char to represent the root
of JSON object. If the JSON key of a sub column is 'a', then the jsonPath for that
column would be: '$.a'."""
facet_type: Optional[FacetType] = None
"""Set to one of the enumerated values to indicate a column should be
treated as a facet"""
@dataclass()
class Row:
# TODO: We will want to restrict the typing here
values: Optional[List[Any]] = None
"""The values for each column of this row."""
row_id: Optional[int] = None
"""The immutable ID issued to a new row."""
version_number: Optional[int] = None
"""The version number of this row. Each row version is immutable, so when a
row is updated a new version is created."""
@dataclass()
class Column:
"""A column model contains the metadata of a single column of a table or view."""
id: str
"""The immutable ID issued to new columns"""
name: str
"""The display name of the column"""
column_type: ColumnType
"""The column type determines the type of data that can be stored in a column.
Switching between types (using a transaction with TableUpdateTransactionRequest
in the "changes" list) is generally allowed except for switching to "_LIST"
suffixed types. In such cases, a new column must be created and data must be
copied over manually"""
facet_type: Optional[FacetType] = None
"""Set to one of the enumerated values to indicate a column should be
treated as a facet"""
default_value: Optional[str] = None
"""The default value for this column. Columns of type ENTITYID, FILEHANDLEID,
USERID, and LARGETEXT are not allowed to have default values."""
maximum_size: Optional[int] = None
"""A parameter for columnTypes with a maximum size. For example, ColumnType.STRINGs
have a default maximum size of 50 characters, but can be set to a maximumSize
of 1 to 1000 characters. For columnType of STRING_LIST, this limits the size
of individual string elements in the list"""
maximum_list_length: Optional[int] = None
"""Required if using a columnType with a "_LIST" suffix. Describes the maximum number
of values that will appear in that list. Value range 1-100 inclusive. Default 100"""
enum_values: Optional[List[str]] = None
"""Columns of type STRING can be constrained to an enumeration values set on this
list. The maximum number of entries for an enum is 100"""
json_sub_columns: Optional[List[JsonSubColumn]] = None
"""For column of type JSON that represents the combination of multiple sub-columns,
this property is used to define each sub-column."""
def convert_from_api_parameters(self, synapse_column: Synapse_Column) -> "Column":
"""Converts a response from the synapseclient into this dataclass."""
self.id = synapse_column.id
self.name = synapse_column.name
self.column_type = synapse_column.columnType
self.facet_type = synapse_column.get("facetType", None)
self.default_value = synapse_column.get("defaultValue", None)
self.maximum_size = synapse_column.get("maximumSize", None)
self.maximum_list_length = synapse_column.get("maximumListLength", None)
self.enum_values = synapse_column.get("enumValues", None)
self.json_sub_columns = synapse_column.get("jsonSubColumns", None)
return self
async def store(self, synapse_client: Optional[Synapse] = None):
"""Persist the column to Synapse.
:param synapse_client: If not passed in or None this will use the last client from the `.login()` method.
:return: Column
"""
with tracer.start_as_current_span(f"Column_Store: {self.name}"):
# TODO - We need to add in some validation before the store to verify we have enough
# information to store the data
# Call synapse
loop = asyncio.get_event_loop()
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).createColumn(
name=self.name,
columnType=self.column_type,
opentelemetry_context=current_context,
),
)
print(entity)
self.convert_from_api_parameters(entity)
print(f"Stored column {self.name}, id: {self.id}")
return self
@dataclass()
class Table:
"""A Table represents the metadata of a table.
Attributes:
id: The unique immutable ID for this table. A new ID will be generated for new
Tables. Once issued, this ID is guaranteed to never change or be re-issued
name: The name of this table. Must be 256 characters or less. Names may only
contain: letters, numbers, spaces, underscores, hyphens, periods, plus signs,
apostrophes, and parentheses
parent_id: The ID of the Entity that is the parent of this table.
columns: The columns of this table.
description: The description of this entity. Must be 1000 characters or less.
etag: Synapse employs an Optimistic Concurrency Control (OCC) scheme to handle
concurrent updates. Since the E-Tag changes every time an entity is updated it
is used to detect when a client's current representation of an entity is out-of-date.
created_on: The date this table was created.
created_by: The ID of the user that created this table.
modified_on: The date this table was last modified. In YYYY-MM-DD-Thh:mm:ss.sssZ format
modified_by: The ID of the user that last modified this table.
version_number: The version number issued to this version on the object.
version_label: The version label for this table
version_comment: The version comment for this table
is_latest_version: If this is the latest version of the object.
is_search_enabled: When creating or updating a table or view specifies if full text search
should be enabled. Note that enabling full text search might slow down the
indexing of the table or view.
annotations: Additional metadata associated with the table. The key is the name of your
desired annotations. The value is an object containing a list of values
(use empty list to represent no values for key) and the value type associated with
all values in the list.
"""
id: Optional[str] = None
"""The unique immutable ID for this table. A new ID will be generated for new
Tables. Once issued, this ID is guaranteed to never change or be re-issued"""
name: Optional[str] = None
"""The name of this table. Must be 256 characters or less. Names may only
contain: letters, numbers, spaces, underscores, hyphens, periods, plus signs,
apostrophes, and parentheses"""
parent_id: Optional[str] = None
"""The ID of the Entity that is the parent of this table."""
columns: Optional[List[Column]] = None
# TODO: Description doesn't seem to be returned from the API. Look into why.
# description: Optional[str] = None
# """The description of this entity. Must be 1000 characters or less."""
etag: Optional[str] = None
"""Synapse employs an Optimistic Concurrency Control (OCC) scheme to handle
concurrent updates. Since the E-Tag changes every time an entity is updated it
is used to detect when a client's current representation of an entity is out-of-date."""
created_on: Optional[str] = None
"""The date this table was created."""
created_by: Optional[str] = None
"""The ID of the user that created this table."""
modified_on: Optional[str] = None
"""The date this table was last modified. In YYYY-MM-DD-Thh:mm:ss.sssZ format"""
modified_by: Optional[str] = None
"""The ID of the user that last modified this table."""
version_number: Optional[int] = None
"""The version number issued to this version on the object."""
version_label: Optional[str] = None
"""The version label for this table"""
version_comment: Optional[str] = None
"""The version comment for this table"""
is_latest_version: Optional[bool] = None
"""If this is the latest version of the object."""
is_search_enabled: Optional[bool] = None
"""When creating or updating a table or view specifies if full text search
should be enabled. Note that enabling full text search might slow down the
indexing of the table or view."""
annotations: Optional[Dict[str, AnnotationsValue]] = None
"""Additional metadata associated with the table. The key is the name of your
desired annotations. The value is an object containing a list of values
(use empty list to represent no values for key) and the value type associated with
all values in the list."""
def convert_from_api_parameters(
self, synapse_table: Synapse_Table, set_annotations: bool = True
) -> "Table":
"""Converts the data coming from the Synapse API into this datamodel.
:param synapse_table: The data coming from the Synapse API
"""
self.id = synapse_table.get("id", None)
self.name = synapse_table.get("name", None)
self.parent_id = synapse_table.get("parentId", None)
# TODO: Description doesn't seem to be returned from the API. Look into why.
# self.description = synapse_table.description
self.etag = synapse_table.get("etag", None)
self.created_on = synapse_table.get("createdOn", None)
self.created_by = synapse_table.get("createdBy", None)
self.modified_on = synapse_table.get("modifiedOn", None)
self.modified_by = synapse_table.get("modifiedBy", None)
self.version_number = synapse_table.get("versionNumber", None)
self.version_label = synapse_table.get("versionLabel", None)
self.version_comment = synapse_table.get("versionComment", None)
self.is_latest_version = synapse_table.get("isLatestVersion", None)
self.is_search_enabled = synapse_table.get("isSearchEnabled", False)
self.columns = [
Column(id=columnId, name=None, column_type=None)
for columnId in synapse_table.get("columnIds", [])
]
if set_annotations:
self.annotations = Annotations.convert_from_api_parameters(
synapse_table.get("annotations", None)
)
return self
async def store_rows_from_csv(
self, csv_path: str, synapse_client: Optional[Synapse] = None
) -> str:
"""Takes in a path to a CSV and stores the rows to Synapse.
Args:
csv_path: The path to the CSV to store.
synapse_client: If not passed in or None this will use the last client from the `.login()` method.
Returns:
The path to the CSV that was stored.
"""
with tracer.start_as_current_span(f"Store_rows_by_csv: {csv_path}"):
synapse_table = Synapse_Table(schema=self.id, values=csv_path)
loop = asyncio.get_event_loop()
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).store(
obj=synapse_table, opentelemetry_context=current_context
),
)
print(entity)
# TODO: What should this return?
return csv_path
async def delete_rows(
self, rows: List[Row], synapse_client: Optional[Synapse] = None
) -> None:
"""Delete rows from a table.
Args:
rows: The rows to delete.
synapse_client: If not passed in or None this will use the last client from the `.login()` method.
Returns:
None
"""
with tracer.start_as_current_span(f"Delete_rows: {self.name}"):
rows_to_delete = []
for row in rows:
rows_to_delete.append([row.row_id, row.version_number])
loop = asyncio.get_event_loop()
current_context = context.get_current()
await loop.run_in_executor(
None,
lambda: delete_rows(
syn=Synapse.get_client(synapse_client=synapse_client),
table_id=self.id,
row_id_vers_list=rows_to_delete,
opentelemetry_context=current_context,
),
)
async def store_schema(self, synapse_client: Optional[Synapse] = None) -> "Table":
"""Store non-row information about a table including the columns and annotations.
Args:
synapse_client: If not passed in or None this will use the last client from the `.login()` method.
Returns:
The Table instance stored in synapse.
"""
with tracer.start_as_current_span(f"Table_Schema_Store: {self.name}"):
tasks = []
if self.columns:
tasks.extend(
column.store(synapse_client=synapse_client)
for column in self.columns
)
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
# TODO: Proper exception handling
for result in results:
if isinstance(result, Column):
print(f"Stored {result.name}")
else:
raise ValueError(f"Unknown type: {type(result)}")
except Exception as ex:
Synapse.get_client(synapse_client=synapse_client).logger.exception(
ex
)
print("I hit an exception")
synapse_schema = Synapse_Schema(
name=self.name,
columns=self.columns,
parent=self.parent_id,
)
loop = asyncio.get_event_loop()
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).store(
obj=synapse_schema, opentelemetry_context=current_context
),
)
self.convert_from_api_parameters(
synapse_table=entity, set_annotations=False
)
tasks = []
if self.annotations:
tasks.append(
asyncio.create_task(
Annotations(
id=self.id, etag=self.etag, annotations=self.annotations
).store(synapse_client=synapse_client)
)
)
try:
results = await asyncio.gather(*tasks, return_exceptions=True)
# TODO: Proper exception handling
for result in results:
if isinstance(result, Annotations):
self.annotations = result.annotations
print(
f"Stored annotations id: {result.id}, etag: {result.etag}"
)
else:
raise ValueError(f"Unknown type: {type(result)}")
except Exception as ex:
Synapse.get_client(synapse_client=synapse_client).logger.exception(
ex
)
print("I hit an exception")
return self
async def get(self, synapse_client: Optional[Synapse] = None) -> "Table":
"""Get the metadata about the table from synapse.
Args:
synapse_client: If not passed in or None this will use the last client from the `.login()` method.
Returns:
The Table instance stored in synapse.
"""
# TODO: How do we want to support retriving the table? Do we want to support by name, and parent?
with tracer.start_as_current_span(f"Table_Get: {self.name}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
entity = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).get(
entity=self.id, opentelemetry_context=current_context
),
)
self.convert_from_api_parameters(synapse_table=entity, set_annotations=True)
return self
# TODO: Synapse allows immediate deletion of entities, but the Synapse Client does not
# TODO: Should we support immediate deletion?
async def delete(self, synapse_client: Optional[Synapse] = None) -> None:
"""Delete the table from synapse.
Args:
synapse_client: If not passed in or None this will use the last client from the `.login()` method.
Returns:
None
"""
with tracer.start_as_current_span(f"Table_Delete: {self.name}"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).delete(
obj=self.id, opentelemetry_context=current_context
),
)
@classmethod
async def query(
self,
query: str,
result_format: Union[CsvResultFormat, RowsetResultFormat] = CsvResultFormat(),
synapse_client: Optional[Synapse] = None,
) -> Union[Synapse_CsvFileTable, Synaspe_TableQueryResult]:
"""Query for data on a table stored in Synapse.
Args:
query: The query to run.
result_format: The format of the results. Defaults to CsvResultFormat().
synapse_client: If not passed in or None this will use the last client from the `.login()` method.
Returns:
The results of the query.
"""
with tracer.start_as_current_span("Table_query"):
loop = asyncio.get_event_loop()
current_context = context.get_current()
# TODO: Future Idea - We stream back a CSV, and let those reading this to handle the CSV however they want
results = await loop.run_in_executor(
None,
lambda: Synapse.get_client(synapse_client=synapse_client).tableQuery(
query=query,
**result_format.convert_into_api_parameters(),
opentelemetry_context=current_context,
),
)
print(results)
return results