Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fugue implementation of report #208

Merged
merged 7 commits into from
Jun 1, 2023
Merged

Conversation

goodwanghan
Copy link
Contributor

@goodwanghan goodwanghan commented May 30, 2023

Close #206

@goodwanghan goodwanghan changed the title Add report to Fugue implementation (WIP) Add Fugue implementation of report May 31, 2023
@goodwanghan
Copy link
Contributor Author

@fdosani please take a look

@fdosani
Copy link
Member

fdosani commented May 31, 2023

@goodwanghan Thank you for the PR! Much appreciated. I was running through the example and I'm getting a strange error:

from io import StringIO
import pandas as pd
import datacompy

data1 = """acct_id,dollar_amt,name,float_fld,date_fld
10000001234,123.45,George Maharis,14530.1555,2017-01-01
10000001235,0.45,Michael Bluth,1,2017-01-01
10000001236,1345,George Bluth,,2017-01-01
10000001237,123456,Bob Loblaw,345.12,2017-01-01
10000001239,1.05,Lucille Bluth,,2017-01-01
"""

data2 = """acct_id,dollar_amt,name,float_fld
10000001234,123.4,George Michael Bluth,14530.155
10000001235,0.45,Michael Bluth,
10000001236,1345,George Bluth,1
10000001237,123456,Robert Loblaw,345.12
10000001238,1.05,Loose Seal Bluth,111
"""

df1 = pd.read_csv(StringIO(data1))
df2 = pd.read_csv(StringIO(data2))

datacompy.is_match(
    df1,
    df2,
    join_columns='acct_id',
    abs_tol=0,
    rel_tol=0,
    df1_name='Original',
    df2_name='New',
    parallelism=2
)

It yields the following error:

_1 _State.RUNNING -> _State.FAILED  'date_fld'
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-7-fe5bae9dab15> in ?()
----> 1 datacompy.is_match(
      2     df1,
      3     df2,
      4     join_columns='acct_id',

~/Documents/git/datacompy/datacompy/fuguecompare.py in ?(df1, df2, join_columns, abs_tol, rel_tol, df1_name, df2_name, ignore_spaces, ignore_case, cast_column_names_lower, parallelism, strict_schema)
    125             cast_column_names_lower=cast_column_names_lower,
    126             parallelism=parallelism,
    127             strict_schema=strict_schema,
    128         )
--> 129     except _StrictSchemaError:
    130         return False
    131 
    132     return all(matches)

~/Documents/git/datacompy/datacompy/fuguecompare.py in ?(df1, df2, join_columns, return_obj_func, abs_tol, rel_tol, df1_name, df2_name, ignore_spaces, ignore_case, cast_column_names_lower, parallelism, strict_schema)
    527         )
    528         return [[pickle.dumps(return_obj_func(comp))]]
    529 
    530     objs = fa.as_array(
--> 531         fa.transform(
    532             ser, _comp, schema="obj:binary", partition=dict(by="key", num=bucket)
    533         )
    534     )

~/miniconda3/envs/datacompy/lib/python3.10/site-packages/fugue/workflow/api.py in ?(df, using, schema, params, partition, callback, ignore_errors, persist, as_local, save_path, checkpoint, engine, engine_conf, as_fugue)
    170             tdf.yield_dataframe_as("result", as_local=as_local)
    171         else:
    172             tdf.save(save_path, fmt="parquet")
    173 
--> 174     dag.run(make_execution_engine(engine, conf=engine_conf, infer_by=[df]))
    175     if checkpoint:
    176         result = dag.yields["result"].result  # type:ignore
    177     else:

~/miniconda3/envs/datacompy/lib/python3.10/site-packages/fugue/workflow/workflow.py in ?(self, engine, conf, **kwargs)
   1600                         lambda x: any(x.lower().startswith(xx) for xx in pre),
   1601                     )
   1602                     if ctb is None:  # pragma: no cover
   1603                         raise
-> 1604                     raise ex.with_traceback(ctb)
   1605                 self._computed = True
   1606         return FugueWorkflowResult(self.yields)

~/Documents/git/datacompy/datacompy/fuguecompare.py in ?(df)
    511             .reset_index(drop=True)
    512         )
    513         df2 = (
    514             pd.concat([pickle.loads(r["data"]) for r in df if not r["left"]])
--> 515             .sort_values(all_cols)
    516             .reset_index(drop=True)
    517         )
    518         comp = Compare(

~/miniconda3/envs/datacompy/lib/python3.10/site-packages/pandas/core/generic.py in ?(self, key, axis)
   1846                 .get_level_values(key)  # type: ignore[assignment]
   1847                 ._values
   1848             )
   1849         else:
-> 1850             raise KeyError(key)
   1851 
   1852         # Check for duplicates
   1853         if values.ndim > 1:

KeyError: 'date_fld'

I think it has to do with the date_fld being in df1 but not df2.

@fdosani
Copy link
Member

fdosani commented May 31, 2023

So maybe the solution here is to change the lines
.sort_values(all_cols) to .sort_values(tdf1.schema.names) and .sort_values(tdf2.schema.names) respectively.
Might worth adding in some tests too for where all columns match and where there is a mismatch. Thoughts?

@goodwanghan
Copy link
Contributor Author

goodwanghan commented May 31, 2023

So maybe the solution here is to change the lines .sort_values(all_cols) to .sort_values(tdf1.schema.names) and .sort_values(tdf2.schema.names) respectively. Might worth adding in some tests too for where all columns match and where there is a mismatch. Thoughts?

Yes, I think you are right, I will fix and add unit tests to it, thanks for reporting!

One thing we must keep in mind .sort_values(tdf2.schema.names) is not ok, because this will make the whole tdf2 to be serialized, which could fail to take a lot of time. We must do df2_cols=tdf2.schema.names out of the transform function and then .sort_values(df2_cols) inside the function to avoid incorrect serialization.

@goodwanghan
Copy link
Contributor Author

I have fixed both issues @fdosani

@fdosani fdosani self-requested a review June 1, 2023 14:36
Copy link
Member

@fdosani fdosani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you again @goodwanghan ❤️

@fdosani fdosani merged commit 3587311 into capitalone:develop Jun 1, 2023
15 checks passed
@fdosani fdosani mentioned this pull request Jun 1, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Fugue Phase 2 functionality
2 participants