# 1. Read "clean_me.csv" file with marked unknown columns.

In [4]:
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Spark CSV file reader") \
    .getOrCreate()

filePath = "input_csv_files/clean_me.csv"

# Read CSV file with the specified options
clean_meDF = spark.read \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .csv(filePath)

# Register the DataFrame as a SQL temporary view
clean_meDF.createOrReplaceTempView("clean_me")

clean_meDF.show()


+--------+----------------+--------+------+------------+-------------------+--------------------+--------------------+-----+-----+
|order_id|delivery_company|quantity| price|ordered_date|            address|                  x1|                  x2|   x3|   x4|
+--------+----------------+--------+------+------------+-------------------+--------------------+--------------------+-----+-----+
|       1| delivery_comp_1|       1| 245.0|          52|           9/2/2022|  Cedar Lane Houston|            CA 90001| null| null|
|       2| delivery_comp_2|       2| 114.0|          77|               null|         Main Street|   New York CA 60601| null| null|
|       3| delivery_comp_3|    null| 739.0|          43|          14-3-2022|         Main Street|    Chicago TX 10001| null| null|
|       4| delivery_comp_0|       1|878.93|   20/4/2022|         Oak Avenue|Los Angeles FL 90001|                null| null| null|
|       5| delivery_comp_1|       2| 481.0|          44|               null| Maple 

# 2. CSV Format Fixing

In [8]:
fixedFormatDF = spark.sql(
    "SELECT " +
      "order_id, " +
      "delivery_company, " +
      "CASE " +
        "WHEN (quantity IS NULL OR quantity IN ('1', '2', 'NA', 'null', '#NA', 'NaN', 'NULL')) AND ordered_date NOT LIKE '%-%' AND ordered_date NOT LIKE '%/%' " +
        "THEN ordered_date " +
        "ELSE quantity " +
      "END AS quantity, " +
      "price, " +
      "CASE " +
        "WHEN ordered_date NOT LIKE '%-%' AND ordered_date NOT LIKE '%/%' AND address = 'null' " +
        "THEN address " +
        "WHEN ordered_date NOT LIKE '%-%' AND ordered_date NOT LIKE '%/%' AND (address LIKE '%-%' OR address LIKE '%/%') " +
        "THEN address " +
        "ELSE ordered_date " +
      "END AS ordered_date, " +
      "CASE " +
        "WHEN CONTAINS( " +
          "CONCAT(IFNULL(x1,''), ' ', IFNULL(x2,''), ' ', IFNULL(x3,''), ' ', IFNULL(x4,'')), " + 
          "CASE " +
            "WHEN address != 'null' AND address NOT LIKE '%-%' AND address NOT LIKE '%/%' " +
            "THEN address " +
            "ELSE x1 " +
          "END) = TRUE " +
        "THEN CONCAT(IFNULL(x1,''), ' ', IFNULL(x2,''), ' ', IFNULL(x3,''), ' ', IFNULL(x4,'')) " +
        "ELSE CONCAT( " +
          "CASE " +
            "WHEN address != 'null' AND address NOT LIKE '%-%' AND address NOT LIKE '%/%' " +
            "THEN address " +
            "ELSE x1 " +
          "END, " + 
          "' ', " +
          "CONCAT(IFNULL(x1,''), ' ', IFNULL(x2,''), ' ', IFNULL(x3,''), ' ', IFNULL(x4,''))) " +
      "END AS address " +
    "FROM clean_me"
)

fixedFormatDF.show(20, truncate=False)

+--------+----------------+--------+------+------------+----------------------------------+
|order_id|delivery_company|quantity|price |ordered_date|address                           |
+--------+----------------+--------+------+------------+----------------------------------+
|1       |delivery_comp_1 |52      |245.0 |9/2/2022    |Cedar Lane Houston CA 90001       |
|2       |delivery_comp_2 |77      |114.0 |null        |Main Street New York CA 60601     |
|3       |delivery_comp_3 |43      |739.0 |14-3-2022   |Main Street Chicago TX 10001      |
|4       |delivery_comp_0 |1       |878.93|20/4/2022   |Oak Avenue Los Angeles FL 90001   |
|5       |delivery_comp_1 |44      |481.0 |null        |Maple Drive Chicago FL 60601      |
|6       |delivery_comp_2 |13      |78.0  |null        |Main Street Houston NY 77001      |
|7       |delivery_comp_3 |1       |832.17|20-2-2022   |Oak Avenue New York CA 10001      |
|8       |delivery_comp_0 |8       |687.0 |1/4/2022    |Maple Drive Los Angeles 

# 3. Data Types Fixing

In [15]:
fixedFormatDF.createOrReplaceTempView("fixed_format_df")

dataTypeFixingDF = spark.sql(
    "SELECT " +
      "order_id, " +
      "delivery_company, " +
      "quantity, " +
      "CAST(price AS DECIMAL(9,2)) AS price, " +
      "CASE " +
        "WHEN ordered_date != 'null' " +
        "THEN replace(ordered_date, '/', '-') " +
        "ELSE ordered_date " +
      "END AS ordered_date, " +
      "address " +
    "FROM fixed_format_df"
)

dataTypeFixingDF.show(50, truncate=False)

+--------+----------------+--------+------+------------+-----------------------------------+
|order_id|delivery_company|quantity|price |ordered_date|address                            |
+--------+----------------+--------+------+------------+-----------------------------------+
|1       |delivery_comp_1 |52      |245.00|9-2-2022    |Cedar Lane Houston CA 90001        |
|2       |delivery_comp_2 |77      |114.00|null        |Main Street New York CA 60601      |
|3       |delivery_comp_3 |43      |739.00|14-3-2022   |Main Street Chicago TX 10001       |
|4       |delivery_comp_0 |1       |878.93|20-4-2022   |Oak Avenue Los Angeles FL 90001    |
|5       |delivery_comp_1 |44      |481.00|null        |Maple Drive Chicago FL 60601       |
|6       |delivery_comp_2 |13      |78.00 |null        |Main Street Houston NY 77001       |
|7       |delivery_comp_3 |1       |832.17|20-2-2022   |Oak Avenue New York CA 10001       |
|8       |delivery_comp_0 |8       |687.00|1-4-2022    |Maple Drive Lo

# 4. Missing Values Handling

In [16]:
# Register the DataFrame as a SQL temporary view
dataTypeFixingDF.createOrReplaceTempView("data_type_fixing")

## • Replace missing quantity values with average values for each delivery company

In [18]:
avgQuantityByDeliveryCompanyDF = spark.sql(
    "SELECT " +
      "delivery_company, " +
      "AVG(quantity) AS avg_quantity " +
    "FROM data_type_fixing " +
    "WHERE ISNAN(quantity) != 1 AND quantity IS NOT null AND quantity NOT IN ('null', 'NULL', 'NaN', '#NA', 'NA') " +
    "GROUP BY delivery_company"
)

avgQuantityByDeliveryCompanyDF.show()

+----------------+------------------+
|delivery_company|      avg_quantity|
+----------------+------------------+
| delivery_comp_1|34.771702111136996|
| delivery_comp_3| 34.76116785079929|
| delivery_comp_2|  34.6890914946307|
| delivery_comp_0|34.621575989158686|
+----------------+------------------+



In [19]:
# Register the DataFrame as a SQL temporary view
avgQuantityByDeliveryCompanyDF.createOrReplaceTempView("avg_quantity_by_delivery_company")

In [20]:
missingQuantityValuesDF = spark.sql(
    "SELECT " +
      "dtf.order_id, " +
      "dtf.delivery_company," +
      "CASE " +
        "WHEN ISNAN(dtf.quantity) != 0 OR dtf.quantity IS null OR dtf.quantity IN ('null', 'NULL', 'NaN', '#NA', 'NA') " +
        "THEN aq.avg_quantity " +
        "ELSE dtf.quantity " +
      "END AS quantity, " +
      "dtf.price, " +
      "dtf.ordered_date, " +
      "dtf.address " +
    "FROM data_type_fixing AS dtf " +
      "LEFT OUTER JOIN avg_quantity_by_delivery_company AS aq ON aq.delivery_company = dtf.delivery_company"
)

missingQuantityValuesDF.show(50, truncate=False)

+--------+----------------+------------------+------+------------+-----------------------------------+
|order_id|delivery_company|quantity          |price |ordered_date|address                            |
+--------+----------------+------------------+------+------------+-----------------------------------+
|1       |delivery_comp_1 |52                |245.00|9-2-2022    |Cedar Lane Houston CA 90001        |
|2       |delivery_comp_2 |77                |114.00|null        |Main Street New York CA 60601      |
|3       |delivery_comp_3 |43                |739.00|14-3-2022   |Main Street Chicago TX 10001       |
|4       |delivery_comp_0 |1                 |878.93|20-4-2022   |Oak Avenue Los Angeles FL 90001    |
|5       |delivery_comp_1 |44                |481.00|null        |Maple Drive Chicago FL 60601       |
|6       |delivery_comp_2 |13                |78.00 |null        |Main Street Houston NY 77001       |
|7       |delivery_comp_3 |1                 |832.17|20-2-2022   |Oak Ave

## • Replace a null date in the ordered_date column with the following chronological date after a valid date for orders fulfilled by the same delivery company.

In [22]:
from pyspark.sql.functions import when, lit, col
from pyspark.sql.types import StringType

# Convert the values in 'ordered_date' column to null if they are "null" strings
setAsNullDF = missingQuantityValuesDF \
    .withColumn("ordered_date", when(col("ordered_date") == "null", lit(None).cast(StringType())).otherwise(col("ordered_date")))

# Register the DataFrame as a temporary SQL view
setAsNullDF.createOrReplaceTempView("setAsNullDF")


In [23]:
replaceNullDateDF = spark.sql(
    "SELECT " +
        "sn.order_id, " +
        "sn.delivery_company, " +
        "sn.quantity, " +
        "sn.price, " +
        "COALESCE(cte.ordered_date, cte.previous_ordered_date) AS ordered_date, " +
        "sn.address " +
    "FROM setAsNullDF AS sn " +
        "INNER JOIN ( " +
            "SELECT " +
                "order_id, " +
                "delivery_company, " +
                "ordered_date, " +
                "LAG(ordered_date) OVER (PARTITION BY delivery_company ORDER BY order_id) AS previous_ordered_date " +
            "FROM " +
                "setAsNullDF " +
        ") AS cte ON cte.order_id = sn.order_id " +
    "ORDER BY sn.order_id"
).show(50, truncate=False)

+--------+----------------+------------------+------+------------+-----------------------------------+
|order_id|delivery_company|quantity          |price |ordered_date|address                            |
+--------+----------------+------------------+------+------------+-----------------------------------+
|1       |delivery_comp_1 |52                |245.00|9-2-2022    |Cedar Lane Houston CA 90001        |
|2       |delivery_comp_2 |77                |114.00|null        |Main Street New York CA 60601      |
|3       |delivery_comp_3 |43                |739.00|14-3-2022   |Main Street Chicago TX 10001       |
|4       |delivery_comp_0 |1                 |878.93|20-4-2022   |Oak Avenue Los Angeles FL 90001    |
|5       |delivery_comp_1 |44                |481.00|9-2-2022    |Maple Drive Chicago FL 60601       |
|6       |delivery_comp_2 |13                |78.00 |null        |Main Street Houston NY 77001       |
|7       |delivery_comp_3 |1                 |832.17|20-2-2022   |Oak Ave

## • Set the previous valid record as the end of the month If the next valid record is missing. 

<span style="color:red; font-weight:bold">
    FYI. The last sub-task is not clear even based on the sugessted example in the description! I was just thinking aproximatly for 2 hours looking for approach on how to start doing that.
</span>.