# OLIST E-COMMERCE DATA ANALYSIS

## Data Cleaning and Transformation  
‚úÖ Data quality assessment (identifying NULLs and duplicates)  
‚úÖ Cleaning and standardizing geographic (City/State) and textual data  
‚úÖ Validating logical consistency

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [2]:
import pandas as pd
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [3]:
spark = SparkSession.builder.appName("Olist Ecommerce Preprocessing").master("local[2]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/16 16:52:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# GEOLOCATION

In [4]:
# --- Define Geolocation Schema ---
# Zip_code is StringType to preserve leading zeros, essential for Brazilian CEPs(Postal Addressing Code).
# Latitude and Longitude are DoubleType for high-precision coordinate math.
geo_schema = StructType([
    StructField("geolocation_zip_code_prefix", StringType(), True),
    StructField("geolocation_lat", DoubleType(), True),
    StructField("geolocation_lng", DoubleType(), True),
    StructField("geolocation_city", StringType(), True),
    StructField("geolocation_state", StringType(), True)
])

# --- Load Geolocation Dataset ---
geolocation = spark.read.format("csv") \
    .schema(geo_schema) \
    .option("header", "true") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("encoding", "UTF-8") \
    .option("multiLine", "true") \
    .load("/opt/examples/datasets/brazilian_ecommerce/olist_geolocation_dataset.csv")

In [5]:
# Display schema
geolocation.printSchema()

root
 |-- geolocation_zip_code_prefix: string (nullable = true)
 |-- geolocation_lat: double (nullable = true)
 |-- geolocation_lng: double (nullable = true)
 |-- geolocation_city: string (nullable = true)
 |-- geolocation_state: string (nullable = true)



In [7]:
# Total number of records in the geolocation dataset
geolocation.count()

                                                                                

1000163

In [8]:
# Show first 5 rows
geolocation.limit(5).toPandas()

Unnamed: 0,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_city,geolocation_state
0,1037,-23.545621,-46.639292,sao paulo,SP
1,1046,-23.546081,-46.64482,sao paulo,SP
2,1046,-23.546129,-46.642951,sao paulo,SP
3,1041,-23.544392,-46.639499,sao paulo,SP
4,1035,-23.541578,-46.641607,sao paulo,SP


### Trim String Columns

In [6]:
def trim_and_nullify_strings(df):
    """
    Cleans all string columns by:
    1. Trimming whitespace
    2. Converting to lowercase
    3. Converting empty strings to NULL (None)
    """
    clean_df = df.select([
        F.when(F.trim(F.col(c)) == "", None)                # If trimmed value is empty -> make it NULL
         .otherwise(F.lower(F.trim(F.col(c))))              # Otherwise -> keep the trimmed value and make them lowercase
         .alias(c)
        if t == 'string' else F.col(c) # Only apply this logic to string columns
        for c, t in df.dtypes
    ])
    
    return clean_df

In [10]:
# Cleans the geolocation dataset by removing leading/trailing whitespace and 
# converting empty or blank strings into proper NULL (None) values.
geolocation_nullified_trimmed = trim_and_nullify_strings(geolocation)

### Normalize Brazilian Characters

In [7]:
import unicodedata

# This function uses Unicode Normalization Form D (NFD) to separate 
# characters from their accent marks, then filters out the marks.

# Removes accents and diacritics from a string (e.g., converts '√°' to 'a').

def strip_accents(text):
    if text is None:
        return None
    # Normalize to NFD (decomposes characters like '√£' into 'a' + '~')
    # Then filter out the non-spacing mark (the accent)
    return "".join(c for c in unicodedata.normalize('NFD', text)
                   if unicodedata.category(c) != 'Mn')

# Register as a UDF
strip_accents_udf = F.udf(strip_accents)

In [12]:
# Remove accents to ensure 's√£o paulo' and 'sao paulo' are treated as the same city
geolocation_normalized_brazilian_characters = geolocation_nullified_trimmed.withColumn(
    "geolocation_city_clean",
    strip_accents_udf(F.col("geolocation_city"))
)

In [13]:
# Validation: Check for city names that still contain special characters (not a-z or space)
# to verify if the normalization function was successful.
geolocation_normalized_brazilian_characters.filter(F.col("geolocation_city").rlike(r"[^a-z\s]")) \
    .select("geolocation_city", "geolocation_city_clean").distinct() \
    .toPandas().head()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean
0,s√£o paulo,sao paulo
1,ol√≠mpia,olimpia
2,sobr√°lia,sobralia
3,a√ßucena,acucena
4,olhos d'√°gua,olhos d'agua


### Handling Hyphenated City Entries

In [8]:
# Normalizes string formatting by replacing hyphens with spaces, 
# collapsing multiple spaces into one, and trimming edges.
    
def clean_spacing_and_hyphens(col_name):
    # 1. Replace hyphens (-) with a single space
    step1 = F.regexp_replace(F.col(col_name), "-", " ")
    
    # 2. Replace 2 or more consecutive spaces with a single space
    # \s+ means "one or more whitespace characters"
    step2 = F.regexp_replace(step1, r"\s+", " ")
    
    return F.trim(step2)

In [15]:
# Remove hyphens and multiple spaces
geolocation_cleaned_spacing_and_hyphens = geolocation_normalized_brazilian_characters.withColumn(
    "geolocation_city_clean", 
    clean_spacing_and_hyphens("geolocation_city_clean")
)

In [16]:
# Validation: Check for city names to verify if the normalization function was successful.
(geolocation_cleaned_spacing_and_hyphens
    .filter(F.col("geolocation_city").like("%-%"))
    .select("geolocation_city", "geolocation_city_clean")
    .distinct()
).toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean
0,biritiba-mirim,biritiba mirim
1,col√¥nia z-3,colonia z 3
2,entre-iju√≠s,entre ijuis
3,embu-gua√ßu,embu guacu
4,bacaxa (saquarema) - distrito,bacaxa (saquarema) distrito
5,sapuca√≠-mirim,sapucai mirim
6,venda nova do imigrante-es,venda nova do imigrante es
7,mogi-mirim,mogi mirim
8,ji-parana,ji parana
9,colonia z-3,colonia z 3


### Regional Naming Normalization

In [9]:
def normalize_doeste_and_others(col_name):
    # 1. Basic cleaning: lowercase and trim
    c = F.lower(F.trim(F.col(col_name)))
    
    
    # 2. Decode HTML apostrophes (%26apos%3b)
    c = F.regexp_replace(c, "%26apos%3b", " ")
    
    # 3. The Updated Concatenation Regex (No Group 1)
    # (d)                -> Group 1: The connector ('d')
    # [^a-z]+            -> The "messy" separator (spaces, hyphens, symbols, etc.)
    # (agua|alho|...)    -> Group 2: All your target words
    # We use $1$2 to join the 'd' directly to the word (e.g., 'd' + 'oeste' = 'doeste')
    pattern = r"(d)[^a-z]+(agua|alho|alianca|arco|abadia|ajuda|alcantara|avila|oeste|arca|anta)"
    
    # Replacement: $1 (d) + $2 (target word)
    return F.regexp_replace(c, pattern, "$1$2")

In [18]:
# Refines specific regional naming conventions by mapping variations like "d' oeste" 
# to a standardized format, ensuring consistent grouping across the 'geolocation_city_clean' column.
geolocation_normalized_doeste_and_others = geolocation_cleaned_spacing_and_hyphens.withColumn(
    "geolocation_city_clean", 
    normalize_doeste_and_others("geolocation_city_clean")
)

In [19]:
# Validation: Check for city names to verify if the normalization function was successful.

# Define the pattern for specialized regional naming and characters
special_cities_pattern = "agua|alho|alianca|arco|abadia|ajuda|alcantara|avila|oeste|arca|anta"

# Filter for rows that match normalization criteria
(geolocation_normalized_doeste_and_others
    .filter(F.col("geolocation_city").rlike(special_cities_pattern))
    .select("geolocation_city", "geolocation_city_clean")
    .distinct()
).toPandas().head(20)

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean
0,ouroeste,ouroeste
1,limeira d oeste,limeira doeste
2,floresta do araguaia,floresta do araguaia
3,sao joao d alianca,sao joao dalianca
4,s√£o jo√£o do oeste,sao joao do oeste
5,santa rita de c√°ssia,santa rita de cassia
6,tagua√≠,taguai
7,paraguacu,paraguacu
8,c√≠cero dantas,cicero dantas
9,santa b√°rbara do sul,santa barbara do sul


### Removing Parentheses 

In [20]:
# There are some rows where geolocation_city contains '(' or ')'

# Filter for rows where geolocation_city contains '(' or ')'
df_with_parentheses = geolocation_normalized_doeste_and_others.filter(F.col("geolocation_city").rlike(r"\(|\)"))

# Show the distinct results to see what kind of data is there
df_with_parentheses.select("geolocation_city").distinct().toPandas()

                                                                                

Unnamed: 0,geolocation_city
0,praia grande (fund√£o) - distrito
1,tamoios (cabo frio)
2,monte gordo (camacari) - distrito
3,jacare (cabreuva)
4,bacaxa (saquarema) - distrito
5,california da barra (barra do pirai)
6,jacar√© (cabre√∫va)
7,realeza (manhuacu)
8,antunes (igaratinga)
9,penedo (itatiaia)


üìç The text outside the parentheses is retained because it represents the specific city or district name (e.g., 'bacaxa'). The text inside typically refers to the parent municipality (e.g., 'saquarema'), which becomes redundant when a separate municipality or state column already exists.

In [10]:
def remove_parentheses(col_name):
    # 1. Basic cleaning: lowercase and trim
    c = F.lower(F.trim(F.col(col_name)))
    
    # 2. Remove the first parenthesis and everything after it
    # \s* -> matches any leading spaces
    # \(    -> matches the opening parenthesis
    # .* -> matches everything until the end of the string
    c = F.regexp_replace(c, r"\s*\(.*", "")
    
    # 3. Final Polish: trim any unexpected trailing spaces
    return F.trim(c)

In [22]:
# Apply the 'remove_parentheses' function to normalize the city names
geolocation_removed_parentheses = geolocation_normalized_doeste_and_others\
    .withColumn("geolocation_city_clean", remove_parentheses("geolocation_city_clean"))


In [23]:
# Validation: Check for city names to verify if the normalization function was successful.
(geolocation_removed_parentheses\
    .filter(F.col("geolocation_city").contains("("))
    .select("geolocation_city", "geolocation_city_clean")
    .distinct()).toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean
0,praia grande (fund√£o) - distrito,praia grande
1,monte gordo (camacari) - distrito,monte gordo
2,jacar√© (cabre√∫va),jacare
3,jacare (cabreuva),jacare
4,bacaxa (saquarema) - distrito,bacaxa
5,itabatan (mucuri),itabatan
6,antunes (igaratinga),antunes
7,california da barra (barra do pirai),california da barra
8,penedo (itatiaia),penedo
9,realeza (manhuacu),realeza


### Cleaning Special Characters

In [24]:
# There are still special characters to fix
# Filters for city names containing special characters (not a-z or space)
geolocation_removed_parentheses.filter(F.col("geolocation_city_clean").rlike(r"[^a-z\s]")) \
    .select("geolocation_city", "geolocation_city_clean").distinct() \
    .toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean
0,sa¬£o paulo,sa¬£o paulo
1,col√¥nia z-3,colonia z 3
2,colonia z-3,colonia z 3
3,florian&oacute;polis,florian&oacute;polis
4,¬¥teresopolis,¬¥teresopolis
5,4¬∫ centenario,4¬∫ centenario
6,maceia¬≥,maceia¬≥
7,quilometro 14 do mutum,quilometro 14 do mutum
8,riacho fundo 2,riacho fundo 2
9,sant'ana do livramento,sant'ana do livramento


‚úÇÔ∏è Standardizes city names by splitting at common delimiters ('/', '\\', or ',') and keeping only 
the first part. This removes redundant geographical suffixes like state or country names.
example: rio de janeiro, rio de janeiro, brasil -> rio de janeiro

In [26]:
# Split the city names 
geolocation_split_city = geolocation_removed_parentheses.withColumn(
    "geolocation_city_clean",
    F.trim(F.split(F.col("geolocation_city_clean"), r"/|\\|,").getItem(0))
)

In [27]:
# Validation: Checking if city names with delimiters (/, \, ,) were correctly split.
(geolocation_split_city
    .filter(F.col("geolocation_city").rlike(r"[/\\,]")) 
    .select(
        "geolocation_city",        # The raw original column
        "geolocation_city_clean"   # The new cleaned column
    ).distinct()
).toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean
0,"rio de janeiro, rio de janeiro, brasil",rio de janeiro
1,"campo alegre de lourdes, bahia, brasil",campo alegre de lourdes


In [11]:
# Cleaning special characters 
def clean_special_characters(col_name):
    # 1. Trim
    c = F.trim(F.col(col_name))
    
    # 2. Fix HTML entities (like florian&oacute;polis)
    # This regex looks for '&', followed by letters, and ending with ';'
    c = F.regexp_replace(c, r"&[a-z]+;", "o") 
    
    # 3. Fix encoding errors (like sa¬£o paulo)
    # If you want to fix 'sao' specifically:
    c = F.regexp_replace(c, "sa¬£o", "sao")
    
    # 4. Remove leading symbols and punctuation (like * cidade	)
    # Also removes superscripts
    # We keep letters, numbers, and spaces
    c = F.regexp_replace(c, r"[^a-z0-9\s]", " ")
    
    # 5. Fix specific abbreviations (like 4o. in row 19)
    # If '4o' should just be '4'
    c = F.regexp_replace(c, r"(\d+)o\b", "$1")
    
    # 6. Final Polish: remove double spaces and trim again
    return F.trim(F.regexp_replace(c, r"\s+", " "))

In [29]:
# Apply the 'clean_special_characters' function to normalize the city names
geolocation_remove_special = geolocation_split_city.withColumn("geolocation_city_clean", clean_special_characters("geolocation_city_clean"))

In [30]:
# Validation: Check for city names to verify if the normalization function was successful.
geolocation_remove_special.filter(F.col("geolocation_city_clean").rlike(r"[^a-z\s]")) \
    .select("geolocation_city", "geolocation_city_clean").distinct() \
    .toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean
0,col√¥nia z-3,colonia z 3
1,colonia z-3,colonia z 3
2,4o. centenario,4 centenario
3,4¬∫ centenario,4 centenario
4,quilometro 14 do mutum,quilometro 14 do mutum
5,riacho fundo 2,riacho fundo 2


### Identifying and Correcting State Codes in City Fields

üìç There are city names that are only two letters long.

In [31]:
# Identify rows where city name is only 2 characters long (likely a state code)
(geolocation_remove_special
    .filter(F.length(F.col("geolocation_city")) < 3)
    .select("geolocation_city", "geolocation_city_clean", "geolocation_state")
    .distinct()
).toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean,geolocation_state
0,rj,rj,rj
1,bh,bh,mg
2,sp,sp,sp


In [32]:
# Standardize city abbreviations to their full descriptive names
geolocation_cities_fixed_state_codes = geolocation_remove_special.withColumn(
    "geolocation_city_clean",
    F.when(F.col("geolocation_city_clean") == "sp", "sao paulo")
     .when(F.col("geolocation_city_clean") == "rj", "rio de janeiro")
     .when(F.col("geolocation_city_clean") == "bh", "belo horizonte")
     .otherwise(F.col("geolocation_city_clean"))
)

In [33]:
# Validation: Check for city names to verify if the normalization function was successful.
(geolocation_cities_fixed_state_codes
    .filter(F.length(F.col("geolocation_city")) < 3)
    .select("geolocation_city", "geolocation_city_clean", "geolocation_state")
    .distinct()
).toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean,geolocation_state
0,bh,belo horizonte,mg
1,sp,sao paulo,sp
2,rj,rio de janeiro,rj


### Normalizing Regional Suffixes

‚úÇÔ∏è This step merges variations of 'do oeste' and 'de alcantara' into a single unified format 
to prevent duplicate city entries (e.g., 'do oeste' to 'doeste').

In [34]:
# Standardize regional suffixes by merging multi-part names 
geolocation_regional_suffixes = geolocation_cities_fixed_state_codes.withColumn(
    "geolocation_city_clean",
    F.regexp_replace(F.col("geolocation_city_clean"), r"do\s+oeste", "doeste")
).withColumn(
    "geolocation_city_clean",
    F.regexp_replace(F.col("geolocation_city_clean"), r"de\s+alcantara", "dalcantara")
)

In [35]:
# Validation: Check for city names to verify if the normalization function was successful.
(geolocation_regional_suffixes
    .filter(F.col("geolocation_city").rlike("do oeste|de alcantara"))
    .select("geolocation_city", "geolocation_city_clean")
    .distinct()
).toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean
0,cruzeiro do oeste,cruzeiro doeste
1,sao sebastiao do oeste,sao sebastiao doeste
2,sao jorge do oeste,sao jorge doeste
3,ouro preto do oeste,ouro preto doeste
4,iracema do oeste,iracema doeste
5,vera cruz do oeste,vera cruz doeste
6,limeira do oeste,limeira doeste
7,dom pedro de alcantara,dom pedro dalcantara
8,sao miguel do oeste,sao miguel doeste
9,limeira do oeste mg,limeira doeste mg


### Removing Trailing State Abbreviations

‚úÇÔ∏è Some rows have a state abbreviation at the end of the line (e.g., 'franca sp'), 
while others have suffix characters that are not state codes, like 'santa fe'.

In [36]:
# Inspect rows where 'geolocation_city' contains a state code at the end of the string.
(geolocation_regional_suffixes
    .filter(F.col("geolocation_city").rlike(r"\s[a-z]{2}$"))
    .select("geolocation_city", "geolocation_city_clean", "geolocation_state")
    .distinct()
).toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean,geolocation_state
0,franca sp,franca sp,sp
1,maria da fe,maria da fe,mg
2,coronel joao sa,coronel joao sa,ba
3,holambra ii,holambra ii,sp
4,sento se,sento se,ba
5,francisco sa,francisco sa,mg
6,pedro ii,pedro ii,pi
7,pio ix,pio ix,pi
8,limeira do oeste mg,limeira doeste mg,mg
9,senador sa,senador sa,ce


In [37]:
# This pattern only matches specific state codes at the end of the string
state_codes_pattern_geolocation = r"\s(mg|sp)$"

geolocation_trailing_state_code = geolocation_regional_suffixes.withColumn(
    "geolocation_city_clean",
    F.regexp_replace(F.col("geolocation_city_clean"), state_codes_pattern_geolocation, "")
)

In [38]:
# Validation: Check unique examples of city names ending in state codes to ensure correct processing.
(geolocation_trailing_state_code
    .filter(F.col("geolocation_city").rlike(r"\s[a-z]{2}$"))
    .select("geolocation_city", "geolocation_city_clean", "geolocation_state")
    .distinct()
).toPandas()

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean,geolocation_state
0,maria da fe,maria da fe,mg
1,coronel joao sa,coronel joao sa,ba
2,franca sp,franca,sp
3,holambra ii,holambra ii,sp
4,sento se,sento se,ba
5,francisco sa,francisco sa,mg
6,pedro ii,pedro ii,pi
7,pio ix,pio ix,pi
8,limeira do oeste mg,limeira doeste,mg
9,senador sa,senador sa,ce


### üïµÔ∏è  Final validation: Reviewing rows to ensure all cleaning transformations were applied correctly.

In [39]:
# üé≤ Random Sampling: Selecting 100 unique examples for manual verification
(geolocation_trailing_state_code
    .filter(F.col("geolocation_city").rlike(r"[^a-z\s]"))
    .select("geolocation_city", "geolocation_city_clean")
    .distinct()
    .orderBy(F.rand()) # This shuffles the rows randomly
    .limit(100)        # This picks the first 100 from the shuffle
    .toPandas()
)

                                                                                

Unnamed: 0,geolocation_city,geolocation_city_clean
0,olho d'agua grande,olho dagua grande
1,timb√≥,timbo
2,s√£o jos√© do jacu√≠pe,sao jose do jacuipe
3,santa vit√≥ria do palmar,santa vitoria do palmar
4,col√¥nia,colonia
5,cipot√¢nea,cipotanea
6,nova santa b√°rbara,nova santa barbara
7,santo ant√¥nio,santo antonio
8,pira√∫ba,pirauba
9,bom princ√≠pio,bom principio


### Removing Duplicates

In [12]:
def check_duplicates(df):
    """
    Checks the dataframe for exact duplicate rows across all columns.

    This function compares the total row count against the distinct row count 
    to identify if any identical records exist in the dataset.
    """
    total_count = df.count()
    unique_count = df.distinct().count()

    if total_count == unique_count:
        print("‚úÖ No exact duplicate rows found. Every row is unique.")
    else:
        duplicate_count = total_count - unique_count
        print(f"‚ö†Ô∏è Warning: Found {duplicate_count} identical duplicate rows.")

In [41]:
# Final cleanup: Trimming whitespace and converting empty strings to Null.
geolocation_final_trim = trim_and_nullify_strings(geolocation_trailing_state_code)

In [42]:
check_duplicates(geolocation_final_trim)

[Stage 51:>                                                         (0 + 2) / 2]



                                                                                

In [43]:
# Removing rows that are 100% identical across all columns
geolocation_drop_duplicates = geolocation_final_trim.dropDuplicates()

### üìä Row Count Comparison

In [44]:
# 1. Get the initial and final counts
initial_count = geolocation.count()
final_count = geolocation_drop_duplicates.count()

# 2. Calculate the difference (rows removed)
rows_removed = initial_count - final_count
# Calculate the percentage of data removed
reduction_pct = (rows_removed / initial_count) * 100

# 3. Display the results clearly 
print(f"--- Data Cleaning Summary ---")
print(f"Initial Rows:    {initial_count:,}")
print(f"Final Rows:      {final_count:,}")
print(f"Rows Removed:    {rows_removed:,}")
print(f"Data Reduction:  {reduction_pct:.2f}%")

[Stage 58:>                                                         (0 + 1) / 1]

--- Data Cleaning Summary ---
Initial Rows:    1,000,163
Final Rows:      738,332
Rows Removed:    261,831
Data Reduction:  26.18%


                                                                                

### Missing Values

In [13]:
def check_missing_values(dataframe):
    """
    Dynamically checks for missing values (Null and NaN) across all columns 
    based on their data types.
    
    This function identifies the data type of each column. For numeric 
    floating-point columns (Double/Float), it checks for both NaN and Null. 
    For all other types, it checks only for Null. It returns a summary 
    table showing counts and percentages.

    Args:
        dataframe (pyspark.sql.DataFrame): The Spark DataFrame to analyze.

    Returns:
        pandas.DataFrame: A summary containing 'missing_count' and 'percentage' for each column.
    """
    # 1. Get the total number of rows
    total_rows = dataframe.count()
    
    # 2. Map column names to their types in a dictionary
    # Example: {'order_id': 'string', 'price': 'double'}
    dtypes_dict = dict(dataframe.dtypes)

    # 3. Create a selection of counts for each column
    null_counts = dataframe.select([
        F.count(
            F.when(
                # If column is Double or Float, check both NaN and Null
                (F.isnan(c) | F.col(c).isNull()) if dtypes_dict[c] in ("double", "float")
                # For other types (Timestamp, String, Int), check only Null
                else F.col(c).isNull(), 
                c
            )
        ).alias(c)
        for c in dataframe.columns
    ])
    
    # 4. Transpose the result into a vertical table for better readability
    summary = null_counts.toPandas().transpose().rename(columns={0: 'missing_count'})

    # 5. Calculate the percentage of missing values
    summary['percentage'] = (summary['missing_count'] / total_rows) * 100
    
    return summary

In [46]:
# Check for missing values in the final 'geolocation_final_clean' dataset.
geolocation_missing_values = check_missing_values(geolocation_drop_duplicates)
print(geolocation_missing_values)

[Stage 72:>                                                         (0 + 2) / 2]

                             missing_count  percentage
geolocation_zip_code_prefix              0         0.0
geolocation_lat                          0         0.0
geolocation_lng                          0         0.0
geolocation_city                         0         0.0
geolocation_state                        0         0.0
geolocation_city_clean                   0         0.0


                                                                                

üß© No missing values in the final geolocation dataset.

### Dropping the dirty column

In [47]:
# Remove the original 'dirty' column to prevent redundancy and confusion.
# Promote the 'clean' version to the original column name.
geolocation_final_clean = geolocation_drop_duplicates.drop("geolocation_city") \
              .withColumnRenamed("geolocation_city_clean", "geolocation_city")

In [48]:
# Data is cleaned and ready for EDA

# üé≤ Selecting 100 unique examples for verification
geolocation_final_clean.orderBy(F.rand()).limit(100).toPandas()

                                                                                

Unnamed: 0,geolocation_zip_code_prefix,geolocation_lat,geolocation_lng,geolocation_state,geolocation_city
0,72251,-15.787382,-48.123714,df,brasilia
1,8255,-23.552119,-46.427861,sp,sao paulo
2,35248,-18.982912,-41.21701,mg,goiabeira
3,36060,-21.734177,-43.328326,mg,juiz de fora
4,92500,-30.120294,-51.329486,rs,guaiba
5,39648,-17.091683,-42.538661,mg,chapada do norte
6,30644,-19.995822,-44.006024,mg,belo horizonte
7,89120,-26.838116,-49.248543,sc,timbo
8,3372,-23.570794,-46.556099,sp,sao paulo
9,27345,-22.544378,-44.184281,rj,barra mansa


### ü•à Silver Layer: Storing Refined Data
The data cleaning and transformation phase is now complete. In this step, the refined and validated dataset is being  
persisted into the Silver Layer. This provides a reliable and high-quality foundation for the upcoming Exploratory Data  
Analysis (EDA) and reporting stages.

In [50]:
# Storing the refined geolocation dataset to the Silver Layer in Parquet format
geolocation_final_clean.write.mode("overwrite") \
    .parquet("/opt/examples/datasets/brazilian_ecommerce/silver/olist_geolocation.parquet")

                                                                                

# SELLERS


In [51]:
# --- Define Sellers Schema ---
# Zip_code is StringType to preserve leading zeros, essential for Brazilian CEPs(Postal Addressing Code).
sellers_schema = StructType([
    StructField("seller_id", StringType(), True),
    StructField("seller_zip_code_prefix", StringType(), True),
    StructField("seller_city", StringType(), True),
    StructField("seller_state", StringType(), True)
])

# --- Load Sellers Dataset ---
sellers = (spark.read.format("csv")
    .option("header", "true")
    .schema(sellers_schema)  
    .option("sep", ",")
    .option("quote", "\"")
    .option("escape", "\"")
    .option("encoding", "UTF-8")
    .option("multiLine", "true")
    .load("/opt/examples/datasets/brazilian_ecommerce/olist_sellers_dataset.csv")
)

In [52]:
# Display schema
sellers.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_zip_code_prefix: string (nullable = true)
 |-- seller_city: string (nullable = true)
 |-- seller_state: string (nullable = true)



In [53]:
# Total number of records
sellers.count()

3095

In [54]:
# Show first 5 rows
sellers.limit(5).toPandas()

Unnamed: 0,seller_id,seller_zip_code_prefix,seller_city,seller_state
0,3442f8959a84dea7ee197c632cb2df15,13023,campinas,SP
1,d1b65fc7debc3361ea86b5f14c68d2e2,13844,mogi guacu,SP
2,ce3ad9de960102d0677a81f5d0bb7b2d,20031,rio de janeiro,RJ
3,c0f3eea2e14555b6faeea3dd58c1b1c3,4195,sao paulo,SP
4,51a04a8a6bdcb23deccc82b0b80742cf,12914,braganca paulista,SP


### Trim String Columns

In [55]:
# Cleans the sellers dataset by removing leading/trailing whitespace and 
# converting empty or blank strings into proper NULL (None) values.
sellers_nullified_trimmed = trim_and_nullify_strings(sellers)

### Normalize Brazilian Characters

In [56]:
# Remove accents to ensure 's√£o paulo' and 'sao paulo' are treated as the same city
sellers_normalized_brazilian_characters = sellers_nullified_trimmed.withColumn(
    "seller_city_clean",
    strip_accents_udf(F.col("seller_city"))
)

In [57]:
# Validation: Check for city names that still contain special characters (not a-z or space)
# to verify if the normalization function was successful.
sellers_normalized_brazilian_characters.filter(F.col("seller_city").rlike(r"[^a-z\s]")) \
    .select("seller_city", "seller_city_clean", "seller_state") \
    .toPandas().head()

Unnamed: 0,seller_city,seller_city_clean,seller_state
0,lages - sc,lages - sc,sc
1,auriflama/sp,auriflama/sp,sp
2,sao paulo / sao paulo,sao paulo / sao paulo,sp
3,saÃÉo paulo,sao paulo,sp
4,santa barbara d¬¥oeste,santa barbara d¬¥oeste,sp


### Handling Hyphenated City Entries

In [58]:
# Remove hyphens and multiple spaces
sellers_cleaned_spacing_and_hyphens = sellers_normalized_brazilian_characters.withColumn(
    "seller_city_clean", 
    clean_spacing_and_hyphens("seller_city_clean")  
)

In [59]:
# Validation: Check for city names to verify if the normalization function was successful.
(sellers_cleaned_spacing_and_hyphens
    .filter(F.col("seller_city").like("%-%"))
    .select("seller_city", "seller_city_clean", "seller_state")
    .distinct()
).toPandas()

Unnamed: 0,seller_city,seller_city_clean,seller_state
0,andira-pr,andira pr,pr
1,lages - sc,lages sc,sc
2,sao paulo - sp,sao paulo sp,sp


### Regional Naming Normalization

In [60]:
# Refines specific regional naming conventions by mapping variations like "d' oeste" 
# to a standardized format, ensuring consistent grouping across the 'seller_city_clean' column.
sellers_normalized_doeste_and_others = sellers_cleaned_spacing_and_hyphens.withColumn(
    "seller_city_clean", 
    normalize_doeste_and_others("seller_city_clean")
)

In [61]:
# Validation: Check for city names to verify if the normalization function was successful.

# Define the pattern for specialized regional naming and characters
special_cities_pattern = "agua|alho|alianca|arco|abadia|ajuda|alcantara|avila|oeste|arca|anta"

# Filter for rows that match normalization criteria
(sellers_normalized_doeste_and_others
    .filter(F.col("seller_city").rlike(special_cities_pattern))
    .select("seller_city", "seller_city_clean")
    .distinct()
).toPandas()

Unnamed: 0,seller_city,seller_city_clean
0,garca,garca
1,sao miguel d'oeste,sao miguel doeste
2,rio do oeste,rio do oeste
3,jaragua,jaragua
4,arraial d'ajuda (porto seguro),arraial dajuda (porto seguro)
5,santa barbara d'oeste,santa barbara doeste
6,cataguases,cataguases
7,abadia de goias,abadia de goias
8,jaguaruna,jaguaruna
9,santa cruz do sul,santa cruz do sul


###  Removing Parentheses

In [62]:
# There are some rows where seller_city contains '(' or ')'

# Apply the 'remove_parentheses' function to normalize the city names
sellers_removed_parentheses = sellers_normalized_doeste_and_others\
    .withColumn("seller_city_clean", remove_parentheses("seller_city_clean"))

In [63]:
# Validation: Check for city names to verify if the normalization function was successful.
(sellers_removed_parentheses\
    .filter(F.col("seller_city").contains("("))
    .select("seller_city", "seller_city_clean")
    .distinct()).toPandas()

Unnamed: 0,seller_city,seller_city_clean
0,arraial d'ajuda (porto seguro),arraial dajuda


### Cleaning Special Characters

In [64]:
# There are still special characters to fix
# Filters for city names containing special characters (not a-z or space)
sellers_removed_parentheses.filter(F.col("seller_city_clean").rlike(r"[^a-z\s]")) \
    .select("seller_city", "seller_city_clean").distinct() \
    .toPandas()

Unnamed: 0,seller_city,seller_city_clean
0,vendas@creditparts.com.br,vendas@creditparts.com.br
1,sao paulo / sao paulo,sao paulo / sao paulo
2,maua/sao paulo,maua/sao paulo
3,carapicuiba / sao paulo,carapicuiba / sao paulo
4,mogi das cruzes / sp,mogi das cruzes / sp
5,auriflama/sp,auriflama/sp
6,sp / sp,sp / sp
7,jacarei / sao paulo,jacarei / sao paulo
8,"novo hamburgo, rio grande do sul, brasil","novo hamburgo, rio grande do sul, brasil"
9,rio de janeiro \rio de janeiro,rio de janeiro \rio de janeiro


In [65]:
# Split the city names 
sellers_split_city = sellers_removed_parentheses.withColumn(
    "seller_city_clean",
    F.trim(F.split(F.col("seller_city_clean"), r"/|\\|,").getItem(0))
)

In [66]:
# Validation: Check for city names to verify if the normalization function was successful.
(sellers_split_city
    .filter(F.col("seller_city").rlike(r"[,/\\\\]"))
    .select("seller_city", "seller_city_clean", "seller_state")
    .distinct()
).toPandas()

Unnamed: 0,seller_city,seller_city_clean,seller_state
0,mogi das cruzes / sp,mogi das cruzes,sp
1,maua/sao paulo,maua,sp
2,sbc/sp,sbc,sp
3,sao paulo / sao paulo,sao paulo,sp
4,auriflama/sp,auriflama,sp
5,"novo hamburgo, rio grande do sul, brasil",novo hamburgo,rs
6,rio de janeiro \rio de janeiro,rio de janeiro,rj
7,sao sebastiao da grama/sp,sao sebastiao da grama,sp
8,barbacena/ minas gerais,barbacena,mg
9,"rio de janeiro, rio de janeiro, brasil",rio de janeiro,rj


### Identifying and Correcting State Codes in City Fields

In [67]:
# Identify rows where the city name is only 2 characters long (likely a state code)
(sellers_split_city
    .filter(F.length(F.col("seller_city")) < 3)
    .select("seller_city", "seller_city_clean", "seller_state")
    .distinct()
).toPandas()

Unnamed: 0,seller_city,seller_city_clean,seller_state
0,sp,sp,sp


In [68]:
# Replaces the state code "sp" with "sao paulo"
sellers_cities_fixed_state_codes = sellers_split_city.withColumn(
    "seller_city_clean",
    F.when(F.col("seller_city_clean") == "sp", "sao paulo")
     .otherwise(F.col("seller_city_clean"))
)

In [69]:
# Validation: Check for city names to verify if the normalization function was successful.
(sellers_cities_fixed_state_codes
    .filter(F.length(F.col("seller_city")) < 3)
    .select("seller_city", "seller_city_clean", "seller_state")
    .distinct()
).toPandas()

Unnamed: 0,seller_city,seller_city_clean,seller_state
0,sp,sao paulo,sp


### Normalizing Regional Suffixes

‚úÇÔ∏è This step merges variations of 'do oeste' and 'de alcantara' into a single unified format 
to prevent duplicate city entries (e.g., 'do oeste' to 'doeste').

In [70]:
# Standardize regional suffixes by merging multi-part names 
sellers_regional_suffixes = sellers_cities_fixed_state_codes.withColumn(
    "seller_city_clean",
    F.regexp_replace(F.col("seller_city_clean"), r"do\s+oeste", "doeste")
).withColumn(
    "seller_city_clean",
    F.regexp_replace(F.col("seller_city_clean"), r"de\s+alcantara", "dalcantara")
)

In [71]:
# Validation: Check for city names to verify if the normalization function was successful.
(sellers_regional_suffixes
    .filter(F.col("seller_city").rlike("do oeste|de alcantara"))
    .select("seller_city", "seller_city_clean")
    .distinct()
).toPandas()

Unnamed: 0,seller_city,seller_city_clean
0,sao miguel do oeste,sao miguel doeste
1,rio do oeste,rio doeste
2,entre rios do oeste,entre rios doeste
3,formosa do oeste,formosa doeste


### Removing Trailing State Abbreviations

‚úÇÔ∏è Some rows have a state abbreviation at the end of the line (e.g., 'franca sp'), 
while others have suffix characters that are not state codes, like 'santa fe'.

In [72]:
# Inspect rows where 'seller_city' contains a state code at the end of the string.
(sellers_regional_suffixes
    .filter(F.col("seller_city_clean").rlike(r"\s[a-z]{2}$"))
    .select("seller_city", "seller_city_clean", "seller_state")
    .distinct()
).toPandas()

Unnamed: 0,seller_city,seller_city_clean,seller_state
0,sao paulo sp,sao paulo sp,sp
1,angra dos reis rj,angra dos reis rj,rj
2,andira-pr,andira pr,pr
3,brasilia df,brasilia df,df
4,aguas claras df,aguas claras df,sp
5,lages - sc,lages sc,sc
6,sao paulo - sp,sao paulo sp,sp


‚ö†Ô∏è Instead of a general state code pattern that contains all state codes in Brazil, a dataset-specific regex was used to remove state suffixes. This prevents the accidental deletion of name-internal characters that mimic state codes, such as in the case of 'Santo Se'."

In [73]:
# This pattern only matches specific state codes at the end of the string
state_codes_pattern_sellers = r"\s(mg|sp|rj|pr|df|sc)$"

sellers_trailing_state_code = sellers_regional_suffixes.withColumn(
    "seller_city_clean",
    F.regexp_replace(F.col("seller_city_clean"), state_codes_pattern_sellers, "")
)

In [74]:
# Validation: Check unique examples of city names ending in state codes to ensure correct processing.
(sellers_trailing_state_code
    .filter(F.col("seller_city").rlike(r"\s[a-z]{2}$"))
    .select("seller_city", "seller_city_clean", "seller_state")
    .distinct()
).toPandas()

Unnamed: 0,seller_city,seller_city_clean,seller_state
0,mogi das cruzes / sp,mogi das cruzes,sp
1,sao paulo sp,sao paulo,sp
2,aguas claras df,aguas claras,sp
3,angra dos reis rj,angra dos reis,rj
4,cariacica / es,cariacica,es
5,sp / sp,sao paulo,sp
6,lages - sc,lages,sc
7,sao paulo - sp,sao paulo,sp
8,brasilia df,brasilia,df


**‚ö†Ô∏è Correction:** Manually fixing a state mismatch for **√Åguas Claras**. Even though the original data says 'sp', this city belongs to 'df'.

In [75]:
# Fixing state mismatch
sellers_trailing_state_code = sellers_trailing_state_code.withColumn(
    "seller_state",
    F.when(F.col("seller_city_clean") == "aguas claras", "df")
     .otherwise(F.col("seller_state"))
)

In [76]:
# Validation: Check unique examples of city names ending in state codes to ensure correct processing.
(sellers_trailing_state_code
    .filter(F.col("seller_city").rlike(r"\s[a-z]{2}$"))
    .select("seller_city", "seller_city_clean", "seller_state")
    .distinct()
).toPandas()

Unnamed: 0,seller_city,seller_city_clean,seller_state
0,mogi das cruzes / sp,mogi das cruzes,sp
1,sao paulo sp,sao paulo,sp
2,aguas claras df,aguas claras,df
3,angra dos reis rj,angra dos reis,rj
4,cariacica / es,cariacica,es
5,sp / sp,sao paulo,sp
6,lages - sc,lages,sc
7,sao paulo - sp,sao paulo,sp
8,brasilia df,brasilia,df


### Manual Fixing Some Incorrect Data

In [44]:
# Manual data cleaning for the seller_city column:
# 's jose do rio preto' -> 'sao jose do rio preto', 
# 'sbc' -> 'sao bernardo do campo'
# 'vendas@creditparts.com.br' and '04482255'-> Replacing with the correct city name associated with their zip codes


In [77]:
# Validation: Check for city names that still contain special characters (not a-z or space)
# to verify if the normalization function was successful.
sellers_trailing_state_code.filter(F.col("seller_city_clean").rlike(r"[^a-z\s]")) \
    .select("seller_city", "seller_city_clean", "seller_state") \
    .toPandas()


Unnamed: 0,seller_city,seller_city_clean,seller_state
0,04482255,04482255,rj
1,vendas@creditparts.com.br,vendas@creditparts.com.br,pr


In [78]:
manual_fixes = {
    "s jose do rio preto": "sao jose do rio preto",
    "sbc": "sao bernardo do campo",
    "vendas@creditparts.com.br": "maringa", # Replace with the actual city name for that zip
    "04482255": "rio de janeiro", # Replace with the actual city name for that zip
    "ao bernardo do campo" : "sao bernardo do campo",
    "scao jose do rio pardo" : "sao jose do rio pardo"
}
sellers_manual_fix = sellers_trailing_state_code.replace(manual_fixes, subset=["seller_city_clean"])

In [79]:
# Validation: Check for city names that still contain special characters (not a-z or space)
# to verify if the normalization function was successful.
sellers_manual_fix.filter(F.col("seller_city_clean").rlike(r"[^a-z\s]")) \
    .select("seller_city", "seller_city_clean", "seller_state") \
    .toPandas()

                                                                                

Unnamed: 0,seller_city,seller_city_clean,seller_state


### üïµÔ∏è  Final validation: Reviewing rows to ensure all cleaning transformations were applied correctly.

In [81]:
# üé≤ Random Sampling: Selecting 100 unique examples for manual verification
(sellers_manual_fix
    .filter(F.col("seller_city").rlike(r"[^a-z\s]"))
    .select("seller_city", "seller_city_clean", "seller_state")
    .distinct()
    .orderBy(F.rand()) # This shuffles the rows randomly
    .limit(100)        # This picks the first 100 from the shuffle
    .toPandas()
)

Unnamed: 0,seller_city,seller_city_clean,seller_state
0,cariacica / es,cariacica,es
1,maua/sao paulo,maua,sp
2,sp / sp,sao paulo,sp
3,santo andre/sao paulo,santo andre,sp
4,saÃÉo paulo,sao paulo,sp
5,carapicuiba / sao paulo,carapicuiba,sp
6,vendas@creditparts.com.br,maringa,pr
7,sao paulo / sao paulo,sao paulo,sp
8,sbc/sp,sao bernardo do campo,sp
9,andira-pr,andira,pr


### Removing Duplicates

In [88]:
# Final cleanup: Trimming whitespace and converting empty strings to Null.
sellers_final_trim = trim_and_nullify_strings(sellers_manual_fix)

In [89]:
# Check for duplicates
check_duplicates(sellers_final_trim)

‚úÖ No exact duplicate rows found. Every row is unique.


‚úÖ No exact duplicate rows found. Every row is unique.

### Missing Values

In [87]:
# Check for missing values in the final 'sellers_final_clean' dataset.
sellers_missing_values = check_missing_values(sellers_final_trim)
print(sellers_missing_values)

                        missing_count  percentage
seller_id                           0         0.0
seller_zip_code_prefix              0         0.0
seller_city                         0         0.0
seller_state                        0         0.0
seller_city_clean                   0         0.0


üß© No missing values in the final sellers dataset.

### Dropping the Dirty Column

In [90]:
# Step 1: Remove the original unrefined 'seller_city' column to prevent redundancy.
# Step 2: Promote the 'seller_city_clean' version to the original column name.
# Verified: The dataset is free of duplicate records and ready for the Silver Layer. ‚úÖ
sellers_final_clean = sellers_final_trim.drop("seller_city") \
    .withColumnRenamed("seller_city_clean", "seller_city")

In [91]:
# üé≤ Selecting 100 unique examples for verification and quality check.
sellers_final_clean.orderBy(F.rand()).limit(100).toPandas()

Unnamed: 0,seller_id,seller_zip_code_prefix,seller_state,seller_city
0,f1198c871e0171278b63f40d112caeec,1220,sp,sao paulo
1,3afc536624bc2e65bddfb5db33cc01f3,30150,mg,belo horizonte
2,fe87f472055fbcf1d7e691c00b1560dc,13085,sp,campinas
3,c97aa4ee7420f937da13b7f9e2228b99,93510,rs,novo hamburgo
4,d9e8c084b68fe958861d8f2c21202e6b,5528,sp,sao paulo
5,60562ab00b8054280520d390c8c0045c,4809,sp,sao paulo
6,95f83f51203c626648c875dd41874c7f,37564,mg,borda da mata
7,15aac934c58d886785ac1b17953ea898,29480,es,muqui
8,62760d278921b5f352461620d68a9cee,13320,sp,salto
9,daeb5653dd96c1b11860f72209795012,31310,mg,belo horizonte


### ü•à Silver Layer: Storing Refined Data
The data cleaning and transformation phase is now complete. In this step, the refined and validated dataset is being  
persisted into the Silver Layer. This provides a reliable and high-quality foundation for the upcoming Exploratory Data Analysis (EDA) and reporting stages.

In [120]:
# Storing the refined sellers dataset to the Silver Layer in Parquet format
sellers_final_clean.write.mode("overwrite") \
    .parquet("/opt/examples/datasets/brazilian_ecommerce/silver/olist_sellers.parquet")

# CUSTOMERS


In [93]:
# --- Define Customers Schema ---
# Zip_code is StringType to preserve leading zeros, essential for Brazilian CEPs(Postal Addressing Code).
customers_schema = StructType([
    StructField("customer_id", StringType(), True),
     StructField("customer_unique_id", StringType(), True),
    StructField("customer_zip_code_prefix", StringType(), True),
    StructField("customer_city", StringType(), True),
    StructField("customer_state", StringType(), True)
])

# Load the dataset using the manual schema
customers = (spark.read.format("csv")
    .option("header", "true")
    .schema(customers_schema)  
    .option("sep", ",")
    .option("quote", "\"")
    .option("escape", "\"")
    .option("encoding", "UTF-8")
    .option("multiLine", "true")
    .load("/opt/examples/datasets/brazilian_ecommerce/olist_customers_dataset.csv")
)

In [94]:
# Display schema
customers.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)



üÜî **customer_id:** A unique key for each order placed. Changes with every new purchase.  
üë§ **customer_unique_id:** A unique identifier for the actual person.Remains the same across multiple orders.

In [95]:
# Show first 5 rows
customers.limit(5).toPandas()

Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_city,customer_state
0,06b8999e2fba1a1fbc88172c00ba8bc7,861eff4711a542e4b93843c6dd7febb0,14409,franca,SP
1,18955e83d337fd6b2def6b18a428ac77,290c77bc529b7ac935b93aa66c333dc3,9790,sao bernardo do campo,SP
2,4e7b3e00288586ebd08712fdd0374a03,060e732b5b29e8181a18229c7b0b2b5e,1151,sao paulo,SP
3,b2b6027bc5c5109e529d4dc6358b12c3,259dac757896d24d7702b9acbbff3f3c,8775,mogi das cruzes,SP
4,4f2d8ab171c80ec8364f7c12e35b23ad,345ecd01c38d18a9036ed96c73b8d066,13056,campinas,SP


In [96]:
# Total number of records of the customers dataset
customers.count()

99441

### Trim String Columns

In [97]:
# Cleans the customers dataset by removing leading/trailing whitespace and 
# converting empty or blank strings into proper NULL (None) values.
customers_nullified_trimmed = trim_and_nullify_strings(customers)

In [98]:
# Inspection: Check for city names that contain special characters (not a-z or space)
customers_nullified_trimmed.filter(F.col("customer_city").rlike(r"[^a-z\s]")) \
    .select("customer_city").distinct() \
    .toPandas()

Unnamed: 0,customer_city
0,igarape-miri
1,machadinho d'oeste
2,sao jorge d'oeste
3,olho d'agua das cunhas
4,entre-ijuis
5,santa barbara d'oeste
6,alvorada d'oeste
7,governador dix-sept rosado
8,varre-sai
9,pau d'arco


### Normalize Brazilian Characters

In [99]:
# Remove accents to ensure 's√£o paulo' and 'sao paulo' are treated as the same city
customers_normalized_brazilian_characters = customers_nullified_trimmed.withColumn(
    "customer_city_clean",
    strip_accents_udf(F.col("customer_city"))
)

In [101]:
# Validation: Check for city names that still contain special characters (not a-z or space)
# to verify if the normalization function was successful.
customers_normalized_brazilian_characters.filter(F.col("customer_city").rlike(r"[^a-z\s]")) \
    .select("customer_city", "customer_city_clean", "customer_state").distinct() \
    .toPandas().head(50)

Unnamed: 0,customer_city,customer_city_clean,customer_state
0,diamante d'oeste,diamante d'oeste,pr
1,mae d'agua,mae d'agua,pb
2,mirassol d'oeste,mirassol d'oeste,mt
3,pingo-d'agua,pingo-d'agua,mg
4,figueiropolis d'oeste,figueiropolis d'oeste,mt
5,nova brasilandia d'oeste,nova brasilandia d'oeste,ro
6,nao-me-toque,nao-me-toque,rs
7,guarda-mor,guarda-mor,mg
8,santa rita d'oeste,santa rita d'oeste,sp
9,alta floresta d'oeste,alta floresta d'oeste,ro


### Handling Hyphenated City Entries

In [102]:
# Remove hyphens and multiple spaces
customers_cleaned_spacing_and_hyphens = customers_normalized_brazilian_characters.withColumn(
    "customer_city_clean", 
    clean_spacing_and_hyphens("customer_city_clean")   
)

In [103]:
# Validation: Check for city names to verify if the normalization function was successful.
(customers_cleaned_spacing_and_hyphens
    .filter(F.col("customer_city").like("%-%"))
    .select("customer_city", "customer_city_clean", "customer_state")
    .distinct()
).toPandas()

Unnamed: 0,customer_city,customer_city_clean,customer_state
0,guajara-mirim,guajara mirim,ro
1,sapucai-mirim,sapucai mirim,mg
2,embu-guacu,embu guacu,sp
3,tome-acu,tome acu,pa
4,ceara-mirim,ceara mirim,rn
5,guarda-mor,guarda mor,mg
6,igarape-acu,igarape acu,pa
7,mogi-guacu,mogi guacu,sp
8,governador dix-sept rosado,governador dix sept rosado,rn
9,cipo-guacu,cipo guacu,sp


### Regional Naming Normalization

In [104]:
# Refines specific regional naming conventions by mapping variations like "d' oeste" 
# to a standardized format, ensuring consistent grouping across the 'customer_city_clean' column.
customers_normalized_doeste_and_others = customers_cleaned_spacing_and_hyphens.withColumn(
    "customer_city_clean", 
    normalize_doeste_and_others("customer_city_clean")
)

In [105]:
# Validation: Check for city names to verify if the normalization function was successful.

# Define the pattern for specialized regional naming and characters
special_cities_pattern = "agua|alho|alianca|arco|abadia|ajuda|alcantara|avila|oeste|arca|anta"

# Filter for rows that match normalization criteria
(customers_normalized_doeste_and_others
    .filter(F.col("customer_city").rlike(special_cities_pattern))
    .select("customer_city", "customer_city_clean")
    .distinct()
).toPandas().head(50)

Unnamed: 0,customer_city,customer_city_clean
0,ouroeste,ouroeste
1,floresta do araguaia,floresta do araguaia
2,paraguacu,paraguacu
3,olhos d'agua,olhos dagua
4,central de santa helena,central de santa helena
5,paraiso das aguas,paraiso das aguas
6,garca,garca
7,santa rita do itueto,santa rita do itueto
8,santa fe de goias,santa fe de goias
9,vera cruz do oeste,vera cruz do oeste


In [106]:
# Chech for special characters to fix
# Filters for city names containing special characters (not a-z or space)
customers_normalized_doeste_and_others.filter(F.col("customer_city_clean").rlike(r"[^a-z\s]")) \
    .select("customer_city", "customer_city_clean").distinct() \
    .toPandas()

                                                                                

Unnamed: 0,customer_city,customer_city_clean
0,quilometro 14 do mutum,quilometro 14 do mutum


üßπ Successfully removed special characters from all entries

### Identifying and Correcting State Codes in City Fields

In [108]:
# Identify rows where the city name is only 2 characters long (likely a state code)
customer_2letter_city = (customers_normalized_doeste_and_others
    .filter(F.length(F.col("customer_city")) < 3)
    .select("customer_city", "customer_city_clean", "customer_state")
    .distinct()
)

# Display the results to verify which cities need fixing
customer_2letter_city.toPandas()

Unnamed: 0,customer_city,customer_city_clean,customer_state


üßπ There are no cities consisting solely of state codes.

### Normalizing Regional Suffixes

In [109]:
# Standardize regional suffixes by merging multi-part names 
customers_regional_suffixes = customers_normalized_doeste_and_others.withColumn(
    "customer_city_clean",
    F.regexp_replace(F.col("customer_city_clean"), r"do\s+oeste", "doeste")
).withColumn(
    "customer_city_clean",
    F.regexp_replace(F.col("customer_city_clean"), r"de\s+alcantara", "dalcantara")
)

In [110]:
# Validation: Check for city names to verify if the normalization function was successful.
(customers_regional_suffixes
    .filter(F.col("customer_city").rlike("do oeste|de alcantara"))
    .select("customer_city", "customer_city_clean")
    .distinct()
).toPandas().head(50)

Unnamed: 0,customer_city,customer_city_clean
0,cruzeiro do oeste,cruzeiro doeste
1,sao jorge do oeste,sao jorge doeste
2,ouro preto do oeste,ouro preto doeste
3,vera cruz do oeste,vera cruz doeste
4,limeira do oeste,limeira doeste
5,sao miguel do oeste,sao miguel doeste
6,luizlandia do oeste,luizlandia doeste
7,sao pedro de alcantara,sao pedro dalcantara
8,sao lourenco do oeste,sao lourenco doeste
9,rio do oeste,rio doeste


### Removing Trailing State Abbreviations

In [111]:
# Identify rows where the city names that end with a space followed by exactly two letters.
(customers_regional_suffixes
    .filter(F.col("customer_city").rlike(r"\s[a-z]{2}$"))
    .select("customer_city", "customer_city_clean", "customer_state")
    .distinct()
).toPandas()

Unnamed: 0,customer_city,customer_city_clean,customer_state
0,maria da fe,maria da fe,mg
1,coronel joao sa,coronel joao sa,ba
2,holambra ii,holambra ii,sp
3,sento se,sento se,ba
4,francisco sa,francisco sa,mg
5,pedro ii,pedro ii,pi
6,pio ix,pio ix,pi
7,santa fe,santa fe,pr
8,bonito de santa fe,bonito de santa fe,pb


In [None]:
üßπ There are no city names that contain trailing state codes.

### üïµÔ∏è  Final validation: Reviewing rows to ensure all cleaning transformations were applied correctly.

In [112]:
# üé≤ Selecting 100 random examples for verification
(customers_regional_suffixes
    .filter(F.col("customer_city").rlike(r"[^a-z\s]"))
    .select("customer_city", "customer_city_clean")
    .distinct()
    .orderBy(F.rand()) # This shuffles the rows randomly
    .limit(100)        # This picks the first 100 from the shuffle
    .toPandas()
)

Unnamed: 0,customer_city,customer_city_clean
0,santa clara d'oeste,santa clara doeste
1,ceara-mirim,ceara mirim
2,nova brasilandia d'oeste,nova brasilandia doeste
3,lagoa d'anta,lagoa danta
4,tome-acu,tome acu
5,pau d'arco,pau darco
6,embu-guacu,embu guacu
7,arraial d'ajuda,arraial dajuda
8,machadinho d'oeste,machadinho doeste
9,perola d'oeste,perola doeste


### Removing Duplicates

In [113]:
# Final cleanup: Trimming whitespace and converting empty strings to Null.
customers_final_trim = trim_and_nullify_strings(customers_regional_suffixes)

In [114]:
# Check for duplicates
check_duplicates(customers_final_trim)



‚úÖ No exact duplicate rows found. Every row is unique.


                                                                                

‚úÖ No exact duplicate rows found. Every row is unique.

### Missing Values

In [115]:
# Check for missing values in the final customers dataset.
customers_missing_values = check_missing_values(customers_final_trim)
print(customers_missing_values)

[Stage 210:>                                                        (0 + 1) / 1]

                          missing_count  percentage
customer_id                           0         0.0
customer_unique_id                    0         0.0
customer_zip_code_prefix              0         0.0
customer_city                         0         0.0
customer_state                        0         0.0
customer_city_clean                   0         0.0


                                                                                

üß© No missing values in the final customers dataset.

### Dropping the Dirty Column

In [117]:
# Step 1: Remove the original unrefined 'customer_city' column.
# Step 2: Promote the 'customer_city_clean' version to the original column name.
# Verified: The dataset is free of duplicate records and ready for the Silver Layer. ‚úÖ
customers_final_clean = customers_final_trim.drop("customer_city") \
    .withColumnRenamed("customer_city_clean", "customer_city")

In [118]:
# üé≤ Selecting 100 random examples for verification and quality check.
customers_final_clean.orderBy(F.rand()).limit(100).toPandas()

                                                                                

Unnamed: 0,customer_id,customer_unique_id,customer_zip_code_prefix,customer_state,customer_city
0,2f7001da9e597c4b707313a341a20614,38c576114024f3c3e45f83e7b5ee0c56,59965,rn,alexandria
1,e8ac9b27d2b90428825cedfe54b8c455,ff0d4869d957ba46496a840133691be6,4726,sp,sao paulo
2,8638b28024054f46722ade334452c048,8e2362db1bcd46ad2de37f1d63e7203e,18065,sp,sorocaba
3,4365561a2a4c1d768592aa0536e503c6,580c11d5df4eda0d717f716067b52de7,38400,mg,uberlandia
4,3909643728e469c11fe39b47013e93e9,7f94443aaca299f1234b9618d3f5805b,36240,mg,santos dumont
5,14e618e8adb379c1d9727e4cfb189daa,58b2572b34b182a7b5b04766a86bfb95,15057,sp,sao jose do rio preto
6,557680a586ced70b6898cbe01a35c87d,21bc36e41f9312e83d48bdba400f702d,22450,rj,rio de janeiro
7,438c7f59588a3d370200db20fb5c5a85,d4254847c058df4dbe92d4e6206da691,73350,df,brasilia
8,2004beab18003b04d2e2603e1055d589,d936da095a235937dbcc81ab08bac901,23860,rj,mangaratiba
9,ece0add9b18630501bd9a4020bbf2637,5a740d409f9eed7b1051a2b08f8198a5,13049,sp,campinas


### ü•à Silver Layer: Storing Refined Data  
The data cleaning and transformation phase is now complete. In this step, the refined and validated dataset is being persisted into the Silver Layer. This provides a reliable and high-quality foundation for the upcoming Exploratory Data
Analysis (EDA) and reporting stages.

In [119]:
# Storing the refined customers dataset to the Silver Layer in Parquet format
customers_final_clean.write.mode("overwrite") \
    .parquet("/opt/examples/datasets/brazilian_ecommerce/silver/olist_customers.parquet")

                                                                                

# ORDERS

In [14]:
# --- Define Orders Schema ---
orders_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("order_purchase_timestamp", TimestampType(), True),
    StructField("order_approved_at", TimestampType(), True),
    StructField("order_delivered_carrier_date", TimestampType(), True),
    StructField("order_delivered_customer_date", TimestampType(), True),
    StructField("order_estimated_delivery_date", TimestampType(), True)
])

# Load the dataset using the manual schema
orders = spark.read.format("csv") \
    .schema(orders_schema) \
    .option("header", "true") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("encoding", "UTF-8") \
    .option("multiLine", "true") \
    .load("/opt/examples/datasets/brazilian_ecommerce/olist_orders_dataset.csv")

In [15]:
# Display schema
orders.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: timestamp (nullable = true)
 |-- order_approved_at: timestamp (nullable = true)
 |-- order_delivered_carrier_date: timestamp (nullable = true)
 |-- order_delivered_customer_date: timestamp (nullable = true)
 |-- order_estimated_delivery_date: timestamp (nullable = true)



In [16]:
# Show first 5 rows
orders.limit(5).toPandas()

                                                                                

Unnamed: 0,order_id,customer_id,order_status,order_purchase_timestamp,order_approved_at,order_delivered_carrier_date,order_delivered_customer_date,order_estimated_delivery_date
0,e481f51cbdc54678b7cc49136f2d6af7,9ef432eb6251297304e76186b10a928d,delivered,2017-10-02 10:56:33,2017-10-02 11:07:15,2017-10-04 19:55:00,2017-10-10 21:25:13,2017-10-18
1,53cdb2fc8bc7dce0b6741e2150273451,b0830fb4747a6c6d20dea0b8c802d7ef,delivered,2018-07-24 20:41:37,2018-07-26 03:24:27,2018-07-26 14:31:00,2018-08-07 15:27:45,2018-08-13
2,47770eb9100c2d0c44946d9cf07ec65d,41ce2a54c0b03bf3443c3d931a367089,delivered,2018-08-08 08:38:49,2018-08-08 08:55:23,2018-08-08 13:50:00,2018-08-17 18:06:29,2018-09-04
3,949d5b44dbf5de918fe9c16f97b45f8a,f88197465ea7920adcdbec7375364d82,delivered,2017-11-18 19:28:06,2017-11-18 19:45:59,2017-11-22 13:39:59,2017-12-02 00:28:42,2017-12-15
4,ad21c59c0840e6cb83a9ceb5573f8159,8ab97904e6daea8866dbdbc4fb7aad2c,delivered,2018-02-13 21:18:39,2018-02-13 22:20:29,2018-02-14 19:46:34,2018-02-16 18:17:02,2018-02-26


### Trim String Columns

In [17]:
# Cleans the orders dataset by removing leading/trailing whitespace and 
# converting empty or blank strings into proper NULL (None) values.
orders_nullified_trimmed = trim_and_nullify_strings(orders)

### Distinct Order Status

In [18]:
# Select the 'order_status' column and display only its unique (distinct) values
orders.select("order_status").distinct().toPandas()

                                                                                

Unnamed: 0,order_status
0,shipped
1,canceled
2,approved
3,invoiced
4,created
5,delivered
6,unavailable
7,processing


### Missing Values

In [19]:
# Check for missing values in the orders dataset.
orders_missing_values = check_missing_values(orders_nullified_trimmed)
print(orders_missing_values)

[Stage 7:>                                                          (0 + 1) / 1]

                               missing_count  percentage
order_id                                   0    0.000000
customer_id                                0    0.000000
order_status                               0    0.000000
order_purchase_timestamp                   0    0.000000
order_approved_at                        160    0.160899
order_delivered_carrier_date            1783    1.793023
order_delivered_customer_date           2965    2.981668
order_estimated_delivery_date              0    0.000000


                                                                                

üîç Core columns like order_id are observed to be clean with no missing values. However, empty entries are noted in the delivery dates. These missing values will not be filled at this stage. Instead, they will be analyzed during the EDA phase to determine if the orders were canceled or if they are still being shipped.

In [21]:
# Import the specific functions from PySpark
from pyspark.sql.functions import min, max

# Display min and max purchase date
orders_nullified_trimmed.select(
    min("order_purchase_timestamp"), 
    max("order_purchase_timestamp")
).show()

+-----------------------------+-----------------------------+
|min(order_purchase_timestamp)|max(order_purchase_timestamp)|
+-----------------------------+-----------------------------+
|          2016-09-04 21:15:19|          2018-10-17 17:30:18|
+-----------------------------+-----------------------------+



The dataset covers the order period from 2016 to 2018.

### ü•à Silver Layer: Storing Refined Data  
The data cleaning and transformation phase is now complete. In this step, the refined and validated dataset is being persisted into the Silver Layer. This provides a reliable and high-quality foundation for the upcoming Exploratory Data
Analysis (EDA) and reporting stages.

In [22]:
# Storing the refined orders dataset to the Silver Layer in Parquet format
orders_nullified_trimmed.write.mode("overwrite") \
    .parquet("/opt/examples/datasets/brazilian_ecommerce/silver/olist_orders.parquet")

                                                                                

# ORDER ITEMS

This table uses a Composite Primary Key composed of (order_id, order_item_id).  
üõí **order_id:** Identifies the transaction.  
üõçÔ∏è **order_item_id:** A sequential number identifying each product within that transaction.  

Both columns must be used to uniquely identify a specific row or to join accurately.

In [13]:
# --- Define Order Items Schema ---
order_items_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_item_id", IntegerType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", StringType(), True),
    StructField("shipping_limit_date", TimestampType(), True),
    StructField("price", DecimalType(10, 2), True),
    StructField("freight_value", DecimalType(10, 2), True)
])

# Load the dataset using the manual schema
order_items = spark.read.format("csv") \
    .schema(order_items_schema) \
    .option("header", "true") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("encoding", "UTF-8") \
    .option("multiLine", "true") \
    .load("/opt/examples/datasets/brazilian_ecommerce/olist_order_items_dataset.csv")

In [6]:
# Display schema
order_items.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: decimal(10,2) (nullable = true)
 |-- freight_value: decimal(10,2) (nullable = true)



In [7]:
# Show first 5 rows
order_items.limit(5).toPandas()

                                                                                

Unnamed: 0,order_id,order_item_id,product_id,seller_id,shipping_limit_date,price,freight_value
0,00010242fe8c5a6d1ba2dd792cb16214,1,4244733e06e7ecb4970a6e2683c13e61,48436dade18ac8b2bce089ec2a041202,2017-09-19 09:45:35,58.9,13.29
1,00018f77f2f0320c557190d7a144bdd3,1,e5f2d52b802189ee658865ca93d83a8f,dd7ddc04e1b6c2c614352b383efe2d36,2017-05-03 11:05:13,239.9,19.93
2,000229ec398224ef6ca0657da4fc703e,1,c777355d18b72b67abbeef9df44fd0fd,5b51032eddd242adc84c38acab88f23d,2018-01-18 14:48:30,199.0,17.87
3,00024acbcdf0a6daa1e931b038114c75,1,7634da152a4610f1595efa32f14722fc,9d7a1d34a5052409006425275ba1c2b4,2018-08-15 10:10:18,12.99,12.79
4,00042b26cf59d7ce69dfabb4e55b4fd9,1,ac6c3623068f30de03045865e4e10089,df560393f3a51e74553ab94004ba5c87,2017-02-13 13:57:51,199.9,18.14


In [12]:
# Total number of records
order_items.count()

                                                                                

112650

### Trim String Columns

In [14]:
# Cleans the order items dataset by removing leading/trailing whitespace and 
# converting empty or blank strings into proper NULL (None) values.
order_items_nullified_trimmed = trim_and_nullify_strings(order_items)

### Missing Values

In [15]:
# Check for missing values 
order_items_missing_values = check_missing_values(order_items_nullified_trimmed)
print(order_items_missing_values)

[Stage 3:>                                                          (0 + 1) / 1]

                     missing_count  percentage
order_id                         0         0.0
order_item_id                    0         0.0
product_id                       0         0.0
seller_id                        0         0.0
shipping_limit_date              0         0.0
price                            0         0.0
freight_value                    0         0.0


                                                                                

üß© No missing values in the final order items dataset.

### üìä Statistical Summary

In [16]:
order_items_stats = order_items_nullified_trimmed.select(
    "price", 
    "freight_value", 
).summary("count", "mean", "stddev", "min", "50%", "max")

order_items_stats.show()

                                                                                

+-------+-----------------+------------------+
|summary|            price|     freight_value|
+-------+-----------------+------------------+
|  count|           112650|            112650|
|   mean|       120.653739|         19.990320|
| stddev|183.6339280502597|15.806405412296998|
|    min|             0.85|              0.00|
|    50%|            74.99|             16.26|
|    max|          6735.00|            409.68|
+-------+-----------------+------------------+



üïµÔ∏è The **freight_value** column contains a minimum value of **0.00**. 
This confirms the presence of free shipping transactions in the dataset.

In [17]:
# Calculating the minimum and maximum shipping dates
date_range = order_items.agg(
    F.min("shipping_limit_date").alias("earliest_date"),
    F.max("shipping_limit_date").alias("latest_date")
)

date_range.toPandas()

Unnamed: 0,earliest_date,latest_date
0,2016-09-19 00:15:34,2020-04-09 22:35:08


### ü•à Silver Layer: Storing Refined Data  
The data cleaning and transformation phase is now complete. In this step, the refined and validated dataset is being persisted into the Silver Layer. This provides a reliable and high-quality foundation for the upcoming Exploratory Data
Analysis (EDA) and reporting stages.

In [18]:
# Storing the refined order items dataset to the Silver Layer in Parquet format
order_items_nullified_trimmed.write.mode("overwrite") \
    .parquet("/opt/examples/datasets/brazilian_ecommerce/silver/olist_order_items.parquet")

                                                                                

# ORDER PAYMENTS

In [20]:
# --- Define Order Payments Schema ---
order_payments_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("payment_sequential", IntegerType(), True),
    StructField("payment_type", StringType(), True),
    StructField("payment_installments", IntegerType(), True),
    StructField("payment_value", DecimalType(10, 2), True) 
])

# Load the dataset using the manual schema
order_payments = spark.read.format("csv") \
    .schema(order_payments_schema) \
    .option("header", "true") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("encoding", "UTF-8") \
    .option("multiLine", "true") \
    .load("/opt/examples/datasets/brazilian_ecommerce/olist_order_payments_dataset.csv")

In [21]:
# Display Schema
order_payments.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- payment_sequential: integer (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: integer (nullable = true)
 |-- payment_value: decimal(10,2) (nullable = true)



üí≥ Payment Columns:  
**order_id:** Unique order identifier.  
**payment_sequential:** Sequence number for multiple payment methods per order.  
**payment_type:** Method of payment (e.g., credit card, voucher).  
**payment_installments:** Number of installments selected by the customer.  
**payment_value:** Total transaction value for this specific payment method.

In [22]:
# Show first 5 rows
order_payments.limit(5).toPandas()

Unnamed: 0,order_id,payment_sequential,payment_type,payment_installments,payment_value
0,b81ef226f3fe1789b1e8b2acac839d17,1,credit_card,8,99.33
1,a9810da82917af2d9aefd1278f1dcfa0,1,credit_card,1,24.39
2,25e8ea4e93396b6fa0d3dd708e76c1bd,1,credit_card,1,65.71
3,ba78997921bbcdc1373bb41e913ab953,1,credit_card,8,107.78
4,42fdf880ba16b47b59251dd489d4441a,1,credit_card,2,128.45


### Trim String Columns

In [23]:
# Cleans the order payments dataset by removing leading/trailing whitespace and 
# converting empty or blank strings into proper NULL (None) values.
order_payments_nullified_trimmed = trim_and_nullify_strings(order_payments)

### Missing Values

In [24]:
# Check for missing values in the order payments dataset.
order_payments_missing_values = check_missing_values(order_payments_nullified_trimmed)
print(order_payments_missing_values)

                      missing_count  percentage
order_id                          0         0.0
payment_sequential                0         0.0
payment_type                      0         0.0
payment_installments              0         0.0
payment_value                     0         0.0


üß© No missing values in the final order payments dataset.

### üìä Statistical Summary

In [25]:
order_payments_stats = order_payments_nullified_trimmed.select(
    "payment_value", 
    "payment_installments", 
    "payment_sequential"
).summary("count", "mean", "stddev", "min", "50%", "max")

order_payments_stats.show()

26/02/16 10:34:54 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 20:>                                                         (0 + 1) / 1]

+-------+------------------+--------------------+------------------+
|summary|     payment_value|payment_installments|payment_sequential|
+-------+------------------+--------------------+------------------+
|  count|            103886|              103886|            103886|
|   mean|        154.100380|   2.853348863176944|1.0926785129853878|
| stddev|217.49406386472384|   2.687050673856486|0.7065837791949948|
|    min|              0.00|                   0|                 1|
|    50%|             100.0|                   1|                 1|
|    max|          13664.08|                  24|                29|
+-------+------------------+--------------------+------------------+



                                                                                

‚ö†Ô∏è **Anomaly:** Found 'payment_installments' = 0 (see statistical summary). Business logic requires a minimum of 1. These records may indicate technical glitches or failed vouchers. 

‚úÖ Retaining records despite logical inconsistencies in **payment_sequential** and **payment_installments**.
Deletion would lead to revenue underreporting (sum of payment_value). 
Prioritizing financial data integrity over sequence perfection to ensure accurate sales aggregation.


In [46]:
# Filter rows where payment_installments is 0
order_payments_nullified_trimmed.filter(F.col("payment_installments") == 0).toPandas()

Unnamed: 0,order_id,payment_sequential,payment_type,payment_installments,payment_value
0,744bade1fcf9ff3f31d860ace076d422,2,credit_card,0,58.69
1,1a57108394169c0b47d8f876acc9ba2d,2,credit_card,0,129.94


In [26]:
# Specific list of IDs
target_ids = ["744bade1fcf9ff3f31d860ace076d422", "1a57108394169c0b47d8f876acc9ba2d"]

# Fix inconsistencies in payment_sequential and payment_installments
order_payments_fixed_payment_installments = order_payments_nullified_trimmed.withColumn(
    "payment_installments",
    F.when(F.col("order_id").isin(target_ids), 1).otherwise(F.col("payment_installments"))
)

order_payments_fixed_payment_sequential = order_payments_fixed_payment_installments.withColumn(
    "payment_sequential",
    F.when(F.col("order_id").isin(target_ids), 1).otherwise(F.col("payment_sequential"))
)

In [27]:
# Validation: Check for payment_installments to verify if the fixing prcess was successful.
order_payments_fixed_payment_sequential.filter(F.col("payment_installments") == 0).toPandas()

Unnamed: 0,order_id,payment_sequential,payment_type,payment_installments,payment_value


### ü•à Silver Layer: Storing Refined Data
The data cleaning and transformation phase is now complete. In this step, the refined and validated dataset is being persisted into the Silver Layer. This provides a reliable and high-quality foundation for the upcoming Exploratory Data Analysis (EDA) and reporting stages.

In [28]:
# Storing the refined order payments dataset to the Silver Layer in Parquet format
order_payments_fixed_payment_sequential.write.mode("overwrite") \
    .parquet("/opt/examples/datasets/brazilian_ecommerce/silver/olist_order_payments.parquet")

                                                                                

# ORDER REVIEWS

In [29]:
# --- Define Order Reviews Schema ---
order_reviews_schema = StructType([
    StructField("review_id", StringType(), True),
    StructField("order_id", StringType(), True),
    StructField("review_score", IntegerType(), True),
    StructField("review_comment_title", StringType(), True),
    StructField("review_comment_message", StringType(), True),
    StructField("review_creation_date", TimestampType(), True),
    StructField("review_answer_timestamp", TimestampType(), True)
])

# Load the dataset using the manual schema
order_reviews = spark.read.format("csv") \
    .schema(order_reviews_schema) \
    .option("header", "true") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("encoding", "UTF-8") \
    .option("multiLine", "true") \
    .load("/opt/examples/datasets/brazilian_ecommerce/olist_order_reviews_dataset.csv")

In [30]:
# Display Schema
order_reviews.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- review_score: integer (nullable = true)
 |-- review_comment_title: string (nullable = true)
 |-- review_comment_message: string (nullable = true)
 |-- review_creation_date: timestamp (nullable = true)
 |-- review_answer_timestamp: timestamp (nullable = true)



In [31]:
# Show first 5 rows
order_reviews.limit(5).toPandas()

Unnamed: 0,review_id,order_id,review_score,review_comment_title,review_comment_message,review_creation_date,review_answer_timestamp
0,7bc2406110b926393aa56f80a40eba40,73fc7af87114b39712e6da79b0a377eb,4,,,2018-01-18,2018-01-18 21:46:59
1,80e641a11e56f04c1ad469d5645fdfde,a548910a1c6147796b98fdf73dbeba33,5,,,2018-03-10,2018-03-11 03:05:13
2,228ce5500dc1d8e020d8d1322874b6f0,f9e4b658b201a9f2ecdecbb34bed034b,5,,,2018-02-17,2018-02-18 14:36:24
3,e64fb393e7b32834bb789ff8bb30750e,658677c97b385a9be170737859d3511b,5,,Recebi bem antes do prazo estipulado.,2017-04-21,2017-04-21 22:02:06
4,f7c4243c7fe1938f181bec41a392bdeb,8e6bfb81e283fa7e4f11123a3fb894f1,5,,Parab√©ns lojas lannister adorei comprar pela I...,2018-03-01,2018-03-02 10:26:53


### Trim String Columns

In [32]:
# Cleans the order reviews dataset by removing leading/trailing whitespace and 
# converting empty or blank strings into proper NULL (None) values.
order_reviews_nullified_trimmed = trim_and_nullify_strings(order_reviews)

### Missing Values

In [33]:
# Check for missing values in the order reviews dataset.
order_reviews_missing_values = check_missing_values(order_reviews_nullified_trimmed)
print(order_reviews_missing_values)

[Stage 29:>                                                         (0 + 1) / 1]

                         missing_count  percentage
review_id                            0    0.000000
order_id                             0    0.000000
review_score                         0    0.000000
review_comment_title             87658   88.343546
review_comment_message           58256   58.711602
review_creation_date                 0    0.000000
review_answer_timestamp              0    0.000000


                                                                                

‚ö†Ô∏è Analysis shows that **review_score** has **no missing values**, while text fields 
**review_comment_title**, **review_comment_message** contain nulls.
No imputation will be performed since these columns are reserved for future analysis.

### üìä Statistical Summary

In [31]:
order_reviews_stats = order_reviews_nullified_trimmed.select(
    "review_score"
).summary("count", "mean", "stddev", "min", "50%", "max")

order_reviews_stats.show()

[Stage 48:>                                                         (0 + 1) / 1]

+-------+------------------+
|summary|      review_score|
+-------+------------------+
|  count|             99224|
|   mean|  4.08642062404257|
| stddev|1.3475791311150984|
|    min|                 1|
|    50%|                 5|
|    max|                 5|
+-------+------------------+



                                                                                

üìä Statistical summary of 'review_score' indicates a valid range [1, 5] with no anomalies or outliers detected.

### ü•à Silver Layer: Storing Refined Data
The data cleaning and transformation phase is now complete. In this step, the refined and validated dataset is being persisted into the Silver Layer. This provides a reliable and high-quality foundation for the upcoming Exploratory Data Analysis (EDA) and reporting stages.

In [34]:
# Storing the refined order reviews dataset to the Silver Layer in Parquet format
order_reviews_nullified_trimmed.write.mode("overwrite") \
    .parquet("/opt/examples/datasets/brazilian_ecommerce/silver/olist_order_reviews.parquet")

                                                                                

# PRODUCTS

In [35]:
# --- Define Products Schema ---
products_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("product_category_name", StringType(), True),
    StructField("product_name_lenght", IntegerType(), True),
    StructField("product_description_lenght", IntegerType(), True),
    StructField("product_photos_qty", IntegerType(), True),
    StructField("product_weight_g", IntegerType(), True),
    StructField("product_length_cm", IntegerType(), True),
    StructField("product_height_cm", IntegerType(), True),
    StructField("product_width_cm", IntegerType(), True)
])

# Load the dataset using the manual schema
products = spark.read.format("csv") \
    .schema(products_schema) \
    .option("header", "true") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("encoding", "UTF-8") \
    .option("multiLine", "true") \
    .load("/opt/examples/datasets/brazilian_ecommerce/olist_products_dataset.csv")

In [36]:
# Display schema
products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_category_name: string (nullable = true)
 |-- product_name_lenght: integer (nullable = true)
 |-- product_description_lenght: integer (nullable = true)
 |-- product_photos_qty: integer (nullable = true)
 |-- product_weight_g: integer (nullable = true)
 |-- product_length_cm: integer (nullable = true)
 |-- product_height_cm: integer (nullable = true)
 |-- product_width_cm: integer (nullable = true)



In [37]:
# Show first 5 rows
products.limit(5).toPandas()

Unnamed: 0,product_id,product_category_name,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
0,1e9e8ef04dbcff4541ed26657ea517e5,perfumaria,40,287,1,225,16,10,14
1,3aa071139cb16b67ca9e5dea641aaa2f,artes,44,276,1,1000,30,18,20
2,96bd76ec8810374ed1b65e291975717f,esporte_lazer,46,250,1,154,18,9,15
3,cef67bcfe19066a932b7673e239eb23d,bebes,27,261,1,371,26,4,26
4,9dc1a7de274444849c219cff195d0b71,utilidades_domesticas,37,402,4,625,20,17,13


### Trim String Columns

In [38]:
# Cleans the products dataset by removing leading/trailing whitespace and 
# converting empty or blank strings into proper NULL (None) values.
products_nullified_trimmed = trim_and_nullify_strings(products)

### Product Category Name Translation

In [40]:
# --- Define Products Translation Schema ---
products_translation_schema = StructType([
    StructField("product_category_name", StringType(), True),
    StructField("product_category_name_english", StringType(), True),
])

# Load the dataset using the manual schema
products_translation = spark.read.format("csv") \
    .schema(products_translation_schema) \
    .option("header", "true") \
    .option("sep", ",") \
    .option("quote", "\"") \
    .option("escape", "\"") \
    .option("encoding", "UTF-8") \
    .option("multiLine", "true") \
    .load("/opt/examples/datasets/brazilian_ecommerce/product_category_name_translation.csv")

In [41]:
# Display schema
products_translation.printSchema()

root
 |-- product_category_name: string (nullable = true)
 |-- product_category_name_english: string (nullable = true)



In [42]:
# Show first 5 rows
products_translation.limit(5).toPandas()

Unnamed: 0,product_category_name,product_category_name_english
0,beleza_saude,health_beauty
1,informatica_acessorios,computers_accessories
2,automotivo,auto
3,cama_mesa_banho,bed_bath_table
4,moveis_decoracao,furniture_decor


In [43]:
# Apply trimming to remove whitespace and convert empty strings to NULL values
products_translation_nullified_trimmed = trim_and_nullify_strings(products_translation)

### Joining Tables for English Product Names

In [44]:
# Joining tables
products_translated = products_nullified_trimmed.join(
    products_translation_nullified_trimmed, 
    on="product_category_name", 
    how="left"
)

In [45]:
# Preview the results
products_translated.limit(5).toPandas()

Unnamed: 0,product_category_name,product_id,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,product_category_name_english
0,perfumaria,1e9e8ef04dbcff4541ed26657ea517e5,40,287,1,225,16,10,14,perfumery
1,artes,3aa071139cb16b67ca9e5dea641aaa2f,44,276,1,1000,30,18,20,art
2,esporte_lazer,96bd76ec8810374ed1b65e291975717f,46,250,1,154,18,9,15,sports_leisure
3,bebes,cef67bcfe19066a932b7673e239eb23d,27,261,1,371,26,4,26,baby
4,utilidades_domesticas,9dc1a7de274444849c219cff195d0b71,37,402,4,625,20,17,13,housewares


### Missing Values

In [46]:
# Check for missing values 
products_missing_values = check_missing_values(products_translated)
print(products_missing_values)

                               missing_count  percentage
product_category_name                    610    1.851234
product_id                                 0    0.000000
product_name_lenght                      610    1.851234
product_description_lenght               610    1.851234
product_photos_qty                       610    1.851234
product_weight_g                           2    0.006070
product_length_cm                          2    0.006070
product_height_cm                          2    0.006070
product_width_cm                           2    0.006070
product_category_name_english            623    1.890686


üö® The product_category_name_english column has **623** missing values, which is **13 more** than the original category column.

‚ö†Ô∏è Missing values in **product_category_name** and **product_category_name_english** will be handled as it is a key feature.
Other columns such as 'product_name_lenght' or 'product_photos_qty' will be ignored for now 
as they are not required for the upcoming steps of this EDA.

### Handling Missing Values

In [47]:
# This filter identifies products that have a name in the original language 
# but are missing the English translation.

missing_translations = products_translated.filter(
    F.col('product_category_name').isNotNull() & 
    F.col('product_category_name_english').isNull()
)

# To see the unique values
missing_translations.select('product_category_name').distinct().toPandas()

Unnamed: 0,product_category_name
0,pc_gamer
1,portateis_cozinha_e_preparadores_de_alimentos


üö® **MANUAL MAPPING:** Fixing the 13 missing English translations.  
The following entries in **product_category_name** are updated in the product_category_name_english column as follows:  
**pc_gamer -> pc_gamer  
portateis_cozinha_e_preparadores_de_alimentos -> kitchen_portables_and_food_preparators**

In [48]:
# Updating the English column
products_translated_missing_13 = products_translated.withColumn(
    'product_category_name_english',
    F.when(
        F.col('product_category_name') == 'pc_gamer', 
        'pc_gamer' # Rule 1: If it's pc_gamer, keep it as pc_gamer
    ).when(
        F.col('product_category_name') == 'portateis_cozinha_e_preparadores_de_alimentos', 
        'kitchen_portables_and_food_preparators' # Rule 2: Translate this long name
    ).otherwise(
        F.col('product_category_name_english') # Otherwise: Don't change anything!
    )
)

In [49]:
# Check for missing values 
products_missing_values = check_missing_values(products_translated_missing_13)
print(products_missing_values)

                               missing_count  percentage
product_category_name                    610    1.851234
product_id                                 0    0.000000
product_name_lenght                      610    1.851234
product_description_lenght               610    1.851234
product_photos_qty                       610    1.851234
product_weight_g                           2    0.006070
product_length_cm                          2    0.006070
product_height_cm                          2    0.006070
product_width_cm                           2    0.006070
product_category_name_english            610    1.851234


In [50]:
# Identifying rows where both category columns are empty.
missing_translations = products_translated_missing_13.filter(
    F.col('product_category_name').isNull() & 
    F.col('product_category_name_english').isNull()
)

# To see the unique values
missing_translations.select('product_category_name').count()

610

There are 610 instances where both fields are missing simultaneously.

In [51]:
# Fill null values with 'unknown' in the specified columns
products_translated_handled_missing = products_translated_missing_13.fillna(
    value='unknown', 
    subset=['product_category_name', 'product_category_name_english']
)

In [52]:
# Validation: Check for product_category_name to verify if the filling process was successful.
products_missing_values = check_missing_values(products_translated_handled_missing)
print(products_missing_values)

                               missing_count  percentage
product_category_name                      0    0.000000
product_id                                 0    0.000000
product_name_lenght                      610    1.851234
product_description_lenght               610    1.851234
product_photos_qty                       610    1.851234
product_weight_g                           2    0.006070
product_length_cm                          2    0.006070
product_height_cm                          2    0.006070
product_width_cm                           2    0.006070
product_category_name_english              0    0.000000


üß© Missing values in **product_category_name** and **product_category_name_english** colums are handled.

### Duplicated Rows

In [53]:
# Check for duplicate records in the products dataset
check_duplicates(products_translated_handled_missing)

[Stage 74:>                                                         (0 + 1) / 1]

‚úÖ No exact duplicate rows found. Every row is unique.


                                                                                

‚úÖ No exact duplicate rows found. Every row is unique.

### üìä Statistical Summary

In [54]:
# Select the numerical columns from the products table
products_numerical_stats = products_translated_handled_missing.select(
    "product_name_lenght",
    "product_description_lenght",
    "product_photos_qty",
    "product_weight_g",
    "product_length_cm",
    "product_height_cm",
    "product_width_cm"
).summary("count", "mean", "stddev", "min", "50%", "max")

# Display the statistics
products_numerical_stats.toPandas()

                                                                                

Unnamed: 0,summary,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm
0,count,32341.0,32341.0,32341.0,32949.0,32949.0,32949.0,32949.0
1,mean,48.47694876472589,771.4952846232337,2.18898611669398,2276.472487784152,30.81507784758263,16.937661234028347,23.196728277034204
2,stddev,10.245740725237289,635.1152246349538,1.7367656379315437,4282.038730977024,16.914458054065953,13.637554061749569,12.079047453227794
3,min,5.0,4.0,1.0,0.0,7.0,2.0,6.0
4,50%,51.0,595.0,1.0,700.0,25.0,13.0,20.0
5,max,76.0,3992.0,20.0,40425.0,105.0,105.0,118.0


üîç Preliminary check of min/max values; no obvious anomalies observed.
A comprehensive statistical analysis will be performed during the EDA stage.

### üïµÔ∏è  Final validation: Reviewing product names to ensure transformation were applied correctly.

In [58]:
# Retrieving unique rows for product names
products_translated_handled_missing.select("product_category_name", "product_category_name_english").distinct().toPandas()

                                                                                

Unnamed: 0,product_category_name,product_category_name_english
0,automotivo,auto
1,pcs,computers
2,eletroportateis,small_appliances
3,moveis_decoracao,furniture_decor
4,fashion_roupa_masculina,fashion_male_clothing
5,beleza_saude,health_beauty
6,informatica_acessorios,computers_accessories
7,livros_interesse_geral,books_general_interest
8,artes,art
9,eletronicos,electronics


 ‚úÖ Mapping validation successful; all product categories have been correctly translated to their English counterparts.

### Dropping the Original Portuguese Product Name Column

In [59]:
# Drop the original Portuguese product category name column and rename the English version for consistency.
products_final_clean = products_translated_handled_missing.drop("product_category_name") \
    .withColumnRenamed("product_category_name_english", "product_category_name")

In [60]:
# Finalized Silver Layer dataset for products is ready for EDA.
products_final_clean.limit(10).toPandas()

Unnamed: 0,product_id,product_name_lenght,product_description_lenght,product_photos_qty,product_weight_g,product_length_cm,product_height_cm,product_width_cm,product_category_name
0,1e9e8ef04dbcff4541ed26657ea517e5,40,287,1,225,16,10,14,perfumery
1,3aa071139cb16b67ca9e5dea641aaa2f,44,276,1,1000,30,18,20,art
2,96bd76ec8810374ed1b65e291975717f,46,250,1,154,18,9,15,sports_leisure
3,cef67bcfe19066a932b7673e239eb23d,27,261,1,371,26,4,26,baby
4,9dc1a7de274444849c219cff195d0b71,37,402,4,625,20,17,13,housewares
5,41d3672d4792049fa1779bb35283ed13,60,745,1,200,38,5,11,musical_instruments
6,732bd381ad09e530fe0a5f457d81becb,56,1272,4,18350,70,24,44,cool_stuff
7,2548af3e6e77a690cf3eb6368e9ab61e,56,184,2,900,40,8,40,furniture_decor
8,37cc742be07708b53a98702e77a21a02,57,163,1,400,27,13,17,home_appliances
9,8c92109888e8cdf9d66dc7e463025574,36,1156,1,600,17,10,12,toys


### ü•à Silver Layer: Storing Refined Data
The data cleaning and transformation phase is now complete. In this step, the refined and validated dataset is being persisted into the Silver Layer. This provides a reliable and high-quality foundation for the upcoming Exploratory Data Analysis (EDA) and reporting stages.

In [61]:
# Storing the refined products dataset to the Silver Layer in Parquet format
products_final_clean.write.mode("overwrite") \
    .parquet("/opt/examples/datasets/brazilian_ecommerce/silver/olist_products.parquet")

                                                                                

‚úÖ All data cleaning and transformation tasks for the Olist E-commerce dataset are finalized. The processed tables have been stored into the Silver Layer.

**Next Step:** Transitioning to **Exploratory Data Analysis (EDA)** to uncover trends, customer behavior patterns, and operational insights. üîçüöÄ