DATA CLEANING

In [15]:
from pyspark import SparkContext

In [16]:
# Initialize SparkContext
sc = SparkContext("local", "Data Cleaning with Basic Routines")

In [17]:
# Define paths for loading data
txt_path_music = "hdfs:///mydata/music_info_unstructured_semicolon.txt"  
txt_path_history = "hdfs:///mydata/user_listening_history_unstructured_semicolon.txt"

In [18]:
# Load the datasets as RDDs
rdd_music = sc.textFile(txt_path_music)
print(rdd_music.getNumPartitions())  # Number of partitions based on file size

2


In [19]:
rdd_history = sc.textFile(txt_path_history)
print(rdd_history.getNumPartitions())  # Number of partitions based on file size

5


In [20]:
# Initial profiling: Count original records
print(f"Original music_info record count: {rdd_music.count()}")
print(f"Original user_listening_history record count: {rdd_history.count()}")

                                                                                

Original music_info record count: 50683




Original user_listening_history record count: 9711301


                                                                                

In [21]:
rdd_music.take(5)  # Review the first 5 rows

['TRIOREW128F424EAF0;Mr. Brightside;The Killers;https://p.scdn.co/mp3-preview/4d26180e6961fd46866cd9106936ea55dfcbaa75?cid=774b29d4f13844c495f206cafdad9c86;09ZQ5TmUG8TSL56n0knqrj;rock, alternative, indie, alternative_rock, indie_rock, 00s;;2004;222200;0.355;0.918;1;-4.36;1;0.0746;0.00119;0.0;0.0971;0.24;148.114;4',
 'TRRIVDJ128F429B0E8;Wonderwall;Oasis;https://p.scdn.co/mp3-preview/d012e536916c927bd6c8ced0dae75ee3b7715983?cid=774b29d4f13844c495f206cafdad9c86;06UfBBDISthj1ZJAtX4xjj;rock, alternative, indie, pop, alternative_rock, british, 90s, love, britpop;;2006;258613;0.409;0.892;2;-4.373;1;0.0336;0.000807;0.0;0.207;0.651;174.426;4',
 'TROUVHL128F426C441;Come as You Are;Nirvana;https://p.scdn.co/mp3-preview/a1c11bb1cb231031eb20e5951a8bfb30503224e9?cid=774b29d4f13844c495f206cafdad9c86;0keNu0t0tqsWtExGM3nT1D;rock, alternative, alternative_rock, 90s, grunge;RnB;1991;218920;0.508;0.826;4;-5.783;0;0.04;0.000175;0.000459;0.0878;0.543;120.012;4',
 'TRUEIND128F93038C4;Take Me Out;Franz Ferdin

In [22]:
rdd_history.take(5)  # Review the first 5 rows

['TRIRLYL128F42539D1;b80344d063b5ccb3212f76538f3d9e43d87dca9e;1',
 'TRFUPBA128F934F7E1;b80344d063b5ccb3212f76538f3d9e43d87dca9e;1',
 'TRLQPQJ128F42AA94F;b80344d063b5ccb3212f76538f3d9e43d87dca9e;1',
 'TRTUCUY128F92E1D24;b80344d063b5ccb3212f76538f3d9e43d87dca9e;1',
 'TRHDDQG12903CB53EE;b80344d063b5ccb3212f76538f3d9e43d87dca9e;1']

In [23]:
# caching the data after counting to avoid re-computation
rdd_music.cache()
rdd_history.cache()

hdfs:///mydata/user_listening_history_unstructured_semicolon.txt MapPartitionsRDD[3] at textFile at NativeMethodAccessorImpl.java:0

In [24]:
# Configuration parameters
expected_column_count_music = 21  
expected_column_count_history = 3
essential_indexes_music = [0]  # track_id for music_info
essential_indexes_history = [0, 1]  # track_id and user_id for user_listening_history

In [25]:
# Cleaning and profiling function
def basic_cleaning(rdd, expected_count, essential_indexes):
    # Step 1: Remove duplicates
    rdd = rdd.distinct()
    print(f"Record count after removing duplicates: {rdd.count()}")
    
    # Step 2: Filter rows with the correct column count
    rdd = rdd.filter(lambda line: len(line.split(";")) == expected_count)
    print(f"Record count after column count verification: {rdd.count()}")
    
    # Step 3: Filter rows where essential fields are not null
    rdd = rdd.filter(lambda line: all(line.split(";")[index] != "" for index in essential_indexes))
    print(f"Record count after filtering essential non-null fields: {rdd.count()}")
    
    return rdd

In [26]:
# Apply cleaning operations to the music_info dataset
print("Starting cleaning operations for music_info dataset...")
rdd_music_cleaned = basic_cleaning(rdd_music, expected_column_count_music, essential_indexes_music)
print("Cleaning operations for music_info dataset completed.\n")

Starting cleaning operations for music_info dataset...
Record count after removing duplicates: 50683
Record count after column count verification: 50681
Record count after filtering essential non-null fields: 50681
Cleaning operations for music_info dataset completed.



In [27]:
# Apply cleaning operations to the user_listening_history dataset
print("Starting cleaning operations for user_listening_history dataset...")
rdd_history_cleaned = basic_cleaning(rdd_history, expected_column_count_history, essential_indexes_history)
print("Cleaning operations for user_listening_history dataset completed.")

Starting cleaning operations for user_listening_history dataset...


                                                                                

Record count after removing duplicates: 9711301


                                                                                

Record count after column count verification: 9711301




Record count after filtering essential non-null fields: 9711301
Cleaning operations for user_listening_history dataset completed.


                                                                                

In [28]:
print("First row from cleaned music_info dataset:")
rdd_music_cleaned.first()

First row from cleaned music_info dataset:


'TRRIVDJ128F429B0E8;Wonderwall;Oasis;https://p.scdn.co/mp3-preview/d012e536916c927bd6c8ced0dae75ee3b7715983?cid=774b29d4f13844c495f206cafdad9c86;06UfBBDISthj1ZJAtX4xjj;rock, alternative, indie, pop, alternative_rock, british, 90s, love, britpop;;2006;258613;0.409;0.892;2;-4.373;1;0.0336;0.000807;0.0;0.207;0.651;174.426;4'

In [29]:
print("First row from cleaned user_listening_history dataset:")
rdd_history_cleaned.first()

First row from cleaned user_listening_history dataset:


                                                                                

'TRAUCNU128F42671EB;b80344d063b5ccb3212f76538f3d9e43d87dca9e;1'

In [30]:
# Function to replace null values with "Unknown"
def replace_nulls(line):
    # Split the line into fields using semicolon as a delimiter
    fields = line.split(";")
    # Replace each empty field with "Unknown"
    fields = ["Unknown" if field.strip() == "" else field for field in fields]
    # Reconstruct the original line
    return ";".join(fields)

In [31]:
# Apply the function to each line of the music RDD
rdd_music_cleaned = rdd_music_cleaned.map(replace_nulls)

# Check the result by displaying the first 5 lines
for line in rdd_music_cleaned.take(5):
    print(line)

TRRIVDJ128F429B0E8;Wonderwall;Oasis;https://p.scdn.co/mp3-preview/d012e536916c927bd6c8ced0dae75ee3b7715983?cid=774b29d4f13844c495f206cafdad9c86;06UfBBDISthj1ZJAtX4xjj;rock, alternative, indie, pop, alternative_rock, british, 90s, love, britpop;Unknown;2006;258613;0.409;0.892;2;-4.373;1;0.0336;0.000807;0.0;0.207;0.651;174.426;4
TROUVHL128F426C441;Come as You Are;Nirvana;https://p.scdn.co/mp3-preview/a1c11bb1cb231031eb20e5951a8bfb30503224e9?cid=774b29d4f13844c495f206cafdad9c86;0keNu0t0tqsWtExGM3nT1D;rock, alternative, alternative_rock, 90s, grunge;RnB;1991;218920;0.508;0.826;4;-5.783;0;0.04;0.000175;0.000459;0.0878;0.543;120.012;4
TRUMISQ128F9340BEE;Somebody Told Me;The Killers;https://p.scdn.co/mp3-preview/0d07673cfb46218a49c96eed639933f19b45cf9c?cid=774b29d4f13844c495f206cafdad9c86;0FNmIQ7u45Lhdn6RHhSLix;rock, alternative, indie, pop, alternative_rock, indie_rock;Unknown;2005;198480;0.508;0.979;10;-4.289;0;0.0847;8.71e-05;0.000643;0.0641;0.704;138.03;4
TRFNTDZ128F426B34D;In the End;Lin

In [32]:
# Apply the function to each line of the history RDD
rdd_history_cleaned = rdd_history_cleaned.map(replace_nulls)

# Check the result by displaying the first 5 lines
for line in rdd_history_cleaned.take(5):
    print(line)

[Stage 23:>                                                         (0 + 1) / 1]

TRAUCNU128F42671EB;b80344d063b5ccb3212f76538f3d9e43d87dca9e;1
TRADVZX128F426BF79;b80344d063b5ccb3212f76538f3d9e43d87dca9e;1
TRSGIYX128F149F01F;969cc6fb74e076a68e36a04409cb9d3765757508;1
TRDSFKT12903CB510F;4bd88bfb25263a75bbdd467e74018f4ae570e5df;4
TRUZPNY128F147FD23;e006b1a48f466bf59feefed32bec6494495a4436;1


                                                                                

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when

In [34]:
# Initialize Spark session
spark = SparkSession.builder \
        .appName("CSVtoTXT") \
        .getOrCreate()

In [35]:
# Remove unnecessary columns from the cleaned music_info RDD
columns_to_drop_indices = [3, 4, 8, 11, 13, 18, 19, 20]  # Indices of columns to drop
cleaned_music_rdd = rdd_music_cleaned.map(lambda row: [val for idx, val in enumerate(row.split(";")) if idx not in columns_to_drop_indices])

In [36]:
# Define the schema for the music DataFrame
columns_music = StructType([
    StructField("track_id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("artist", StringType(), True),
    StructField("tags", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("year", StringType(), True),  # To be converted to IntegerType later
    StructField("danceability", StringType(), True),  # To be converted to FloatType
    StructField("energy", StringType(), True),  # To be converted to FloatType
    StructField("loudness", StringType(), True),  # To be converted to FloatType
    StructField("speechiness", StringType(), True),
    StructField("acousticness", StringType(), True),
    StructField("instrumentalness", StringType(), True),
    StructField("liveness", StringType(), True)
])

In [37]:
# Create the music DataFrame with string types
df_music_cleaned = spark.createDataFrame(cleaned_music_rdd, columns_music)

In [38]:
# Cast columns to the correct types
df_music_cleaned = df_music_cleaned \
    .withColumn("year", df_music_cleaned["year"].cast(IntegerType())) \
    .withColumn("danceability", df_music_cleaned["danceability"].cast(FloatType())) \
    .withColumn("energy", df_music_cleaned["energy"].cast(FloatType())) \
    .withColumn("loudness", df_music_cleaned["loudness"].cast(FloatType())) \
    .withColumn("speechiness", df_music_cleaned["speechiness"].cast(FloatType())) \
    .withColumn("acousticness", df_music_cleaned["acousticness"].cast(FloatType())) \
    .withColumn("instrumentalness", df_music_cleaned["instrumentalness"].cast(FloatType())) \
    .withColumn("liveness", df_music_cleaned["liveness"].cast(FloatType()))


In [39]:
# Check the schema to make sure the types are correct
df_music_cleaned.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- danceability: float (nullable = true)
 |-- energy: float (nullable = true)
 |-- loudness: float (nullable = true)
 |-- speechiness: float (nullable = true)
 |-- acousticness: float (nullable = true)
 |-- instrumentalness: float (nullable = true)
 |-- liveness: float (nullable = true)



In [40]:
# Check for null or empty values in each column of the music DataFrame
df_music_cleaned.select([
    F.count(F.when((F.col(c).isNull()) | (F.col(c) == ''), c)).alias(c) 
    for c in df_music_cleaned.columns
]).show()

                                                                                

+--------+----+------+----+-----+----+------------+------+--------+-----------+------------+----------------+--------+
|track_id|name|artist|tags|genre|year|danceability|energy|loudness|speechiness|acousticness|instrumentalness|liveness|
+--------+----+------+----+-----+----+------------+------+--------+-----------+------------+----------------+--------+
|       0|   0|     0|   0|    0|   0|           0|     0|       0|          0|           0|               0|       0|
+--------+----+------+----+-----+----+------------+------+--------+-----------+------------+----------------+--------+



In [41]:
df_music_cleaned.show(5, truncate=False)

+------------------+----------------+-----------+------------------------------------------------------------------------------------------------------+-------+----+------------+------+--------+-----------+------------+----------------+--------+
|track_id          |name            |artist     |tags                                                                                                  |genre  |year|danceability|energy|loudness|speechiness|acousticness|instrumentalness|liveness|
+------------------+----------------+-----------+------------------------------------------------------------------------------------------------------+-------+----+------------+------+--------+-----------+------------+----------------+--------+
|TRRIVDJ128F429B0E8|Wonderwall      |Oasis      |rock, alternative, indie, pop, alternative_rock, british, 90s, love, britpop                          |Unknown|2006|0.409       |0.892 |-4.373  |0.0336     |8.07E-4     |0.0             |0.207   |
|TROUVHL128F426C

In [43]:
# Split the cleaned user listening history RDD into individual fields based on semicolon (;) delimiter
history_cleaned_rdd = rdd_history_cleaned.map(lambda line: line.split(";"))

In [44]:
# Define the schema for the history DataFrame
columns_history =  StructType([
    StructField("track_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("playcount", StringType(), True) # To be converted to IntegerType later
])

In [45]:
# Create the history DataFrame with string types
df_history_cleaned = spark.createDataFrame(history_cleaned_rdd, columns_history)

In [46]:
# Cast columns to the correct types
df_history_cleaned = df_history_cleaned \
    .withColumn("playcount", df_history_cleaned["playcount"].cast(IntegerType()))

In [47]:
# Check the schema to make sure the types are correct
df_history_cleaned.printSchema()

root
 |-- track_id: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- playcount: integer (nullable = true)



In [48]:
# Check for null or empty values in each column of the history DataFrame
df_history_cleaned.select([
    F.count(F.when((F.col(c).isNull()) | (F.col(c) == ''), c)).alias(c) 
    for c in df_history_cleaned.columns
]).show()



+--------+-------+---------+
|track_id|user_id|playcount|
+--------+-------+---------+
|       0|      0|        0|
+--------+-------+---------+



                                                                                

In [49]:
df_history_cleaned.show(5, truncate=False)

[Stage 39:>                                                         (0 + 1) / 1]

+------------------+----------------------------------------+---------+
|track_id          |user_id                                 |playcount|
+------------------+----------------------------------------+---------+
|TRAUCNU128F42671EB|b80344d063b5ccb3212f76538f3d9e43d87dca9e|1        |
|TRADVZX128F426BF79|b80344d063b5ccb3212f76538f3d9e43d87dca9e|1        |
|TRSGIYX128F149F01F|969cc6fb74e076a68e36a04409cb9d3765757508|1        |
|TRDSFKT12903CB510F|4bd88bfb25263a75bbdd467e74018f4ae570e5df|4        |
|TRUZPNY128F147FD23|e006b1a48f466bf59feefed32bec6494495a4436|1        |
+------------------+----------------------------------------+---------+
only showing top 5 rows



                                                                                

In [None]:
# Save the cleaned DataFrames back to HDFS in Parquet format
cleaned_parquet_path_music = "hdfs:///mydata/cleaned_music_info.parquet"
cleaned_parquet_path_history = "hdfs:///mydata/cleaned_user_listening_history.parquet"

df_music_cleaned.write.mode('overwrite').parquet(cleaned_parquet_path_music)
df_history_cleaned.write.mode('overwrite').parquet(cleaned_parquet_path_history)

In [50]:
# Stop Spark session
spark.stop()

In [51]:
# Stop SparkContent
sc.stop()