In [0]:
class DataSink:
    """
    Abstract class for loading a DataFrame to a sink.
    """

    def __init__(self, df, path, method, params):
        self.df = df
        self.path = path
        self.method = method
        self.params = params

    def load_dataframe(self):
        """
        Abstract method to be implemented by subclasses.
        """
        raise NotImplementedError("load_dataframe() is not implemented")


class LoadToDBFS(DataSink):
    def load_dataframe(self):
        self.df.write.mode(self.method).save(self.path)


class LoadToDBFSWithPartition(DataSink):
    def load_dataframe(self):
        partitionByColumns = self.params.get("partitionByColumns")
        self.df.write.mode(self.method).partitionBy(*partitionByColumns).save(self.path)


def get_sink_source(sink_type, df, path, method, params):
    if sink_type == "dbfs":
        return LoadToDBFS(df, path, method, params)
    elif sink_type == "dbfs_with_partition":
        return LoadToDBFSWithPartition(df, path, method, params)
    else:
        raise ValueError(f"Not implemented for sink type: {sink_type}")
