In [1]:
import pyspark  
sc = pyspark.SparkContext('local[*]') 
# do something to prove it works 
rdd = sc.parallelize(range(1000)) 
rdd.takeSample(False, 5) 

[773, 546, 975, 55, 797]

Write a function to format the file in a way that each appropriate value will correspond to only one column. If the data itself contains the splitter comma, it should be quoted by double quotes.  

In [6]:
from pyspark.sql.functions import col, lit, regexp_replace, split, concat, when,expr
from pyspark.sql import SparkSession
import pandas as pd

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

def get_format_file():
    # Read the text file
    file_read = spark.read.text("clean_me.csv")
    # Format changues
    splited_df_1 = file_read.withColumn("splited_colum",split(file_read["value"],","))
    splited_df_1 = splited_df_1.withColumn("order_id", col("splited_colum")[0])
    splited_df_1 = splited_df_1.withColumn("delivery_company", col("splited_colum")[1])
    splited_df_1 = splited_df_1.withColumn("quantity", when(col("splited_colum")[2].cast("int").isNotNull(),
                                                            col("splited_colum")[2]) \
                                                            .otherwise(lit(0))) 
    splited_df_1 = splited_df_1.withColumn("price",  when(col("splited_colum")[4].cast("int").isNotNull(),
                                                        concat(col("splited_colum")[3],lit("."),col("splited_colum")[4])) 
                                                        .otherwise(col("splited_colum")[3])) 
    splited_df_1 = splited_df_1.withColumn("ordered_date", when(col("splited_colum")[4].cast("int").isNotNull(),
                                                        concat(col("splited_colum")[5])) \
                                                        .otherwise(col("splited_colum")[4]))
    
    splited_df_1 = splited_df_1.withColumn("address", concat(col("splited_colum")[6],lit(","),col("splited_colum")[7]))
    # Delete first record that was as header
    final_df = splited_df_1.select("order_id","delivery_company","quantity","price","ordered_date","address") \
        .filter(col("order_id") != "order_id")
    # Format changues
    return final_df
    
# Call the function to execute the code
get_format_file().show()

+--------+----------------+--------+--------+------------+--------------------+
|order_id|delivery_company|quantity|   price|ordered_date|             address|
+--------+----------------+--------+--------+------------+--------------------+
|       1| delivery_comp_1|       1| 245. 52|    9-2-2022| Cedar Lane Houst...|
|       2| delivery_comp_2|       2|  114.77|        null|Main Street,New Y...|
|       3| delivery_comp_3|       0|  739.43|   14-3-2022|Main Street,Chica...|
|       4| delivery_comp_0|       1|  878.93|   20/4/2022|                null|
|       5| delivery_comp_1|       2|  481.44|        null|Maple Drive Chica...|
|       6| delivery_comp_2|       0|   78.13|        null|Main Street,Houst...|
|       7| delivery_comp_3|       1|  832.17|   20-2-2022|                null|
|       8| delivery_comp_0|       2|   687.8|    1/4/2022|Maple Drive,Los A...|
|       9| delivery_comp_1|       0|  338.44|   13/4/2022|Cedar Lane Miami,...|
|      10| delivery_comp_2|       1|  46

Write a function to unify column data values, ensure that: 
* Prices are truly double. 
* Order dates have the dd-MM-yyyy format. 
* Addresses have the $street, $city, $state, and $zipCode format. 

In [8]:

from pyspark.sql.functions import date_format,col,size

data_set = get_format_file()

def get_format_address(value):
    # Method that formats addresses
    split_chars = [" ",","]
    
    column_splited = split(value,"["+"".join(split_chars)+"]")
 
    splited = when(size(column_splited)== 5, 
                   concat(column_splited[0],
                   lit(" "),
                   column_splited[1],
                   lit(","),
                   column_splited[2],
                   lit(","),
                   column_splited[3],
                   lit(","),
                   column_splited[4])) \
    .otherwise(when(size(column_splited)== 4, 
                   concat(column_splited[0],
                   lit(","),
                   column_splited[1],
                   lit(","),
                   column_splited[2],
                   lit(","),
                   column_splited[3])) \
    .otherwise(when(size(column_splited)== 3, 
                   concat(
                   column_splited[0],
                   lit(","),
                   column_splited[1],
                   lit(","),
                   column_splited[2])) \
    .otherwise(when(size(column_splited)== 2, 
                   concat(
                   column_splited[0],
                   lit(","),
                   column_splited[1])))))    
    return splited

    
def get_format_date(value):
    # format to handle date formats
    splited = when(value.like("%-%"), split(value,"-")).otherwise(split(value,"/"))
    return concat(splited[2],lit("-"),splited[1],lit("-"),splited[0])


def cast_prices_double(data_set):
    # format to cast double 
    return data_set.withColumn("price",col("price").cast("double"))

def cast_date_format(data_set):
    # format to handle date formats 
    return data_set.withColumn("ordered_date",
                               date_format(get_format_date(col("ordered_date")),"dd-MM-yyyy"))

def cast_address_format(data_set):
    return data_set.withColumn("address", get_format_address(col("address")))


def get_final_format():
    # method that applies all transformations
    data_set_1 = cast_prices_double(data_set)
    
    data_set_2 = cast_date_format(data_set_1)
    
    data_set_3 = cast_address_format(data_set_2)
    
    return data_set_3



get_final_format().show()




+--------+----------------+--------+------+------------+--------------------+
|order_id|delivery_company|quantity| price|ordered_date|             address|
+--------+----------------+--------+------+------------+--------------------+
|       1| delivery_comp_1|       1|  null|        null|                null|
|       2| delivery_comp_2|       2|114.77|        null|                null|
|       3| delivery_comp_3|       0|739.43|  14-03-2022|Main Street,Chica...|
|       4| delivery_comp_0|       1|878.93|  20-04-2022|                null|
|       5| delivery_comp_1|       2|481.44|        null|Maple Drive,Chica...|
|       6| delivery_comp_2|       0| 78.13|        null|Main Street,Houst...|
|       7| delivery_comp_3|       1|832.17|  20-02-2022|                null|
|       8| delivery_comp_0|       2| 687.8|  01-04-2022|Maple,Drive,Los,A...|
|       9| delivery_comp_1|       0|338.44|  13-04-2022|Cedar Lane,Miami,...|
|      10| delivery_comp_2|       1|461.33|        null|    Chic

write a function to resolve missing/error-prone values. It is required to: 
* Replace missing quantity values with average values for each delivery company 
* Replace a null date in the ordered_date column with the following chronological date after a valid date for orders fulfilled by the same delivery company. 

In [12]:
from pyspark.sql.functions import avg, last
from pyspark.sql.window import Window

fomated_df = get_final_format()

def format_quantity(formated_df):
    return formated_df.join(formated_df
                            .groupBy("delivery_company")
                            .agg(avg("quantity").alias("avg_quantity")),"delivery_company","inner") \
    .withColumn("quantity", 
                when(col("quantity")!=0, 
                     col("quantity"))
                .otherwise(col("avg_quantity"))).drop("avg_quantity")

def fill_nulls_ordered_date(format_quantity):
    window_spec = Window.partitionBy("delivery_company")

    return format_quantity.withColumn("ordered_date",
                                      when(col("ordered_date").isNotNull(), 
                                           col("ordered_date"))
                                      .otherwise(last("ordered_date", True).over(window_spec)))
        
df_fill_nulls_ordered_data = format_quantity(fomated_df)

fill_nulls_ordered_date(df_fill_nulls_ordered_data).show()


+----------------+--------+------------------+------+------------+--------------------+
|delivery_company|order_id|          quantity| price|ordered_date|             address|
+----------------+--------+------------------+------+------------+--------------------+
| delivery_comp_1|       5|                 2|481.44|  09-04-2022|Maple Drive,Chica...|
| delivery_comp_1|       9|1.0000080000640006|338.44|  13-04-2022|Cedar Lane,Miami,...|
| delivery_comp_1|      13|                 1|939.99|  09-04-2022| Main,Street,Chicago|
| delivery_comp_1|      17|                 2| 533.6|  02-03-2022|    Houston,TX,90001|
| delivery_comp_1|      21|1.0000080000640006|111.18|  04-02-2022|                null|
| delivery_comp_1|      25|                 1|118.47|  15-04-2022|Main,Street,Los,A...|
| delivery_comp_1|      29|                 2|749.48|  17-03-2022|Oak Avenue,Housto...|
| delivery_comp_1|      33|1.0000080000640006|713.49|  09-02-2022|                null|
| delivery_comp_1|      37|     