Skip to content

Commit

Permalink
Updated line 196 to assure storage_options is passed to read_parquet …
Browse files Browse the repository at this point in the history
…if not using Pandas, since Dask requires storage_options. Also changed line mlrun#226 from inplace=True to inplace=False.  Dask does not support dropping columns inplace.  Alternatively, we could test if the df is hasattr 'dask' and only apply if inplace, but this is a more general behavior.
  • Loading branch information
Greg committed Jul 23, 2021
1 parent d4f680e commit 0dba58c
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions mlrun/datastore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from os import getenv, path, remove
from tempfile import mktemp

import dask.dataframe as dd
import fsspec
import pandas as pd
import pyarrow.parquet as pq
Expand Down Expand Up @@ -144,14 +145,15 @@ def as_df(
kwargs["columns"] = columns

def reader(*args, **kwargs):
import pdb;pdb.set_trace()
if start_time or end_time:
if sys.version_info < (3, 7):
raise ValueError(
f"feature not supported for python version {sys.version_info}"
)

from storey.utils import find_filters

import pdb;pdb.set_trace()
dataset = pq.ParquetDataset(args[0], filesystem=fs)
if dataset.partitions:
partitions = dataset.partitions.partition_names
Expand Down Expand Up @@ -190,14 +192,15 @@ def reader(*args, **kwargs):

fs = self.get_filesystem()
if fs:
if fs.isdir(url):
if fs.isdir(url) or df_module != pd:
# Dask requires the storage_options parameter
storage_options = self.get_storage_options()
if storage_options:
kwargs["storage_options"] = storage_options
return reader(url, **kwargs)
else:
# If not dir, use fs.open() to avoid regression when pandas < 1.2 and does not
# support the storage_options parameter.
# support the storage_options parameter.
return reader(fs.open(url), **kwargs)

tmp = mktemp()
Expand All @@ -220,6 +223,9 @@ def _drop_reserved_columns(df):
for col in df.columns:
if col.startswith("igzpart_"):
cols_to_drop.append(col)
if hasattr(df, 'dask'):
# Dask does not support dropping columns inplace
df = df.drop(labels=)
df.drop(labels=cols_to_drop, axis=1, inplace=True, errors="ignore")


Expand Down

0 comments on commit 0dba58c

Please sign in to comment.