Skip to content

Commit

Permalink
Improve readibility
Browse files Browse the repository at this point in the history
  • Loading branch information
argenisleon committed Dec 22, 2020
1 parent 2ea0e83 commit 217ad16
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 16 deletions.
12 changes: 7 additions & 5 deletions optimus/engines/cudf/io/load.py
Expand Up @@ -3,10 +3,11 @@
import cudf

from optimus.engines.base.io.load import BaseLoad
from optimus.engines.base.meta import Meta
from optimus.engines.cudf.dataframe import CUDFDataFrame
from optimus.helpers.functions import prepare_path
from optimus.helpers.logger import logger
from optimus.engines.cudf.dataframe import CUDFDataFrame
from optimus.engines.base.meta import Meta


class Load(BaseLoad):

Expand Down Expand Up @@ -87,9 +88,10 @@ def csv(path, sep=',', header=True, infer_schema=True, encoding="utf-8", null_va
dtype = ["str"]

cdf = cudf.read_csv(path, sep=sep, header=header, encoding=encoding,
quoting=quoting, error_bad_lines=error_bad_lines,
keep_default_na=keep_default_na, na_values=null_value, nrows=n_rows, na_filter=na_filter,
dtype=dtype)
quoting=quoting, error_bad_lines=error_bad_lines,
keep_default_na=keep_default_na, na_values=null_value, nrows=n_rows,
na_filter=na_filter,
dtype=dtype)
df = CUDFDataFrame(cdf)
df.meta = Meta.set(df.meta, "file_name", path)
except IOError as error:
Expand Down
12 changes: 6 additions & 6 deletions optimus/engines/dask/io/load.py
Expand Up @@ -6,11 +6,11 @@

import optimus.helpers.functions_spark
from optimus.engines.base.io.load import BaseLoad
from optimus.engines.base.meta import Meta
from optimus.engines.dask.dataframe import DaskDataFrame
from optimus.helpers.core import val_to_list
from optimus.helpers.functions import prepare_path
from optimus.helpers.logger import logger
from optimus.engines.dask.dataframe import DaskDataFrame
from optimus.engines.base.meta import Meta


class Load(BaseLoad):
Expand Down Expand Up @@ -84,9 +84,9 @@ def csv(path, sep=',', header=True, infer_schema=True, na_values=None, encoding=
# Detect missing value markers (empty strings and the value of na_values). In data without any NAs,
# passing na_filter=False can improve the performance of reading a large file.
ddf = dd.read_csv(path, sep=sep, header=0 if header else None, encoding=encoding,
quoting=quoting, lineterminator=lineterminator, error_bad_lines=error_bad_lines,
keep_default_na=True, na_values=None, engine=engine, na_filter=na_filter, *args,
**kwargs)
quoting=quoting, lineterminator=lineterminator, error_bad_lines=error_bad_lines,
keep_default_na=True, na_values=None, engine=engine, na_filter=na_filter, *args,
**kwargs)

if n_rows > -1:
ddf = dd.from_pandas(ddf.head(n_rows), npartitions=1)
Expand Down Expand Up @@ -199,7 +199,7 @@ def avro(path, *args, **kwargs):
file, file_name = prepare_path(path, "avro")

try:
df = db.read_avro(path, *args, **kwargs).to_dataframe()
df = dd.read_avro(path, *args, **kwargs).to_dataframe()
df.meta = Meta.set(df.meta, "file_name", file_name)

except IOError as error:
Expand Down
1 change: 0 additions & 1 deletion optimus/engines/ibis/functions.py
Expand Up @@ -10,7 +10,6 @@


class IbisFunctions(Functions):

def _to_float(self, series, *args):
return series.cast("float64")

Expand Down
10 changes: 6 additions & 4 deletions optimus/engines/spark/columns.py
Expand Up @@ -168,7 +168,7 @@ def copy(self, input_cols, output_cols=None, columns=None):
dfd = df.data.withColumn(output_col, F.col(input_col))
meta.set(value=current_meta)
meta = meta.copy({input_col: output_col})

return self.root.new(dfd, meta=meta)

@staticmethod
Expand Down Expand Up @@ -403,7 +403,7 @@ def _cast_to(col_name, attr):
for input_col, output_col, data_type in zip(input_cols, output_cols, _dtype):
return_type, func, func_type = cast_factory(data_type)
dfd = self.apply(input_col, func, func_return_type=return_type, args=data_type, func_type=func_type,
output_cols=output_col, meta_action=Actions.CAST.value)
output_cols=output_col, meta_action=Actions.CAST.value)

return dfd

Expand Down Expand Up @@ -897,7 +897,8 @@ def _match(_input_cols, attr):
_search = attr[0]
return F.when(F.col(_input_cols).rlike(_search), True).otherwise(False)

return self.apply(input_cols, func=_match, args=(regex,), func_type="column_expr", func_return_type=str, output_cols=output_cols,
return self.apply(input_cols, func=_match, args=(regex,), func_type="column_expr", func_return_type=str,
output_cols=output_cols,
meta_action=Actions.REPLACE_REGEX.value)

def replace(self, input_cols, search=None, replace_by=None, search_by="chars", output_cols=None):
Expand Down Expand Up @@ -1412,7 +1413,8 @@ def _unnest(row):

else:
RaiseIt.type_error(input_col, ["string", "struct", "array", "vector"])
df.meta = Meta.set(df.meta, value=df.meta.preserve(None, Actions.UNNEST.value, [v for k, v in final_columns]).get())
df.meta = Meta.set(df.meta,
value=df.meta.preserve(None, Actions.UNNEST.value, [v for k, v in final_columns]).get())
return df

@staticmethod
Expand Down

0 comments on commit 217ad16

Please sign in to comment.