In [0]:
from loguru import logger
from pyspark.sql.functions import col, trim, to_timestamp, date_format, when

In [0]:

dbutils.widgets.text("Mode", "")
dbutils.widgets.text("process_path", "")

In [0]:

mode = dbutils.widgets.get("Mode")
process_path = dbutils.widgets.get("process_path")

In [0]:

class fact_quality_material_movement:

    def __init__(self):
        pass 
    def transform_invoke(self):
        try:
            logger.info("Starting material document transformation process...")

            df_mseg = spark.sql("SELECT * FROM cur.mseg")
            df_mkpf = spark.sql("SELECT * FROM cur.mkpf")

            joined_df = df_mkpf.join(
                df_mseg,
                (df_mkpf["MBLNR"] == df_mseg["MBLNR"]) &
                (df_mkpf["MJAHR"] == df_mseg["MJAHR"]),
                how='left'
            )

            renamed_df = joined_df.select(
                col("mkpf.MBLNR").alias("MATERIAL_DOC_NO"),
                col("mkpf.MJAHR").alias("MATERIAL_DOC_YEAR"),
                col("mseg.ZEILE").alias("MATERIAL_DOC_ITEM"),
                col("mseg.BWART").alias("MOVEMENT_TYPE"),
                col("mseg.MATNR").alias("MATERIAL_NO"),
                col("mseg.CHARG").alias("BATCH_ID"),
                col("mseg.MENGE").alias("QUANTITY"),
                col("mseg.ERFME").alias("UNIT_OF_ENTRY"),
                col("mseg.DMBTR").alias("AMOUNT_IN_LC"),
                col("mseg.LGORT").alias("SENDER_STORAGE_LOCATION"),
                col("mseg.WERKS").alias("SENDER_PLANT"),
                col("mseg.UMLGO").alias("RECEIVING_STORAGE_LOCATION"),
                col("mseg.UMWRK").alias("RECEIVING_PLANT"),
                col("mkpf.BUDAT").alias("POSTING_DATE"),
                col("mkpf.CPUDT").alias("ENTRY_DATE"),
                col("mkpf.CPUTM").alias("ENTRY_TIME"),
                col("mkpf.USNAM").alias("USERNAME"),
                col("mseg.KUNNR").alias("CUSTOMER_ID"),
                col("mseg.MAT_KDAUF").alias("SALES_ORDER_NO"),
                col("mseg.MAT_KDPOS").alias("SALES_ORDER_ITEM"),
                col("mseg.FISTL").alias("FUNDS_CENTER"),
                col("mseg.MAT_KDAUF"),
                col("mseg.MAT_KDPOS")
            )

            transformed_df = renamed_df.withColumn(
                "MATERIAL_NO", trim(col("MATERIAL_NO"))
            ).withColumn(
                "CUSTOMER_ID", trim(col("CUSTOMER_ID"))
            ).withColumn(
                "ENTRY_TIME", date_format(
                    to_timestamp(col("ENTRY_TIME"), "HHmmss"), "HH:mm"
                )
            ).withColumn(
                "SALES_ORDER_NO",
                when(
                    (col("SALES_ORDER_NO").isNull()) | (col("SALES_ORDER_NO") == ""),
                    col("MAT_KDAUF")
                ).otherwise(col("SALES_ORDER_NO"))
            ).withColumn(
                "SALES_ORDER_ITEM",
                when(
                    (col("SALES_ORDER_ITEM").isNull()) |
                    (col("SALES_ORDER_ITEM") == "") |
                    (col("SALES_ORDER_ITEM") == "000000"),
                    col("MAT_KDPOS")
                ).otherwise(col("SALES_ORDER_ITEM"))
            ).drop("MAT_KDAUF", "MAT_KDPOS")

            logger.success("Material document transformation completed successfully.")

        except Exception as e:
            logger.exception("Error occurred during material document transformation.")
            raise e
        return transformed_df

In [0]:
if __name__ == "__main__":
    transformer = fact_quality_material_movement()
    final_df = transformer.transform_invoke()
     
    
    



In [0]:
final_df.write.format("parquet").mode(mode).save(process_path)