In [1]:
import pyspark  
sc = pyspark.SparkContext('local[*]') 
# do something to prove it works 
rdd = sc.parallelize(range(1000)) 
rdd.takeSample(False, 5) 

[400, 329, 43, 34, 557]

In [3]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()
# accounts
df_clean_file = spark.read \
    .option("header", "true")  \
    .option("inferSchema", "true")  \
    .csv("clean_me.csv", sep=";")

df_clean_file.count()

499999

Write a function to format the file in a way that each appropriate value will correspond to only one column. If the data itself contains the splitter comma, it should be quoted by double quotes.  

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, split, concat, when

import pandas as pd  # Note: This import is not used in the provided code. Consider removing it.

def format_file():
    """
    Read a CSV file, format its content and return a DataFrame with the desired columns.
    """
    # Read the text file
    raw_file = spark.read.text("clean_me.csv")
    
    # Splitting the columns
    formatted_df = raw_file.withColumn("splitted_column", split(col("value"), ","))
    
    # Extracting fields from the splitted_column
    formatted_df = (formatted_df
                    .withColumn("order_id", col("splitted_column")[0])
                    .withColumn("delivery_company", col("splitted_column")[1])
                    .withColumn("quantity", when(col("splitted_column")[2].cast("int").isNotNull(), col("splitted_column")[2])
                                .otherwise(lit(0)))
                    .withColumn("price", when(col("splitted_column")[4].cast("int").isNotNull(),
                                               concat(col("splitted_column")[3], lit("."), col("splitted_column")[4]))
                                .otherwise(col("splitted_column")[3]))
                    .withColumn("ordered_date", when(col("splitted_column")[4].cast("int").isNotNull(), col("splitted_column")[5])
                                .otherwise(col("splitted_column")[4]))
                    .withColumn("address", concat(col("splitted_column")[6], lit(","), col("splitted_column")[7]))
                    )
    
    # Removing the header and selecting desired columns
    result_df = (formatted_df.select("order_id", "delivery_company", "quantity", "price", "ordered_date", "address")
                 .filter(col("order_id") != "order_id"))
    
    return result_df

# Creating a SparkSession
spark = SparkSession.builder.appName("Read CSV Example").getOrCreate()

# Calling the function and showing the result
format_file().show(truncate=False)


+--------+----------------+--------+--------+------------+-------------------------------+
|order_id|delivery_company|quantity|price   |ordered_date|address                        |
+--------+----------------+--------+--------+------------+-------------------------------+
|1       | delivery_comp_1| 1      | 245. 52| 9-2-2022   | Cedar Lane Houston, CA 90001  |
|2       |delivery_comp_2 |2       |114.77  |null        |Main Street,New York CA 60601  |
|3       |delivery_comp_3 |0       |739.43  |14-3-2022   |Main Street,Chicago TX 10001   |
|4       |delivery_comp_0 |1       |878.93  |20/4/2022   |null                           |
|5       |delivery_comp_1 |2       |481.44  |null        |Maple Drive Chicago,FL 60601   |
|6       |delivery_comp_2 |0       |78.13   |null        |Main Street,Houston NY 77001   |
|7       |delivery_comp_3 |1       |832.17  |20-2-2022   |null                           |
|8       |delivery_comp_0 |2       |687.8   |1/4/2022    |Maple Drive,Los Angeles        |

Write a function to unify column data values, ensure that: 
* Prices are truly double. 
* Order dates have the dd-MM-yyyy format. 
* Addresses have the $street, $city, $state, and $zipCode format. 

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, isnan, when, count, regexp_extract

# Initialize Spark session
spark = SparkSession.builder.appName("data_validation").getOrCreate()

# Assuming data is loaded into a DataFrame named df
df = format_file()

def get_df_file_format(df):
    # 1. Ensure all order_id are unique
    if df.count() != df.select("order_id").distinct().count():
        print("Duplicate order_ids found!")
    
    # 2. Check delivery_company format
    invalid_delivery_comp = df.filter(~col("delivery_company").rlike("^delivery_comp_[0-9]+$")).count()
    if invalid_delivery_comp > 0:
        print(f"Found {invalid_delivery_comp} rows with invalid delivery_company format!")
    
    # 3. Check quantity is non-negative
    if df.filter(col("quantity") < 0).count() > 0:
        print("Negative quantity values found!")
    
    # 4. Check if prices are truly doubles and positive
    if df.filter(~col("price").cast("double").isNotNull() | (col("price") <= 0)).count() > 0:
        print("Invalid price values found!")
    
    # 5. Standardize and validate date format
    df = df.withColumn("ordered_date", 
                       when(col("ordered_date").rlike("^[0-9]{1,2}-[0-9]{1,2}-[0-9]{4}$"), 
                            regexp_extract(col("ordered_date"), "([0-9]{1,2}-[0-9]{1,2}-[0-9]{4})", 1))
                       .otherwise(
                            regexp_extract(col("ordered_date"), "([0-9]{1,2}/[0-9]{1,2}/[0-9]{4})", 1)))
    if df.filter((col("ordered_date") == "") & (~col("ordered_date").isNull())).count() > 0:
        print("Invalid date formats found!")
    
    # 6. Check for null addresses
    if df.filter(col("address").isNull()).count() > 0:
        print("Null address values found!")
    
    print("Validation complete.")
    
    return df


get_df_file_format(df).show()

Found 1 rows with invalid delivery_company format!
Invalid price values found!
Invalid date formats found!
Null address values found!
Validation complete.
+--------+----------------+--------+--------+------------+--------------------+
|order_id|delivery_company|quantity|   price|ordered_date|             address|
+--------+----------------+--------+--------+------------+--------------------+
|       1| delivery_comp_1|       1| 245. 52|            | Cedar Lane Houst...|
|       2| delivery_comp_2|       2|  114.77|            |Main Street,New Y...|
|       3| delivery_comp_3|       0|  739.43|   14-3-2022|Main Street,Chica...|
|       4| delivery_comp_0|       1|  878.93|   20/4/2022|                null|
|       5| delivery_comp_1|       2|  481.44|            |Maple Drive Chica...|
|       6| delivery_comp_2|       0|   78.13|            |Main Street,Houst...|
|       7| delivery_comp_3|       1|  832.17|   20-2-2022|                null|
|       8| delivery_comp_0|       2|   687.8|

Write a function to resolve missing/error-prone values. It is required to: 
Replace missing quantity values with average values for each delivery company 
Replace a null date in the ordered_date column with the following chronological date after a valid date for orders fulfilled by the same delivery company. 

In [5]:
from pyspark.sql.functions import avg, last, coalesce
from pyspark.sql.window import Window
import sys


fomated_df = get_df_file_format(format_file())

def format_quantity(formated_df):
    avg_quantity_df = formated_df.groupBy("delivery_company").agg(avg("quantity").alias("avg_quantity"))
    return formated_df.join(avg_quantity_df, "delivery_company", "inner") \
        .withColumn("quantity", coalesce(col("quantity"), col("avg_quantity"))).drop("avg_quantity")

def fill_nulls_ordered_date(format_quantity):
    # Order by delivery_company and ordered_date to make sure the 'last' function gets the right value
    window_spec = Window.partitionBy("delivery_company").rowsBetween(-sys.maxsize, 0)    
    return format_quantity.withColumn("ordered_date",
                                     when(col("ordered_date") != "", col("ordered_date")).otherwise(lit(None))).withColumn("ordered_date",
                                      coalesce(col("ordered_date"), last("ordered_date", True).over(window_spec)))

df_fill_nulls_ordered_data = format_quantity(fomated_df)

fill_nulls_ordered_date(df_fill_nulls_ordered_data).show()


Found 1 rows with invalid delivery_company format!
Invalid price values found!
Invalid date formats found!
Null address values found!
Validation complete.
+----------------+--------+--------+------+------------+--------------------+
|delivery_company|order_id|quantity| price|ordered_date|             address|
+----------------+--------+--------+------+------------+--------------------+
| delivery_comp_1|       5|       2|481.44|        null|Maple Drive Chica...|
| delivery_comp_1|       9|       0|338.44|   13/4/2022|Cedar Lane Miami,...|
| delivery_comp_1|      13|       1|939.99|   13/4/2022| Main Street,Chicago|
| delivery_comp_1|      17|       2| 533.6|    2-3-2022|    Houston,TX 90001|
| delivery_comp_1|      21|       0|111.18|    4-2-2022|Maple Drive Los A...|
| delivery_comp_1|      25|       1|118.47|   15/4/2022|Main Street,Los A...|
| delivery_comp_1|      29|       2|749.48|   17-3-2022|Oak Avenue,Housto...|
| delivery_comp_1|      33|       0|713.49|    9/2/2022|         