Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimal Fixes #742

Merged
merged 4 commits into from Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/conf.py
Expand Up @@ -60,7 +60,7 @@
# The short X.Y version.
version = '2.2'
# The full version, including alpha/beta/rc tags.
release = "2.2.25"
release = "2.2.26"

# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
Expand Down
19 changes: 12 additions & 7 deletions optimus/dataframe/columns.py
Expand Up @@ -1117,7 +1117,7 @@ def impute(input_cols, data_type="continuous", strategy="mean", output_cols=None
Imputes missing data from specified columns using the mean or median.
:param input_cols: list of columns to be analyze.
:param output_cols:
:param data_type: continuous or categorical
:param data_type: "continuous" or "categorical"
:param strategy: String that specifies the way of computing missing data. Can be "mean", "median" for continuous
or "mode" for categorical columns
:return: Dataframe object (DF with columns that has the imputed values).
Expand Down Expand Up @@ -1500,7 +1500,6 @@ def _final_columns(_index, _splits, _output_col):
_index = [i - 1 for i in _index]

actual_index = _index
print("actual_index",actual_index)

# Create final output columns
if is_tuple(_output_col):
Expand All @@ -1527,9 +1526,7 @@ def _final_columns(_index, _splits, _output_col):
splits = format_dict(df.agg(F.max(F.size(input_col))).to_dict())

expr = F.col(input_col)
print(index, splits, output_col)
final_columns = _final_columns(index, splits, output_col)
print("final_columns", final_columns)
for i, col_name in final_columns:
df = df.withColumn(col_name, expr.getItem(i))

Expand All @@ -1545,7 +1542,6 @@ def _final_columns(_index, _splits, _output_col):
expr = F.split(F.col(input_col), separator)
final_columns = _final_columns(index, splits, output_col)
for i, col_name in final_columns:
print(i, col_name)
df = df.withColumn(col_name, expr.getItem(i))

# Vector
Expand Down Expand Up @@ -2125,7 +2121,12 @@ def join_all(_dfs):

@add_attr(cols)
def string_to_index(input_cols, output_cols=None):

"""
Encodes a string column of labels to a column of label indices
:param input_cols:
:param output_cols:
:return:
"""
df = self

input_cols = parse_columns(df, input_cols)
Expand All @@ -2140,12 +2141,16 @@ def bucketizer(input_cols, splits, output_cols=None):
"""
Bucketize multiples columns at the same time.
:param input_cols:
:param splits: Dict of splits. You can use create_buckets() to make it
:param splits: Dict of splits or ints. You can use create_buckets() to make it
:param output_cols:
:return:
"""
df = self

if is_int(splits):
min_max = df.cols.range(input_cols)[input_cols]["range"]
splits = create_buckets(min_max["min"], min_max["max"], splits)

def _bucketizer(col_name, args):
"""
Create a column expression that create buckets in a range of values
Expand Down
4 changes: 2 additions & 2 deletions optimus/dataframe/extension.py
Expand Up @@ -452,15 +452,15 @@ def send(self, name=None, infer=True, mismatch=None, stats=True):
if name is not None:
df.set_name(name)

result = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR,
columns, output = Profiler.instance.dataset(df, columns="*", buckets=35, infer=infer, relative_error=RELATIVE_ERROR,
approx_count=True,
sample=10000,
stats=stats,
format="json",
mismatch=mismatch)

if Comm:
Comm.instance.send(result)
Comm.instance.send(output)
else:
raise Exception("Comm is not initialized. Please use comm=True param like Optimus(comm=True)")

Expand Down
1 change: 1 addition & 0 deletions optimus/helpers/constants.py
Expand Up @@ -29,6 +29,7 @@
"array": "array",
"null": "null"
}

PYTHON_TYPES = {"string": str, "int": int, "float": float, "boolean": bool}

PYSPARK_NUMERIC_TYPES = ["byte", "short", "big", "int", "double", "float"]
Expand Down
12 changes: 6 additions & 6 deletions optimus/profiler/profiler.py
Expand Up @@ -217,10 +217,11 @@ def to_file(self, path=None, output="html"):
RaiseIt.type_error(output, ["html", "json"])

def to_json(self, df, columns="*", buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True,
sample=10000, stats=True, format="json", mismatch=None):
return self.dataset(df, columns=columns, buckets=buckets, infer=infer, relative_error=relative_error,
approx_count=approx_count,
sample=sample, stats=stats, format=format, mismatch=mismatch)
sample=10000, stats=True, mismatch=None):
columns, output = self.dataset(df, columns=columns, buckets=buckets, infer=infer, relative_error=relative_error,
approx_count=approx_count,
sample=sample, stats=stats, format="json", mismatch=mismatch)
return output

def dataset(self, df, columns="*", buckets=10, infer=False, relative_error=RELATIVE_ERROR, approx_count=True,
sample=10000, stats=True, format="dict", mismatch=None):
Expand Down Expand Up @@ -383,9 +384,8 @@ def match_names(_col_names):
result = json.dumps(output_columns, ignore_nan=True, default=json_converter)
else:
result = output_columns
# print(format)

self.output_columns = result
self.output_columns = output_columns
# print(result)
df = df.set_meta("transformations.actions", {})

Expand Down
2 changes: 1 addition & 1 deletion optimus/version.py
Expand Up @@ -5,5 +5,5 @@ def _safe_int(string):
return string


__version__ = '2.2.25'
__version__ = '2.2.26'
VERSION = tuple(_safe_int(x) for x in __version__.split('.'))
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -59,7 +59,7 @@ def readme():
author='Favio Vazquez and Argenis Leon',
author_email='argenisleon@gmail.com',
url='https://github.com/ironmussa/Optimus/',
download_url='https://github.com/ironmussa/Optimus/archive/2.2.25.tar.gz',
download_url='https://github.com/ironmussa/Optimus/archive/2.2.26.tar.gz',
description=('Optimus is the missing framework for cleaning and pre-processing data in a distributed fashion with '
'pyspark.'),
long_description=readme(),
Expand Down