In [0]:
# Import required libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [0]:
# Read data from bronze layer
df_order_reviews = spark.read.format("delta").load("abfss://olist-data@retailds.dfs.core.windows.net/bronze/order_reviews")

In [0]:
# print the schema
df_order_reviews.printSchema()

In [0]:
# Display the records
df_order_reviews.display()

In [0]:
# Count the total number of records
df_order_reviews.count()

In [0]:

# Display Null count from each column
df_order_reviews.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_order_reviews.columns
]
).display()

In [0]:
# Dropping Null from order_reviews table
df_order_reviews = df_order_reviews.dropna(subset=['review_id','order_id','review_score'])

In [0]:
# After Dropping null values 
df_order_reviews.select([
    count(when(col(c).isNull(), c)).alias(c)
    for c in df_order_reviews.columns
]
).display()

In [0]:
# Count the total number of records
df_order_reviews.count()

In [0]:
# Check if order_reviews and orders have same number of orders
df_order_reviews.join(spark.table("silver.orders"),"order_id","left_anti").display()

In [0]:
# Remove the unwanted order_id and keep order_id which are present inside order table
df_order_reviews = df_order_reviews.join(spark.table("silver.orders").select("order_id"),"order_id","left_semi")

In [0]:
# Count the distinct order_id
df_order_reviews.select("order_id").distinct().count()

In [0]:
#  Check if there are any duplicate order_id
df_order_reviews.groupBy("order_id").count().filter(col("count")>1).count()

In [0]:
# Check if there are any duplicate review per order_id
df_order_reviews.groupBy("order_id","review_id").count().filter(col("count")>1).count()

In [0]:
# Keeping the latest review for each order
window_spec = Window.partitionBy("order_id").orderBy(col("review_creation_date").desc())
df_ranked = df_order_reviews.withColumn("Row_number", row_number().over(window_spec))
df_order_reviews = df_ranked.filter(col("Row_number")==1).drop("Row_number")

In [0]:
# Count the total number of records
df_order_reviews.count()

### Review score validation & standardization

In [0]:
# Count the number of null values in review_score
df_order_reviews.filter(col("review_score").isNull()).count()

In [0]:
# Check if there are any review_score other than 1,2,3,4,5
df_order_reviews.filter(col("review_score").isin([1,2,3,4,5])).count()

In [0]:
# Check if there are any negative review_score
df_order_reviews = df_order_reviews.withColumn("review_score", col("review_score").cast("int"))

### Normalize & Clean Review Text Fields

In [0]:
# Check if there are any null review_comment_title
df_order_reviews.filter(col("review_comment_title").isNull()).count()

In [0]:
# Check if there are any null review_comment_message
df_order_reviews.filter(col("review_comment_message").isNull()).count()

In [0]:
# Trim spaces from review_comment_title and review_comment_message
df_order_reviews = df_order_reviews.withColumn("review_comment_title", trim(col("review_comment_title")))
df_order_reviews = df_order_reviews.withColumn("review_comment_message", trim(col("review_comment_message")))

In [0]:
# Flag if review_comment_title and review_comment_message are null
df_order_reviews = df_order_reviews.withColumn("title_is_Null", col("review_comment_title").isNull())
df_order_reviews = df_order_reviews.withColumn("message_is_Null", col("review_comment_message").isNull())

## Date & timestamp

### Changing the data type of review_creation_date and review_answer_timestamp

In [0]:
df_order_reviews = df_order_reviews.withColumn("review_creation_date", col("review_creation_date").cast("string"))
df_order_reviews = df_order_reviews.withColumn("review_answer_timestamp", col("review_answer_timestamp").cast("string"))

In [0]:
timestamp_regex = r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$'

df_order_reviews = df_order_reviews.withColumn(
    "review_creation_ts_str",
    when(col("review_creation_date").rlike(timestamp_regex),
         col("review_creation_date")
    ).otherwise(None)
)

df_order_reviews = df_order_reviews.withColumn(
    "review_answer_ts_str",
    when(col("review_answer_timestamp").rlike(timestamp_regex),
         col("review_answer_timestamp")
    ).otherwise(None)
)

In [0]:
 df_order_reviews = df_order_reviews.withColumn(
    "review_creation_ts",
    to_timestamp(col("review_creation_ts_str"), "yyyy-MM-dd HH:mm:ss")
)
 
df_order_reviews = df_order_reviews.withColumn(
    "review_answer_ts",
    to_timestamp(col("review_answer_ts_str"), "yyyy-MM-dd HH:mm:ss")
)

In [0]:
df_order_reviews=df_order_reviews.withColumn("review_creation_date", col("review_creation_ts")).drop("review_creation_ts_str").drop("review_creation_ts")
df_order_reviews=df_order_reviews.withColumn("review_answer_timestamp", col("review_answer_ts")).drop("review_answer_ts_str").drop("review_answer_ts")

In [0]:
df_order_reviews.display()

In [0]:
# flag if review is positive or negative
df_order_reviews = df_order_reviews.withColumn("is_positive_review", when(col("review_score") >=4 , "True").otherwise("False"))

In [0]:
# write to delta
df_order_reviews.write.format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "true")\
    .save("abfss://olist-data@retailds.dfs.core.windows.net/silver/order_reviews")