In [2]:
import os
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
!tar xf spark-3.5.4-bin-hadoop3.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

In [3]:
# Import findspark and initialise.
import findspark
findspark.init()

In [4]:
from google.colab import files
uploaded = files.upload()

Saving PS_2025.02.03_02.08.19.csv to PS_2025.02.03_02.08.19.csv


In [5]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("ExoPlanet_ETL_Project").getOrCreate()

In [6]:
file_path = "/content/PS_2025.02.03_02.08.19.csv"

df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
df.show(5)

+----------+--------+------------+-------+-------+---------------+---------+--------------------+-------------------+---------------+--------------------+---------+-------------+-------------+------------+----------+--------------+--------------+-------------+-------+-----------+-----------+----------+-------+-----------+-----------+----------+----------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+-----------+---------------+---------------+--------------+--------+------------+------------+-----------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+----------+----------+---------+-------+-----------+-----------+----------+------+----------+----------+---------+-----------+-------+-----------+-----------+----------+--------------------+------------+-----------+-------------+----------+-------+-----------+-----------+-------+-----------+---------

In [7]:
# Import necessary libraries
from pyspark.sql.functions import col, mean, when, lit, count, round
from pyspark.sql.types import IntegerType, DoubleType, DateType

In [8]:
# Total Rows
total_rows = df.count()
print(f"Total Rows: {total_rows}")

Total Rows: 5749


In [9]:
# Count missing values per column
missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

# Show missing values
missing_counts.show()

+-------+--------+------------+-------+-------+---------------+---------+-------------+-------+---------------+----------+---------+-------------+-------------+------------+----------+--------------+--------------+-------------+-------+-----------+-----------+----------+-------+-----------+-----------+----------+---------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+-----------+---------------+---------------+--------------+--------+------------+------------+-----------+------+----------+----------+---------+--------+----------+-----------+-------+-----------+-----------+----------+------+----------+----------+---------+-------+-----------+-----------+----------+------+----------+----------+---------+-----------+-------+-----------+-----------+----------+----------+-----+---+------+---+-------+-----------+-----------+-------+-----------+-----------+-------+-----------+-----------+----------+--------------+--------------+------

In [10]:
#Drop columns with too many missing values (e.g., >50%)
threshold = int(0.5 * df.count())
df_cleaned = df.dropna(thresh=threshold)


In [11]:
#Fill missing categorical values with "Unknown"
for c in df.columns:
    df = df.fillna({c: "Unknown"})

In [12]:
#Convert Data types
from pyspark.sql.types import IntegerType, DoubleType, DateType

df = df.withColumn("disc_year", col("disc_year").cast(IntegerType()))
df = df.withColumn("pl_rade", col("pl_rade").cast(DoubleType()))
df = df.withColumn("st_teff", col("st_teff").cast(DoubleType()))
df = df.withColumn("releasedate", col("releasedate").cast(DateType()))

In [13]:
#Remove Duplicates
df = df.dropDuplicates()

In [14]:
# Define columns to check for outliers
outlier_cols = ["pl_rade", "pl_bmasse", "st_teff"]

# Remove outliers based on the 1st and 99th percentile
for col_name in outlier_cols:
    lower, upper = df.approxQuantile(col_name, [0.01, 0.99], 0)  # Compute quantiles
    df = df.filter((col(col_name) >= lower) & (col(col_name) <= upper))

df.show(5)

+-----------+---------+------------+-------+-------+---------------+---------+--------------------+-------------------+---------------+--------------------+---------+-------------+-------------+------------+----------+--------------+--------------+-------------+-------+-----------+-----------+----------+-------+-----------+-----------+----------+----------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+-----------+---------------+---------------+--------------+--------+------------+------------+-----------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+----------+----------+---------+-------+-----------+-----------+----------+------+----------+----------+---------+-----------+-------+-----------+-----------+----------+--------------------+------------+-----------+-------------+-----------+-------+-----------+-----------+-------+-----------+------

In [15]:
#Check the Data Types for each Column in a DataFrame
df_types = spark.createDataFrame(df.dtypes, ["Column", "DataType"])
df_types.show(truncate=False)

+---------------+--------+
|Column         |DataType|
+---------------+--------+
|pl_name        |string  |
|hostname       |string  |
|default_flag   |int     |
|sy_snum        |int     |
|sy_pnum        |int     |
|discoverymethod|string  |
|disc_year      |int     |
|disc_facility  |string  |
|soltype        |string  |
|pl_controv_flag|int     |
|pl_refname     |string  |
|pl_orbper      |double  |
|pl_orbpererr1  |double  |
|pl_orbpererr2  |double  |
|pl_orbperlim   |int     |
|pl_orbsmax     |double  |
|pl_orbsmaxerr1 |double  |
|pl_orbsmaxerr2 |double  |
|pl_orbsmaxlim  |int     |
|pl_rade        |double  |
+---------------+--------+
only showing top 20 rows



In [16]:
#Convert the Data Types into boolean for 0 and 1 values
df = df.withColumn("pl_orbperlim", col("pl_orbperlim").cast("boolean"))
df = df.withColumn("pl_orbsmaxlim", col("pl_orbsmaxlim").cast("boolean"))
df = df.withColumn("pl_controv_flag", col("pl_controv_flag").cast("boolean"))

In [17]:
#disc_year should be a proper Date not int
from pyspark.sql.functions import to_date, concat_ws, lit

# Convert disc_year (int) to a proper date format
df = df.withColumn("disc_date", to_date(concat_ws("-", col("disc_year").cast("string"), lit("01"), lit("01")), "yyyy-MM-dd"))

# Verify the updated schema
df.select("disc_year", "disc_date").show(5)
df.printSchema()

+---------+----------+
|disc_year| disc_date|
+---------+----------+
|     2016|2016-01-01|
|     2018|2018-01-01|
|     2024|2024-01-01|
|     2021|2021-01-01|
|     2019|2019-01-01|
+---------+----------+
only showing top 5 rows

root
 |-- pl_name: string (nullable = false)
 |-- hostname: string (nullable = false)
 |-- default_flag: integer (nullable = true)
 |-- sy_snum: integer (nullable = true)
 |-- sy_pnum: integer (nullable = true)
 |-- discoverymethod: string (nullable = false)
 |-- disc_year: integer (nullable = true)
 |-- disc_facility: string (nullable = false)
 |-- soltype: string (nullable = false)
 |-- pl_controv_flag: boolean (nullable = true)
 |-- pl_refname: string (nullable = false)
 |-- pl_orbper: double (nullable = true)
 |-- pl_orbpererr1: double (nullable = true)
 |-- pl_orbpererr2: double (nullable = true)
 |-- pl_orbperlim: boolean (nullable = true)
 |-- pl_orbsmax: double (nullable = true)
 |-- pl_orbsmaxerr1: double (nullable = true)
 |-- pl_orbsmaxerr2: doubl

In [18]:
# Count null values per column
missing_counts = df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns])
missing_counts.show()


+-------+--------+------------+-------+-------+---------------+---------+-------------+-------+---------------+----------+---------+-------------+-------------+------------+----------+--------------+--------------+-------------+-------+-----------+-----------+----------+-------+-----------+-----------+----------+---------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+-----------+---------------+---------------+--------------+--------+------------+------------+-----------+------+----------+----------+---------+--------+----------+-----------+-------+-----------+-----------+----------+------+----------+----------+---------+-------+-----------+-----------+----------+------+----------+----------+---------+-----------+-------+-----------+-----------+----------+----------+-----+---+------+---+-------+-----------+-----------+-------+-----------+-----------+-------+-----------+-----------+----------+--------------+--------------+------

In [19]:
# Get columns where more than 50% values are missing
cols_to_drop = [c for c in df.columns if df.select(count(when(col(c).isNull(), 1)) / total_rows).collect()[0][0] > 0.5]

# Drop those columns
df = df.drop(*cols_to_drop)

df.show(5)

+-----------+---------+------------+-------+-------+---------------+---------+--------------------+-------------------+---------------+--------------------+---------+-------------+-------------+------------+----------+--------------+--------------+-------------+-------+-----------+-----------+----------+-------+-----------+-----------+----------+----------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+-----------+---------------+---------------+--------------+--------+------------+------------+-----------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+----------+----------+---------+-------+-----------+-----------+----------+------+----------+----------+---------+-----------+-------+-----------+-----------+----------+--------------------+------------+-----------+-------------+-----------+-------+-----------+-----------+-------+-----------+------

In [20]:
#Drop Rows where key Columns are missing
df = df.dropna(subset=["pl_name", "hostname"])
df.show(5)

+-----------+---------+------------+-------+-------+---------------+---------+--------------------+-------------------+---------------+--------------------+---------+-------------+-------------+------------+----------+--------------+--------------+-------------+-------+-----------+-----------+----------+-------+-----------+-----------+----------+----------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+-----------+---------------+---------------+--------------+--------+------------+------------+-----------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+----------+----------+---------+-------+-----------+-----------+----------+------+----------+----------+---------+-----------+-------+-----------+-----------+----------+--------------------+------------+-----------+-------------+-----------+-------+-----------+-----------+-------+-----------+------

In [21]:
# Fill missing numerical values with Mean
from pyspark.sql.functions import mean

num_cols = [c for c, dtype in df.dtypes if dtype in ("int", "double")]

for c in num_cols:
    mean_value = df.select(mean(col(c))).collect()[0][0]
    if mean_value is not None:
        df = df.fillna({c: mean_value})

df.show(5)

+-----------+---------+------------+-------+-------+---------------+---------+--------------------+-------------------+---------------+--------------------+---------+-------------+-------------+------------+-------------------+--------------------+--------------------+-------------+-------+-----------+-----------+----------+-------+-----------+-----------+----------+----------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+-----------+-------------------+--------------------+--------------+-----------------+------------------+-------------------+-----------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+----------+----------+---------+-------+-----------+-----------+----------+------+-------------------+-------------------+---------+-----------+-----------------+-------------------+--------------------+----------+--------------------+------------

In [22]:
#count the Rows with any NULL or Empty values
df.select(count(when(
    col(c).isNull() | (col(c) == ""), 1
)).alias("Rows_with_missing_values")).show()

+------------------------+
|Rows_with_missing_values|
+------------------------+
|                       0|
+------------------------+



In [23]:
# show the Rows that contain NULL or Empty Strings
df.filter(
    (col("pl_name").isNull()) | (col("pl_name") == "")
).show()

+-------+--------+------------+-------+-------+---------------+---------+-------------+-------+---------------+----------+---------+-------------+-------------+------------+----------+--------------+--------------+-------------+-------+-----------+-----------+----------+-------+-----------+-----------+----------+---------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+-----------+---------------+---------------+--------------+--------+------------+------------+-----------+------+----------+----------+---------+--------+----------+-----------+-------+-----------+-----------+----------+------+----------+----------+---------+-------+-----------+-----------+----------+------+----------+----------+---------+-----------+-------+-----------+-----------+----------+----------+-----+---+------+---+-------+-----------+-----------+-------+-----------+-----------+-------+-----------+-----------+----------+--------------+--------------+------

In [24]:
#Columns Names
df.columns

['pl_name',
 'hostname',
 'default_flag',
 'sy_snum',
 'sy_pnum',
 'discoverymethod',
 'disc_year',
 'disc_facility',
 'soltype',
 'pl_controv_flag',
 'pl_refname',
 'pl_orbper',
 'pl_orbpererr1',
 'pl_orbpererr2',
 'pl_orbperlim',
 'pl_orbsmax',
 'pl_orbsmaxerr1',
 'pl_orbsmaxerr2',
 'pl_orbsmaxlim',
 'pl_rade',
 'pl_radeerr1',
 'pl_radeerr2',
 'pl_radelim',
 'pl_radj',
 'pl_radjerr1',
 'pl_radjerr2',
 'pl_radjlim',
 'pl_bmasse',
 'pl_bmasseerr1',
 'pl_bmasseerr2',
 'pl_bmasselim',
 'pl_bmassj',
 'pl_bmassjerr1',
 'pl_bmassjerr2',
 'pl_bmassjlim',
 'pl_bmassprov',
 'pl_orbeccen',
 'pl_orbeccenerr1',
 'pl_orbeccenerr2',
 'pl_orbeccenlim',
 'pl_insol',
 'pl_insolerr1',
 'pl_insolerr2',
 'pl_insollim',
 'pl_eqt',
 'pl_eqterr1',
 'pl_eqterr2',
 'pl_eqtlim',
 'ttv_flag',
 'st_refname',
 'st_spectype',
 'st_teff',
 'st_tefferr1',
 'st_tefferr2',
 'st_tefflim',
 'st_rad',
 'st_raderr1',
 'st_raderr2',
 'st_radlim',
 'st_mass',
 'st_masserr1',
 'st_masserr2',
 'st_masslim',
 'st_met',
 'st_me

In [25]:
# Rename Columns in PySpark
df = df.withColumnRenamed("pl_name", "Planet_Name") \
       .withColumnRenamed("hostname", "Host_Star") \
       .withColumnRenamed("default_flag", "Default_Flag") \
       .withColumnRenamed("sy_snum", "Num_Stars") \
       .withColumnRenamed("sy_pnum", "Num_Planets") \
       .withColumnRenamed("discoverymethod", "Discovery_Method") \
       .withColumnRenamed("disc_year", "Discovery_Year") \
       .withColumnRenamed("disc_facility", "Discovery_Facility") \
       .withColumnRenamed("soltype", "Solar_System_Type") \
       .withColumnRenamed("pl_controv_flag", "Controversial_Flag") \
       .withColumnRenamed("pl_refname", "Reference_Name") \
       .withColumnRenamed("pl_orbper", "Orbital_Period_Days") \
       .withColumnRenamed("pl_orbpererr1", "Orbital_Period_Error_Upper") \
       .withColumnRenamed("pl_orbpererr2", "Orbital_Period_Error_Lower") \
       .withColumnRenamed("pl_orbperlim", "Orbital_Period_Limit_Flag") \
       .withColumnRenamed("pl_orbsmax", "Semi_Major_Axis_AU") \
       .withColumnRenamed("pl_orbsmaxerr1", "Semi_Major_Axis_Error_Upper") \
       .withColumnRenamed("pl_orbsmaxerr2", "Semi_Major_Axis_Error_Lower") \
       .withColumnRenamed("pl_orbsmaxlim", "Semi_Major_Axis_Limit_Flag") \
       .withColumnRenamed("pl_rade", "Planet_Radius_Earth") \
       .withColumnRenamed("pl_bmasse", "Planet_Mass_Earth") \
       .withColumnRenamed("pl_orbeccen", "Orbital_Eccentricity") \
       .withColumnRenamed("pl_insol", "Insolation_Flux") \
       .withColumnRenamed("pl_eqt", "Equilibrium_Temperature") \
       .withColumnRenamed("st_teff", "Star_Temperature_K") \
       .withColumnRenamed("st_rad", "Star_Radius_Solar") \
       .withColumnRenamed("st_mass", "Star_Mass_Solar") \
       .withColumnRenamed("st_met", "Star_Metallicity") \
       .withColumnRenamed("st_logg", "Star_Surface_Gravity") \
       .withColumnRenamed("sy_dist", "System_Distance_PC") \
       .withColumnRenamed("sy_vmag", "System_Visual_Magnitude") \
       .withColumnRenamed("sy_kmag", "System_Infrared_Magnitude") \
       .withColumnRenamed("sy_gaiamag", "System_Gaia_Magnitude") \
       .withColumnRenamed("rowupdate", "Row_Last_Update") \
       .withColumnRenamed("pl_pubdate", "Publication_Date") \
       .withColumnRenamed("releasedate", "Release_Date") \
       .withColumnRenamed("disc_date", "Discovery_Date")

In [26]:
#verify Renamed Columns
df.show()

+-------------+-----------+------------+---------+-----------+----------------+--------------+--------------------+-------------------+------------------+--------------------+-------------------+--------------------------+--------------------------+-------------------------+-------------------+---------------------------+---------------------------+--------------------------+-------------------+------------------+--------------------+----------+-------+--------------------+--------------------+----------+-----------------+-----------------+------------------+------------+---------+-------------------+--------------------+------------+------------+--------------------+-------------------+--------------------+--------------+-----------------+------------------+-------------------+-----------+-----------------------+----------+----------+---------+--------+--------------------+-----------+------------------+-----------------+------------------+----------+-----------------+----------------

In [27]:
# Standardize Numerical Columns (Feature Scaling for ML Models)
from pyspark.ml.feature import StandardScaler, VectorAssembler

# Define numerical columns to scale
num_features = ["Planet_Radius_Earth", "Planet_Mass_Earth", "Star_Temperature_K"]

# Step 1: Assemble features into a single vector column
assembler = VectorAssembler(inputCols=num_features, outputCol="features")
df_transformed = assembler.transform(df)  # Apply transformation and create "features" column

# Step 2: Apply StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_transformed)  # Fit scaler model
df_scaled = scaler_model.transform(df_transformed)  # Transform data

# Show the final dataset with the new scaled features
df_scaled.select("features", "scaled_features").show(truncate=False)

+--------------------------+--------------------------------------------------------------+
|features                  |scaled_features                                               |
+--------------------------+--------------------------------------------------------------+
|[2.578,22.2481,5766.0]    |[-1.0447632876255382,-0.5339018190218463,0.4553293055023975]  |
|[2.41,14.9,5368.0]        |[-1.0752931266583798,-0.5491033960478693,-0.03211352346981332]|
|[7.16,68.2,6177.0]        |[-0.21209827305124804,-0.4388376055051407,0.958693633913399]  |
|[12.173,1147.36055,6466.0]|[0.6988903166608681,1.7937044426801465,1.3126408137449792]    |
|[2.562,8.82,5664.0]       |[-1.0476708913429518,-0.5616815575206608,0.33040677144419267] |
|[1.381,3.9,6004.0]        |[-1.2622883907345352,-0.5718599381861436,0.7468152183048753]  |
|[2.89,111.0,5858.0]       |[-0.9880650151359749,-0.35029396882167385,0.5680045322999939] |
|[12.251,400.449,4790.0]   |[0.7130648847832588,0.24851133712208318,-0.740007883

In [28]:
# Encode categorical variables (for ML)
from pyspark.ml.feature import StringIndexer

categorical_cols = ["Discovery_Method", "Discovery_Facility", "Solar_System_Type"]

for col_name in categorical_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_Index")
    df = indexer.fit(df).transform(df)

df.show(5)

+-----------+---------+------------+---------+-----------+----------------+--------------+--------------------+-------------------+------------------+--------------------+-------------------+--------------------------+--------------------------+-------------------------+-------------------+---------------------------+---------------------------+--------------------------+-------------------+-----------+-----------+----------+-------+-----------+-----------+----------+-----------------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+--------------------+-------------------+--------------------+--------------+-----------------+------------------+-------------------+-----------+-----------------------+----------+----------+---------+--------+--------------------+-----------+------------------+-----------+-----------+----------+-----------------+----------+----------+---------+---------------+-----------+-----------+----------+------

In [29]:
# Save the fully cleaned Dataset
df.write.csv("PS_dataset.csv", header=True)

