%md
# 🥈 **Silver Layer: Data Cleaning & Standardization**


The **Silver Layer** represents the "Cleaned" and "Validated" state of our data. In this stage, we transform the raw Bronze data into a queryable format by applying a strict sequence of cleaning and casting operations.

# 🚀 ETL Transformation: Customers Silver Layer
**Objective:** Clean, standardize, and deduplicate customer data to ensure a "Single Source of Truth" in the Silver Layer.

### ✅ Summary of Transformation Steps
| Step | Operation | Tool |
| :--- | :--- | :--- |
| **1** | Data ingestion  | spark.read( ) / pandas |
| **2** | Trim Whitespace | Pandas `.str.strip()` |
| **3** | Remove Duplicates | Pandas `.drop_duplicates()` |
| **4** | Rename Columns | Pandas `.rename()` |
| **5** | Save as Delta Table | PySpark (Delta Lake) |
---

### 1. Data Ingestion & Environment Switch
The process began by pulling raw data from the Bronze Layer. To perform efficient string manipulations and data profiling, the data was transitioned from Spark to Pandas.
* **Source Table:** `sales.bronz_layer.customers`
* **Conversion:** `.toPandas()` was utilized for localized, high-speed text processing.



---

In [0]:
import pandas as pd 
import pyspark.sql.functions as f 

#load data and cover it to data frame 
customers = spark.read.table("sales.bronz_layer.customers")
customers = customers.toPandas()
print(type(customers))
customers.head()

<class 'pandas.core.frame.DataFrame'>


Unnamed: 0,_CustomerID,Customer_Names,ingestion_data
0,1,Avon Corp,2026-02-02 12:53:37.641545
1,2,WakeFern,2026-02-02 12:53:37.641545
2,3,"Elorac, Corp",2026-02-02 12:53:37.641545
3,4,ETUDE Ltd,2026-02-02 12:53:37.641545
4,5,Procter Corp,2026-02-02 12:53:37.641545


### 2. Advanced Whitespace Cleaning
A major issue identified in the raw data was "invisible" whitespace. These hidden characters often cause Join failures and data mismatches.

* **Action:** Applied `.str.strip()` across all identified string columns.
* **Result:** Standardized all text entries, ensuring `"  Avon crop"`  became `"Avon crop"`.



---

In [0]:
# identify columns contain texts 
string_columns = customers.select_dtypes(include = ['object']).columns
# loop through and trim leading and traialing whitespaces 
for col in string_columns: 
    customers[col] = customers[col].astype(str).str.strip()

print(f"Trimmed {len(string_columns)} columns : {list(string_columns)}")
customers.head()

Trimmed 1 columns : ['Customer_Names']


Unnamed: 0,_CustomerID,Customer_Names,ingestion_data
0,1,Avon Corp,2026-02-02 12:53:37.641545
1,2,WakeFern,2026-02-02 12:53:37.641545
2,3,"Elorac, Corp",2026-02-02 12:53:37.641545
3,4,ETUDE Ltd,2026-02-02 12:53:37.641545
4,5,Procter Corp,2026-02-02 12:53:37.641545


### 3. Deduplication Logic
Once the whitespaces were removed, hidden duplicate records were revealed. Records that previously looked unique (due to different space padding) were now correctly identified as identical.
* **Analysis:** Used `.duplicated().sum()` to quantify the data redundancy.
* **Action:** Applied `.drop_duplicates(keep='first')` to retain only the earliest record.
* **Integrity:** Executed `.reset_index(drop=True)` to maintain a continuous, clean row index.

---

In [0]:
print(f"length of dataframe {len(customers)}")
duplicate_count = customers.duplicated().sum() # count duplicated rows 
print(f"found {duplicate_count} duplicated rows ")
customers = customers.drop_duplicates(keep ='first') #remove duplicate
customers = customers.reset_index(drop = True) # reset index 
print(f"length after removing duplicates {len(customers)}")


length of dataframe 54
found 4 duplicated rows 
length after removing duplicates 50


### 4. Schema Standardization & Renaming
To align with the Silver Layer's business-ready standards, column headers were cleaned and renamed:
* **`_CustomerID`** → `Customer_ID` (Removed technical underscores)
* **`Customer_Names`** → `Customer_Name` (Standardized pluralization)



---

### 5. Delta Lake Finalization
The refined Pandas DataFrame was converted back into a distributed Spark DataFrame and committed to the Lakehouse.
* **Format:** Saved as a **Delta Table** for ACID compliance and Time Travel.
* **Operation:** Used `.mode("overwrite")` with `.option("overwriteSchema", "true")`.
* **Destination:** `sales.silver_layer.customers`

---

In [0]:
customers = customers.rename(columns ={
  '_CustomerID' : 'Customer_ID',
  'Customer_Names' : 'Customer_Name'
})

#save data frame 
customers_spark = spark.createDataFrame(customers)
customers_spark.write\
  .mode("overwrite")\
  .format("delta")\
  .option("overwriteSchema","true")\
  .saveAsTable("sales.silver_layer.customers")

#sanity check for silver table 

In [0]:
%sql 
SELECT * FROM sales.silver_layer.customers LIMIT 10;

Customer_ID,Customer_Name,ingestion_data
1,Avon Corp,2026-02-02T12:53:37.641Z
2,WakeFern,2026-02-02T12:53:37.641Z
3,"Elorac, Corp",2026-02-02T12:53:37.641Z
4,ETUDE Ltd,2026-02-02T12:53:37.641Z
5,Procter Corp,2026-02-02T12:53:37.641Z
6,"PEDIFIX, Corp",2026-02-02T12:53:37.641Z
7,New Ltd,2026-02-02T12:53:37.641Z
8,Medsep Group,2026-02-02T12:53:37.641Z
9,Ei,2026-02-02T12:53:37.641Z
10,21st Ltd,2026-02-02T12:53:37.641Z
