Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 0 additions & 123 deletions quasardb/numpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,129 +597,6 @@ def ensure_ma(
return _ensure_ma(xs, dtype)


def read_array(
table: Optional[Table] = None, column: Optional[str] = None, ranges: Any = None
) -> Tuple[NDArrayTime, MaskedArrayAny]:
if table is None:
raise RuntimeError("A table is required.")

if column is None:
raise RuntimeError("A column is required.")

kwargs: Dict[str, Any] = {"column": column}

if ranges is not None:
kwargs["ranges"] = ranges

read_with = {
quasardb.ColumnType.Double: table.double_get_ranges,
quasardb.ColumnType.Blob: table.blob_get_ranges,
quasardb.ColumnType.String: table.string_get_ranges,
quasardb.ColumnType.Symbol: table.string_get_ranges,
quasardb.ColumnType.Int64: table.int64_get_ranges,
quasardb.ColumnType.Timestamp: table.timestamp_get_ranges,
}

ctype = table.column_type_by_id(column)

fn = read_with[ctype]
return fn(**kwargs)


def write_array(
data: Any = None,
index: Optional[NDArrayTime] = None,
table: Optional[Table] = None,
column: Optional[str] = None,
dtype: Optional[DType] = None,
infer_types: bool = True,
) -> None:
"""
Write a Numpy array to a single column.

Parameters:
-----------

data: np.array
Numpy array with a dtype that is compatible with the column's type.

index: np.array
Numpy array with a datetime64[ns] dtype that will be used as the
$timestamp axis for the data to be stored.

dtype: optional np.dtype
If provided, ensures the data array is converted to this dtype before
insertion.

infer_types: optional bool
If true, when necessary will attempt to convert the data and index array
to the best type for the column. For example, if you provide float64 data
while the column's type is int64, it will automatically convert the data.

Defaults to True. For production use cases where you want to avoid implicit
conversions, we recommend always setting this to False.

"""

if table is None:
raise RuntimeError("A table is required.")

if column is None:
raise RuntimeError("A column is required.")

if data is None:
raise RuntimeError("A data numpy array is required.")

if index is None:
raise RuntimeError("An index numpy timestamp array is required.")

data = ensure_ma(data, dtype=dtype)
ctype = table.column_type_by_id(column)

# We try to reuse some of the other functions, which assume array-like
# shapes for column info and data. It's a bit hackish, but actually works
# well.
#
# We should probably generalize this block of code with the same found in
# write_arrays().

cinfos = [(column, ctype)]
dtype_: List[Optional[DType]] = [dtype]

dtype_ = _coerce_dtype(dtype_, cinfos)

if infer_types is True:
dtype_ = _add_desired_dtypes(dtype_, cinfos)

# data_ = an array of [data]
data_ = [data]
data_ = _coerce_data(data_, dtype_)
_validate_dtypes(data_, cinfos)

# No functions that assume array-of-data anymore, let's put it back
data = data_[0]

# Dispatch to the correct function
write_with = {
quasardb.ColumnType.Double: table.double_insert,
quasardb.ColumnType.Blob: table.blob_insert,
quasardb.ColumnType.String: table.string_insert,
quasardb.ColumnType.Symbol: table.string_insert,
quasardb.ColumnType.Int64: table.int64_insert,
quasardb.ColumnType.Timestamp: table.timestamp_insert,
}

logger.info(
"Writing array (%d rows of dtype %s) to columns %s.%s (type %s)",
len(data),
data.dtype,
table.get_name(),
column,
ctype,
)
write_with[ctype](column, index, data)


def _concat_masked(xs: List[MaskedArrayAny]) -> MaskedArrayAny:
if len(xs) == 0:
return ma.masked_array(np.array([]))
Expand Down
99 changes: 0 additions & 99 deletions quasardb/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,105 +90,6 @@ class PandasRequired(ImportError):
TableLike = Union[str, Table]


def read_series(
table: Table, col_name: str, ranges: Optional[RangeSet] = None
) -> pd.Series:
"""
Read a Pandas Timeseries from a single column.

Parameters:
-----------

table : quasardb.Timeseries
QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

col_name : str
Name of the column to read.

ranges : list
A list of ranges to read, represented as tuples of Numpy datetime64[ns] objects.
"""
read_with = {
quasardb.ColumnType.Double: table.double_get_ranges,
quasardb.ColumnType.Blob: table.blob_get_ranges,
quasardb.ColumnType.String: table.string_get_ranges,
quasardb.ColumnType.Int64: table.int64_get_ranges,
quasardb.ColumnType.Timestamp: table.timestamp_get_ranges,
quasardb.ColumnType.Symbol: table.string_get_ranges,
}

kwargs: Dict[str, Any] = {"column": col_name}

if ranges is not None:
kwargs["ranges"] = ranges

# Dispatch based on column type
t = table.column_type_by_id(col_name)

logger.info(
"reading Series from column %s.%s with type %s", table.get_name(), col_name, t
)

res = (read_with[t])(**kwargs)

return pd.Series(res[1], index=res[0])


def write_series(
series: pd.Series,
table: Table,
col_name: str,
infer_types: bool = True,
dtype: Optional[DType] = None,
) -> None:
"""
Writes a Pandas Timeseries to a single column.

Parameters:
-----------

series : pandas.Series
Pandas Series, with a numpy.datetime64[ns] as index. Underlying data will be attempted
to be transformed to appropriate QuasarDB type.

table : quasardb.Timeseries
QuasarDB Timeseries table object, e.g. qdb_cluster.table('my_table')

col_name : str
Column name to store data in.
"""

logger.debug(
"write_series, table=%s, col_name=%s, infer_types=%s, dtype=%s",
table.get_name(),
col_name,
infer_types,
dtype,
)

data = None
index = None

data = ma.masked_array(series.to_numpy(copy=False), mask=series.isna())

if infer_types is True:
index = series.index.to_numpy("datetime64[ns]", copy=False)
else:
index = series.index.to_numpy(copy=False)

assert data is not None
assert index is not None

qdbnp.write_array(
data=data,
index=index,
table=table,
column=col_name,
dtype=dtype,
infer_types=infer_types,
)


def query(
cluster: Cluster,
query: str,
Expand Down
45 changes: 0 additions & 45 deletions quasardb/quasardb/_table.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,6 @@ class IndexedColumnInfo:

class Table(Entry):
def __repr__(self) -> str: ...
def blob_get_ranges(
self, column: str, ranges: Optional[RangeSet] = None
) -> tuple[NDArrayTime, MaskedArrayAny]: ...
def blob_insert(
self,
column: str,
timestamps: NDArrayTime,
values: Union[MaskedArrayAny, NDArrayAny],
) -> None: ...
def column_id_by_index(self, index: int) -> str: ...
def column_index_by_id(self, alias: str) -> int: ...
def column_info_by_index(self, index: int) -> ColumnInfo: ...
Expand All @@ -88,29 +79,11 @@ class Table(Entry):
shard_size: datetime.timedelta = ...,
ttl: datetime.timedelta = ...,
) -> None: ...
def double_get_ranges(
self, column: str, ranges: Optional[RangeSet] = None
) -> tuple[NDArrayTime, MaskedArrayAny]: ...
def double_insert(
self,
column: str,
timestamps: NDArrayTime,
values: Union[MaskedArrayAny, NDArrayAny],
) -> None: ...
def erase_ranges(self, column: str, ranges: RangeSet) -> int: ...
def get_shard_size(self) -> datetime.timedelta: ...
def get_ttl(self) -> datetime.timedelta: ...
def has_ttl(self) -> bool: ...
def insert_columns(self, columns: list[ColumnInfo]) -> None: ...
def int64_get_ranges(
self, column: str, ranges: Optional[RangeSet] = None
) -> tuple[NDArrayTime, MaskedArrayAny]: ...
def int64_insert(
self,
column: str,
timestamps: NDArrayTime,
values: Union[MaskedArrayAny, NDArrayAny],
) -> None: ...
def list_columns(self) -> list[ColumnInfo]: ...
def reader(
self,
Expand All @@ -119,22 +92,4 @@ class Table(Entry):
ranges: RangeSet = [],
) -> Reader: ...
def retrieve_metadata(self) -> None: ...
def string_get_ranges(
self, column: str, ranges: Optional[RangeSet] = None
) -> tuple[NDArrayTime, MaskedArrayAny]: ...
def string_insert(
self,
column: str,
timestamps: NDArrayTime,
values: Union[MaskedArrayAny, NDArrayAny],
) -> None: ...
def subscribe(self, conn: Any) -> Any: ...
def timestamp_get_ranges(
self, column: str, ranges: Optional[RangeSet] = None
) -> tuple[NDArrayTime, MaskedArrayAny]: ...
def timestamp_insert(
self,
column: str,
timestamps: NDArrayTime,
values: Union[MaskedArrayAny, NDArrayAny],
) -> None: ...
Loading
Loading