# Star Wars - Data Quality Project

In [2]:
!pip install pyspark;



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0.1[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [3]:
import pyspark
from pyspark.sql import functions as F


### Know your machine resources

In [4]:
# CPU Cores
!sysctl -n hw.logicalcpu

# Memory in Bytes
!sysctl -n hw.memsize

8
17179869184


In [5]:
# Memory in GB
17179869184/(1024**3)

16.0

In [6]:
MAX_MEMORY = '4g'
MAX_MEMORY_OVERHEAD = '512m'
MAX_DRIVER_MEMORY = '1g'

### Setup Config

In [7]:
conf = pyspark.SparkConf().setMaster("local[2]") \
        .set('spark.executor.memory', MAX_MEMORY) \
        .set('spark.executor.memoryOverhead', MAX_MEMORY_OVERHEAD) \
        .set('spark.driver.memory', MAX_DRIVER_MEMORY) \
        .set("spark.driver.extraJavaOptions", "-XX:ReservedCodeCacheSize=256m") # code cache stores compiled code, and it can be full some times
        # .set('spark.executor.heartbeatInterval', 10000) \
        # .set('spark.network.timeout', 10000) \
        # .set('spark.core.connection.ack.wait.timeout', '3600') \

In [8]:
from pyspark.sql import SparkSession

### Initialize Spark

In [9]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("StarWars - Data Quality Project") \
        .config(conf=conf) \
        .getOrCreate()
    return spark

In [10]:
spark = init_spark()

25/05/08 18:19:12 WARN Utils: Your hostname, Gowthams-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.4.246 instead (on interface en0)
25/05/08 18:19:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/08 18:19:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Data Profiling
Understand the structure, missing values, distinct favorites, anomalies.

In [11]:
df = spark.read.csv('star_wars_reviews.csv',header=True)

In [12]:
df.show(5)

+---------+----------------+--------------+--------------------+--------------------+-----------------+----------+------------+
|review_id|       fav_heroe|   fav_villain|            fav_film|      fav_soundtrack|    fav_spaceship|fav_planet|   fav_robot|
+---------+----------------+--------------+--------------------+--------------------+-----------------+----------+------------+
|        0|Anakin Skywalker|    Darth Maul|Episode IV - A Ne...|   Accross the Stars|Naboo Starfighter|  Tatooine|       R2-D2|
|        1|Anakin Skywalker|    Darth Maul|Episode IV - A Ne...|     The Throne Room|Naboo Starfighter|  Tatooine|Battle Droid|
|        2|  Luke Skywalker|   Count Dooku|Episode V - The E...|Star Wars (Main T...|Millennium Falcon|     Endor|       R2-D2|
|        3|Anakin Skywalker|Wilhuff Tarkin|Episode VI - Retu...|     The Throne Room|Millennium Falcon|  Tatooine|       R2-D2|
|        4|            Yoda|     Palpatine|Episode IV - A Ne...|Star Wars (Main T...|Millennium Falcon| 

### Structure

In [13]:
df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- fav_heroe: string (nullable = true)
 |-- fav_villain: string (nullable = true)
 |-- fav_film: string (nullable = true)
 |-- fav_soundtrack: string (nullable = true)
 |-- fav_spaceship: string (nullable = true)
 |-- fav_planet: string (nullable = true)
 |-- fav_robot: string (nullable = true)



### Basic Statistics

In [14]:
df.describe().show()

25/05/08 18:19:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 2:>                                                          (0 + 1) / 1]

+-------+------------------+----------------+--------------+--------------------+-----------------+-------------+----------+------------+
|summary|         review_id|       fav_heroe|   fav_villain|            fav_film|   fav_soundtrack|fav_spaceship|fav_planet|   fav_robot|
+-------+------------------+----------------+--------------+--------------------+-----------------+-------------+----------+------------+
|  count|             36927|           36927|         36927|               36927|            36927|        36927|     36927|       36927|
|   mean|           18463.0|            NULL|          NULL|                NULL|             NULL|         NULL|      NULL|        NULL|
| stddev|10660.051031772784|            NULL|          NULL|                NULL|             NULL|         NULL|      NULL|        NULL|
|    min|                 0|Anakin Skywalker|   Count Dooku|Episode I - The P...|Accross the Stars|   Death Star|  Alderaan|Battle Droid|
|    max|              9999|      

                                                                                

### Null Counts

In [15]:
df.select([F.count(F.when(F.col(column).isNull(), column)).alias(column) for column in df.columns]).show()

+---------+---------+-----------+--------+--------------+-------------+----------+---------+
|review_id|fav_heroe|fav_villain|fav_film|fav_soundtrack|fav_spaceship|fav_planet|fav_robot|
+---------+---------+-----------+--------+--------------+-------------+----------+---------+
|        0|        0|          0|       0|             0|            0|         0|        0|
+---------+---------+-----------+--------+--------------+-------------+----------+---------+



### Distinct Values in Fav Columns


In [16]:
for col_name in ['fav_heroe',
 'fav_villain',
 'fav_film',
 'fav_soundtrack',
 'fav_spaceship',
 'fav_planet',
 'fav_robot']:
    df.select(col_name).distinct().show()

+----------------+
|       fav_heroe|
+----------------+
|        Han Solo|
|            Leia|
|    Qui-Gon Jinn|
|   Jar Jar Binks|
|            Yoda|
|  Luke Skywalker|
|  Obi-Wan Kenobi|
|Anakin Skywalker|
|       Chewbacca|
+----------------+

+----------------+
|     fav_villain|
+----------------+
|  Wilhuff Tarkin|
|General Grievous|
|       Palpatine|
|     Count Dooku|
|      Darth Maul|
|     Darth Vader|
+----------------+

+--------------------+
|            fav_film|
+--------------------+
|Episode I - The P...|
|Episode III - Rev...|
|Episode VI - Retu...|
|Episode II - Atta...|
|Episode IV - A Ne...|
|Episode V - The E...|
+--------------------+

+--------------------+
|      fav_soundtrack|
+--------------------+
|Star Wars (Main T...|
|      Imperial March|
|     The Throne Room|
|   Accross the Stars|
|  Anakin vs. Obi-Wan|
+--------------------+

+-----------------+
|    fav_spaceship|
+-----------------+
|       Death Star|
|Millennium Falcon|
|      TIE Fighter|
|N

### Count distinct values/Cardinality in all columns

In [17]:
df.agg(*[F.countDistinct(column).alias(column) for column in df.columns]).show()

CodeCache: size=262144Kb used=35591Kb max_used=35822Kb free=226552Kb
 bounds [0x000000010a1f8000, 0x000000010c588000, 0x000000011a1f8000]
 total_blobs=12045 nmethods=11097 adapters=860
 compilation: disabled (not enough contiguous free space left)
+---------+---------+-----------+--------+--------------+-------------+----------+---------+
|review_id|fav_heroe|fav_villain|fav_film|fav_soundtrack|fav_spaceship|fav_planet|fav_robot|
+---------+---------+-----------+--------+--------------+-------------+----------+---------+
|    36927|        9|          6|       6|             5|            4|         5|        4|
+---------+---------+-----------+--------+--------------+-------------+----------+---------+





### Detect Data Skewness 
Some columns might have more repeated values which cause Data skewness when joining with the column_key

In [18]:
total_rows = df.count()
for col_name in ['fav_heroe',
 'fav_villain',
 'fav_film',
 'fav_soundtrack',
 'fav_spaceship',
 'fav_planet',
 'fav_robot']:
    unique_values_freq_df = df.groupBy(col_name).count().orderBy(F.desc("count"))


    
    key_with_skew_percent_df = unique_values_freq_df.withColumn(
        "skew_percentile", (F.col("count") / F.lit(total_rows))*100
    )
    key_with_skew_percent_df.show(truncate=False)

+----------------+-----+------------------+
|fav_heroe       |count|skew_percentile   |
+----------------+-----+------------------+
|Obi-Wan Kenobi  |6324 |17.125680396457877|
|Han Solo        |6028 |16.32409889782544 |
|Anakin Skywalker|5641 |15.276085249275598|
|Leia            |5234 |14.173910688655997|
|Yoda            |4385 |11.874779971294716|
|Luke Skywalker  |3679 |9.962899775232215 |
|Chewbacca       |2473 |6.696996777425731 |
|Qui-Gon Jinn    |1630 |4.414114333685379 |
|Jar Jar Binks   |1533 |4.1514339101470465|
+----------------+-----+------------------+

+----------------+-----+------------------+
|fav_villain     |count|skew_percentile   |
+----------------+-----+------------------+
|Darth Vader     |9165 |24.81923795596718 |
|Palpatine       |7469 |20.226392612451594|
|Darth Maul      |6703 |18.152029680179815|
|General Grievous|5506 |14.910499092804722|
|Count Dooku     |4727 |12.800931567687602|
|Wilhuff Tarkin  |3357 |9.090909090909092 |
+----------------+-----+-------

#### Columns with Skew above 50%
From the above data:
- `fav_spaceship` -> "Millennium Falcon" = 64.12% -> Severe Skew
- `fav_robot` -> "R2-D2" = 48.39% -> Borderline Skew
- `fav_soundtrack` -> "Star Wars (Main Theme)" = 43.79 -> Borderline Skew
These column require skew handling techniques like salting, especially if you're
- Joining these columns
- Aggregating/grouping by them
- Seeing straggler tasks or long stages during shuffle

## Data Quality Checks (Before Transformation)
Find incorrect data — for example, unexpected hero names.

In [19]:
# Example valid hero List
valid_heroes = [
                "Han Solo",
                "Leia",
                "Qui-Gon Jinn",
                "Jar Jar Binks",
                "Yoda",
                "Luke Skywalker",
                "Obi-Wan Kenobi",
                "Anakin Skywalker"
               ]


In [20]:
# Check for invalid heroes
invalid_heroes = df.filter(~F.col("fav_heroe").isin(valid_heroes))
invalid_heroes.show()

+---------+---------+----------------+--------------------+--------------------+-----------------+----------+------------+
|review_id|fav_heroe|     fav_villain|            fav_film|      fav_soundtrack|    fav_spaceship|fav_planet|   fav_robot|
+---------+---------+----------------+--------------------+--------------------+-----------------+----------+------------+
|       14|Chewbacca|General Grievous|Episode I - The P...|      Imperial March|      TIE Fighter|  Tatooine|    Droideka|
|       19|Chewbacca|     Count Dooku|Episode V - The E...|Star Wars (Main T...|      TIE Fighter|     Endor|       C-3PO|
|       44|Chewbacca|General Grievous|Episode III - Rev...|      Imperial March|Millennium Falcon|     Naboo|       R2-D2|
|       57|Chewbacca|      Darth Maul|Episode IV - A Ne...|Star Wars (Main T...|Naboo Starfighter|   Dagobah|       R2-D2|
|       74|Chewbacca|      Darth Maul|Episode V - The E...|      Imperial March|Naboo Starfighter|   Dagobah|       R2-D2|
|       77|Chewb

## Data Transformations
Clean and prepare

#### Fill Missing Values

In [21]:
df = df.fillna({
    "fav_heroe": "Unknown",
    "fav_villain": "Unknown",
    "fav_film": "Unknown",
    "fav_planet": "Unknown"
})

#### Remove leading/trailing spaces and standardize case

In [22]:
for col_name in ["fav_heroe", "fav_villain", "fav_film", "fav_planet"]:
    df = df.withColumn(col_name, F.trim(F.col(col_name)))
    df = df.withColumn(col_name, F.initcap(F.col(col_name)))  # Makes "luke skywalker" -> "Luke Skywalker"

#### Handling Skewed Data
Add salt column for skewed data, here 


In [37]:
skewed_value = "Millennium Falcon"
num_of_partitions = df.rdd.getNumPartitions()

# Do not overfragment the data across all available partitions)
salt_range = max(2, num_of_partitions//2) 

# add salt Column for rows with skewed value
df = df.withColumn("salt", F.when(F.col("fav_spaceship") == skewed_value, F.floor(F.rand() * salt_range))
                   .otherwise(F.lit(None))
                  )            

In [38]:
df = df.withColumn("fav_spaceship_salted", F.when(F.col("salt").isNotNull(), F.concat_ws("_", "fav_spaceship", "salt"))
                   .otherwise(F.col("fav_spaceship"))
                  )
df.show()

+---------+----------------+----------------+--------------------+--------------------+-----------------+----------+------------+----+--------------------+
|review_id|       fav_heroe|     fav_villain|            fav_film|      fav_soundtrack|    fav_spaceship|fav_planet|   fav_robot|salt|fav_spaceship_salted|
+---------+----------------+----------------+--------------------+--------------------+-----------------+----------+------------+----+--------------------+
|        0|Anakin Skywalker|      Darth Maul|Episode Iv - A Ne...|   Accross the Stars|Naboo Starfighter|  Tatooine|       R2-D2|NULL|   Naboo Starfighter|
|        1|Anakin Skywalker|      Darth Maul|Episode Iv - A Ne...|     The Throne Room|Naboo Starfighter|  Tatooine|Battle Droid|NULL|   Naboo Starfighter|
|        2|  Luke Skywalker|     Count Dooku|Episode V - The E...|Star Wars (Main T...|Millennium Falcon|     Endor|       R2-D2|   0| Millennium Falcon_0|
|        3|Anakin Skywalker|  Wilhuff Tarkin|Episode Vi - Retu..

In [39]:
# now the values are evenly distriuted across partitions
df.groupBy("fav_spaceship_salted").agg(F.count("*")).show()

+--------------------+--------+
|fav_spaceship_salted|count(1)|
+--------------------+--------+
| Millennium Falcon_0|   11810|
| Millennium Falcon_1|   11869|
|          Death Star|     718|
|         TIE Fighter|    4739|
|   Naboo Starfighter|    7791|
+--------------------+--------+



## Data Validation (After Transformation)

In [13]:
df.rdd.getNumPartitions()


1

In [27]:
df.columns

['review_id',
 'fav_heroe',
 'fav_villain',
 'fav_film',
 'fav_soundtrack',
 'fav_spaceship',
 'fav_planet',
 'fav_robot']

In [14]:
df = df.repartition(4)

In [15]:
df.rdd.getNumPartitions()

CodeCache: size=262144Kb used=22089Kb max_used=22105Kb free=240054Kb
 bounds [0x000000010b1f8000, 0x000000010c7b8000, 0x000000011b1f8000]
 total_blobs=7964 nmethods=7044 adapters=832
 compilation: disabled (not enough contiguous free space left)




4

In [16]:
def print_partition_rows(partition):
    print("Partition Starts ------------")
    c = 0
    for row in partition:
        c+=1
        print(row)
        if c==4:
            break

df.foreachPartition(print_partition_rows)


Partition Starts ------------
Row(review_id='19100', fav_heroe='Yoda', fav_villain='Palpatine', fav_film='Episode V - The Empire Strikes Back', fav_soundtrack='Star Wars (Main Theme)', fav_spaceship='Millennium Falcon', fav_planet='Tatooine', fav_robot='R2-D2')
Partition Starts ------------
Row(review_id='28930', fav_heroe='Anakin Skywalker', fav_villain='Darth Vader', fav_film='Episode VI - Return of the Jedi', fav_soundtrack='Imperial March', fav_spaceship='Millennium Falcon', fav_planet='Endor', fav_robot='R2-D2')
Row(review_id='25133', fav_heroe='Yoda', fav_villain='Wilhuff Tarkin', fav_film='Episode V - The Empire Strikes Back', fav_soundtrack='Star Wars (Main Theme)', fav_spaceship='Naboo Starfighter', fav_planet='Tatooine', fav_robot='C-3PO')
Row(review_id='17821', fav_heroe='Han Solo', fav_villain='Wilhuff Tarkin', fav_film='Episode VI - Return of the Jedi', fav_soundtrack='Star Wars (Main Theme)', fav_spaceship='Millennium Falcon', fav_planet='Tatooine', fav_robot='R2-D2')
Row

In [1]:
import time

start = time.time()
df.groupBy("fav_heroe").count().collect()
print("Time taken:", time.time() - start)


NameError: name 'df' is not defined

In [18]:
df = df.coalesce(1)

In [19]:
import time

start = time.time()
df.groupBy("fav_heroe").count().collect()
print("Time taken:", time.time() - start)


Time taken: 0.1858351230621338


In [20]:
df = df.repartition(4)

In [21]:
import time

start = time.time()
df.groupBy("fav_heroe").count().collect()
print("Time taken:", time.time() - start)


Time taken: 0.173295259475708


In [22]:
df.count()

36927