In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, when, lit, split, concat, regexp_extract



In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("InspectRawData").getOrCreate()


In [None]:
# Azure Data Lake paths (replace placeholders with actual values)
storage_account_name = "datalakestoragetask"  # Replace with your storage account name
raw_container = "raw"
processed_container = "processed"
storage_key = ""  # Replace with your key or credential method

# Configure Spark to access Azure Data Lake
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net", storage_key)

# Define paths for each dataset
paths = {
    "order_raw": f"abfss://{raw_container}@{storage_account_name}.dfs.core.windows.net/order",
    "order_processed": f"abfss://{processed_container}@{storage_account_name}.dfs.core.windows.net/order/",

}


In [None]:
# Function to inspect a dataset
def inspect_dataset(name, path, format_type, options={}):
    print(f"\n=== Inspecting {name} Dataset ===")
    try:
        # Load dataset based on format
        df = spark.read.format(format_type).options(**options).load(path)
        
        # Show schema and a sample of the data
        df.printSchema()
        df.limit(30).show(truncate=False)
        
        # Return DataFrame for further analysis if needed
        return df
    except Exception as e:
        print(f"Error reading {name} data: {e}")
        return None

# Inspect datasets one by one
print("\n--- Starting Inspection ---\n")




--- Starting Inspection ---



In [None]:
# Step 1: Log the files being processed
print("\n--- Processing Order Files ---")
order_files = dbutils.fs.ls(paths["order_raw"])  # List all files in the folder

# Log all file names
print("Files in the Order folder:")
for file_info in order_files:
    print(f"- {file_info.name} (size: {file_info.size} bytes, modification time: {file_info.modificationTime})")

# Step 2: Read the Order dataset (combines all files in the folder)
# Order data (EDI format - read as plain text)
order_df = inspect_dataset(
    name="Order",
    path=paths["order_raw"],
    format_type="text"
)
#order_df = spark.read.text(paths["order_raw"])



--- Processing Order Files ---
Files in the Order folder:
- example-order-1.edi (size: 252 bytes, modification time: 1733905923000)
- example-order-2.edi (size: 261 bytes, modification time: 1733905923000)
- example-order-3.edi (size: 293 bytes, modification time: 1733905923000)
- example-order-4.edi (size: 249 bytes, modification time: 1733905923000)

=== Inspecting Order Dataset ===
root
 |-- value: string (nullable = true)

+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                     |
+----------------------------------------------------------------------------------------------------------------------------------------------------------+
|HDR;ORD;2.0;NOID;tequip;KDNR000001;Test.User;999999;;;2024-09-06T12:05;NORML;;;;;;;;test.user@ne

In [None]:
print("\n--- Processing Order Files ---")
order_files = dbutils.fs.ls(paths["order_raw"])  # List all files in the folder

# Step 1: Check for completely duplicate files
print("\n--- Checking for Completely Duplicate Files ---")
file_hashes = {}
duplicate_files = []
unique_files = []


for file_info in order_files:
    file_path = file_info.path
    file_content = dbutils.fs.head(file_path)  # Read the first part of the file content
    #file_content = dbutils.fs.head(file_path, 1024)  # Read the first 1024 bytes for hashing
    file_hash = hash(file_content)  # Compute a hash for the file content

    if file_hash in file_hashes:
        duplicate_files.append(file_info.name)
    else:
        file_hashes[file_hash] = file_info.name
        unique_files.append(file_path)

# Log duplicate files (if any)
if duplicate_files:
    print(f"Duplicate files found: {duplicate_files}")
else:
    print("No duplicate files found.")

# Step 2: Filter and keep unique files
#unique_files = [file_info for file_info in order_files if file_info.name not in duplicate_files]
unique_files = [file_info.path for file_info in order_files if file_info.name not in duplicate_files]

#order_df = spark.read.text(unique_files)

# Log unique files being processed
print("\n--- Unique Files to Be Processed ---")

for file_info in order_files:
    if file_info.path in unique_files:
        print(f"- {file_info.name} (size: {file_info.size} bytes)")


    

# Step 3: Create a temporary path for unique files (if needed)
# Here, we still use the folder path since all unique files are in the same folder
#unique_path = paths["order_raw"]  # Assuming unique files are left in the same folder after deduplication
 


--- Processing Order Files ---

--- Checking for Completely Duplicate Files ---
No duplicate files found.

--- Unique Files to Be Processed ---
- example-order-1.edi (size: 252 bytes)
- example-order-2.edi (size: 261 bytes)
- example-order-3.edi (size: 293 bytes)
- example-order-4.edi (size: 249 bytes)


In [None]:
print("=== Inspecting order Dataset ===")
order_df.printSchema()
display(order_df.limit(7).toPandas())  # Display as table-like format

=== Inspecting order Dataset ===
root
 |-- value: string (nullable = true)



value
HDR;ORD;2.0;NOID;tequip;KDNR000001;Test.User;999999;;;2024-09-06T12:05;NORML;;;;;;;;test.user@nexmart.com;;false;test.user@nexmart.com;765412345;
ADR;SND;;;;;;;;
POS;0;5;;DEMO;90001;;2.0;;;;;;
POS;0;5;;9999911111;9999911111;;50.0;;;;;;
QNT;SETU;PCE;
PRI;PCE;0.01;EUR;
PRI;SUM;0.52;EUR;


In [None]:
# Correcting the issue with file-wide order_id and proper interpretation of QNT
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, first, lit, last, when
from pyspark.sql.window import Window

# Initialize Spark session

#edi_data = spark.read.text(unique_files)

dataframes = []
for idx, file_path in enumerate(unique_files):
    edi_data = spark.read.text(file_path)
    edi_data = edi_data.withColumn("order_id", lit(f"order_{idx + 1}"))
    dataframes.append(edi_data)

# Combine all files into a single DataFrame
data_with_order_id = dataframes[0]
for df in dataframes[1:]:
    data_with_order_id = data_with_order_id.union(df)

# Step 1: Parse HDR Records (Header details)
hdr_df = data_with_order_id.filter(col("value").startswith("HDR")).select(
    col("order_id"),
    split(col("value"), ";").getItem(1).alias("transaction_type"),
    split(col("value"), ";").getItem(2).alias("version"),
    split(col("value"), ";").getItem(3).alias("identifier"),
    split(col("value"), ";").getItem(4).alias("supplier"),
    split(col("value"), ";").getItem(5).alias("customer_id"),
    split(col("value"), ";").getItem(6).alias("customer_name"),
    split(col("value"), ";").getItem(7).alias("document_number"),
    split(col("value"), ";").getItem(10).alias("order_date"),
    split(col("value"), ";").getItem(17).alias("email")
)

# Step 2: Parse POS Records (Product details)
pos_df = data_with_order_id.filter(col("value").startswith("POS")).select(
    col("order_id"),
    split(col("value"), ";").getItem(1).alias("position_id"),
    split(col("value"), ";").getItem(2).alias("line_number"),
    split(col("value"), ";").getItem(4).alias("product_id"),
    split(col("value"), ";").getItem(5).alias("alt_product_id"),
    split(col("value"), ";").getItem(6).alias("quantity"),
    split(col("value"), ";").getItem(7).alias("unit_of_measure")
)

# Step 3: Parse QNT Records (Quantity details)
qnt_df = data_with_order_id.filter(col("value").startswith("QNT")).select(
    col("order_id"),
    split(col("value"), ";").getItem(1).alias("quantity_type"),
    split(col("value"), ";").getItem(2).alias("unit_of_measure")
)

# Step 4: Parse PRI Records (Pricing details)
pri_df = data_with_order_id.filter(col("value").startswith("PRI")).select(
    col("order_id"),
    when(split(col("value"), ";").getItem(1) == "PCE", split(col("value"), ";").getItem(2)).alias("value_PCE"),
    when(split(col("value"), ";").getItem(1) == "PCE", split(col("value"), ";").getItem(3)).alias("currency_PCE"),
    when(split(col("value"), ";").getItem(1) == "SUM", split(col("value"), ";").getItem(2)).alias("value_SUM"),
    when(split(col("value"), ";").getItem(1) == "SUM", split(col("value"), ";").getItem(3)).alias("currency_SUM")
)

# Verify if SUM lines are correctly parsed
pri_df.show(truncate=False)

# Consolidate PRI data
pri_consolidated_df = pri_df.groupBy("order_id").agg(
    first("value_PCE", ignorenulls=True).alias("value_PCE"),
    first("currency_PCE", ignorenulls=True).alias("currency_PCE"),
    first("value_SUM", ignorenulls=True).alias("value_SUM"),
    first("currency_SUM", ignorenulls=True).alias("currency_SUM")
)

# Combine HDR, POS, QNT, and PRI data into a unified dataset
order_transformed = hdr_df.join(pos_df, "order_id", "left")
order_transformed = order_transformed.join(qnt_df, "order_id", "left")
order_transformed = order_transformed.join(pri_consolidated_df, "order_id", "left")


# Show the schema and data to verify the correctness
order_transformed.printSchema()
print("\n--- Order Processing Completed Successfully ---")

+--------+---------+------------+---------+------------+
|order_id|value_PCE|currency_PCE|value_SUM|currency_SUM|
+--------+---------+------------+---------+------------+
|order_1 |1.11     |EUR         |NULL     |NULL        |
|order_1 |NULL     |NULL        |11.10    |EUR         |
|order_2 |1.11     |EUR         |NULL     |NULL        |
|order_2 |NULL     |NULL        |15.54    |EUR         |
|order_3 |0.01     |EUR         |NULL     |NULL        |
|order_3 |NULL     |NULL        |0.52     |EUR         |
|order_4 |0.01     |EUR         |NULL     |NULL        |
|order_4 |NULL     |NULL        |0.14     |EUR         |
+--------+---------+------------+---------+------------+

root
 |-- order_id: string (nullable = false)
 |-- transaction_type: string (nullable = true)
 |-- version: string (nullable = true)
 |-- identifier: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- documen

In [None]:
pri_consolidated_df.printSchema()
pri_consolidated_df.limit(10).show(truncate=False)

root
 |-- order_id: string (nullable = false)
 |-- value_PCE: string (nullable = true)
 |-- currency_PCE: string (nullable = true)
 |-- value_SUM: string (nullable = true)
 |-- currency_SUM: string (nullable = true)

+--------+---------+------------+---------+------------+
|order_id|value_PCE|currency_PCE|value_SUM|currency_SUM|
+--------+---------+------------+---------+------------+
|order_1 |1.11     |EUR         |11.10    |EUR         |
|order_2 |1.11     |EUR         |15.54    |EUR         |
|order_3 |0.01     |EUR         |0.52     |EUR         |
|order_4 |0.01     |EUR         |0.14     |EUR         |
+--------+---------+------------+---------+------------+



In [None]:
print("=== Inspecting order Dataset ===")
order_transformed.printSchema()
display(order_transformed.limit(10).toPandas())  # Display as table-like format

=== Inspecting order Dataset ===
root
 |-- order_id: string (nullable = false)
 |-- transaction_type: string (nullable = true)
 |-- version: string (nullable = true)
 |-- identifier: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- document_number: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- email: string (nullable = true)
 |-- position_id: string (nullable = true)
 |-- line_number: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- alt_product_id: string (nullable = true)
 |-- quantity: string (nullable = true)
 |-- unit_of_measure: string (nullable = true)
 |-- quantity_type: string (nullable = true)
 |-- unit_of_measure: string (nullable = true)
 |-- value_PCE: string (nullable = true)
 |-- currency_PCE: string (nullable = true)
 |-- value_SUM: string (nullable = true)
 |-- currency_SUM: string (nullable = true)



  Duplicate column names found: ['order_id', 'transaction_type', 'version', 'identifier', 'supplier', 'customer_id', 'customer_name', 'document_number', 'order_date', 'email', 'position_id', 'line_number', 'product_id', 'alt_product_id', 'quantity', 'unit_of_measure', 'quantity_type', 'unit_of_measure', 'value_PCE', 'currency_PCE', 'value_SUM', 'currency_SUM']
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


order_id,transaction_type,version,identifier,supplier,customer_id,customer_name,document_number,order_date,email,position_id,line_number,product_id,alt_product_id,quantity,unit_of_measure,quantity_type,unit_of_measure.1,value_PCE,currency_PCE,value_SUM,currency_SUM
order_1,ORD,2.0,NOID,tequip,KDNR000001,Test.User,999999,2024-09-06T12:05,,0,5,90001,90001,,10.0,SETU,PCE,1.11,EUR,11.1,EUR
order_2,ORD,2.0,NOID,tequip,KDNR000001,Data.Example,999999,2024-09-06T12:05,,0,5,90001,90001,,14.0,SETU,PCE,1.11,EUR,15.54,EUR
order_3,ORD,2.0,NOID,tequip,KDNR000001,Test.User,999999,2024-09-06T12:05,,0,5,9999911111,9999911111,,50.0,SETU,PCE,0.01,EUR,0.52,EUR
order_3,ORD,2.0,NOID,tequip,KDNR000001,Test.User,999999,2024-09-06T12:05,,0,5,DEMO,90001,,2.0,SETU,PCE,0.01,EUR,0.52,EUR
order_4,ORD,2.0,NOID,tequip,KDNR000001,Test.User,999999,2024-09-06T12:05,,0,5,DEMO,DEMO,,14.0,SETU,PCE,0.01,EUR,0.14,EUR


##4. Write Transformed Data to Processed Folder
Save the transformed datasets into the processed folder.

In [None]:
order_transformed.write.format("parquet").mode("overwrite").save(paths["order_processed"])
