###  Data Integration Project

Transaction and Article csv files cleaned, validated and combined to create target table using PySpark.

#### Import Libraries & Read Csv Files

In [None]:
import pyspark
from pyspark.sql.functions import col, isnan, count, when, regexp_replace, to_date, monotonically_increasing_id
from pyspark.sql.types import IntegerType, FloatType

In [None]:
def read_csv_dbfs(file_name, infer_schema, first_row_is_header, delimiter):
    """
    Reads csv file from DBFS's /FileStore/tables/ location into PySpark DataFrame.
    return: df 
    """
    file_location = "/FileStore/tables/" + file_name
    file_type = "csv"
    # The applied options are for CSV files. For other file types, these will be ignored.
    df = spark.read.format(file_type) \
      .option("inferSchema", infer_schema) \
      .option("header", first_row_is_header) \
      .option("sep", delimiter) \
      .load(file_location)
    
    return df

In [None]:
def describe_df(df):
    """
    Shows a dataframe's schema, shape and distinct-duplicate row counts
    """
    # Schema
    df.printSchema()
    # Shape
    print(f"Row Count: {df.count()}, Column Count: {len(df.columns)}")
    # Distinct & Duplicate row count
    distinct_row_count = df.distinct().count()
    duplicate_row_count = df.count() - df.distinct().count()
    print(f"Distinct Row Count: {distinct_row_count}, Duplicate Row Count: {duplicate_row_count}")

##### Read and Describe TRANSACTION data

In [None]:
# Read TRANSACTION.csv into dataframe
transaction_df = read_csv_dbfs(file_name="TRANSACTION.csv", infer_schema="true", first_row_is_header="true", delimiter=",")

In [None]:
# Describe Transaction DF
describe_df(transaction_df)

##### Read and Describe ARTICLE data

In [None]:
# Read TRANSACTION.csv into dataframe
article_df = read_csv_dbfs(file_name="ARTICLE.csv", infer_schema="true", first_row_is_header="true", delimiter=",")

In [None]:
# Describe ARTICLE DF
describe_df(article_df)

### Cleaning Transaction Data

- Unwanted float values and int values like "31.99-"" and "1.000-" detected at "SALES_PRICE_AT_CASH_DESK0RPA_SAT","SALES_PRICE_PLANNED/SOL/LOC0086C","VAT0RPA_TAM" and "ARTICLE_COUNT0RPA_RLQ" columns. This values transformed and casted to desired format.

In [None]:
transaction_float_columns=["SALES_PRICE_AT_CASH_DESK0RPA_SAT","SALES_PRICE_PLANNED/SOL/LOC0086C","VAT0RPA_TAM"]
transaction_int_columns = ["ARTICLE_COUNT0RPA_RLQ"]

In [None]:
# All numeric values can be casted to int so cast("int") used,  
def transform_numeric_columns(df, column_list):
    """
    Dirty rows like  "31.99-" validated to a float like string.
    return: df
    """
    for column in transaction_numeric_columns:
        df = df.withColumn(column, when(col(column).cast("int").isNull(),regexp_replace(column, '-', '')).otherwise(col(column)))
    
    return df
    

In [None]:
def cast_numeric_columns(df, column_list, data_type):
    """
    Cast Columns to desired data types if all rows are validated.
    return: df
    """
    for column in column_list:
        unmatched_row_count = df.filter(col(column).cast(data_type).isNull()).count()
        print(f"Column Name: {column}, Not{data_type} row Count: {unmatched_row_count}")
        if unmatched_row_count == 0:
            print(f"All values can be casted to {data_type} data type.")
            df = df.withColumn(column,col(column).cast(data_type))
        else:
            print("Ups ! Some values still include dirty characters like: ")
            print(df.filter(col(column).cast(data_type).isNull()).show())
    return df

In [None]:
def duplicate_controller(df):
    """
    Drops duplicate rows if exists
    return: df
    """
    distinct_row_count = df.distinct().count()
    duplicate_row_count = df.count() - distinct_row_count
    if duplicate_row_count > 0:
        df = df.dropDuplicates(targettable_df.columns)
        print("Droped duplicate rows")
    else :
        print("No duplicates")
    return df

In [None]:
# Validate Transaction Data and fix rows
transaction_numeric_columns = transaction_int_columns + transaction_float_columns
transaction_df = transform_numeric_columns(transaction_df, transaction_numeric_columns)
# Cast columns to desired data types
transaction_df = cast_numeric_columns(transaction_df, transaction_float_columns, "float")
transaction_df = cast_numeric_columns(transaction_df, transaction_int_columns, "int")

In [None]:
#check dtypes
transaction_df.dtypes

### Cleaning Article Dataset

In [None]:
# Barcode Ean value should be bigint or long but null values breaks the type so null values replaced and column casted to long
article_long_columns = ["EAN0EANUPC"]

for column in article_long_columns:
    article_df = article_df.withColumn(column, when(col(column).cast("bigint").isNull(),regexp_replace(column, 'null', '0')).otherwise(col(column)))
    
article_df = cast_numeric_columns(article_df, article_long_columns, "long")

In [None]:
article_df.dtypes

###Join Transaction and Article Dataframes by Article_Id

In [None]:
#Column names simplified for better understanding
transaction_df=transaction_df.withColumnRenamed("ARTICLE_ID0MATERIAL","FKARTICLE_ID") \
.withColumnRenamed("TRANSACTION_ID/SOL/BONKEY","TRANSACTION_ID")

In [None]:
# Joined two dataframes on Article_id column. As Article_id defined as Foreign Key, records that have matching values in both dataframes joined using inner join method. 
targettable_df = transaction_df.join(article_df, transaction_df.FKARTICLE_ID == article_df.ARTICLE_ID0MATERIAL, how= "inner").drop("ARTICLE_ID0MATERIAL")

In [None]:
# Describe Transaction DF
describe_df(targettable_df)

In [None]:
# Cast Integer type TRANSACTION_DATE column to date type
targettable_df = targettable_df.withColumn("TRANSACTION_DATE0CALDAY",col("TRANSACTION_DATE0CALDAY").cast("string")).withColumn("TRANSACTION_DATE",to_date(col("TRANSACTION_DATE0CALDAY"),"yyyyMMdd").cast("date")).drop("TRANSACTION_DATE0CALDAY")

In [None]:
# Sort by date, than transaction time -> date aggregation ?
targettable_df = targettable_df.sort(col("TRANSACTION_DATE").asc(),col("TRANSACTION_TIME0RPA_ETS2").asc())

In [None]:
# Check if duplicate rows exists and drops them if exists
targettable_df = duplicate_controller(targettable_df)

In [None]:
# As some TransactionId values occured multiple times with different Articles a new primary index key added to dataframe.  
targettable_df = targettable_df.withColumn("INDEX",monotonically_increasing_id())

In [None]:
# Final dataframe overview
display(targettable_df)

TRANSACTION_ID,TRANSACTION_TIME0RPA_ETS2,SALESORG0SALESORG,DISTR_CHAN0DISTR_CHAN,FKARTICLE_ID,LOCATION_ID0PLANT,TRANSACTION_TYPE0RPA_TTC,ARTICLE_COUNT0RPA_RLQ,SALES_PRICE_AT_CASH_DESK0RPA_SAT,SALES_PRICE_PLANNED/SOL/LOC0086C,VAT0RPA_TAM,ARTICLE_COLOR_ID0RT_COLOR,ARTICLE_GROUP_ID0RT_CONFMAT,EAN0EANUPC,DESCRIPTION0TXTMD,BRAND_NAME/SOL/MDPROD1,PICTURE_PATH0EXT_URL,INITIAL_SEASON_NAME/SOL/FMSSEASO,CURRENT_SEASON_NAME,MATL_TYPE0MATL_TYPE,TRANSACTION_DATE,INDEX
2021040660480000000018,73108,1099,13,2061332020,6048,1005,1,37.49,49.99,0.0,5760,2061332,4065206011492,5223833e5b8fb5a4ce7b2b78b0a9cfe18c77e519,10,fc4b0394df1dfdff72c996ae898b99ada8b95d3e,202103,202103,ZMO3,2021-04-06,0
2021040660480000000015,73112,1099,13,2063756004,6048,1005,1,31.99,39.99,0.0,5959,2063756,4065206022993,774f42a810ca602872e9ca74aa08255f0afe1ec1,10,463e1051034ff34b6062dc31c5e7d87413810b3d,202103,202103,ZMO3,2021-04-06,1
2021040660480000000013,73126,1099,13,2040421030,6048,1005,1,39.79,49.99,0.0,9400,2040421,4063614074085,2a57c4e5da1a3187c6232f7ecb72b37ca45b9c83,10,ca9ef352a5419fbccd4bfbdcbfcfc442fe05d820,202009,202009,ZMO3,2021-04-06,2
2021040660480000000012,73135,1099,13,2064691023,6048,1005,1,43.92,59.99,0.0,59Z6,2064691,4063615258798,f58bf0e43af3ca930db64f053cb292b1b1414418,10,c90323514196b56f6036cdcf87a7a89d9ad14af0,202101,202101,ZMO3,2021-04-06,3
2021040660480000000017,73150,1099,13,2064590005,6048,1005,1,22.49,29.99,0.0,0100,2064590,4065206040188,eae9c711aad183ceb4b69995c0e4322c85b24a11,10,70437f3cc17a55194e7841426a86b91424fd5ca6,202103,202103,ZMO3,2021-04-06,4
2021040660480000000023,75816,1099,13,2061332020,6048,1005,1,39.99,49.99,0.0,5760,2061332,4065206011492,5223833e5b8fb5a4ce7b2b78b0a9cfe18c77e519,10,fc4b0394df1dfdff72c996ae898b99ada8b95d3e,202103,202103,ZMO3,2021-04-06,5
2021040660480000000030,75826,1099,13,2061332016,6048,1005,1,49.99,49.99,0.0,3781,2061332,4065206011454,5223833e5b8fb5a4ce7b2b78b0a9cfe18c77e519,10,416d33a140805abe83b3dbb9548219e80f9c3bef,202103,202103,ZMO3,2021-04-06,6
2021040660480000000036,75836,1099,13,2064625023,6048,1005,1,20.79,25.99,0.0,7940,2064625,4063615256541,860ad9b1fa23204364bbbadcdde53b3b761b1aeb,10,53f6f2652d423a7ff0dfd9ac7896583d323400de,202102,202102,ZMO3,2021-04-06,7
2021040660480000000021,75850,1099,13,2058758006,6048,1005,1,57.06,69.99,0.0,53Z5,2058758,4063615262764,579b5a82f37da61f8ec033e177c5316fc2b1e4ad,10,897501077fd316b2d8498ca1932e7ebb673434b0,202102,202102,ZMO3,2021-04-06,8
2021040660480000000028,75856,1099,13,2063722037,6048,1005,1,59.99,79.99,0.0,7940,2063722,4063615053577,858d6b6f4947361fd1f75453a346b6924604f0df,10,3de73f0734ad4cdad439b27a14903d59cc8b9d9c,202101,202101,ZMO3,2021-04-06,9


In [None]:
# Write final dataframe to target table
targettable_df.write.saveAsTable("TransactionArticle")