In [1]:
import pyspark  
sc = pyspark.SparkContext('local[*]') 
# do something to prove it works 
rdd = sc.parallelize(range(1000)) 
rdd.takeSample(False, 5)

[673, 33, 118, 314, 563]

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CSV File Cleanup") \
    .getOrCreate()

In [3]:
df = spark.read.csv("clean_me.csv",sep="|",header=True)


In [4]:
column_name = df.columns[0]

In [5]:
column_name

'order_id,delivery_company,quantity,price,ordered_date,address,,,,'

In [6]:
df = df.withColumnRenamed(column_name, "col")

In [7]:
df.show(5)

+--------------------+
|                 col|
+--------------------+
|1,delivery_comp_1...|
|2,delivery_comp_2...|
|3,delivery_comp_3...|
|4,delivery_comp_0...|
|5,delivery_comp_1...|
+--------------------+
only showing top 5 rows



In [8]:
df.createOrReplaceTempView("temp")

In [9]:
# Take the first 3 columns

In [10]:
clean_first2 = spark.sql("""
 SELECT SPLIT(col, ',')[0] as order_id,
        SPLIT(col, ',')[1] as delivery_company,
        SPLIT(col, ',')[2] as quantity,
        CONCAT_WS(',', SPLIT(col, ',')[3],
        SPLIT(col, ',')[4], SPLIT(col, ',')[5],
        SPLIT(col, ',')[6], SPLIT(col, ',')[7],
        SPLIT(col, ',')[8], SPLIT(col, ',')[9]) as tail
 FROM temp 
""")

In [11]:
clean_first2.createOrReplaceTempView("tmp2")

In [12]:
# Extract the floating digits separated by , and add . to them

In [13]:
from  pyspark.sql.functions import split, regexp_replace, regexp_extract

df_clean = clean_first2.withColumn("tail",
                        regexp_replace("tail",r'(\d+),(\d+),',r'\1.\2,'))

In [14]:
df_clean.createOrReplaceTempView("tmp3")

In [15]:
# Modify the date

In [16]:
df_clean2 = spark.sql("""
   SELECT order_id, delivery_company, quantity, 
   SPLIT(tail, ',')[0] as price,
   SPLIT(tail, ',')[1] as ordered_date,
        CONCAT_WS(',', SPLIT(tail, ',')[2],
        SPLIT(tail, ',')[3], SPLIT(tail, ',')[4],
        SPLIT(tail, ',')[5]) as tail
   FROM (
    SELECT order_id, delivery_company, quantity, regexp_replace(tail, "-", '/') as tail
      from tmp3 ) tb
""")

In [17]:
# Extract the zipcode and state. I use tail as the part of the text left unprocessed

In [18]:
df_clean2 = (df_clean2
             .withColumn("zipcode", regexp_extract("tail", r"(\d+)", 0))
             .withColumn("state", regexp_extract("tail", r"\b([A-Z]{2})\b", 0)))


In [19]:
df_clean2.show(5)

+--------+----------------+--------+------+------------+--------------------+-------+-----+
|order_id|delivery_company|quantity| price|ordered_date|                tail|zipcode|state|
+--------+----------------+--------+------+------------+--------------------+-------+-----+
|       1| delivery_comp_1|       1|   1.2|    9/2/2022|Cedar Lane Housto...|  90001|   CA|
|       2| delivery_comp_2|       2|   1.2|        null|Main Street,New Y...|  60601|   CA|
|       3| delivery_comp_3|    null|   1.2|   14/3/2022|Main Street,Chica...|  10001|   TX|
|       4| delivery_comp_0|       1|878.93|   20/4/2022|Oak Avenue,Los An...|  90001|   FL|
|       5| delivery_comp_1|       2|   1.2|        null|Maple Drive Chica...|  60601|   FL|
+--------+----------------+--------+------+------------+--------------------+-------+-----+
only showing top 5 rows



In [20]:
df_clean2.createOrReplaceTempView("tmp4")

In [21]:
df_clean3 = spark.sql("""
   SELECT order_id, delivery_company, CAST(quantity AS INTEGER) as quantity,price,state,
   TO_DATE(ordered_date,"d/m/yyyy") as ordered_date,zipcode,
   SPLIT(tail, ',')[0] as street,
   SPLIT(tail, ',')[1] as city,tail
   from tmp4 
""")

In [22]:
df_clean3.createOrReplaceTempView("tmp5")

In [23]:
cities = spark.sql(""" SELECT city from (
   SELECT city,count(1)
   from tmp5 group by city order by 2 desc limit 5 ) tb
""").rdd.collect()

In [24]:
# Use the global cities variable to remove the cities from street and extract city name from the text
# The top 5 is the only hardcoded variable that I had to infer based on the date in the CSV
cities = [city.city for city in cities]

In [25]:
cities

['Miami', 'Chicago', 'Los Angeles', 'New York', 'Houston']

In [26]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def extract_city(sentence):
    for city in cities:
        if city in cities:
            return city
    return None

@udf(returnType=StringType())
def replace_city_empty(sentence):
    for city in cities:
        sentence = sentence.replace(city, "")
    return sentence.strip()


In [27]:
df_clean3 = (df_clean3
             .withColumn("city_", extract_city("tail"))
             .withColumn("street_", replace_city_empty("street"))
             )

In [28]:
df_clean3.show(5)

+--------+----------------+--------+------+-----+------------+-------+-------------------+--------------------+--------------------+-----+-----------+
|order_id|delivery_company|quantity| price|state|ordered_date|zipcode|             street|                city|                tail|city_|    street_|
+--------+----------------+--------+------+-----+------------+-------+-------------------+--------------------+--------------------+-----+-----------+
|       1| delivery_comp_1|       1|   1.2|   CA|  2022-01-09|  90001| Cedar Lane Houston|            CA 90001|Cedar Lane Housto...|Miami| Cedar Lane|
|       2| delivery_comp_2|       2|   1.2|   CA|        null|  60601|        Main Street|   New York CA 60601|Main Street,New Y...|Miami|Main Street|
|       3| delivery_comp_3|    null|   1.2|   TX|  2022-01-14|  10001|        Main Street|    Chicago TX 10001|Main Street,Chica...|Miami|Main Street|
|       4| delivery_comp_0|       1|878.93|   FL|  2022-01-20|  90001|         Oak Avenue|Los 

In [29]:
df_clean3.createOrReplaceTempView("tmp6")

In [30]:
# Fill missing data

In [31]:
spark.sql(""" 
        SELECT 
          order_id, delivery_company,
        CASE WHEN quantity IS NULL THEN avg_quantity 
             ELSE quantity END as quantity,
             price,
             CASE WHEN ordered_date IS NULL THEN next_date
                  WHEN ordered_date IS NULL AND next_date IS NULL THEN LAST_DAY(prev_date)
                  ELSE ordered_date 
                  END as ordered_date,
             zipcode,street,city,state 
          FROM
      (SELECT 
      order_id, delivery_company, quantity,price,ordered_date,zipcode,city_ as city,street_ as street, state,
      avg(quantity) OVER (PARTITION BY delivery_company) as avg_quantity,
      LEAD(ordered_date) OVER (PARTITION BY delivery_company ORDER BY ordered_date asc) as next_date,
      LAG(ordered_date) OVER (PARTITION BY delivery_company ORDER BY ordered_date asc) as prev_date
      FROM tmp6) tb  where delivery_company = 'delivery_comp_1' order by ordered_date desc
      """).show(10)

+--------+----------------+--------+------+------------+-------+-----------+-----+-----+
|order_id|delivery_company|quantity| price|ordered_date|zipcode|     street| city|state|
+--------+----------------+--------+------+------------+-------+-----------+-----+-----+
|     105| delivery_comp_1|     1.5|   1.2|  2022-01-27|  90001|Maple Drive|Miami|   NY|
|    2433| delivery_comp_1|     1.5|980.53|  2022-01-27|  60601| Cedar Lane|Miami|   TX|
|     305| delivery_comp_1|     2.0|   1.2|  2022-01-27|  77001| Oak Avenue|Miami|   NY|
|    1157| delivery_comp_1|     2.0|   1.2|  2022-01-27|  10001| Oak Avenue|Miami|   FL|
|    1173| delivery_comp_1|     1.5|   1.2|  2022-01-27|  10001| Cedar Lane|Miami|   TX|
|    1209| delivery_comp_1|     1.5|   1.2|  2022-01-27|  33101|Maple Drive|Miami|   TX|
|    1353| delivery_comp_1|     1.5| 21.58|  2022-01-27|  33101| Oak Avenue|Miami|   TX|
|    1361| delivery_comp_1|     2.0|   1.2|  2022-01-27|  90001|Maple Drive|Miami|   TX|
|    1821| delivery_c