In [0]:
from pyspark.sql import functions as F
from pyspark.sql import DataFrame

In [0]:
path = '/Volumes/my_workspace/my_schema/my_volume'

in_xlsx = f"{path}/ERGO_test.xlsx"
out_xlsx = f"{path}/processed.xlsx"

In [0]:
df = (
    spark.read.format("com.crealytics.spark.excel")
        .option("header", "true")
        .option("inferSchema", "true")
        .load(in_xlsx)
)

display(df)

In [0]:
class PipelineStep:
    def run(self, data: DataFrame) -> DataFrame:
        raise NotImplementedError("Add run method!")


class Pipeline:
    def __init__(self):
        self.steps = []

    def add_step(self, step: PipelineStep):
        self.steps.append(step)

    def run(self):
        data = None
        for step in self.steps:
            data = step.run(data)
        return data
    
    
class Load_xlsx(PipelineStep):
    def __init__(self, input_path):
        self.input_path = input_path

    def run(self, data=None):
        return (
            spark.read.format("com.crealytics.spark.excel")
                .option("header", "true")
                .option("inferSchema", "true")
                .load(self.input_path)
        )
    

class Filter(PipelineStep):
    def run(self, data: DataFrame):
        return data.filter(F.col('quantity') > 10)
    

class Total_revenue(PipelineStep):
    def run(self, data: DataFrame):
        return (
            data
                .withColumn('total', F.col('quantity') * F.col('price'))
                .groupBy('store_id')
                .agg(F.sum('total'))
        )
    

class Save_xlsx(PipelineStep):
    def __init__(self, output_path):
        self.output_path = output_path

    def run(self, data: DataFrame):
        (
            data.write.format("com.crealytics.spark.excel")
                .option("header", "true")
                .mode("overwrite")
                .save(self.output_path)
        )
        return data

In [0]:
pipeline = Pipeline()
pipeline.add_step(Load_xlsx(in_xlsx))
pipeline.add_step(Filter())
pipeline.add_step(Total_revenue())
pipeline.add_step(Save_xlsx(out_xlsx))

In [0]:
process = pipeline.run()
display(process)