In [154]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, count, when, isnull
from pyspark.sql.types import DoubleType, LongType,IntegerType, DateType, DecimalType



In [155]:
spark = SparkSession.builder.appName("ArticleTransaction").getOrCreate()

In [156]:
# Read CSV files with inferred schema and header
articles_df = spark.read.csv("../article.csv", header=True, inferSchema=False)
transactions_df = spark.read.csv("../transactions.csv", header=True, inferSchema=False)

## Article DF inital exploration

In [157]:
print("Articles DataFrame:")
articles_df.show(5)
#print("Articles DataFrame Schema:")
#articles_df.printSchema()
print("Count of Articles DataFrame:", articles_df.count())
print("Unique Article IDs in Articles DataFrame:", articles_df.select("ARTICLE_ID0MATERIAL").distinct().count())
# 4.1 Check for duplicate keys in ARTICLE
dup_articles = articles_df.groupBy("ARTICLE_ID0MATERIAL").count().filter(col("count") > 1)
print("Duplicate ARTICLE_ID0MATERIAL rows:", dup_articles.count())

print("Is Article ID having null values?", articles_df.filter(col("ARTICLE_ID0MATERIAL").isNull()).count() > 0)
print("Is Article ID having empty values?", articles_df.filter(col("ARTICLE_ID0MATERIAL") == "").count() > 0)
print("Is Article ID having empty null strings?", articles_df.filter(col("ARTICLE_ID0MATERIAL") == "null").count() > 0)
# print("Article df statistics:")
# articles_df.describe().show()

Articles DataFrame:
+-------------------+-------------------------+---------------------------+----------+--------------------+----------------------+--------------------+--------------------------------+-------------------+-------------------+
|ARTICLE_ID0MATERIAL|ARTICLE_COLOR_ID0RT_COLOR|ARTICLE_GROUP_ID0RT_CONFMAT|EAN0EANUPC|   DESCRIPTION0TXTMD|BRAND_NAME/SOL/MDPROD1|PICTURE_PATH0EXT_URL|INITIAL_SEASON_NAME/SOL/FMSSEASO|CURRENT_SEASON_NAME|MATL_TYPE0MATL_TYPE|
+-------------------+-------------------------+---------------------------+----------+--------------------+----------------------+--------------------+--------------------------------+-------------------+-------------------+
| 000000000001282247|                     null|                       null|      null|d3dd68a11f543d039...|                    75|                null|                          202008|             202008|               ZMO3|
| 000000000001282474|                     null|                       null|     

                                                                                

Unique Article IDs in Articles DataFrame: 223893


                                                                                

Duplicate ARTICLE_ID0MATERIAL rows: 0
Is Article ID having null values? False
Is Article ID having empty values? False
Is Article ID having empty null strings? False


### I can observe ARTICLE_ID0MATERIAL is long. Lot of column values are having "null" string which can be converted to actual null values. There are no duplicate rows in article dataframe. Lot of column can make use of datatype change.

In [158]:
# Replace "null" strings with actual nulls
cleaned_articles_df = articles_df.replace("null", None)

#Type conversion
cleaned_articles_df = (
    cleaned_articles_df
      # numeric identifiers → LongType
      #.withColumn("ARTICLE_ID0MATERIAL",   col("ARTICLE_ID0MATERIAL").cast(LongType()))
      #.withColumn("ARTICLE_GROUP_ID0RT_CONFMAT", col("ARTICLE_GROUP_ID0RT_CONFMAT").cast(LongType()))
      .withColumn("EAN0EANUPC", col("EAN0EANUPC").cast(LongType()))
      .withColumn("BRAND_NAME/SOL/MDPROD1",     col("BRAND_NAME/SOL/MDPROD1").cast(IntegerType()))
      .withColumn("INITIAL_SEASON_NAME/SOL/FMSSEASO",     col("INITIAL_SEASON_NAME/SOL/FMSSEASO").cast(IntegerType()))
      .withColumn("CURRENT_SEASON_NAME",     col("CURRENT_SEASON_NAME").cast(IntegerType()))

)

cleaned_articles_df.show(5)

+-------------------+-------------------------+---------------------------+----------+--------------------+----------------------+--------------------+--------------------------------+-------------------+-------------------+
|ARTICLE_ID0MATERIAL|ARTICLE_COLOR_ID0RT_COLOR|ARTICLE_GROUP_ID0RT_CONFMAT|EAN0EANUPC|   DESCRIPTION0TXTMD|BRAND_NAME/SOL/MDPROD1|PICTURE_PATH0EXT_URL|INITIAL_SEASON_NAME/SOL/FMSSEASO|CURRENT_SEASON_NAME|MATL_TYPE0MATL_TYPE|
+-------------------+-------------------------+---------------------------+----------+--------------------+----------------------+--------------------+--------------------------------+-------------------+-------------------+
| 000000000001282247|                     NULL|                       NULL|      NULL|d3dd68a11f543d039...|                    75|                NULL|                          202008|             202008|               ZMO3|
| 000000000001282474|                     NULL|                       NULL|      NULL|d6045a9b489ed9

### Nulls in article df before cleaning

In [159]:
from pyspark.sql import functions as F

# for each column c, produce sum of 1 where c IS NULL, else 0
null_count_exprs = [
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in articles_df.columns
]
row = articles_df.select(*null_count_exprs).first()
null_counts = row.asDict()
for column, count in null_counts.items():
    print(f"Column '{column}' NULL value count {count}")

Column 'ARTICLE_ID0MATERIAL' NULL value count 0
Column 'ARTICLE_COLOR_ID0RT_COLOR' NULL value count 0
Column 'ARTICLE_GROUP_ID0RT_CONFMAT' NULL value count 0
Column 'EAN0EANUPC' NULL value count 0
Column 'DESCRIPTION0TXTMD' NULL value count 0
Column 'BRAND_NAME/SOL/MDPROD1' NULL value count 0
Column 'PICTURE_PATH0EXT_URL' NULL value count 0
Column 'INITIAL_SEASON_NAME/SOL/FMSSEASO' NULL value count 0
Column 'CURRENT_SEASON_NAME' NULL value count 0
Column 'MATL_TYPE0MATL_TYPE' NULL value count 0


### Nulls in cleaned article df

In [160]:

# for each column c, produce sum of 1 where c IS NULL, else 0
null_count_exprs = [
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in cleaned_articles_df.columns
]
row = cleaned_articles_df.select(*null_count_exprs).first()
null_counts = row.asDict()
for column, count in null_counts.items():
    print(f"Column '{column}' NULL value count {count}")

Column 'ARTICLE_ID0MATERIAL' NULL value count 0
Column 'ARTICLE_COLOR_ID0RT_COLOR' NULL value count 18004
Column 'ARTICLE_GROUP_ID0RT_CONFMAT' NULL value count 18004
Column 'EAN0EANUPC' NULL value count 18004
Column 'DESCRIPTION0TXTMD' NULL value count 0
Column 'BRAND_NAME/SOL/MDPROD1' NULL value count 0
Column 'PICTURE_PATH0EXT_URL' NULL value count 18004
Column 'INITIAL_SEASON_NAME/SOL/FMSSEASO' NULL value count 0
Column 'CURRENT_SEASON_NAME' NULL value count 0
Column 'MATL_TYPE0MATL_TYPE' NULL value count 0


### I tried converting ARTICLE_ID0MATERIAL and ARTICLE_GROUP_ID0RT_CONFMAT to Long and Int but the null count increased. Later I found there are characters in them.

## Transaction DF initial exploration

In [161]:
print("Transaction DataFrame:")
transactions_df.show(5,truncate=False)
# print("Transaction DataFrame Schema:")
# transactions_df.printSchema()
print("Count of Transaction DataFrame:", transactions_df.count())
print("Unique TRANSACTION_ID in Transaction DataFrame:", transactions_df.select("TRANSACTION_ID/SOL/BONKEY").distinct().count())
# 4.1 Check for duplicate keys in ARTICLE
dup_transaction = transactions_df.groupBy("TRANSACTION_ID/SOL/BONKEY").count().filter(col("count") > 1)
print("Duplicate TRANSACTION_ID/SOL/BONKEY rows:", dup_transaction.count())
dup_transaction = transactions_df.groupBy(["TRANSACTION_ID/SOL/BONKEY","ARTICLE_ID0MATERIAL"]).count().filter(col("count") > 1)
print("Duplicate TRANSACTION_ID/SOL/BONKEY, ARTICLE_ID0MATERIAL rows:", dup_transaction.count())
print("Unique TRANSACTION_ID, Article ID in Transaction DataFrame:", transactions_df.select(["TRANSACTION_ID/SOL/BONKEY", "ARTICLE_ID0MATERIAL"]).distinct().count())

#dup_transaction_1 = cleaned_transactions_df.filter(cleaned_transactions_df['TRANSACTION_ID/SOL/BONKEY'] == '0002021040720240000000117' )
#dup_transaction_1.show( truncate=False)

print("Is Article ID having null values?", transactions_df.filter(col("ARTICLE_ID0MATERIAL").isNull()).count() > 0)
print("Is Article ID having empty values?", transactions_df.filter(col("ARTICLE_ID0MATERIAL") == "").count() > 0)
print("Is Article ID having empty null strings?", transactions_df.filter(col("ARTICLE_ID0MATERIAL") == "null").count() > 0)

print("Is Transaction ID having null values?", transactions_df.filter(col("TRANSACTION_ID/SOL/BONKEY").isNull()).count() > 0)
print("Is Transaction ID having empty values?", transactions_df.filter(col("TRANSACTION_ID/SOL/BONKEY") == "").count() > 0)
print("Is Transaction ID having empty null strings?", transactions_df.filter(col("TRANSACTION_ID/SOL/BONKEY") == "null").count() > 0)

Transaction DataFrame:
+-------------------------+-------------------------+-----------------+-----------------------+---------------------+-------------------+-----------------+------------------------+---------------------+--------------------------------+--------------------------------+-----------+
|TRANSACTION_ID/SOL/BONKEY|TRANSACTION_TIME0RPA_ETS2|SALESORG0SALESORG|TRANSACTION_DATE0CALDAY|DISTR_CHAN0DISTR_CHAN|ARTICLE_ID0MATERIAL|LOCATION_ID0PLANT|TRANSACTION_TYPE0RPA_TTC|ARTICLE_COUNT0RPA_RLQ|SALES_PRICE_AT_CASH_DESK0RPA_SAT|SALES_PRICE_PLANNED/SOL/LOC0086C|VAT0RPA_TAM|
+-------------------------+-------------------------+-----------------+-----------------------+---------------------+-------------------+-----------------+------------------------+---------------------+--------------------------------+--------------------------------+-----------+
|0002021040620240000000061|121315                   |1099             |20210406               |13                   |00000000206204202

### After analysis I found that the transaction data does not have any duplicate rows. It has a composite key (TRANSACTION_ID/SOL/BONKEY and ARTICLE_ID0MATERIAL). The data type of the coulmns needs to be defined explicitly.

In [162]:
# Replace "null" strings with actual nulls
cleaned_transactions_df = transactions_df.replace("null", None)

#Type conversion
cleaned_transactions_df = (
    cleaned_transactions_df
      #.withColumn("TRANSACTION_ID/SOL/BONKEY",   col("TRANSACTION_ID/SOL/BONKEY").cast(LongType()))   Did not work, so left it as StringType
      .withColumn("SALESORG0SALESORG",     col("SALESORG0SALESORG").cast(IntegerType()))
      .withColumn("TRANSACTION_DATE0CALDAY", to_date(col("TRANSACTION_DATE0CALDAY"), "yyyyMMdd"))
      .withColumn("DISTR_CHAN0DISTR_CHAN",     col("DISTR_CHAN0DISTR_CHAN").cast(IntegerType()))
      #.withColumn("ARTICLE_ID0MATERIAL",   col("ARTICLE_ID0MATERIAL").cast(LongType()))
      .withColumn("LOCATION_ID0PLANT",     col("LOCATION_ID0PLANT").cast(IntegerType()))
      .withColumn("TRANSACTION_TYPE0RPA_TTC",     col("TRANSACTION_TYPE0RPA_TTC").cast(IntegerType()))
      #.withColumn("ARTICLE_COUNT0RPA_RLQ",     col("ARTICLE_COUNT0RPA_RLQ").cast(IntegerType()))
      #.withColumn("SALES_PRICE_AT_CASH_DESK0RPA_SAT", col("SALES_PRICE_AT_CASH_DESK0RPA_SAT").cast(DecimalType(10,2)))
      #.withColumn("SALES_PRICE_PLANNED/SOL/LOC0086C", col("SALES_PRICE_PLANNED/SOL/LOC0086C").cast(DecimalType(10,2)))
      #.withColumn("VAT0RPA_TAM", col("VAT0RPA_TAM").cast(DecimalType(10,2)))

)

cleaned_transactions_df.show(5, truncate=False)
#cleaned_transactions_df.select(['ARTICLE_COUNT0RPA_RLQ']).distinct().show( truncate=False)

cleaned_transactions_df.printSchema()

+-------------------------+-------------------------+-----------------+-----------------------+---------------------+-------------------+-----------------+------------------------+---------------------+--------------------------------+--------------------------------+-----------+
|TRANSACTION_ID/SOL/BONKEY|TRANSACTION_TIME0RPA_ETS2|SALESORG0SALESORG|TRANSACTION_DATE0CALDAY|DISTR_CHAN0DISTR_CHAN|ARTICLE_ID0MATERIAL|LOCATION_ID0PLANT|TRANSACTION_TYPE0RPA_TTC|ARTICLE_COUNT0RPA_RLQ|SALES_PRICE_AT_CASH_DESK0RPA_SAT|SALES_PRICE_PLANNED/SOL/LOC0086C|VAT0RPA_TAM|
+-------------------------+-------------------------+-----------------+-----------------------+---------------------+-------------------+-----------------+------------------------+---------------------+--------------------------------+--------------------------------+-----------+
|0002021040620240000000061|121315                   |1099             |2021-04-06             |13                   |000000002062042026 |2024             |10

### Nulls in Transaction dataframe before cleaning

In [163]:

# for each column c, produce sum of 1 where c IS NULL, else 0
null_count_exprs = [
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in transactions_df.columns
]
row = transactions_df.select(*null_count_exprs).first()
null_counts = row.asDict()
for column, count in null_counts.items():
    print(f"Column '{column}' NULL value count {count}")

Column 'TRANSACTION_ID/SOL/BONKEY' NULL value count 0
Column 'TRANSACTION_TIME0RPA_ETS2' NULL value count 0
Column 'SALESORG0SALESORG' NULL value count 0
Column 'TRANSACTION_DATE0CALDAY' NULL value count 0
Column 'DISTR_CHAN0DISTR_CHAN' NULL value count 0
Column 'ARTICLE_ID0MATERIAL' NULL value count 0
Column 'LOCATION_ID0PLANT' NULL value count 0
Column 'TRANSACTION_TYPE0RPA_TTC' NULL value count 0
Column 'ARTICLE_COUNT0RPA_RLQ' NULL value count 0
Column 'SALES_PRICE_AT_CASH_DESK0RPA_SAT' NULL value count 0
Column 'SALES_PRICE_PLANNED/SOL/LOC0086C' NULL value count 0
Column 'VAT0RPA_TAM' NULL value count 0


### Nulls in transaction df after cleaning

In [164]:

# for each column c, produce sum of 1 where c IS NULL, else 0
null_count_exprs = [
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in cleaned_transactions_df.columns
]
row = cleaned_transactions_df.select(*null_count_exprs).first()
null_counts = row.asDict()
for column, count in null_counts.items():
    print(f"Column '{column}' NULL value count {count}")

Column 'TRANSACTION_ID/SOL/BONKEY' NULL value count 0
Column 'TRANSACTION_TIME0RPA_ETS2' NULL value count 0
Column 'SALESORG0SALESORG' NULL value count 0
Column 'TRANSACTION_DATE0CALDAY' NULL value count 0
Column 'DISTR_CHAN0DISTR_CHAN' NULL value count 0
Column 'ARTICLE_ID0MATERIAL' NULL value count 0
Column 'LOCATION_ID0PLANT' NULL value count 0
Column 'TRANSACTION_TYPE0RPA_TTC' NULL value count 0
Column 'ARTICLE_COUNT0RPA_RLQ' NULL value count 0
Column 'SALES_PRICE_AT_CASH_DESK0RPA_SAT' NULL value count 0
Column 'SALES_PRICE_PLANNED/SOL/LOC0086C' NULL value count 0
Column 'VAT0RPA_TAM' NULL value count 0


### When I convert last 4 numeric column it some of the rows get's converted to null. After inspection I found that there are values like this 1.000-,31.99-,39.99-,5.11-. I undid the type conversion. Let's fix the issue.

In [165]:
cleaned_transactions_df = cleaned_transactions_df.withColumn(
    "ARTICLE_COUNT0RPA_RLQ",
    when(
        col("ARTICLE_COUNT0RPA_RLQ").endswith("-"),
        # strip trailing '-' and prepend it to the number
        F.concat(F.lit("-"), F.regexp_replace(col("ARTICLE_COUNT0RPA_RLQ"), r"-$", "")).cast("decimal(10, 3)")
    ).otherwise(
        col("ARTICLE_COUNT0RPA_RLQ").cast("decimal(10, 3)")
    )
).withColumn(
    "SALES_PRICE_AT_CASH_DESK0RPA_SAT",
    when(
        col("SALES_PRICE_AT_CASH_DESK0RPA_SAT").endswith("-"),
        # strip trailing '-' and prepend it to the number
        F.concat(F.lit("-"), F.regexp_replace(col("SALES_PRICE_AT_CASH_DESK0RPA_SAT"), r"-$", "")).cast("decimal(10, 2)")
    ).otherwise(
        col("SALES_PRICE_AT_CASH_DESK0RPA_SAT").cast("decimal(10, 2)")
    )
).withColumn(
    "SALES_PRICE_PLANNED/SOL/LOC0086C",
    when(
        col("SALES_PRICE_PLANNED/SOL/LOC0086C").endswith("-"),
        # strip trailing '-' and prepend it to the number
        F.concat(F.lit("-"), F.regexp_replace(col("SALES_PRICE_PLANNED/SOL/LOC0086C"), r"-$", "")).cast("decimal(10, 2)")
    ).otherwise(
        col("SALES_PRICE_PLANNED/SOL/LOC0086C").cast("decimal(10, 2)")
    )
).withColumn(
    "VAT0RPA_TAM",
    when(
        col("VAT0RPA_TAM").endswith("-"),
        # strip trailing '-' and prepend it to the number
        F.concat(F.lit("-"), F.regexp_replace(col("VAT0RPA_TAM"), r"-$", "")).cast("decimal(10, 2)")
    ).otherwise(
        col("VAT0RPA_TAM").cast("decimal(10, 2)")
    )
)

cleaned_transactions_df.show(5)

+-------------------------+-------------------------+-----------------+-----------------------+---------------------+-------------------+-----------------+------------------------+---------------------+--------------------------------+--------------------------------+-----------+
|TRANSACTION_ID/SOL/BONKEY|TRANSACTION_TIME0RPA_ETS2|SALESORG0SALESORG|TRANSACTION_DATE0CALDAY|DISTR_CHAN0DISTR_CHAN|ARTICLE_ID0MATERIAL|LOCATION_ID0PLANT|TRANSACTION_TYPE0RPA_TTC|ARTICLE_COUNT0RPA_RLQ|SALES_PRICE_AT_CASH_DESK0RPA_SAT|SALES_PRICE_PLANNED/SOL/LOC0086C|VAT0RPA_TAM|
+-------------------------+-------------------------+-----------------+-----------------------+---------------------+-------------------+-----------------+------------------------+---------------------+--------------------------------+--------------------------------+-----------+
|     00020210406202400...|                   121315|             1099|             2021-04-06|                   13| 000000002062042026|             2024|  

### Null values after the fix.

In [166]:
#
# for each column c, produce sum of 1 where c IS NULL, else 0
null_count_exprs = [
    F.sum(F.when(F.col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in cleaned_transactions_df.columns
]
row = cleaned_transactions_df.select(*null_count_exprs).first()
null_counts = row.asDict()
for column, count in null_counts.items():
    print(f"Column '{column}' NULL value count {count}")


cleaned_transactions_df.printSchema()

Column 'TRANSACTION_ID/SOL/BONKEY' NULL value count 0
Column 'TRANSACTION_TIME0RPA_ETS2' NULL value count 0
Column 'SALESORG0SALESORG' NULL value count 0
Column 'TRANSACTION_DATE0CALDAY' NULL value count 0
Column 'DISTR_CHAN0DISTR_CHAN' NULL value count 0
Column 'ARTICLE_ID0MATERIAL' NULL value count 0
Column 'LOCATION_ID0PLANT' NULL value count 0
Column 'TRANSACTION_TYPE0RPA_TTC' NULL value count 0
Column 'ARTICLE_COUNT0RPA_RLQ' NULL value count 0
Column 'SALES_PRICE_AT_CASH_DESK0RPA_SAT' NULL value count 0
Column 'SALES_PRICE_PLANNED/SOL/LOC0086C' NULL value count 0
Column 'VAT0RPA_TAM' NULL value count 0
root
 |-- TRANSACTION_ID/SOL/BONKEY: string (nullable = true)
 |-- TRANSACTION_TIME0RPA_ETS2: string (nullable = true)
 |-- SALESORG0SALESORG: integer (nullable = true)
 |-- TRANSACTION_DATE0CALDAY: date (nullable = true)
 |-- DISTR_CHAN0DISTR_CHAN: integer (nullable = true)
 |-- ARTICLE_ID0MATERIAL: string (nullable = true)
 |-- LOCATION_ID0PLANT: integer (nullable = true)
 |-- TRA

### The "-" at the end of last 4 columns were fixed as it were negative numerical values and the type conversion was success full.

## Let's join the dataframes

In [167]:

article_transaction_df = cleaned_transactions_df.join(
    cleaned_articles_df,
    on="ARTICLE_ID0MATERIAL",
    how="inner" # When used Inner join the row drops to 
)
article_transaction_df.show(5, truncate=False)

# Ensure no duplicates in TARGET 
dup_target = article_transaction_df.groupBy(article_transaction_df.columns).count().filter(col("count") > 1)
print("Duplicate rows in TARGET:", dup_target.count())

print("Count of Transaction DataFrame:", cleaned_transactions_df.count())
print("Count of Joined Article Transaction DataFrame:", article_transaction_df.count())
print("Unique  TRANSACTION_ID, Article ID in Joined Article Transaction DataFrame:", article_transaction_df.select(["TRANSACTION_ID/SOL/BONKEY", "ARTICLE_ID0MATERIAL"]).distinct().count())


article_transaction_df.printSchema()

+-------------------+-------------------------+-------------------------+-----------------+-----------------------+---------------------+-----------------+------------------------+---------------------+--------------------------------+--------------------------------+-----------+-------------------------+---------------------------+-------------+----------------------------------------+----------------------+----------------------------------------+--------------------------------+-------------------+-------------------+
|ARTICLE_ID0MATERIAL|TRANSACTION_ID/SOL/BONKEY|TRANSACTION_TIME0RPA_ETS2|SALESORG0SALESORG|TRANSACTION_DATE0CALDAY|DISTR_CHAN0DISTR_CHAN|LOCATION_ID0PLANT|TRANSACTION_TYPE0RPA_TTC|ARTICLE_COUNT0RPA_RLQ|SALES_PRICE_AT_CASH_DESK0RPA_SAT|SALES_PRICE_PLANNED/SOL/LOC0086C|VAT0RPA_TAM|ARTICLE_COLOR_ID0RT_COLOR|ARTICLE_GROUP_ID0RT_CONFMAT|EAN0EANUPC   |DESCRIPTION0TXTMD                       |BRAND_NAME/SOL/MDPROD1|PICTURE_PATH0EXT_URL                    |INITIAL_SEASON_NAME/

                                                                                

Duplicate rows in TARGET: 0
Count of Transaction DataFrame: 14779
Count of Joined Article Transaction DataFrame: 7184
Unique  TRANSACTION_ID, Article ID in Joined Article Transaction DataFrame: 7184
root
 |-- ARTICLE_ID0MATERIAL: string (nullable = true)
 |-- TRANSACTION_ID/SOL/BONKEY: string (nullable = true)
 |-- TRANSACTION_TIME0RPA_ETS2: string (nullable = true)
 |-- SALESORG0SALESORG: integer (nullable = true)
 |-- TRANSACTION_DATE0CALDAY: date (nullable = true)
 |-- DISTR_CHAN0DISTR_CHAN: integer (nullable = true)
 |-- LOCATION_ID0PLANT: integer (nullable = true)
 |-- TRANSACTION_TYPE0RPA_TTC: integer (nullable = true)
 |-- ARTICLE_COUNT0RPA_RLQ: decimal(10,3) (nullable = true)
 |-- SALES_PRICE_AT_CASH_DESK0RPA_SAT: decimal(10,2) (nullable = true)
 |-- SALES_PRICE_PLANNED/SOL/LOC0086C: decimal(10,2) (nullable = true)
 |-- VAT0RPA_TAM: decimal(10,2) (nullable = true)
 |-- ARTICLE_COLOR_ID0RT_COLOR: string (nullable = true)
 |-- ARTICLE_GROUP_ID0RT_CONFMAT: string (nullable = true)

### When we do a inner join, the target df count drops to 7184. Let's find the article ids which are not present in the article dataframe.

In [168]:
# 4.2 Check for missing articles in TRANSACTIONS
missing_articles = cleaned_transactions_df.join(cleaned_articles_df, on='ARTICLE_ID0MATERIAL', how='left_anti')

#Print count of missing articles
print("Count of Transactions with no matching ARTICLE:", missing_articles.count())

print("Total count of transactions:", cleaned_transactions_df.count())
print("Total of missing and non-missing articles in TRANSACTIONS:", article_transaction_df.count() + missing_articles.count())

print("Transactions with no matching ARTICLE:")
missing_articles.show(5)

Count of Transactions with no matching ARTICLE: 7595
Total count of transactions: 14779
Total of missing and non-missing articles in TRANSACTIONS: 14779
Transactions with no matching ARTICLE:
+-------------------+-------------------------+-------------------------+-----------------+-----------------------+---------------------+-----------------+------------------------+---------------------+--------------------------------+--------------------------------+-----------+
|ARTICLE_ID0MATERIAL|TRANSACTION_ID/SOL/BONKEY|TRANSACTION_TIME0RPA_ETS2|SALESORG0SALESORG|TRANSACTION_DATE0CALDAY|DISTR_CHAN0DISTR_CHAN|LOCATION_ID0PLANT|TRANSACTION_TYPE0RPA_TTC|ARTICLE_COUNT0RPA_RLQ|SALES_PRICE_AT_CASH_DESK0RPA_SAT|SALES_PRICE_PLANNED/SOL/LOC0086C|VAT0RPA_TAM|
+-------------------+-------------------------+-------------------------+-----------------+-----------------------+---------------------+-----------------+------------------------+---------------------+--------------------------------+-----------

### Let's write the final tables as parquet.

In [169]:
article_transaction_df.coalesce(1).write.mode("overwrite").parquet('../article_transaction/')
missing_articles.coalesce(1).write.mode("overwrite").parquet('../missing_articles/')

                                                                                

In [None]:
#spark.stop()