In [0]:
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql.functions import col
from pyspark.sql.functions import trim, when, lit
from pyspark.sql.functions import col, trim, when, date_format, to_timestamp
from loguru import logger

In [0]:
dbutils.widgets.text("process_path", "")
dbutils.widgets.text("Mode", "")
process_path=dbutils.widgets.get("process_path")
Mode=dbutils.widgets.get("Mode")

In [0]:
from pyspark.sql.functions import col, trim, when, to_timestamp, date_format

class fact_quality_material_movement:
    def __init__(self):
        pass

    def transform_and_invoke(self):
        mkpf = spark.sql("SELECT * FROM cur.mkpf")
        mseg = spark.sql("SELECT * FROM cur.mseg")

        mkpf_df= mkpf.alias("mkpf")
        mseg_df = mseg.alias("mseg")
        # Join the DataFrames using aliases to avoid ambiguity
        joined_df = mkpf_df.join(
            mseg_df,
            on=[
                col("mkpf.MBLNR") == col("mseg.MBLNR"),
                col("mkpf.MJAHR") == col("mseg.MJAHR")
            ],
            how="left"
        )

        # Select and alias columns explicitly
        renamed_df = joined_df.select(
            col("mkpf.MBLNR").alias("MATERIAL_DOC_NO"),
            col("mkpf.MJAHR").alias("MATERIAL_DOC_YEAR"),
            col("mseg.ZEILE").alias("MATERIAL_DOC_ITEM"),
            col("mseg.BWART").alias("MOVEMENT_TYPE"),
            col("mseg.MATNR").alias("MATERIAL_NO"),
            col("mseg.CHARG").alias("BATCH_ID"),
            col("mseg.MENGE").alias("QUANTITY"),
            col("mseg.ERFME").alias("UNIT_OF_ENTRY"),
            col("mseg.DMBTR").alias("AMOUNT_IN_LC"),
            col("mseg.LGORT").alias("SENDER_STORAGE_LOCATION"),
            col("mseg.WERKS").alias("SENDER_PLANT"),
            col("mseg.UMLGO").alias("RECEIVING_STORAGE_LOCATION"),
            col("mseg.UMWRK").alias("RECEIVING_PLANT"),
            col("mkpf.BUDAT").alias("POSTING_DATE"),
            col("mkpf.CPUDT").alias("ENTRY_DATE"),
            col("mkpf.CPUTM").alias("ENTRY_TIME"),
            col("mkpf.USNAM").alias("USERNAME"),
            col("mseg.KUNNR").alias("CUSTOMER_ID"),
            col("mseg.KDAUF").alias("SALES_ORDER_NO"),
            col("mseg.KDPOS").alias("SALES_ORDER_ITEM"),
            col("mseg.MAT_KDAUF"),
            col("mseg.MAT_KDPOS")
        )

        # Apply transformations
        transformed_df = renamed_df \
            .withColumn("MATERIAL_NO", trim(col("MATERIAL_NO"))) \
            .withColumn("CUSTOMER_ID", trim(col("CUSTOMER_ID"))) \
            .withColumn("ENTRY_TIME", date_format(to_timestamp(col("ENTRY_TIME"), "HHmmss"), "HH:mm")) \
            .withColumn("SALES_ORDER_NO", when(
                (col("SALES_ORDER_NO").isNull()) | (col("SALES_ORDER_NO") == ""),
                col("MAT_KDAUF")
            ).otherwise(col("SALES_ORDER_NO"))) \
            .withColumn("SALES_ORDER_ITEM", when(
                (col("SALES_ORDER_ITEM").isNull()) | (col("SALES_ORDER_ITEM") == "") | (col("SALES_ORDER_ITEM") == "000000"),
                col("MAT_KDPOS")
            ).otherwise(col("SALES_ORDER_ITEM"))) \
            .drop("MAT_KDAUF", "MAT_KDPOS")

        return transformed_df


In [0]:
if __name__ == "__main__":
    try:
        transformer = fact_quality_material_movement()
        final_df = transformer.transform_and_invoke()
        logger.info("Data transformation and write completed successfully.")

    except Exception as e:
        logger.error(f"Error occurred during processing: {e}")


In [0]:
final_df.write.format("parquet").mode(Mode).save(process_path)