Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENH] Pyjanitor for PySpark #504

Closed
zjpoh opened this issue Jul 27, 2019 · 22 comments
Closed

[ENH] Pyjanitor for PySpark #504

zjpoh opened this issue Jul 27, 2019 · 22 comments
Labels
enhancement New feature or request good advanced issue Issues that would require Python trickery to get an elegant implementation question Further information is requested

Comments

@zjpoh
Copy link
Collaborator

zjpoh commented Jul 27, 2019

Brief Description

I would like to know if there are any interest to create pyjanitor for pyspark? I'm using pyspark a lot and I would really like use custom method chaining to clean up my ETL code.

I'm not sure if it is doable or how easy it is but I would be open to explore.

@ericmjl
Copy link
Member

ericmjl commented Jul 27, 2019

Thanks for pinging in, @zjpoh!

I'm more of a Dask user, and even then I don't use the Dask DataFrame API much, as my datasets are generally small (compared to the scale of what generally parallelization engines are good for). So I don't have much experience with PySpark, and hence I'd have to defer to someone else on how to extend pyspark DataFrames with custom method chainable functions.

If you're up for it, I'd love to see what's possible!

@zjpoh
Copy link
Collaborator Author

zjpoh commented Jul 27, 2019

Sure. I'll explore how pandas_flavor and pyspark DataFrame method work.

@anzelpwj
Copy link
Collaborator

I'd be interested in helping out with his @zjpoh - we use Spark DataFrames all the time at work.

@ericmjl ericmjl added enhancement New feature or request good advanced issue Issues that would require Python trickery to get an elegant implementation question Further information is requested labels Jul 28, 2019
@zjpoh
Copy link
Collaborator Author

zjpoh commented Jul 28, 2019

It works by copying code from pandas and pandas_flavor and make a single adjustment (import pyspark.sql.DataFrame instead of pd.DataFrame).

Here is the code to create accessor (we should be able to further simply this).

from functools import wraps

# From https://github.com/pandas-dev/pandas/blob/d1accd032b648c9affd6dce1f81feb9c99422483/pandas/core/accessor.py#L153-L198
class CachedAccessor:
    """
    Custom property-like object (descriptor) for caching accessors.
    Parameters
    ----------
    name : str
        The namespace this will be accessed under, e.g. ``df.foo``
    accessor : cls
        The class with the extension methods. The class' __init__ method
        should expect one of a ``Series``, ``DataFrame`` or ``Index`` as
        the single argument ``data``
    """

    def __init__(self, name, accessor):
        self._name = name
        self._accessor = accessor

    def __get__(self, obj, cls):
        if obj is None:
            # we're accessing the attribute of the class, i.e., Dataset.geo
            return self._accessor
        accessor_obj = self._accessor(obj)
        # Replace the property with the accessor object. Inspired by:
        # http://www.pydanny.com/cached-property.html
        # We need to use object.__setattr__ because we overwrite __setattr__ on
        # NDFrame
        object.__setattr__(obj, self._name, accessor_obj)
        return accessor_obj

def _register_accessor(name, cls):
    def decorator(accessor):
        if hasattr(cls, name):
            warnings.warn(
                "registration of accessor {!r} under name {!r} for type "
                "{!r} is overriding a preexisting attribute with the same "
                "name.".format(accessor, name, cls),
                UserWarning,
                stacklevel=2,
            )
        setattr(cls, name, CachedAccessor(name, accessor))
        #cls._accessors.add(name)
        return accessor

    return decorator

# From https://github.com/pandas-dev/pandas/blob/d1accd032b648c9affd6dce1f81feb9c99422483/pandas/core/accessor.py#L278-L281
def register_dataframe_accessor(name):
    # I changed the pd.DataFrame import to pyspark.sq.DataFrame
    from pyspark.sql import DataFrame

    return _register_accessor(name, DataFrame)

# From https://github.com/Zsailer/pandas_flavor/blob/8bd43b7fd62be43bb2374299c887009e07027190/pandas_flavor/register.py#L6-L35
def register_dataframe_method(method):
    """Register a function as a method attached to the Pandas DataFrame.
    Example
    -------
    .. code-block:: python
        @register_dataframe_method
        def print_column(df, col):
            '''Print the dataframe column given'''
            print(df[col])
    """
    def inner(*args, **kwargs):

        class AccessorMethod(object):


            def __init__(self, pandas_obj):
                self._obj = pandas_obj

            @wraps(method)
            def __call__(self, *args, **kwargs):
                return method(self._obj, *args, **kwargs)

        register_dataframe_accessor(method.__name__)(AccessorMethod)

        return method

    return inner()

Then running the following test code

import numpy as np
import string


rng = np.random.RandomState(0)
df = spark.createDataFrame(rng.uniform(size=[10, 5]).tolist())

@register_dataframe_method
def clean_names(df):
    l = [f"{col} AS {string.ascii_lowercase[int(col.replace('_', ''))]}" for col in df.columns]
    return df.selectExpr(*l)

df.clean_names().show()

gives

+-------------------+-------------------+-------------------+-------------------+--------------------+
|                  b|                  c|                  d|                  e|                   f|
+-------------------+-------------------+-------------------+-------------------+--------------------+
| 0.5488135039273248| 0.7151893663724195| 0.6027633760716439| 0.5448831829968969|  0.4236547993389047|
| 0.6458941130666561| 0.4375872112626925| 0.8917730007820798| 0.9636627605010293|  0.3834415188257777|
| 0.7917250380826646| 0.5288949197529045| 0.5680445610939323|  0.925596638292661| 0.07103605819788694|
|0.08712929970154071|0.02021839744032572|  0.832619845547938| 0.7781567509498505|  0.8700121482468192|
|  0.978618342232764| 0.7991585642167236|0.46147936225293185| 0.7805291762864555| 0.11827442586893322|
| 0.6399210213275238| 0.1433532874090464| 0.9446689170495839| 0.5218483217500717|  0.4146619399905236|
|0.26455561210462697| 0.7742336894342167|0.45615033221654855| 0.5684339488686485|0.018789800436355142|
| 0.6176354970758771| 0.6120957227224214| 0.6169339968747569| 0.9437480785146242|  0.6818202991034834|
|  0.359507900573786|0.43703195379934145| 0.6976311959272649|0.06022547162926983|  0.6667667154456677|
| 0.6706378696181594| 0.2103825610738409| 0.1289262976548533|0.31542835092418386|  0.3637107709426226|
+-------------------+-------------------+-------------------+-------------------+--------------------+

@zjpoh
Copy link
Collaborator Author

zjpoh commented Aug 1, 2019

@ericmjl Any thoughts on the best place to add this? I'm more inclined to add this as a separate module (janitor.pyspark) as it's not using pandas.

@ericmjl
Copy link
Member

ericmjl commented Aug 1, 2019

@zjpoh definitely this would need some thought.

As the package is currently architected, we have everything depending on pandas as a backend by default.

If we were to structure this in a more logical fashion, there would be the functions, which wouldn’t be decorated, and then there would be individual backends in which the functions are decorated/wrapped and attached to individual dataframe implementations.

We can maybe get this kickstarted by setting up a janitor.backends.pyspark namespace, where the functions that you’re interested in are first wrapped and tested that they work. (I would import pyspark into the janitor namespace so that you can import janitor.pyspark, but keep a backends submodule nonetheless for other backends.) To keep the effort small, we can focus on only getting the functions you’re interested in first.

Admittedly, the organization of the package has to be rethought here in order to support multiple dataframe implementations.

What are your thoughts here?

@anzelpwj
Copy link
Collaborator

anzelpwj commented Aug 4, 2019

I wonder about the backend approach, since I'll sometimes create Pandas DFs from Spark ones (and what if I want to call both Pandas janitor functions alongside Spark ones?). I'd personally vote for @zjpoh 's approach.

@ericmjl
Copy link
Member

ericmjl commented Aug 4, 2019

Let's work backwards from the front-facing API, keeping in mind that in adding support for pyspark:

  1. nothing should change for pandas users, who are currently the primary audience for pyjanitor, and
  2. the maintenance burden for the maintainers should be kept to a minimum.

Here are my reasons for proposing a janitor.backends namespace, with a few examples that also contain some updates to my thinking having thought about it a bit more.

The first reason is preventing code duplication. Using clean_names as an example, as one of the project maintainers, I'd like to avoid maintaining two copies of the same function. (Keep in mind, though, that even if we can't find a solution to this, I don't mind having a pyspark-centric fork of the project, with all of the functions re-implemented to work with pyspark, and development potentially synced up!) Avoiding function duplication would most likely mean that the pyspark function must import the clean_names version that is currently implemented for pandas. This might be accomplishable by organizing data cleaning functions and backends in the following fashion:

Functions stay where they are.

# functions.py

@pf.register_dataframe_method
def clean_names(...):
    ...

pyspark backend imports the functions that are relevant.

# janitor/backends/pyspark.py

from janitor.functions import clean_names

def register_dataframe_method(...):
    ...

clean_names = register_dataframe_method(clean_names)

We provide a namespace in __init__.py.

# janitor/__init__.py
from .backends import pyspark as pyspark_backend

End users imports appropriate backend.

# in a jupyter notebook, for e.g.
import janitor.pyspark_backend

I'm not 100% sure whether this will work in a nice and clean fashion, so we probably have to test whether this works out.

The second reason relates to semantics. Semantically, for the organization of the project, we currently have the submodules functions, biology, chemistry, finance, and engineering, which individuals can directly import. Adding a pyspark namespace at the same level might imply semantic equivalence between what really is a "backend" and the rest (being "subject domains"). On the other hand, structuring it as backends/pyspark, and then importing the pyspark namespace into the top-level namespace as pyspark_backend is ok from my perspective, as it make the semantics much clearer. (End-users would write: import janitor.pyspark_backend in my mind.) What are your thoughts here?

If both of you are willing to make the contribution work, I'm happy to add both of you as maintainers (so we can get some help + expertise using Spark on the team)! I am admittedly quite clueless about Spark, as I have been using Dask all this while, and usually only go to Dask for numerical data manipulations more than I do the string manipulation stuff (in pandas). Here I've been focused on the maintainability question - it's always exciting to contribute new things (and I love them, don't get me wrong!) but I have to also consider who on the team can maintain the new shiny thing going forward too.

Let me know what your thoughts are, having read what hopefully was (to you) a coherent block of text above. (If things are unclear, please let me know, I'm happy to address your questions about my thoughts.)

@zjpoh
Copy link
Collaborator Author

zjpoh commented Aug 4, 2019

I agree that keeping pyspark as backend (janitor.backends.pyspark) makes more sense than having the backend functions (register_dataframe_method etc) at the same level as functions, biology, etc.

I'm more than happy to be one of the maintainers of the pyspark version. I'm starting to think that having pyspark as a separate module (something like janitor.pandas and janitor.pyspark) and work closely on developing functionalities might be easier because I'm not sure how much of the current pandas-based functions can be reused since pandas and pyspark API are quite different. If in the future, we would like to develop something similar for Dask, we can have janitor.dask. Or maybe another way is to have a separate fork for pyspark.

Do you know if R has different types of DataFrame such as pandas, pyspark, and Dask? If so, do you know how janitor in R handles these?

@anzelpwj
Copy link
Collaborator

anzelpwj commented Aug 4, 2019

Well, there are Spark dataframes in R via sparklyr.

If we do use backends, how do we put them in the codebase? Should each function have a big if statement to demonstrate which code is run if in Spark versus Dask vs whatever? I know for some functions the spark API differs from Pandas. Similarly, would this require people to install Spark on their system to run tests (even when they are not testing Spark code)?

@anzelpwj
Copy link
Collaborator

anzelpwj commented Aug 5, 2019

Also, are we thinking of using a context manager ( with janitor.backend(...) ) to determine which version gets called if it's not the primary backends?

@zjpoh
Copy link
Collaborator Author

zjpoh commented Aug 5, 2019

According to my understanding, pyjanitor adds additional method such as clean_names to the pandas DataFrame class. So the pyspark backend would simply adds clean_names to pyspark DataFrame class. We might need to have two different clean_names, one with register_dataframe_method decorator (for pandas) and the other with register_pyspark_dataframe_method decorator.

In terms of testing, we can simply circumvent pyspark testing with pyspark is not installed. Something like

try:
    import pyspark
except ImportError:
    pyspark = None

@pytest.mark.skipif(pyspark is None, reason="requires pyspark")
def test_pyspark():
    ...

@ericmjl
Copy link
Member

ericmjl commented Aug 5, 2019

Thanks both of you, @anzelpwj and @zjpoh, for chiming in!

Also, are we thinking of using a context manager ( with janitor.backend(...) ) to determine which version gets called if it's not the primary backends?

That could be a good idea! I haven't done much with context managers myself, so I'd be curious to see how this gets implemented.

I'm not sure how much of the current pandas-based functions can be reused since pandas and pyspark API are quite different

I took a second look, and indeed, it looks like the pyspark API is quite different from the pandas API. Looks like my preference for a single implementation is going to be a tough proposition.

I'm more than happy to be one of the maintainers of the pyspark version

This actually changes the calculus for maintainability! 😄 @anzelpwj, would you be on-board?

Thinking forward, if both of you come on board and develop pyspark-oriented implementations of the data cleaning functions, I have no issues accepting it into the codebase. We can think about future directions slowly - options available are keeping it part of pyjanitor, breaking it out into its own separate package altogether, and whatever creative ideas may exist in between. (If you guys decide to make a new package altogether, I'm happy to help you all set up the basic stuff, e.g. setup.py, getting a conda package going, setting up the right deprecation warnings in pyjanitor, etc.)

Before that happens, though, let's flesh things out by focusing on only clean_names, and architecting the user-facing portion in a nice way. @zjpoh, if you'd like to put in a PR that fleshes the details out for that function, I'd be happy to review it with you and document how to make things work. And I'd love input from you on the PR, @anzelpwj!

@zjpoh
Copy link
Collaborator Author

zjpoh commented Aug 5, 2019

I really like the idea of starting simple to flesh out the implementation details. I will spend some evenings next week and hopefully that will be enough to put in a PR.

@ericmjl ericmjl changed the title Pyjanitor for PySpark [ENH] Pyjanitor for PySpark Aug 5, 2019
@ericmjl
Copy link
Member

ericmjl commented Aug 5, 2019

Thanks @zjpoh! Looking forward to reviewing your PR 😄

@anzelpwj
Copy link
Collaborator

anzelpwj commented Aug 5, 2019

I can definitely help with maintenance and general architecting.

@zjpoh
Copy link
Collaborator Author

zjpoh commented Aug 8, 2019

I'm a little overwhelmed with work and won't have time to do this until next week. Just FYI.

@ericmjl
Copy link
Member

ericmjl commented Aug 8, 2019

No worries, I totally understand. Same situation here 😄.

If you'd like to further narrow the scope of the PR, that's totally cool - in fact, it might be preferable, because it makes the review process much easier too!

@anzelpwj
Copy link
Collaborator

What's the initial plan, @zjpoh ? Do we want to create a submodule and just get one function up and running?

@zjpoh
Copy link
Collaborator Author

zjpoh commented Aug 20, 2019

@anzelpwj Sorry for the late response. I was on break for a little bit. 😉
I'm working on a submodule with the backend and the clean_names function. I will have the PR up probably by EOD tomorrow. I also wanted to test on a cluster to make sure that all the executors are aware of the additional method.

@zjpoh zjpoh mentioned this issue Aug 21, 2019
10 tasks
@zjpoh zjpoh closed this as completed Aug 29, 2019
@anzelpwj
Copy link
Collaborator

Sorry I haven't been able to be involved at all so far, kid's been sick a lot lately (just with "starting daycare means getting exposed to all the diseases"). Once things calm down, I can try to contribute more.

@zjpoh
Copy link
Collaborator Author

zjpoh commented Aug 30, 2019

No worries~~

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good advanced issue Issues that would require Python trickery to get an elegant implementation question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants