# Table of Contents

1. Importing Necessary Packages
2. Shipping Address Data
    - Data Loading and Initial Inspection
    - Null and Invalid ID Filtering
    - Duplicate Record Handling
    - Postal Code Validation and Cleaning
    - Date Field Validation and Correction
    - Customer ID Validation and Cleaning
3. Product Data
    - Loading and Transforming Product Data
4. Region Data
    - Loading Region Data
5. Invoice Data
    - Loading Invoice Data
6. Customer Data
    - Loading Customer Data
7. Summary and Next Steps

*Use the notebook's outline or search to quickly navigate to each section.*

Importing necessary packages

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_json, from_json, map_entries, explode, col, isnan, isnull, when, lower, regexp_replace, coalesce, to_date, lit, regexp_extract, concat_ws, trim
from pyspark.sql.types import MapType, StringType
import pandas as pd

In [None]:
# Stop any existing Spark session to avoid conflicts
if 'spark' in globals():
    spark.stop()
    
# Create a new Spark session with specific configurations  
spark = SparkSession.builder \
    .appName("ExplorationApp") \
    .master("local[*]") \
    .config("spark.sql.debug.maxToStringFields", "100") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.jars.packages", "com.databricks:spark-xml_2.12:0.15.0") \
    .config("spark.memory.offHeap.size", "2g") \
    .getOrCreate()


### Shipping Address Data: Loading, Analysis, and Cleaning Approach

The shipping address dataset is first loaded and explored to uncover discrepancies and data quality issues. The cleaning process follows a systematic approach:

1. Attribute Analysis: Each attribute (column) is examined to identify potential inconsistencies, invalid values, or formatting issues.
2. Dirty Data Subset: A subset of the data containing only the problematic ("dirty") records is created for focused analysis.
3. Cleaning Logic Development: Appropriate cleaning logic is developed and tested on the dirty subset to ensure it addresses the identified issues.
4. Validation: The cleaning logic is validated on the subset to confirm it produces the expected results.
5. Full DataFrame Application: Once validated, the cleaning logic is applied to the entire dataset to standardize and correct all records. 


This iterative process ensures that data cleaning steps are robust, targeted, and do not introduce unintended side effects. Each major cleaning step is documented with findings and rationale for transparency and reproducibility

In [None]:
# Loading shipping address data
shipping_address_file_path = "data/shippuingaddress_20240521.csv.csv"
address_rdd = spark.sparkContext.textFile(shipping_address_file_path)

# Skip the first 6 rows using zipWithIndex to get zero-based indices
shipping_address_rdd = address_rdd.zipWithIndex() \
    .filter(lambda x: x[1] >= 6) \
    .map(lambda x: x[0])

shipping_address_df = spark.read.options(header='true', inferSchema='true').csv(shipping_address_rdd)

shipping_address_df.printSchema()
shipping_address_df.show(truncate=False)

In [None]:
# Check for null values in the 'id' column
shipping_address_df.filter(col("id").isNull()).show(truncate=False)

In [None]:
# Drop rows where 'id' is null
shipping_address_df = shipping_address_df.dropna(subset=["id"])

In [None]:
# Find ids that are NOT all digits
invalid_id_df = shipping_address_df.filter(~col("id").rlike("^[0-9]+$"))
invalid_id_df.show(truncate=False)

In [None]:
# Keep only rows where 'id' is all digits
shipping_address_df = shipping_address_df.filter(col("id").rlike("^[0-9]+$"))

In [None]:
# Find duplicate ids
duplicate_ids = shipping_address_df.groupBy("id").count().filter(col("count") > 1).select("id")
duplicate_records = shipping_address_df.join(duplicate_ids, on="id", how="inner")
duplicate_records.show(truncate=False)

##### Summary of Findings After Filtering for id duplicate records
After careful analysis of the duplicate records, the following were identified: 
- For record ID 102553, the clean version is:
   id=102553, customerid=LB-16795, city=Antananarivo.
   This will serve as the canonical record among all variants of ID 102553
- For record ID 102608  
  Will retain one representative instance through systematic selection

 Production Implementation Rules
1. **Complete Match Preservation**  
   Only records matching exactly across all columns will be kept

2. **Partial Duplicate Removal**  
   All records that match on some but not all attributes will be dropped

3. **Quality Assurance**  
   This conservative approach ensures:
   - Consistent data integrity
   - Elimination of ambiguous records
   - Reliable downstream processing

**Technical Justification**

The strategy prioritises data quality over completeness to:
- Avoid propagation of inconsistent records
- Maintain clean master data
- Support reliable analytics and operations

In [None]:
# Filter for id=102553 and city=Antananarivo
canonical_102553 = shipping_address_df.filter((col("id") == "102553") & (col("city") == "Antananarivo") & (col("country") == "Madagascar"))

# Filter for id=102608 and keep only one record (e.g., the first)
one_102608 = shipping_address_df.filter(col("id") == "102608").limit(1)

# Combine the results
final_selection = canonical_102553.union(one_102608)

# Remove all duplicate records from shipping_address_df
shipping_address_no_dupes = shipping_address_df.exceptAll(duplicate_records)

# Add back only the desired records
shipping_address_df = shipping_address_no_dupes.union(final_selection)

shipping_address_df.show(truncate=False)

In [None]:
# Filter for postal_code that is NOT 4 or 5 digits
invalid_postal_code_df = shipping_address_df.filter(~col("postal_code").rlike("^[0-9]{4,5}$"))
invalid_postal_code_df.show(truncate=False)

##### Summary of Findings After Postal Code Filtering

After filtering the shipping address data for invalid postal codes (i.e., those not matching a 4- or 5-digit numeric pattern), several key observations were made:

- **Identification of Invalid Postal Codes:**  
  - Rows with postal codes that do not conform to the expected 4- or 5-digit numeric format were isolated. This includes postal codes that are missing, contain non-numeric characters, or are formatted as dates (e.g., "DD/MM/YYYY").
  - Postal code wrongly inserted in other columns

- **Data Quality Issues:**  
  The presence of non-standard postal codes highlights inconsistencies in the data entry process. Some records use date formats or other unexpected values in the postal code field, which can hinder downstream processing and analytics.

- **Country and State Normalization:**  
  To address ambiguity, additional logic was applied to create a new column, `country_new`, based on the following rules:
  - If the normalized (lowercased and accent-removed) city and state are identical, the country value is retained.
  - If the postal code does not match a date format, the postal code itself is used.
  - Otherwise, the state value is used.

- **Impact on Data Integrity:**  
  This filtering and normalization process improves the reliability of the address data by:
  - Flagging and isolating problematic records for further review or correction.
  - Providing a more consistent and meaningful value in the `country_new` column, which can be used for subsequent analysis or reporting.
  - `postalcode_new` column is created my scanning through other attributes for wrongly inserted value of postal code




In [None]:
# Normalize city and state: lowercase and remove accents
city_norm = lower(regexp_replace(col("city"), "[áéíóúüñ]", "a"))
state_norm = lower(regexp_replace(col("state"), "[áéíóúüñ]", "a"))

invalid_postal_code_df = invalid_postal_code_df.withColumn(
    "country_new",
    when(city_norm == state_norm, col("country"))
    .when(~col("postal_code").rlike("^\d{2}/\d{2}/\d{4}$"), col("postal_code"))
    .otherwise(col("state"))
)


invalid_postal_code_df = invalid_postal_code_df.withColumn(
    "postalcode_new",
    when(col("postal_code").rlike("^[0-9]{4,5}$"), col("postal_code"))
    .otherwise(
        coalesce(
            when(col('city').rlike("^[0-9]{4,5}$"), col('city')),
            when(col('state').rlike("^[0-9]{4,5}$"), col('state')),
            when(col('country').rlike("^[0-9]{4,5}$"), col('country')),
            when(col('effstart').rlike("^[0-9]{4,5}$"), col('effstart')),
            when(col('effend').rlike("^[0-9]{4,5}$"), col('effend')),
            when(col('streetadd').rlike("^[0-9]{4,5}$"), col('streetadd'))
        )
    )
)
invalid_postal_code_df.show(truncate=False)

In [None]:
# Implementing the same logic for the entire DataFrame  
city_norm = lower(regexp_replace(col("city"), "[áéíóúüñ]", "a"))
state_norm = lower(regexp_replace(col("state"), "[áéíóúüñ]", "a"))

shipping_address_df = shipping_address_df.withColumn(
    "country_new",
     when(col("postal_code").rlike("^[0-9]{4,5}$") | col("postal_code").isNull(), col("country"))
     .otherwise(when(city_norm == state_norm, col("country"))
                .when(~col("postal_code").rlike("^\d{2}/\d{2}/\d{4}$"), col("postal_code"))
                .otherwise(col("state")))
)

shipping_address_df = shipping_address_df.withColumn(
    "postalcode_new",
    when(col("postal_code").rlike("^[0-9]{4,5}$"), col("postal_code"))
    .otherwise(
        coalesce(
            when(col('city').rlike("^[0-9]{4,5}$"), col('city')),
            when(col('state').rlike("^[0-9]{4,5}$"), col('state')),
            when(col('country').rlike("^[0-9]{4,5}$"), col('country')),
            when(col('effstart').rlike("^[0-9]{4,5}$"), col('effstart')),
            when(col('effend').rlike("^[0-9]{4,5}$"), col('effend')),
            when(col('streetadd').rlike("^[0-9]{4,5}$"), col('streetadd'))
        )
    )
)
shipping_address_df.show(truncate=False)

In [None]:
# Filter for effstart  that is NOT in the required format
invalid_effstart_df = shipping_address_df.filter(~col("effstart").rlike("^\d{2}/\d{2}/\d{4}$"))
invalid_effstart_df.show(truncate=False)

##### Summary of Findings After Filtering effstart 
During the validation of the `effstart` column, it was discovered that only one record did not conform to the expected date format (`DD/MM/YYYY`). A closer inspection of this record revealed a pattern of misplacement among the date-related columns:

- The value in `effstart` was not a valid date, as expected.
- The value in `effend` was a valid date, but it appeared to be earlier than the value in `streetadd`, which was also a valid date.
- This sequence suggested that the values for `effstart`, `effend`, and `streetadd` had been shifted or swapped.

**Business Reasoning:**
- The logical order for these columns should be:  
  `effstart` (start date) < `effend` (end date) < `streetadd` (should not be a date, but in this case, it is).
- The presence of a date in `streetadd` and the chronological order of the dates indicate that the data was likely misaligned during entry or import.
- This conclusion is supported by the fact that only one record is affected, and the pattern of the values matches what would be expected if the columns were shifted right by one position.

**Remediation Plan:**
- The affected record will be corrected by shifting the values back to their appropriate columns.
- `effstart` will be explicitly cast or converted to the correct data type to ensure consistency and prevent similar issues in downstream processing.

**Impact:**
- This targeted correction will restore data integrity for the affected record and reinforce the reliability of the date fields for subsequent analysis.

In [None]:
# cleaning up the effstart and effend columns
invalid_effstart_df = invalid_effstart_df.withColumn(
    "effstart",
        when(~col("effstart").rlike("^\d{2}/\d{2}/\d{4}$"), col("effend")).otherwise(col("effstart"))
).withColumn(
    "effend",
    when(~col("effstart").rlike("^\d{2}/\d{2}/\d{4}$"), col("streetadd")).otherwise(col("effend"))
).withColumn(
    "streetadd",
     when(~col("effstart").rlike("^\d{2}/\d{2}/\d{4}$"), lit(None)).otherwise(col("streetadd"))
)

invalid_effstart_df.show(truncate=False)

In [None]:
# implementing the logic on the original DataFrame
shipping_address_df = shipping_address_df.withColumn(
    "effstart",
        when(~col("effstart").rlike("^\d{2}/\d{2}/\d{4}$"), col("effend")).otherwise(col("effstart"))
).withColumn(
    "effend",
    when(~col("effstart").rlike("^\d{2}/\d{2}/\d{4}$"), col("streetadd")).otherwise(col("effend"))
).withColumn(
    "streetadd",
     when(~col("effstart").rlike("^\d{2}/\d{2}/\d{4}$"), lit(None)).otherwise(col("streetadd"))
)       

shipping_address_df.show(truncate=False)

In [None]:
# Filter for customerid  that is NOT in the required format
invalid_customerid_df = shipping_address_df.filter(~col("customerid").rlike("^[A-Za-z]{2}-\d+$"))
invalid_customerid_df.show(truncate=False)

##### Summary of Findings After Customer ID Validation and Cleaning

During the analysis of the `customerid` field, several records were identified that did not conform to the expected format (`AA-12345`, where `AA` is a two-letter code and the rest is a numeric identifier). The following issues and cleaning steps were observed and addressed:

**Findings:**
- Some records contained extra or malformed values appended to the `customerid` field, such as trailing text or misplaced data.
- In certain cases, the value that should have been in another column (e.g., `city` or `streetadd`) was incorrectly appended to the `customerid`.
- A few records had missing or null values in the `city` column, with the misplaced value only present in the malformed `customerid`.
- Occasionally, the `effend` field did not contain a valid date, and the misplaced value needed to be reassigned.

**Cleaning Logic:**
- The valid portion of the `customerid` (matching the pattern `^[A-Za-z]{2}-\d+$`) was extracted and retained in the `customerid` column.
- Any value trailing the valid pattern was extracted as a separate string.
- If the `city` column was null and a trailing value was present, this value was moved to the `city` column.
- If the `city` was not null and the `effend` field did not match a date format, the trailing value was appended to the `effend` value and placed in the `streetadd` column.
- The helper column used for extraction was dropped after reassignment.

**Impact:**
- This approach restored the integrity of the `customerid` field, ensuring all values now match the required format.
- Misplaced data was reassigned to its appropriate column, reducing ambiguity and improving the overall quality of the dataset.
- The cleaning logic was designed to be robust, handling multiple edge cases and minimizing data loss.

This process ensures that the `customerid` field is reliable for downstream processing and that related address fields are as accurate as possible.

In [None]:
# Extract the value that comes after the valid customerid pattern
invalid_customerid_df = invalid_customerid_df.withColumn(
    "customerid_stripped",
    regexp_extract(col("customerid"), r"^[A-Za-z]{2}-\d+(.*)$", 1)
)

# If city is null, put the stripped value in city
invalid_customerid_df = invalid_customerid_df.withColumn(
    "city",
    when(col("city").isNull() & (col("customerid_stripped") != ""), col("customerid_stripped")).otherwise(col("city"))
)

# If effend doesn't match date, append the stripped value to effend and insert in streetadd
invalid_customerid_df = invalid_customerid_df.withColumn(
    "streetadd",
    when(
        ~col("effend").rlike("^\d{2}/\d{2}/\d{4}$") & (col("customerid_stripped") != ""),
        concat_ws(" ", col("effend"), col("customerid_stripped"))
    ).otherwise(col("streetadd"))
)
# Keep only the valid customerid pattern in customerid
invalid_customerid_df = invalid_customerid_df.withColumn(
    "customerid",
    regexp_extract(col("customerid"), r"([A-Za-z]{2}-\d+)", 1)
)
# Drop the customerid_stripped column
invalid_customerid_df = invalid_customerid_df.drop("customerid_stripped")
# Show the final DataFrame with invalid customerid corrections
invalid_customerid_df.show(truncate=False)

In [None]:
# ipmlemented the same logic on the original DataFrame
shipping_address_df = shipping_address_df.withColumn(
    "customerid_stripped",
    regexp_extract(col("customerid"), r"^[A-Za-z]{2}-\d+(.*)$", 1)
)
shipping_address_df = shipping_address_df.withColumn(
    "city",
    when(col("city").isNull() & (col("customerid_stripped") != ""), col("customerid_stripped")).otherwise(col("city"))
) 
shipping_address_df = shipping_address_df.withColumn(
    "streetadd",
    when(
        ~col("effend").rlike("^\d{2}/\d{2}/\d{4}$") & (col("customerid_stripped") != ""),
        concat_ws(" ", col("effend"), col("customerid_stripped"))
    ).otherwise(col("streetadd"))
)
shipping_address_df = shipping_address_df.withColumn(
    "customerid",
    regexp_extract(col("customerid"), r"([A-Za-z]{2}-\d+)", 1)
)
shipping_address_df = shipping_address_df.drop("customerid_stripped")
shipping_address_df.show(truncate=False)    

In [None]:
# Filter for effstart  that is NOT in the required format
invalid_effend_df = shipping_address_df.filter(~col("effend").rlike("^\d{2}/\d{2}/\d{4}$"))
invalid_effend_df.show(truncate=False)

### Findings: Invalid `effend` Field Analysis

After filtering the `effend` column to identify values that do not match the expected date format (`DD/MM/YYYY`), it was observed that the resulting records correspond to those previously identified as having dirty or malformed `customerid` values.

#### Key Observations
- The records with invalid `effend` values are the same as those affected by earlier data misalignment, where values were shifted between columns (notably, `customerid`, `city`, and `streetadd`).
- In these cases, the dirty value found in `effend` is actually the value that originally belonged in the `streetadd` column. This value has already been moved to its correct place during the previous transformation and cleaning steps for `customerid`.

#### Cleaning Decision
- Since the dirty records in `effend` are remnants of the earlier misalignment and have already been addressed, all such invalid `effend` values will be set to `null`.
- This ensures that the `effend` column contains only valid date values or is left empty (null) where appropriate, maintaining data integrity and consistency.

#### Impact
- This approach prevents the propagation of misaligned or duplicate data in the `effend` column.
- The dataset is now more reliable for downstream processing and analysis, with all date fields properly validated and cleaned.

In [None]:
# Applying the logic explainmed above to clean effend column
invalid_effend_df = invalid_effend_df.withColumn(
    "effend",
    when(~col("effend").rlike("^\d{2}/\d{2}/\d{4}$"), lit(None)).otherwise(col("effend")))
invalid_effend_df.show(truncate=False)

In [None]:
# implementing the same logic on the original DataFrame     
shipping_address_df = shipping_address_df.withColumn(
    "effend",
    when(~col("effend").rlike("^\d{2}/\d{2}/\d{4}$"), lit(None)).otherwise(col("effend"))       
)
shipping_address_df.show(truncate=False)    

In [None]:
# printing all the values in the 'country_new' column
print("Distinct values in 'country_new':")
for row in shipping_address_df.select('country_new').distinct().collect():
    print(row['country_new'])

### Findings and Recommendations: Country Column Analysis

#### Findings

- The analysis of the `country` column revealed significant inconsistencies in how country names are recorded. Examples include:
  - Multiple variants for the same country, such as:  
    - United States, US, USA, United States of America  
    - Alger, Algeria  
    - Republic of the Congo, Democratic Republic of the Congo  
    - NZ, New Zealand  
    - UK, United Kingdom
  - Some entries contain extraneous characters preceding the country name, for example: `""Australia`.

#### Remediation Plan

- **Standardization:**  
  All country name variants will be consolidated into a single, standardized form for each country.
- **Cleaning:**  
  Any leading or extraneous characters before the country name will be stripped to ensure clean and consistent entries.

#### Recommendations for Future Data Collection

- **Lookup Table:**  
  Instead of manually reviewing distinct country values, a lookup table (mapping known variants to standardized names) should be used to efficiently identify and correct inconsistencies.
- **Data Entry Controls:**  
  To prevent such inconsistencies in production, implement data validation measures such as providing a predefined list of countries for users to select from, rather than allowing free-text entry.

This approach will improve data quality, facilitate accurate analysis, and reduce the risk of similar issues in future data collection.

In [None]:
# Define mapping as a CASE WHEN chain
shipping_address_df = shipping_address_df.withColumn(
    "country_new",
    trim(regexp_replace(col("country_new"), r"^[^A-Za-z]*", "")))

shipping_address_df = shipping_address_df.withColumn(
    "country_new",
    when(col("country_new").isin("US", "USA", "United States"), "United States of America")
    .when(col("country_new") == "Alger", "Algeria")
    .when(col("country_new").isin("Republic of the Congo", "Congo"), "Democratic Republic of the Congo")
    .when(col("country_new") == "NZ", "New Zealand")
    .when(col("country_new") == "UK", "United Kingdom")
    .otherwise(col("country_new"))
)

# Show the cleaned DataFrame        
shipping_address_df.select("country_new").distinct().show(150,truncate=False)

Product data

In [None]:
# Loading prodcut data

product_file_path = "data/product.json"
product_single_json_df = spark.read.option("multiLine", "true").json(product_file_path)

# Show the first few records.
product_single_json_df.show(truncate=False)

# Print the schema.
product_single_json_df.printSchema()

The JSON file is structured as a single JSON object with several keys. Each key appears to hold a dictionary where the keys of that dictionaries are IDs and the values are the corresponding values for that field. To work with the data as individual product records, there's a need to "flatten" or "explode" these map-columns into rows.

In [None]:
# We convert the struct to JSON and then parse it as a MapType(StringType(), StringType())
product_transformed_df = product_single_json_df.select(
    from_json(to_json(col("Category")), MapType(StringType(), StringType())).alias("CategoryMap"),
    from_json(to_json(col("Product_ID")), MapType(StringType(), StringType())).alias("Product_IDMap"),
    from_json(to_json(col("Product_Name")), MapType(StringType(), StringType())).alias("Product_NameMap"),
    from_json(to_json(col("Sub-Category")), MapType(StringType(), StringType())).alias("Sub_CategoryMap")
)

product_transformed_df.printSchema()
product_transformed_df.show(truncate=False)

# Now, for each map column, we convert map entries into separate rows
cat_df = product_transformed_df.select(explode(map_entries(col("CategoryMap"))).alias("kv_cat")) \
            .select(col("kv_cat.key").alias("id"),
                    col("kv_cat.value").alias("Category"))

pid_df = product_transformed_df.select(explode(map_entries(col("Product_IDMap"))).alias("kv_pid")) \
            .select(col("kv_pid.key").alias("id"),
                    col("kv_pid.value").alias("Product_ID"))

pname_df = product_transformed_df.select(explode(map_entries(col("Product_NameMap"))).alias("kv_pname")) \
            .select(col("kv_pname.key").alias("id"),
                    col("kv_pname.value").alias("Product_Name"))

sub_df = product_transformed_df.select(explode(map_entries(col("Sub_CategoryMap"))).alias("kv_sub")) \
            .select(col("kv_sub.key").alias("id"),
                    col("kv_sub.value").alias("Sub_Category"))

# Join all the exploded DataFrames on the common key "id".
product_df = cat_df.join(pid_df, "id") \
                 .join(pname_df, "id") \
                 .join(sub_df, "id")

product_df.show(truncate=False)
product_df.printSchema()

In [None]:
# Load region data
region_file_path = "data/regiontxt"
region_df= spark.read.option("delimiter", "\t").option("header", 'true').option("inferSchema", "true").csv(region_file_path)
region_df.show(truncate=False)
region_df.printSchema()

In [None]:
# Load invoice data
invoice_file_path = "data/invoice.xml"

# Read the XML file
invoice_df = spark.read.format("xml") \
    .option("rowTag", "row") \
    .load(invoice_file_path)

# Show DataFrame
invoice_df.show(truncate=False)
invoice_df.printSchema()

In [None]:
# Load Customer data
customer_file_path = "data/cust.xlsx"
customer_pd_df = pd.read_excel(customer_file_path)
customer_df = spark.createDataFrame(customer_pd_df)
customer_df.show(truncate=False)
customer_df.printSchema()


In [None]:
# printing all the values in the 'country_new' column
print("Distinct values in 'country_new':")
for row in shipping_address_df.select('country_new').distinct().collect():
    print(row['country_new'])

### Findings and Recommendations: Country Column Analysis

#### Findings

- The analysis of the `country` column revealed significant inconsistencies in how country names are recorded. Examples include:
  - Multiple variants for the same country, such as:  
    - United States, US, USA, United States of America  
    - Alger, Algeria  
    - Republic of the Congo, Democratic Republic of the Congo  
    - NZ, New Zealand  
    - UK, United Kingdom
  - Some entries contain extraneous characters preceding the country name, for example: `""Australia`.

#### Remediation Plan

- **Standardization:**  
  All country name variants will be consolidated into a single, standardized form for each country.
- **Cleaning:**  
  Any leading or extraneous characters before the country name will be stripped to ensure clean and consistent entries.

#### Recommendations for Future Data Collection

- **Lookup Table:**  
  Instead of manually reviewing distinct country values, a lookup table (mapping known variants to standardized names) should be used to efficiently identify and correct inconsistencies.
- **Data Entry Controls:**  
  To prevent such inconsistencies in production, implement data validation measures such as providing a predefined list of countries for users to select from, rather than allowing free-text entry.

This approach will improve data quality, facilitate accurate analysis, and reduce the risk of similar issues in future data collection.

In [None]:
# Loading prodcut data

product_file_path = "data/product.json"
product_single_json_df = spark.read.option("multiLine", "true").json(product_file_path)

# Show the first few records.
product_single_json_df.show(truncate=False)

# Print the schema.
product_single_json_df.printSchema()

The JSON file is structured as a single JSON object with several keys. Each key appears to hold a dictionary where the keys of that dictionaries are IDs and the values are the corresponding values for that field. To work with the data as individual product records, there's a need to "flatten" or "explode" these map-columns into rows.

In [None]:
# We convert the struct to JSON and then parse it as a MapType(StringType(), StringType())
product_transformed_df = product_single_json_df.select(
    from_json(to_json(col("Category")), MapType(StringType(), StringType())).alias("CategoryMap"),
    from_json(to_json(col("Product_ID")), MapType(StringType(), StringType())).alias("Product_IDMap"),
    from_json(to_json(col("Product_Name")), MapType(StringType(), StringType())).alias("Product_NameMap"),
    from_json(to_json(col("Sub-Category")), MapType(StringType(), StringType())).alias("Sub_CategoryMap")
)

product_transformed_df.printSchema()
product_transformed_df.show(truncate=False)

# Now, for each map column, we convert map entries into separate rows
cat_df = product_transformed_df.select(explode(map_entries(col("CategoryMap"))).alias("kv_cat")) \
            .select(col("kv_cat.key").alias("id"),
                    col("kv_cat.value").alias("Category"))

pid_df = product_transformed_df.select(explode(map_entries(col("Product_IDMap"))).alias("kv_pid")) \
            .select(col("kv_pid.key").alias("id"),
                    col("kv_pid.value").alias("Product_ID"))

pname_df = product_transformed_df.select(explode(map_entries(col("Product_NameMap"))).alias("kv_pname")) \
            .select(col("kv_pname.key").alias("id"),
                    col("kv_pname.value").alias("Product_Name"))

sub_df = product_transformed_df.select(explode(map_entries(col("Sub_CategoryMap"))).alias("kv_sub")) \
            .select(col("kv_sub.key").alias("id"),
                    col("kv_sub.value").alias("Sub_Category"))

# Join all the exploded DataFrames on the common key "id".
product_df = cat_df.join(pid_df, "id") \
                 .join(pname_df, "id") \
                 .join(sub_df, "id")

product_df.show(truncate=False)
product_df.printSchema()

In [None]:
# Load region data
region_file_path = "data/regiontxt"
region_df= spark.read.option("delimiter", "\t").option("header", 'true').option("inferSchema", "true").csv(region_file_path)
region_df.show(truncate=False)
region_df.printSchema()

In [None]:
# Load invoice data
invoice_file_path = "data/invoice.xml"

# Read the XML file
invoice_df = spark.read.format("xml") \
    .option("rowTag", "row") \
    .load(invoice_file_path)

# Show DataFrame
invoice_df.show(truncate=False)
invoice_df.printSchema()

In [None]:
# Load Customer data
customer_file_path = "data/cust.xlsx"
customer_pd_df = pd.read_excel(customer_file_path)
customer_df = spark.createDataFrame(customer_pd_df)
customer_df.show(truncate=False)
customer_df.printSchema()


In [None]:
# printing all the values in the 'country_new' column
print("Distinct values in 'country_new':")
for row in shipping_address_df.select('country_new').distinct().collect():
    print(row['country_new'])