In [1]:
from pyspark.sql.functions import *
from pyspark.sql.functions import sum as _sum
from pyspark.sql import SparkSession
from functools import reduce

### CLEANING FUNCTIONS

In [2]:
''' This function takes as input the hdfs path of a csv and converts it into a spark dataframe '''
def csv_to_sparkdf(csv_path):
    # Create Spark Session
    spark_session = (
    SparkSession.builder.appName("SparkProfiling")
    .master("local[*]")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .getOrCreate()
    )
    # Load the dataset in a spark dataframe
    spark_df = spark_session.read.csv(csv_path, header=True, inferSchema=True)
    return spark_df

In [3]:
''' This function takes a spark dataframe as input and prints statistics on null values and duplicate rows '''
def statistics(spark_df):
    # Count null values for each columns
    null_counts = spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns])
    # Count duplicates rows
    duplicate_rows = spark_df.groupBy(spark_df.columns).count().filter("count > 1")
    duplicate_count = duplicate_rows.count()

    null_counts.show()
    print(f"Number of duplicates rows: {duplicate_count}")

    # Show duplicate rows
    if duplicate_count > 0:
        print("Duplicate rows:")
        duplicate_rows.show(truncate=False) 

In [5]:
''' This function takes a spark dataframe as input and returns a 'clean' spark dataframe i.e. without
    duplicate rows and rows with too many null values '''
def cleaning(spark_df):
    # Initial number of rows
    initial_row_count = spark_df.count()
    print(f"Initial number of rows: {initial_row_count}")

    # Remove duplicate rows
    new_spark_df = spark_df.dropDuplicates()
    row_count_withoutDuplicates = new_spark_df.count()
    print(f"Remove duplicate rows: {initial_row_count - row_count_withoutDuplicates}")


    # Calculate the threshold of nulls for each row 
    # To delete even more lines, simply lower the threshold
    threshold = len(new_spark_df.columns) / 2

    # Create a temporary column to count null values per row
    null_counts_expr = reduce(
        lambda acc, c: acc + when(col(c).isNull(), 1).otherwise(0),
        new_spark_df.columns,
        lit(0)
    )
    spark_df_null = new_spark_df.withColumn("null_count", null_counts_expr)

    # Keep rows with less than the threshold (50%) of null values
    filtered_df = spark_df_null.filter(col("null_count") < threshold).drop("null_count")
    print(f"Number of rows with null values removed: {row_count_withoutDuplicates - filtered_df.count()}")


    # Final number of rows
    final_row_count = filtered_df.count()
    print(f"Final number of rows: {final_row_count}")

    # Number of removed rows
    deleted_row_count = initial_row_count - final_row_count
    print(f"Number of removed rows: {deleted_row_count}")

    filtered_df.show(10)
    return filtered_df

In [6]:
''' This function takes a spark dataframe as input and stores it in a csv on hdfs '''
def sparkdf_to_csv(spark_df):
    # Hdfs path where you want to save the CSV file
    output_path = "/output/cleaned_datasets/"
    spark_df = spark_df.coalesce(1)
    spark_df.write.csv(output_path, header=True)

### CLEANING AIRLINE DATASET

In [None]:
spark_df = csv_to_sparkdf("/input/Airline.csv")

In [9]:
statistics(spark_df)



+------------+----------+---------+------+---+-----------+------------+--------------------+------------+-----------------+----------+--------------+---------------+----------+-------------+
|Passenger ID|First Name|Last Name|Gender|Age|Nationality|Airport Name|Airport Country Code|Country Name|Airport Continent|Continents|Departure Date|Arrival Airport|Pilot Name|Flight Status|
+------------+----------+---------+------+---+-----------+------------+--------------------+------------+-----------------+----------+--------------+---------------+----------+-------------+
|           0|         0|        0|     0|  0|          0|           0|                   0|           0|                0|         0|             0|              0|         0|            0|
+------------+----------+---------+------+---+-----------+------------+--------------------+------------+-----------------+----------+--------------+---------------+----------+-------------+



                                                                                

### CLEANING BOOKS RATING DATASET

In [5]:
spark_df = csv_to_sparkdf("/input/Books_rating.csv")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/02 13:01:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [6]:
statistics(spark_df)

                                                                                

+---+-----+-------+-------+-----------+------------------+------------+-----------+--------------+-----------+
| Id|Title|  Price|User_id|profileName|review/helpfulness|review/score|review/time|review/summary|review/text|
+---+-----+-------+-------+-----------+------------------+------------+-----------+--------------+-----------+
|  0|  208|2517579| 562250|     562200|               367|         130|         27|            65|         43|
+---+-----+-------+-------+-----------+------------------+------------+-----------+--------------+-----------+

Number of duplicates rows: 7793
Duplicate rows:


                                                                                

+----------+----------------------------------------------------------------------------------------+-----+--------------+---------------------------------------------------+------------------+------------+-----------+----------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
filtered_df = cleaning(spark_df)

                                                                                

Numero iniziale di righe: 3000000


                                                                                

Numero di righe duplicate eliminate: 8865


                                                                                

Numero di righe con valori nulli eliminate: 13


                                                                                

Numero finale di righe: 2991122
Numero di righe eliminate: 8878


[Stage 37:>                                                         (0 + 1) / 1]

+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|        Id|               Title|Price|       User_id|         profileName|review/helpfulness|review/score|review/time|      review/summary|         review/text|
+----------+--------------------+-----+--------------+--------------------+------------------+------------+-----------+--------------------+--------------------+
|0595129463|Marshall Hollenze...|12.95|A39OBC2D154CCU|  Elizabeth Bookspan|               1/1|         5.0|  987897600|You Won't Be Able...|Marshall Hollenze...|
|0679751254|Lenin's Tomb: The...|12.11|A3PHHV3UJAUP8B|  "mobuto ""-----"""|               3/5|         5.0|  984873600|  Soviet Tocqueville|Remnick writes el...|
|0919345476|The Witches' God:...|16.75| ASH2T0XFJFLPQ|        John Culloty|               6/6|         5.0| 1011139200|A Primer of Pagan...|This book provide...|
|0833025147|In Athena's Camp

                                                                                

In [12]:
sparkdf_to_csv(filtered_df)

                                                                                

### CLEANING LONDON DATASET

In [6]:
spark_df = csv_to_sparkdf("/input/london.csv")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/04 08:42:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [7]:
statistics(spark_df)



+---------+--------+-------+--------------------+--------------+----------------+----------------------+----------------+------------------+
|rental_id|duration|bike_id|end_rental_date_time|end_station_id|end_station_name|start_rental_date_time|start_station_id|start_station_name|
+---------+--------+-------+--------------------+--------------+----------------+----------------------+----------------+------------------+
|        0|   67617|     17|               67617|         68213|           68213|                     0|               0|                 0|
+---------+--------+-------+--------------------+--------------+----------------+----------------------+----------------+------------------+

Number of duplicates rows: 0


                                                                                

In [14]:
filtered_df = cleaning(spark_df)

                                                                                

Initial number of rows: 38215560


                                                                                

Remove duplicate rows: 0


                                                                                

Number of rows with null values removed: 67548


                                                                                

Final number of rows: 38148012
Number of removed rows: 67548




+---------+--------+-------+--------------------+--------------+--------------------+----------------------+----------------+--------------------+
|rental_id|duration|bike_id|end_rental_date_time|end_station_id|    end_station_name|start_rental_date_time|start_station_id|  start_station_name|
+---------+--------+-------+--------------------+--------------+--------------------+----------------------+----------------+--------------------+
| 61344038|   420.0|13932.0| 2016-12-28 07:45:00|         815.0|Lambeth Palace Ro...|   2016-12-28 07:38:00|             152|Hampton Street, W...|
| 61344158|   360.0|  590.0| 2016-12-28 07:59:00|         746.0|Lots Road, West C...|   2016-12-28 07:53:00|             737|Fulham Broadway, ...|
| 61344288|  1140.0| 7057.0| 2016-12-28 08:27:00|         217.0|Wormwood Street, ...|   2016-12-28 08:08:00|              65|Gower Place , Euston|
| 61344374|  1380.0|12923.0| 2016-12-28 08:39:00|         160.0|Waterloo Place, S...|   2016-12-28 08:16:00|          

                                                                                

In [15]:
sparkdf_to_csv(filtered_df)

                                                                                

### CLEANING TITLES DATASET

In [7]:
spark_df = csv_to_sparkdf("/input/dirty_titles.csv")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/04 17:29:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/04 17:29:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

In [8]:
statistics(spark_df)

24/07/04 17:33:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:33:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:33:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:33:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:33:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:33:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:33:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:33:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:33:03 WARN RowBasedKeyValueBatch: Calling spill() on

+--------+---------+------------+-------------+--------+---------+--------+--------------+--------+
|  tconst|titleType|primaryTitle|originalTitle| isAdult|startYear| endYear|runtimeMinutes|  genres|
+--------+---------+------------+-------------+--------+---------+--------+--------------+--------+
|42684992| 42684992|    42685028|     42685028|42684992| 42684992|42684992|      42684992|42685434|
+--------+---------+------------+-------------+--------+---------+--------+--------------+--------+

Number of duplicates rows: 10671249
Duplicate rows:


24/07/04 17:37:58 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:37:59 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:38:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:38:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:38:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:38:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:38:00 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:38:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:38:01 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+---------+-------------------------------+---------------------------------+-------+---------+-------+--------------+--------------------+-----+
|tconst   |titleType|primaryTitle                   |originalTitle                    |isAdult|startYear|endYear|runtimeMinutes|genres              |count|
+---------+---------+-------------------------------+---------------------------------+-------+---------+-------+--------------+--------------------+-----+
|tt0000030|short    |Rough Sea at Dover             |Rough Sea at Dover               |0      |1895     |\N     |1             |Documentary,Short   |2    |
|tt0000125|short    |The Terrible Railway Accident  |The Terrible Railway Accident    |0      |1896     |\N     |\N            |Short               |2    |
|tt0000380|short    |Sleeping Beauty                |La belle au bois dormant         |0      |1903     |\N     |\N            |Drama,Fantasy,Short |2    |
|tt0000567|short    |The 400 Tricks of the Devil    |Les quatre 

                                                                                

In [9]:
filtered_df = cleaning(spark_df)

                                                                                

Initial number of rows: 64027488


24/07/04 17:43:12 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:43:13 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:43:13 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:43:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:43:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:43:14 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:43:15 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:43:15 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:43:15 WARN RowBasedKeyValueBatch: Calling spill() on

Remove duplicate rows: 53356239


24/07/04 17:47:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:47:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:47:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:47:26 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:47:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:47:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:47:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:47:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:47:28 WARN RowBasedKeyValueBatch: Calling spill() on

Number of rows with null values removed: 1


24/07/04 17:51:23 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:51:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:51:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:51:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:51:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:51:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:51:24 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:51:25 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:51:25 WARN RowBasedKeyValueBatch: Calling spill() on

Final number of rows: 10671248
Number of removed rows: 53356240


24/07/04 17:55:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:55:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:55:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:55:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:55:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:55:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:55:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:55:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:55:28 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|   tconst|titleType|        primaryTitle|       originalTitle|isAdult|startYear|endYear|runtimeMinutes|              genres|
+---------+---------+--------------------+--------------------+-------+---------+-------+--------------+--------------------+
|tt0000033|    short|  Horse Trick Riders|          La voltige|      0|     1895|     \N|             1|Comedy,Documentar...|
|tt0000132|    short|          Card Party|Une partie de cartes|      0|     1896|     \N|             1|     Biography,Short|
|tt0001472|    short|      After the Ball|      After the Ball|      0|     1910|     \N|            \N|        Comedy,Short|
|tt0001672|    short|     His Dress Shirt|     His Dress Shirt|      0|     1911|     \N|            \N|        Comedy,Short|
|tt0001729|    short|The Law of the Range|The Law of the Range|      0|     1911|     \N|            \N| Drama,Short,W

                                                                                

In [10]:
sparkdf_to_csv(filtered_df)

24/07/04 17:59:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:59:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:59:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:59:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:59:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:59:48 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:59:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:59:49 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/07/04 17:59:49 WARN RowBasedKeyValueBatch: Calling spill() on

### CLEANING PRODUCTS MARKET NOV 2019 DATASET

In [15]:
spark_df = csv_to_sparkdf("/input/2019-Nov.csv")

                                                                                

In [17]:
statistics(spark_df)

                                                                                

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     21898171|9218235|    0|      0|          10|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+

Number of duplicates rows: 57553
Duplicate rows:


[Stage 55:>                                                         (0 + 1) / 1]

+-------------------+----------+----------+-------------------+-----------------------------+-------+------+---------+------------------------------------+-----+
|event_time         |event_type|product_id|category_id        |category_code                |brand  |price |user_id  |user_session                        |count|
+-------------------+----------+----------+-------------------+-----------------------------+-------+------+---------+------------------------------------+-----+
|2019-11-01 09:44:01|cart      |5000184   |2053013566100866035|appliances.sewing_machine    |janome |100.39|536869352|a62d675f-0ed9-4ffd-9251-b122c56b95b5|3    |
|2019-11-02 17:57:08|cart      |1004856   |2053013555631882655|electronics.smartphone       |samsung|127.89|549380844|478567aa-37e3-4168-81bd-32e1f7d290f9|2    |
|2019-11-02 16:09:29|cart      |1004768   |2053013555631882655|electronics.smartphone       |samsung|242.08|562538224|ab9b1d35-fdf4-4d97-a66c-e714e062b691|2    |
|2019-11-02 23:28:57|cart   

                                                                                

In [18]:
filtered_df = cleaning(spark_df)

                                                                                

Initial number of rows: 67501979


                                                                                

Remove duplicate rows: 100519


                                                                                

Numero di righe con valori nulli eliminate: 0


                                                                                

Final number of rows: 67401460
Number of removed rows: 100519


[Stage 79:>                                                         (0 + 1) / 1]

+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|  brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+-------+-------+---------+--------------------+
|2019-11-01 01:01:44|      view|   2701646|2053013563911439225|appliances.kitche...|indesit| 268.49|539701280|a9e5a322-c765-d52...|
|2019-11-01 01:03:31|      view|   2401540|2053013563743667055|appliances.kitche...|  midea| 167.29|515761354|b2da6f0a-6eeb-450...|
|2019-11-01 01:05:01|      view|   1004788|2053013555631882655|electronics.smart...| xiaomi| 138.71|532572658|b1df0fce-6e03-47c...|
|2019-11-01 01:07:13|      view|  28714418|2053013565127787455|       apparel.shoes|   etor| 100.39|545220871|fa5820b7-d974-4a8...|
|2019-11-01 01:11:13|      view|   1004159|2053013555631882655|electronics.s

                                                                                

In [21]:
sparkdf_to_csv(filtered_df)

                                                                                