In [27]:
# import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnull

In [28]:
# create sparksession
spark = SparkSession.builder \
    .appName("data_cleaning.ipynb") \
    .getOrCreate()

In [29]:
# import and read csv. file
file_path = "project_dataset/python_raw_data/fake_orders_test_1.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True, sep=';') 

In [30]:
df_new = spark.read.csv(file_path, header=True, inferSchema=True, sep=";")
del df_new  # delete incorrectly created df
df.show(10, truncate=False)

+--------+---------------------+------------+-------------+---------------+--------------+--------+------------------+--------------------+
|order_id|activation_time_local|country_code|store_address|final_status   |payment_status|products|products_total    |purchase_total_price|
+--------+---------------------+------------+-------------+---------------+--------------+--------+------------------+--------------------+
|31503775|2019-03-01 11:43:08  |ES          |15871        |DeliveredStatus|PAID          |1       |1.85              |14.02               |
|31503965|2019-03-01 11:43:08  |ES          |15871        |DeliveredStatus|PAID          |3       |6.15              |12.21               |
|31636675|2019-03-01 08:58:01  |AR          |61807        |DeliveredStatus|PAID          |4       |1.1800000000000002|9.76                |
|31724509|2019-03-01 16:43:04  |ES          |16228        |DeliveredStatus|PAID          |5       |11.07             |12.52               |
|31839133|2019-03-01

In [31]:
# show schema of the dataframe
df.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- activation_time_local: timestamp (nullable = true)
 |-- country_code: string (nullable = true)
 |-- store_address: integer (nullable = true)
 |-- final_status: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- products: integer (nullable = true)
 |-- products_total: double (nullable = true)
 |-- purchase_total_price: double (nullable = true)



In [32]:
# create new column "authorised_or_not" based on "products_total"&"purchase_total_price"
from pyspark.sql.functions import when

df = df.withColumn(
    "authorized_or_not",
    when(df["products_total"]<df["purchase_total_price"], "under_authorized")
    .otherwise("correctly_authorized")
)

In [33]:
df.show(20)

+--------+---------------------+------------+-------------+---------------+--------------+--------+------------------+--------------------+--------------------+
|order_id|activation_time_local|country_code|store_address|   final_status|payment_status|products|    products_total|purchase_total_price|   authorized_or_not|
+--------+---------------------+------------+-------------+---------------+--------------+--------+------------------+--------------------+--------------------+
|31503775|  2019-03-01 11:43:08|          ES|        15871|DeliveredStatus|          PAID|       1|              1.85|               14.02|    under_authorized|
|31503965|  2019-03-01 11:43:08|          ES|        15871|DeliveredStatus|          PAID|       3|              6.15|               12.21|    under_authorized|
|31636675|  2019-03-01 08:58:01|          AR|        61807|DeliveredStatus|          PAID|       4|1.1800000000000002|                9.76|    under_authorized|
|31724509|  2019-03-01 16:43:04|  

In [34]:
sc = spark.sparkContext

In [35]:
# check null values in df, select each columns and return the count of null values
from pyspark.sql.functions import col, isnan, when, count, countDistinct

count_null = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

count_null.show()


+--------+---------------------+------------+-------------+------------+--------------+--------+--------------+--------------------+-----------------+
|order_id|activation_time_local|country_code|store_address|final_status|payment_status|products|products_total|purchase_total_price|authorized_or_not|
+--------+---------------------+------------+-------------+------------+--------------+--------+--------------+--------------------+-----------------+
|       0|                    0|           0|            0|           0|             0|       0|             0|                   0|                0|
+--------+---------------------+------------+-------------+------------+--------------+--------+--------------+--------------------+-----------------+



In [36]:
# count numbers of distinct values of each columns
count_distinct_values = df.agg(*[countDistinct(col(c)).alias(c) for c in df.columns])
count_distinct_values.show()

+--------+---------------------+------------+-------------+------------+--------------+--------+--------------+--------------------+-----------------+
|order_id|activation_time_local|country_code|store_address|final_status|payment_status|products|products_total|purchase_total_price|authorized_or_not|
+--------+---------------------+------------+-------------+------------+--------------+--------+--------------+--------------------+-----------------+
|   60400|                56255|          23|         5755|           2|             3|      28|          3981|                4367|                2|
+--------+---------------------+------------+-------------+------------+--------------+--------+--------------+--------------------+-----------------+



In [37]:
# export csv file 
output_path = "project_dataset/python_raw_data/fake_orders_test_updated.csv"
df.coalesce(1).write.option("header", "true").csv(output_path)


In [39]:
spark.stop()