Skip to content

Commit

Permalink
Minor function refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Feb 25, 2023
1 parent 9424be7 commit f13b94f
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
5 changes: 3 additions & 2 deletions eskema/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@
import pandas as pd

from eskema.autopk import infer_pk
from eskema.ddlgen.sources import SourcePlus
from eskema.exception import UnknownContentType
from eskema.io import to_bytes
from eskema.model import Resource, SqlResult, SqlTarget
from eskema.settings import FRICTIONLESS_CONTENT_TYPES
from eskema.type import ContentType
from eskema.util import to_bytes

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,6 +51,7 @@ def to_sql_ddl(self) -> SqlResult:
"""
Infer field/column schema from input data and generate SQL DDL statement.
"""

logger.info(f"Selected backend: {self.backend}")
fallback = False
try:
Expand Down Expand Up @@ -165,6 +165,7 @@ def _ddl_frictionless(self) -> SqlResult:
def _ddl_ddlgen(self) -> SqlResult:

from eskema.ddlgen.ddlgenerator import TablePlus
from eskema.ddlgen.sources import SourcePlus

# Sanity checks.
if self.resource.type is None:
Expand Down
13 changes: 3 additions & 10 deletions eskema/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@
import typing as t
from collections import OrderedDict

import json_stream
import pandas as pd
from fsspec.implementations.local import LocalFileOpener
from fsspec.spec import AbstractBufferedFile
from json_stream.base import StreamingJSONList, StreamingJSONObject

from eskema.type import ContentType

Expand Down Expand Up @@ -104,6 +102,9 @@ def json_get_first_records(data: io.TextIOBase, nrecords: int = 5) -> t.List[t.O
- From a "list of objects" JSON document, get only the first N records.
- From a "single object" JSON document, get only the first record.
"""
import json_stream
from json_stream.base import StreamingJSONList, StreamingJSONObject

try:
stream = json_stream.load(data)
except StopIteration as ex:
Expand All @@ -129,14 +130,6 @@ def json_get_first_records(data: io.TextIOBase, nrecords: int = 5) -> t.List[t.O
return [] # pragma: no cover


def to_bytes(payload: t.Union[str, bytes], name: t.Optional[str] = None) -> io.BytesIO:
if isinstance(payload, str):
payload = payload.encode()
data = io.BytesIO(payload)
data.name = name or "UNKNOWN"
return data


def read_lineprotocol(data: t.IO[t.Any]):
"""
Read stream of InfluxDB line protocol and decode raw data.
Expand Down
8 changes: 8 additions & 0 deletions eskema/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,11 @@ def _enable_tracing(modules: t.List[str] = None):

def unwrap(value: str):
return textwrap.dedent(value).strip()


def to_bytes(payload: t.Union[str, bytes], name: t.Optional[str] = None) -> io.BytesIO:
if isinstance(payload, str):
payload = payload.encode()
data = io.BytesIO(payload)
data.name = name or "UNKNOWN"
return data

0 comments on commit f13b94f

Please sign in to comment.