Skip to content

Commit

Permalink
Merge 73a44bf into 212db20
Browse files Browse the repository at this point in the history
  • Loading branch information
roll committed Sep 29, 2020
2 parents 212db20 + 73a44bf commit 0e50203
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 37 deletions.
17 changes: 13 additions & 4 deletions frictionless/extract/table.py
Expand Up @@ -12,11 +12,12 @@ def extract_table(
encoding=None,
compression=None,
compression_path=None,
# Control/Dialect/Query/Header
control=None,
# Table
dialect=None,
query=None,
headers=None,
# Schema
schema=None,
sync_schema=False,
patch_schema=False,
Expand All @@ -25,6 +26,8 @@ def extract_table(
infer_volume=config.DEFAULT_INFER_VOLUME,
infer_confidence=config.DEFAULT_INFER_CONFIDENCE,
infer_missing_values=config.DEFAULT_MISSING_VALUES,
# Integrity
on_error="ignore",
lookup=None,
# Extraction
process=None,
Expand Down Expand Up @@ -112,8 +115,11 @@ def extract_table(
lookup? (dict): The lookup is a special object providing relational information.
For more information, please check "Extracting Data" guide.
process? (func): a row processor function
stream? (bool): return a row streams instead of loading into memory
process? (func): A row processor function.
It should be in a form of `row -> result`
stream? (bool): Return a row streams instead of loading into memory.
It can be useful for a big data files.
Returns:
Row[]: an array/stream of rows
Expand All @@ -123,17 +129,18 @@ def extract_table(
# Create table
table = Table(
source,
headers=headers,
# File
scheme=scheme,
format=format,
hashing=hashing,
encoding=encoding,
compression=compression,
compression_path=compression_path,
# Control/Dialect/Query/Header
control=control,
dialect=dialect,
query=query,
headers=headers,
# Schema
schema=schema,
sync_schema=sync_schema,
Expand All @@ -142,6 +149,8 @@ def extract_table(
infer_names=infer_names,
infer_volume=infer_volume,
infer_confidence=infer_confidence,
# Integrity
on_error=on_error,
lookup=lookup,
)

Expand Down
21 changes: 21 additions & 0 deletions frictionless/package.py
Expand Up @@ -36,6 +36,7 @@ class Package(Metadata):
profile? (str): profile name like 'data-package'
basepath? (str): a basepath of the package
trusted? (bool): don't raise on unsafe paths
on_error? (ignore|warn|raise): behaviour if there is an error
Raises:
FrictionlessException: raise any error that occurs during the process
Expand All @@ -53,6 +54,7 @@ def __init__(
profile=None,
basepath=None,
trusted=None,
on_error="ignore",
):

# Handle zip
Expand All @@ -67,8 +69,15 @@ def __init__(
self.setinitial("profile", profile)
self.__basepath = basepath or helpers.detect_basepath(descriptor)
self.__trusted = trusted
self.__on_error = on_error
super().__init__(descriptor)

def __setattr__(self, name, value):
if name == "on_error":
self.__on_error = value
return
super().__setattr__(name, value)

@Metadata.property
def name(self):
"""
Expand Down Expand Up @@ -109,6 +118,15 @@ def profile(self):
"""
return self.get("profile", config.DEFAULT_PACKAGE_PROFILE)

@property
def on_error(self):
"""
Returns:
ignore|warn|raise: on error bahaviour
"""
assert self.__on_error in ["ignore", "warn", "raise"]
return self.__on_error

# Resources

@Metadata.property
Expand Down Expand Up @@ -422,9 +440,12 @@ def metadata_process(self):
resource,
basepath=self.__basepath,
trusted=self.__trusted,
on_error=self.__on_error,
package=self,
)
list.__setitem__(resources, index, resource)
# NOTE: should we sync basepath/trusted also here?
resource.on_error = self.__on_error
if not isinstance(resources, helpers.ControlledList):
resources = helpers.ControlledList(resources)
resources.__onchange__(self.metadata_process)
Expand Down
49 changes: 34 additions & 15 deletions frictionless/resource.py
Expand Up @@ -42,6 +42,7 @@ class Resource(Metadata):
profile? (str): resource profile
basepath? (str): resource basepath
trusted? (bool): don't raise on unsage paths
on_error? (ignore|warn|raise): behaviour if there is an error
package? (Package): resource package
Raises:
Expand Down Expand Up @@ -69,6 +70,7 @@ def __init__(
profile=None,
basepath=None,
trusted=False,
on_error="ignore",
package=None,
):

Expand All @@ -93,6 +95,7 @@ def __init__(
self.setinitial("profile", profile)
self.__basepath = basepath or helpers.detect_basepath(descriptor)
self.__trusted = trusted
self.__on_error = on_error
self.__package = package
super().__init__(descriptor)

Expand All @@ -101,6 +104,12 @@ def __init__(
if hashing != config.DEFAULT_HASHING:
self["hashing"] = hashing

def __setattr__(self, name, value):
if name == "on_error":
self.__on_error = value
return
super().__setattr__(name, value)

@Metadata.property
def name(self):
"""
Expand Down Expand Up @@ -286,21 +295,6 @@ def compression_path(self):
"""
return self.get("compressionPath")

@Metadata.property
def stats(self):
"""
Returns
dict?: resource stats
"""
stats = {}
for name in ["hash", "bytes", "rows"]:
value = self.get(name)
if value is not None:
if name == "hash":
value = helpers.parse_resource_hash(value)[1]
stats[name] = value
return stats

@Metadata.property
def dialect(self):
"""
Expand Down Expand Up @@ -341,6 +335,30 @@ def profile(self):
"""
return self.get("profile", config.DEFAULT_RESOURCE_PROFILE)

@property
def on_error(self):
"""
Returns:
ignore|warn|raise: on error bahaviour
"""
assert self.__on_error in ["ignore", "warn", "raise"]
return self.__on_error

@Metadata.property
def stats(self):
"""
Returns
dict?: resource stats
"""
stats = {}
for name in ["hash", "bytes", "rows"]:
value = self.get(name)
if value is not None:
if name == "hash":
value = helpers.parse_resource_hash(value)[1]
stats[name] = value
return stats

# Expand

def expand(self):
Expand Down Expand Up @@ -724,6 +742,7 @@ def to_table(self, **options):
options.setdefault("compression_path", self.compression_path)
options.setdefault("dialect", self.dialect)
options.setdefault("schema", self.schema)
options.setdefault("on_error", self.__on_error)
if "lookup" not in options:
options["lookup"] = self.read_lookup()
return Table(**options)
Expand Down
57 changes: 55 additions & 2 deletions frictionless/table.py
@@ -1,4 +1,5 @@
import typing
import warnings
from pathlib import Path
from copy import deepcopy
from itertools import chain
Expand Down Expand Up @@ -105,6 +106,10 @@ class Table:
For more information, please check "Describing Data" guide.
It defaults to `['']`
on_error? (ignore|warn|raise): Define behaviour if there is an error in the
header or rows during the reading rows process.
It defaults to `ignore`.
lookup? (dict): The lookup is a special object providing relational information.
For more information, please check "Extracting Data" guide.
Expand All @@ -123,11 +128,12 @@ def __init__(
encoding=None,
compression=None,
compression_path=None,
# Control/Dialect/Query/Header
control=None,
# Table
dialect=None,
query=None,
headers=None,
# Schema
schema=None,
sync_schema=False,
patch_schema=False,
Expand All @@ -136,6 +142,8 @@ def __init__(
infer_volume=config.DEFAULT_INFER_VOLUME,
infer_confidence=config.DEFAULT_INFER_CONFIDENCE,
infer_missing_values=config.DEFAULT_MISSING_VALUES,
# Integrity
on_error="ignore",
lookup=None,
):

Expand Down Expand Up @@ -177,8 +185,12 @@ def __init__(
self.__infer_volume = infer_volume
self.__infer_confidence = infer_confidence
self.__infer_missing_values = infer_missing_values
self.__on_error = on_error
self.__lookup = lookup

# Set error handler
self.on_error = on_error

# Create file
self.__file = File(
source=source,
Expand All @@ -193,6 +205,12 @@ def __init__(
query=query,
)

def __setattr__(self, name, value):
if name == "on_error":
self.__on_error = value
return
super().__setattr__(name, value)

def __enter__(self):
if self.closed:
self.open()
Expand Down Expand Up @@ -301,6 +319,24 @@ def schema(self):
"""
return self.__schema

@property
def on_error(self):
"""
Returns:
ignore|warn|raise: on error bahaviour
"""
assert self.__on_error in ["ignore", "warn", "raise"]
return self.__on_error

# TODO: use property wrapper to make it shorter
@on_error.setter
def on_error(self, value):
"""
Parameters:
value (ignore|warn|raise): on error bahaviour
"""
self.__on_error = value

@property
def header(self):
"""
Expand Down Expand Up @@ -403,7 +439,8 @@ def closed(self):
"""
return self.__parser is None

# Read
# Read
return self.__on_error

def read_data(self):
"""Read data stream into memory
Expand Down Expand Up @@ -662,6 +699,14 @@ def __read_row_stream(self):
def __read_row_stream_create(self):
schema = self.schema

# Handle header errors
if not self.header.valid:
error = self.header.errors[0]
if self.__on_error == "warn":
warnings.warn(error.message, UserWarning)
elif self.__on_error == "raise":
raise exceptions.FrictionlessException(error)

# Create state
memory_unique = {}
memory_primary = {}
Expand Down Expand Up @@ -732,6 +777,14 @@ def __read_row_stream_create(self):
error = errors.ForeignKeyError.from_row(row, note=note)
row.errors.append(error)

# Handle row errors
if not row.valid:
error = row.errors[0]
if self.__on_error == "warn":
warnings.warn(error.message, UserWarning)
elif self.__on_error == "raise":
raise exceptions.FrictionlessException(error)

# Stream row
yield row

Expand Down

0 comments on commit 0e50203

Please sign in to comment.