In [0]:
from pyspark.sql.functions import col, to_date, trim, expr, input_file_name, current_timestamp, lit

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType

import os

In [0]:
# Set widgets with your ABFSS paths
dbutils.widgets.text("source_path", "abfss://chireviews@chireviewstorage.dfs.core.windows.net/raw_dataset/", "Source Path")
dbutils.widgets.text("output_path", "abfss://chireviews@chireviewstorage.dfs.core.windows.net/silver/", "Unified Output Path")
dbutils.widgets.text("checkpoint_path", "abfss://chireviews@chireviewstorage.dfs.core.windows.net/checkpoints/", "Checkpoint Base")
dbutils.widgets.text("file_format", "both", "File Format (ecxel, csv, or both)")

# Retrieve the values
source_path = dbutils.widgets.get("source_path")
output_path = dbutils.widgets.get("output_path")
checkpoint_path = dbutils.widgets.get("checkpoint_path")
file_format = dbutils.widgets.get("file_format")

print(f' Processing {file_format} files from {source_path} to {output_path}')
print(f'Checkpoint: {checkpoint_path}')



excel_source_path = f'{source_path}.csv'
csv_source_path = f'{source_path}.xlsx'
excel_checkpoint_path = f'{checkpoint_path}excel/'
csv_checkpoint_path = f'{checkpoint_path}csv/'


 Processing both files from abfss://chireviews@chireviewstorage.dfs.core.windows.net/raw_dataset/ to abfss://chireviews@chireviewstorage.dfs.core.windows.net/silver/
Checkpoint: abfss://chireviews@chireviewstorage.dfs.core.windows.net/checkpoints/


In [None]:
raw_path = "abfss://.../bronze/customer_reviews"
df_raw = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("cloudFiles.schemaLocation", "abfss://.../schemas/crypto_market") \
    .load(raw_path)

In [None]:
excel_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "binaryFile") \
    .option("cloudFiles.schemaLocation", "abfss://.../schemas/crypto_excel") \
    .load(raw_path) \
    .filter("path LIKE '%.xlsx'") \
    .selectExpr("path as file_path")


In [0]:

excel_files = dbutils.fs.ls(source_path)
excel_files = [f.path for f in excel_files if f.path.endswith(".xlsx") or f.path.endswith(".xls")]


In [0]:
from pyspark.sql.functions import col, to_date, trim, expr, current_timestamp, substring

def process_excel_df(file_path):
    # Read the Excel file
    rename_map = {
        "Review ID/author": "review_id",
        "Review ID": "review_id",
        "Customer Review": "review_content",
        "Review Content": "review_content",
        "Rating": "rating",
        "Date": "date",
        "Country": "country"
    }

    # Rename columns based on mapping
    for original, renamed in rename_map.items():
        if original in excel_df.columns:
            excel_df = excel_df.withColumnRenamed(original, renamed)

    # Select only the columns we need
    excel_df_selected = excel_df.select("review_id", "review_content", "rating", "date", "country")

    # Convert "rating" to integer and handle date conversion
    excel_df_clean = excel_df_selected.withColumn("rating", col("rating").cast("int")) \
                                        .withColumn("date", to_date(col("date")))
    
    # Filter out invalid dates - keep only reasonable date ranges
    excel_df_clean = excel_df_clean.filter(
        col("date").isNotNull() & 
        (col("date") >= "1900-01-01") & 
        (col("date") <= "2030-12-31")
    )
    
    # Alternative approach if the above doesn't work - filter by year string
    # excel_df_clean = excel_df_clean.filter(
    #     substring(col("date").cast("string"), 1, 2).isin(["19", "20"])
    # )
    
    # Trim whitespace from text columns
    excel_df_clean = excel_df_clean.withColumn("review_content", trim(col("review_content")))\
                    .withColumn("processed_timestamp", current_timestamp())
    
    return excel_df_clean

In [0]:
from functools import reduce
from pyspark.sql import DataFrame

# Initialize list to store processed dataframes
processed_exceldfs = []

# Process each Excel file and add to list
for i, file_path in enumerate(excel_files, 1):
    print(f"\n----- Processing Excel File #{i}: {file_path} -----")
    
    # Process the file
    exceldf_clean = process_excel_df(file_path)
    
    # Print summary information
    print(f"Processed {exceldf_clean.count()} rows")
    exceldf_clean.show(2)
    
    # Add to our list
    processed_exceldfs.append(exceldf_clean)

 

excel_combined_df = reduce(DataFrame.unionByName, processed_exceldfs)

print(f"Total rows: {excel_combined_df.count()}")
excel_combined_df.show(5)


----- Processing Excel File #1: abfss://chireviews@chireviewstorage.dfs.core.windows.net/raw_dataset/Amazon Customer Review.xlsx -----
Processed 1263 rows
+-----------+--------------------+------+----------+------------+--------------------+
|  review_id|      review_content|rating|      date|     country| processed_timestamp|
+-----------+--------------------+------+----------+------------+--------------------+
|      Chely|I honestly canâ€™...|     5|2024-09-18|United State|2025-05-29 19:02:...|
|Your Angel |I recently had th...|     5|2025-02-19|United State|2025-05-29 19:02:...|
+-----------+--------------------+------+----------+------------+--------------------+
only showing top 2 rows


----- Processing Excel File #2: abfss://chireviews@chireviewstorage.dfs.core.windows.net/raw_dataset/Mayfair Extracted Data (2).xlsx -----
Processed 2147 rows
+--------------+--------------------+------+----------+--------------+--------------------+
|     review_id|      review_content|rating| 

In [0]:
excel_combined_df.select('date').display()

date
2024-09-18
2025-02-19
2025-02-12
2025-01-22
2025-03-11
2025-03-04
2025-02-10
2025-02-11
2025-03-02
2025-03-18


In [0]:
csv_files = dbutils.fs.ls(source_path)
csv_files = [f.path for f in csv_files if f.path.endswith("csv")]

In [0]:
from pyspark.sql.functions import col, to_date, trim, coalesce, current_timestamp, substring
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType

def process_csv_file(file_path):
        
        # Handle column renaming
        rename_map = {
            "ReviewID": "review_id", "reviewID": "review_id", "comment ID": "review_id",
            "Review ID": "review_id", "ID": "review_id", "Comment Id": "review_id",
            "Review Content": "review_content", "ReviewContent": "review_content",
            "review": "review_content", "Content": "review_content",
            "Buyer Translation Feedback": "review_content",
            "Date5": "date", "Date": "date", "date": "date", "Review Date": "date",
            "country": "country", "Country": "country", "Buyer Country": "country",
            "Rating": "rating", "rating": "rating"
        }

        for old_col, new_col in rename_map.items():
            if old_col in csv_df.columns:
                csv_df = csv_df.withColumnRenamed(old_col, new_col)

        csv_df_selected = csv_df.select("review_id", "review_content", "rating", "date", "country")

        # Cast rating to integer
        csv_df_clean = csv_df_selected.withColumn("rating", col("rating").cast("int"))

        # Coalesce multi-format date parsing
        # Handle multiple date formats with coalesce
        csv_df_clean = csv_df_clean.withColumn("date", 
            coalesce(
                # DD/MM/YYYY format (like 25/02/2025)
                to_date(col("date"), "dd/MM/yyyy"),
                # DD-MMM-YY format (like 23-Feb-22)
                to_date(col("date"), "dd-MMM-yy"),
                # YYYY-MM-DD format
                to_date(col("date"), "yyyy-MM-dd"),
                # MM/DD/YYYY format
                to_date(col("date"), "MM/dd/yyyy"),
                # Original date column if none of the formats match
                col("date")
            )
        )
        
        # Filter out invalid dates - keep only reasonable date ranges
        csv_df_clean = csv_df_clean.filter(
            col("date").isNotNull() & 
            (col("date") >= "1900-01-01") & 
            (col("date") <= "2030-12-31")
        )
        
        # Alternative approach if the above doesn't work - filter by year string
        # csv_df_clean = csv_df_clean.filter(
        #     substring(col("date").cast("string"), 1, 2).isin(["19", "20"])
        # )
         
        # Trim whitespace from text columns
        csv_df_clean = csv_df_clean.withColumn("review_content", trim(col("review_content")))\
                    .withColumn("country", trim(col("country")))\
                    .withColumn("date", to_date(col("date")))\
                    .withColumn("processed_timestamp", current_timestamp())
        
        return csv_df_clean

In [0]:
# Initialize list to store processed dataframes
processed_csvdfs = []

# Process each CSV file and add to list
for i, file_path in enumerate(csv_files, 1):
    print(f"\n----- Processing csv File #{i}: {file_path} -----")
    
    # Process the file
    csvdf_clean = process_csv_file(file_path)
    
    # Print summary information
    print(f"Processed {csvdf_clean.count()} rows")
    csvdf_clean.show(2)
    
    # Add to our list
    processed_csvdfs.append(csvdf_clean)
    
csv_combined_df = reduce(DataFrame.unionByName, processed_csvdfs)
csv_combined_df.show(5)
print(f"Total rows: {csv_combined_df.count()}")


----- Processing csv File #1: abfss://chireviews@chireviewstorage.dfs.core.windows.net/raw_dataset/AliExpress Reviews Extracted Data.csv -----
Processed 34 rows
+-----------------+--------------------+------+----------+-------+--------------------+
|        review_id|      review_content|rating|      date|country| processed_timestamp|
+-----------------+--------------------+------+----------+-------+--------------------+
|30080265285578220|Fan to take on ho...|     5|2024-10-24|     ES|2025-05-29 19:04:...|
|50147142395704160|super!!!!*****5A ...|     5|2024-09-30|     IL|2025-05-29 19:04:...|
+-----------------+--------------------+------+----------+-------+--------------------+
only showing top 2 rows


----- Processing csv File #2: abfss://chireviews@chireviewstorage.dfs.core.windows.net/raw_dataset/Amazon Review Scraper.csv -----
Processed 35 rows
+--------------+--------------------+------+----------+--------------+--------------------+
|     review_id|      review_content|rating

In [0]:
csv_combined_df.select('date').display()

date
2024-10-24
2024-09-30
2024-09-30
2024-09-30
2024-11-08
2024-09-30
2024-09-30
2024-09-30
2024-10-18
2024-09-30


In [0]:
df = csv_combined_df.unionByName(excel_combined_df)

In [0]:
df.count()

11190

In [0]:
df = df.select(
    col('review_id').cast('string'),
    col('review_content').cast('string'),
    col('rating').cast('int'),
    col('date').cast('date'),
    col('country').cast('string'),
    col('processed_timestamp').cast('timestamp'),
    col('month').cast('string'),
    col('year').cast('int')
)

In [0]:
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- review_content: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- country: string (nullable = true)
 |-- processed_timestamp: timestamp (nullable = false)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)



In [0]:
from pyspark.sql.functions import month, year, monthname

df = df.withColumn('month', monthname(col('date')))\
    .withColumn('year', year(col('date')))\
   

In [0]:
from pyspark.sql.functions import col, sum, when

df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
]).show()

+---------+--------------+------+----+-------+-------------------+-----+----+
|review_id|review_content|rating|date|country|processed_timestamp|month|year|
+---------+--------------+------+----+-------+-------------------+-----+----+
|        0|             0|     0|   0|      0|                  0|    0|   0|
+---------+--------------+------+----+-------+-------------------+-----+----+



In [0]:
df.select('date').distinct().display()

date
2024-10-24
2024-10-02
2024-09-30
2024-10-22
2025-02-22
2025-02-23
2024-11-06
2024-10-18
2025-02-18
2024-11-07


In [0]:
from pyspark.sql.functions import col, when, avg

# Step 1: Ensure 'rating' is numeric
df = df.withColumn("rating", col("rating").cast("int"))

# Step 2: Compute mean of valid ratings (1–5)
mean_rating = df.filter((col("rating") >= 1) & (col("rating") <= 5)) \
                .agg(avg("rating").alias("mean_rating")) \
                .first()["mean_rating"]

# Step 3: Replace NULL, 0, and 13 with mean, then cast to int
df = df.withColumn(
    "rating",
    when(col("rating").isin(0, 13) | col("rating").isNull(), mean_rating)
    .otherwise(col("rating")).cast("int")  # ✅ cast applied here
)


In [0]:
from pyspark.sql.functions import trim

df = df.withColumn("review_id", trim(col("review_id"))) \
               .filter(~col("review_id").rlike(r'[ "\/:]')) \
               .filter(col("review_id").rlike(r'^.{4,}$')) 

In [0]:
df.dropDuplicates(['review_id'])

DataFrame[review_id: string, review_content: string, rating: int, date: date, country: string, processed_timestamp: timestamp, month: string, year: int]

In [0]:
df = df.dropna(subset=['review_content','country', 'date'])

In [0]:
df.show(5)

+-----------------+--------------------+------+----------+-------+--------------------+-----+----+
|        review_id|      review_content|rating|      date|country| processed_timestamp|month|year|
+-----------------+--------------------+------+----------+-------+--------------------+-----+----+
|30080265285578220|Fan to take on ho...|     5|2024-10-24|     ES|2025-05-29 19:08:...|  Oct|2024|
|50147142395704160|super!!!!*****5A ...|     5|2024-09-30|     IL|2025-05-29 19:08:...|  Sep|2024|
|50147940932785790|All right, good. ...|     5|2024-09-30|     KR|2025-05-29 19:08:...|  Sep|2024|
|50147143507730984|Product as expect...|     5|2024-09-30|     US|2025-05-29 19:08:...|  Sep|2024|
|60086610431263560|It met my expecta...|     5|2024-11-08|     MX|2025-05-29 19:08:...|  Nov|2024|
+-----------------+--------------------+------+----------+-------+--------------------+-----+----+
only showing top 5 rows



In [0]:
df.select('country').display()

country
Spain
Israel
"Korea, Republic of"
United States
Mexico
"Korea, Republic of"
Singapore
France
Netherlands
Canada


In [0]:
import pycountry
from pyspark.sql.functions import col, when

# Step 1: Create (alpha_2, full name) mapping
country_map = [(c.alpha_2, c.name) for c in pycountry.countries]

# Step 2: Create Spark DataFrame from mapping
country_df = spark.createDataFrame(country_map, ['country_code', 'country_name'])

# Step 3: Join with your main df
df = df.join(country_df, df.country == country_df.country_code, how='left')

# Step 4: Replace ISO codes with full names, keep others as-is
df = df.withColumn(
    'country',
    when(col('country_name').isNotNull(), col('country_name')).otherwise(col('country'))
)

# Step 5: Drop temporary join columns
df = df.drop('country_code', 'country_name')


In [0]:
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- review_content: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- country: string (nullable = true)
 |-- processed_timestamp: timestamp (nullable = false)
 |-- month: string (nullable = true)
 |-- year: integer (nullable = true)



In [None]:
# transformation
df_final = combined_df.withColumn("date_partition", to_date("ingest_timestamp"))

In [0]:
df.write \
  .format("parquet") \
  .mode("overwrite") \
  .save(output_path)

In [0]:
silver_path = "abfss://.../silver/customer_reviews"

df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "abfss://.../checkpoints/reviews") \
    .partitionBy("date_partition") \
    .start(silver_path)