# ExoPredict-Classifying-and-Predicting-Exoplanet-Characteristics

# Extract
1. Initialize a PySpark session to handle big data efficiently.
2. Read the files containing exoplanet data.


In [1]:
import os
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz
!tar xf spark-3.5.4-bin-hadoop3.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.4-bin-hadoop3"

In [2]:
# Import findspark and initialise.
import findspark
findspark.init()

In [3]:
from google.colab import files
uploaded = files.upload()

Saving PS_2025.02.03_02.08.19.csv to PS_2025.02.03_02.08.19.csv


In [4]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("ExoPlanet_ETL_Project").getOrCreate()

In [5]:
# Read the file
file_path = "/content/PS_2025.02.03_02.08.19.csv"

df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
df.show(5)

+----------+--------+------------+-------+-------+---------------+---------+--------------------+-------------------+---------------+--------------------+---------+-------------+-------------+------------+----------+--------------+--------------+-------------+-------+-----------+-----------+----------+-------+-----------+-----------+----------+----------+-------------+-------------+------------+---------+-------------+-------------+------------+------------+-----------+---------------+---------------+--------------+--------+------------+------------+-----------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+----------+----------+---------+-------+-----------+-----------+----------+------+----------+----------+---------+-----------+-------+-----------+-----------+----------+--------------------+------------+-----------+-------------+----------+-------+-----------+-----------+-------+-----------+---------

# Transform
1. Drop unnecessary columns – A new dataframe is created with only the required columns.
2. Remove columns with excessive missing values to improve data quality.
3. Change column data types where necessary for consistency and accuracy.
4. Rename columns for better readability and ease of use.
5. Standardize numerical columns using feature scaling (important for ML models).
6. Create a unique identifier to set as an index for efficient querying.
7. Export the cleaned data for further analysis.

In [6]:
# Data Selection
df = df.select("pl_name", "hostname", "discoverymethod", "disc_year", "disc_facility", "pl_refname", "pl_orbper", "pl_orbpererr1", "pl_orbpererr2", "pl_orbsmax", "pl_rade", "pl_radj","pl_bmasse", "pl_bmassj", "pl_bmassprov", "pl_orbeccen", "pl_insol", "pl_eqt", "pl_eqterr1", "pl_eqterr2", "pl_eqtlim", "ttv_flag", "st_refname", "st_spectype", "st_teff", "st_tefferr1", "st_tefferr2", "st_tefflim", "st_rad", "st_mass", "st_met", "st_logg", "sy_refname", "rastr", "ra", "dec", "sy_dist", "sy_vmag", "sy_kmag")
df.show(5)

+----------+--------+---------------+---------+--------------------+--------------------+---------+-------------+-------------+----------+-------+-------+----------+---------+------------+-----------+--------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+-------+------+-------+--------------------+------------+-----------+----------+-------+-------+-------+
|   pl_name|hostname|discoverymethod|disc_year|       disc_facility|          pl_refname|pl_orbper|pl_orbpererr1|pl_orbpererr2|pl_orbsmax|pl_rade|pl_radj| pl_bmasse|pl_bmassj|pl_bmassprov|pl_orbeccen|pl_insol|pl_eqt|pl_eqterr1|pl_eqterr2|pl_eqtlim|ttv_flag|          st_refname|st_spectype|st_teff|st_tefferr1|st_tefferr2|st_tefflim|st_rad|st_mass|st_met|st_logg|          sy_refname|       rastr|         ra|       dec|sy_dist|sy_vmag|sy_kmag|
+----------+--------+---------------+---------+--------------------+--------------------+---------+-----------

In [7]:
# Import necessary libraries
from pyspark.sql.functions import col, mean, when, lit, count, round
from pyspark.sql.types import IntegerType, DoubleType, DateType

In [8]:
# Total Rows
total_rows = df.count()
print(f"Total Rows: {total_rows}")

Total Rows: 5749


In [9]:
# Count missing values per column
missing_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])

# Show missing values
missing_counts.show()

+-------+--------+---------------+---------+-------------+----------+---------+-------------+-------------+----------+-------+-------+---------+---------+------------+-----------+--------+------+----------+----------+---------+--------+----------+-----------+-------+-----------+-----------+----------+------+-------+------+-------+----------+-----+---+---+-------+-------+-------+
|pl_name|hostname|discoverymethod|disc_year|disc_facility|pl_refname|pl_orbper|pl_orbpererr1|pl_orbpererr2|pl_orbsmax|pl_rade|pl_radj|pl_bmasse|pl_bmassj|pl_bmassprov|pl_orbeccen|pl_insol|pl_eqt|pl_eqterr1|pl_eqterr2|pl_eqtlim|ttv_flag|st_refname|st_spectype|st_teff|st_tefferr1|st_tefferr2|st_tefflim|st_rad|st_mass|st_met|st_logg|sy_refname|rastr| ra|dec|sy_dist|sy_vmag|sy_kmag|
+-------+--------+---------------+---------+-------------+----------+---------+-------------+-------------+----------+-------+-------+---------+---------+------------+-----------+--------+------+----------+----------+---------+--------+

In [10]:
#Fill missing categorical values with "Unknown"
for c in df.columns:
    df = df.fillna({c: "Unknown"})

In [11]:
#Convert Data types
from pyspark.sql.types import IntegerType, DoubleType, DateType

df = df.withColumn("disc_year", col("disc_year").cast(IntegerType()))


In [12]:
#Remove Duplicates
df = df.dropDuplicates()
df.show(2)

+-----------+---------+---------------+---------+--------------------+--------------------+----------+-------------+-------------+----------+-------+-------+---------+---------+------------+-----------+--------+------+----------+----------+---------+--------+--------------------+----------------+-------+-----------+-----------+----------+------+-------+------+-------+--------------------+------------+-----------+-----------+-------+-------+-------+
|    pl_name| hostname|discoverymethod|disc_year|       disc_facility|          pl_refname| pl_orbper|pl_orbpererr1|pl_orbpererr2|pl_orbsmax|pl_rade|pl_radj|pl_bmasse|pl_bmassj|pl_bmassprov|pl_orbeccen|pl_insol|pl_eqt|pl_eqterr1|pl_eqterr2|pl_eqtlim|ttv_flag|          st_refname|     st_spectype|st_teff|st_tefferr1|st_tefferr2|st_tefflim|st_rad|st_mass|st_met|st_logg|          sy_refname|       rastr|         ra|        dec|sy_dist|sy_vmag|sy_kmag|
+-----------+---------+---------------+---------+--------------------+--------------------+---

In [13]:
# Define columns to check for outliers
outlier_cols = ["pl_rade", "pl_bmasse", "st_teff"]

# Remove outliers based on the 1st and 99th percentile
for col_name in outlier_cols:
    lower, upper = df.approxQuantile(col_name, [0.01, 0.99], 0)  # Compute quantiles
    df = df.filter((col(col_name) >= lower) & (col(col_name) <= upper))

df.show(5)

+-----------+---------+---------------+---------+--------------------+--------------------+----------+-------------+-------------+----------+-------+-------+----------+---------+------------+-----------+--------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+-------+------+-------+--------------------+------------+-----------+-----------+-------+-------+-------+
|    pl_name| hostname|discoverymethod|disc_year|       disc_facility|          pl_refname| pl_orbper|pl_orbpererr1|pl_orbpererr2|pl_orbsmax|pl_rade|pl_radj| pl_bmasse|pl_bmassj|pl_bmassprov|pl_orbeccen|pl_insol|pl_eqt|pl_eqterr1|pl_eqterr2|pl_eqtlim|ttv_flag|          st_refname|st_spectype|st_teff|st_tefferr1|st_tefferr2|st_tefflim|st_rad|st_mass|st_met|st_logg|          sy_refname|       rastr|         ra|        dec|sy_dist|sy_vmag|sy_kmag|
+-----------+---------+---------------+---------+--------------------+--------------------+----------+

In [14]:
# count columns
column_count = len(df.columns)
print("Number of columns:", column_count)


Number of columns: 39


In [15]:
#Check the Data Types for each Column in a DataFrame
df_types = spark.createDataFrame(df.dtypes, ["Column", "DataType"])
df_types.show(truncate=False)

+---------------+--------+
|Column         |DataType|
+---------------+--------+
|pl_name        |string  |
|hostname       |string  |
|discoverymethod|string  |
|disc_year      |int     |
|disc_facility  |string  |
|pl_refname     |string  |
|pl_orbper      |double  |
|pl_orbpererr1  |double  |
|pl_orbpererr2  |double  |
|pl_orbsmax     |double  |
|pl_rade        |double  |
|pl_radj        |double  |
|pl_bmasse      |double  |
|pl_bmassj      |double  |
|pl_bmassprov   |string  |
|pl_orbeccen    |double  |
|pl_insol       |double  |
|pl_eqt         |int     |
|pl_eqterr1     |int     |
|pl_eqterr2     |int     |
+---------------+--------+
only showing top 20 rows



In [16]:
#disc_year should be a proper Date not int
from pyspark.sql.functions import to_date, concat_ws, lit

# Convert disc_year (int) to a proper date format
df = df.withColumn("disc_date", to_date(concat_ws("-", col("disc_year").cast("string"), lit("01"), lit("01")), "yyyy-MM-dd"))

# Verify the updated schema
df.select("disc_year", "disc_date").show(5)
df.printSchema()

+---------+----------+
|disc_year| disc_date|
+---------+----------+
|     2015|2015-01-01|
|     2019|2019-01-01|
|     2019|2019-01-01|
|     2023|2023-01-01|
|     2015|2015-01-01|
+---------+----------+
only showing top 5 rows

root
 |-- pl_name: string (nullable = false)
 |-- hostname: string (nullable = false)
 |-- discoverymethod: string (nullable = false)
 |-- disc_year: integer (nullable = true)
 |-- disc_facility: string (nullable = false)
 |-- pl_refname: string (nullable = false)
 |-- pl_orbper: double (nullable = true)
 |-- pl_orbpererr1: double (nullable = true)
 |-- pl_orbpererr2: double (nullable = true)
 |-- pl_orbsmax: double (nullable = true)
 |-- pl_rade: double (nullable = true)
 |-- pl_radj: double (nullable = true)
 |-- pl_bmasse: double (nullable = true)
 |-- pl_bmassj: double (nullable = true)
 |-- pl_bmassprov: string (nullable = false)
 |-- pl_orbeccen: double (nullable = true)
 |-- pl_insol: double (nullable = true)
 |-- pl_eqt: integer (nullable = true)
 |-

In [17]:
# Count null values per column
missing_counts = df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns])
missing_counts.show()


+-------+--------+---------------+---------+-------------+----------+---------+-------------+-------------+----------+-------+-------+---------+---------+------------+-----------+--------+------+----------+----------+---------+--------+----------+-----------+-------+-----------+-----------+----------+------+-------+------+-------+----------+-----+---+---+-------+-------+-------+---------+
|pl_name|hostname|discoverymethod|disc_year|disc_facility|pl_refname|pl_orbper|pl_orbpererr1|pl_orbpererr2|pl_orbsmax|pl_rade|pl_radj|pl_bmasse|pl_bmassj|pl_bmassprov|pl_orbeccen|pl_insol|pl_eqt|pl_eqterr1|pl_eqterr2|pl_eqtlim|ttv_flag|st_refname|st_spectype|st_teff|st_tefferr1|st_tefferr2|st_tefflim|st_rad|st_mass|st_met|st_logg|sy_refname|rastr| ra|dec|sy_dist|sy_vmag|sy_kmag|disc_date|
+-------+--------+---------------+---------+-------------+----------+---------+-------------+-------------+----------+-------+-------+---------+---------+------------+-----------+--------+------+----------+----------

In [18]:
# Get columns where more than 50% values are missing
cols_to_drop = [c for c in df.columns if df.select(count(when(col(c).isNull(), 1)) / total_rows).collect()[0][0] > 0.5]

# Drop those columns
df = df.drop(*cols_to_drop)

df.show(5)

+-----------+---------+---------------+---------+--------------------+--------------------+----------+-------------+-------------+----------+-------+-------+----------+---------+------------+-----------+--------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+-------+------+-------+--------------------+------------+-----------+-----------+-------+-------+-------+----------+
|    pl_name| hostname|discoverymethod|disc_year|       disc_facility|          pl_refname| pl_orbper|pl_orbpererr1|pl_orbpererr2|pl_orbsmax|pl_rade|pl_radj| pl_bmasse|pl_bmassj|pl_bmassprov|pl_orbeccen|pl_insol|pl_eqt|pl_eqterr1|pl_eqterr2|pl_eqtlim|ttv_flag|          st_refname|st_spectype|st_teff|st_tefferr1|st_tefferr2|st_tefflim|st_rad|st_mass|st_met|st_logg|          sy_refname|       rastr|         ra|        dec|sy_dist|sy_vmag|sy_kmag| disc_date|
+-----------+---------+---------------+---------+--------------------+----------

In [19]:
column_count = len(df.columns)
print("Number of columns:", column_count)

Number of columns: 40


In [20]:
#Drop Rows where key Columns are missing
df = df.dropna(subset=["pl_name", "hostname"])
df.show(5)

+-----------+---------+---------------+---------+--------------------+--------------------+----------+-------------+-------------+----------+-------+-------+----------+---------+------------+-----------+--------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+-------+------+-------+--------------------+------------+-----------+-----------+-------+-------+-------+----------+
|    pl_name| hostname|discoverymethod|disc_year|       disc_facility|          pl_refname| pl_orbper|pl_orbpererr1|pl_orbpererr2|pl_orbsmax|pl_rade|pl_radj| pl_bmasse|pl_bmassj|pl_bmassprov|pl_orbeccen|pl_insol|pl_eqt|pl_eqterr1|pl_eqterr2|pl_eqtlim|ttv_flag|          st_refname|st_spectype|st_teff|st_tefferr1|st_tefferr2|st_tefflim|st_rad|st_mass|st_met|st_logg|          sy_refname|       rastr|         ra|        dec|sy_dist|sy_vmag|sy_kmag| disc_date|
+-----------+---------+---------------+---------+--------------------+----------

In [21]:
# Fill missing numerical values with Mean
from pyspark.sql.functions import mean

num_cols = [c for c, dtype in df.dtypes if dtype in ("int", "double")]

for c in num_cols:
    mean_value = df.select(mean(col(c))).collect()[0][0]
    if mean_value is not None:
        df = df.fillna({c: mean_value})

df.show(5)

+-----------+---------+---------------+---------+--------------------+--------------------+----------+-------------+-------------+----------+-------+-------+----------+---------+------------+-----------+--------+------+----------+----------+---------+--------+--------------------+-----------+-------+-----------+-----------+----------+------+-------+------+-------+--------------------+------------+-----------+-----------+-------+-------+-------+----------+
|    pl_name| hostname|discoverymethod|disc_year|       disc_facility|          pl_refname| pl_orbper|pl_orbpererr1|pl_orbpererr2|pl_orbsmax|pl_rade|pl_radj| pl_bmasse|pl_bmassj|pl_bmassprov|pl_orbeccen|pl_insol|pl_eqt|pl_eqterr1|pl_eqterr2|pl_eqtlim|ttv_flag|          st_refname|st_spectype|st_teff|st_tefferr1|st_tefferr2|st_tefflim|st_rad|st_mass|st_met|st_logg|          sy_refname|       rastr|         ra|        dec|sy_dist|sy_vmag|sy_kmag| disc_date|
+-----------+---------+---------------+---------+--------------------+----------

In [22]:
#count the Rows with any NULL or Empty values
df.select(count(when(
    col(c).isNull() | (col(c) == ""), 1
)).alias("Rows_with_missing_values")).show()

+------------------------+
|Rows_with_missing_values|
+------------------------+
|                       0|
+------------------------+



In [23]:
# show the Rows that contain NULL or Empty Strings
df.filter(
    (col("pl_name").isNull()) | (col("pl_name") == "")
).show()

+-------+--------+---------------+---------+-------------+----------+---------+-------------+-------------+----------+-------+-------+---------+---------+------------+-----------+--------+------+----------+----------+---------+--------+----------+-----------+-------+-----------+-----------+----------+------+-------+------+-------+----------+-----+---+---+-------+-------+-------+---------+
|pl_name|hostname|discoverymethod|disc_year|disc_facility|pl_refname|pl_orbper|pl_orbpererr1|pl_orbpererr2|pl_orbsmax|pl_rade|pl_radj|pl_bmasse|pl_bmassj|pl_bmassprov|pl_orbeccen|pl_insol|pl_eqt|pl_eqterr1|pl_eqterr2|pl_eqtlim|ttv_flag|st_refname|st_spectype|st_teff|st_tefferr1|st_tefferr2|st_tefflim|st_rad|st_mass|st_met|st_logg|sy_refname|rastr| ra|dec|sy_dist|sy_vmag|sy_kmag|disc_date|
+-------+--------+---------------+---------+-------------+----------+---------+-------------+-------------+----------+-------+-------+---------+---------+------------+-----------+--------+------+----------+----------

In [24]:
#Columns Names
df.columns

['pl_name',
 'hostname',
 'discoverymethod',
 'disc_year',
 'disc_facility',
 'pl_refname',
 'pl_orbper',
 'pl_orbpererr1',
 'pl_orbpererr2',
 'pl_orbsmax',
 'pl_rade',
 'pl_radj',
 'pl_bmasse',
 'pl_bmassj',
 'pl_bmassprov',
 'pl_orbeccen',
 'pl_insol',
 'pl_eqt',
 'pl_eqterr1',
 'pl_eqterr2',
 'pl_eqtlim',
 'ttv_flag',
 'st_refname',
 'st_spectype',
 'st_teff',
 'st_tefferr1',
 'st_tefferr2',
 'st_tefflim',
 'st_rad',
 'st_mass',
 'st_met',
 'st_logg',
 'sy_refname',
 'rastr',
 'ra',
 'dec',
 'sy_dist',
 'sy_vmag',
 'sy_kmag',
 'disc_date']

In [25]:
# Rename Columns in PySpark
df = df.withColumnRenamed("pl_name", "Planet_Name") \
       .withColumnRenamed("hostname", "Host_Star") \
       .withColumnRenamed("discoverymethod", "Discovery_Method") \
       .withColumnRenamed("disc_year", "Discovery_Year") \
       .withColumnRenamed("disc_facility", "Discovery_Facility") \
       .withColumnRenamed("pl_refname", "Reference_Name") \
       .withColumnRenamed("pl_orbper", "Orbital_Period_Days") \
       .withColumnRenamed("pl_orbpererr1", "Orbital_Period_Error_Upper") \
       .withColumnRenamed("pl_orbpererr2", "Orbital_Period_Error_Lower") \
       .withColumnRenamed("pl_orbsmax", "Semi_Major_Axis_AU") \
       .withColumnRenamed("pl_rade", "Planet_Radius_Earth") \
       .withColumnRenamed("pl_bmasse", "Planet_Mass_Earth") \
       .withColumnRenamed("pl_orbeccen", "Orbital_Eccentricity") \
       .withColumnRenamed("pl_eqt", "Equilibrium_Temperature") \
       .withColumnRenamed("pl_eqterr1", "UEB_temp") \
       .withColumnRenamed("pl_eqterr2", "LEB_temp") \
       .withColumnRenamed("st_teff", "Star_Temperature_K") \
       .withColumnRenamed("st_tefferr1", "Star_UEBTemp") \
       .withColumnRenamed("st_tefferr2", "Star_LEBTemp") \
       .withColumnRenamed("st_rad", "Star_Radius_Solar") \
       .withColumnRenamed("st_mass", "Star_Mass_Solar") \
       .withColumnRenamed("st_met", "Star_Metallicity") \
       .withColumnRenamed("st_logg", "Star_Surface_Gravity") \
       .withColumnRenamed("sy_dist", "System_Distance_PC") \
       .withColumnRenamed("sy_vmag", "System_Visual_Magnitude") \
       .withColumnRenamed("sy_kmag", "System_Infrared_Magnitude") \
       .withColumnRenamed("disc_date", "Discovery_Date")

In [26]:
#verify Renamed Columns
df.show()

+------------+----------+----------------+--------------+--------------------+--------------------+-------------------+--------------------------+--------------------------+------------------+-------------------+-------+-----------------+---------+------------+--------------------+--------+-----------------------+--------+--------+---------+--------+--------------------+-----------+------------------+------------+------------+----------+-----------------+---------------+----------------+--------------------+--------------------+------------+-----------+-----------+------------------+-----------------------+-------------------------+--------------+
| Planet_Name| Host_Star|Discovery_Method|Discovery_Year|  Discovery_Facility|      Reference_Name|Orbital_Period_Days|Orbital_Period_Error_Upper|Orbital_Period_Error_Lower|Semi_Major_Axis_AU|Planet_Radius_Earth|pl_radj|Planet_Mass_Earth|pl_bmassj|pl_bmassprov|Orbital_Eccentricity|pl_insol|Equilibrium_Temperature|UEB_temp|LEB_temp|pl_eqtlim|t

In [27]:
# Standardize Numerical Columns (Feature Scaling for ML Models)
from pyspark.ml.feature import StandardScaler, VectorAssembler

# Define numerical columns to scale
num_features = ["Planet_Radius_Earth", "Planet_Mass_Earth", "Star_Temperature_K"]

# Step 1: Assemble features into a single vector column
assembler = VectorAssembler(inputCols=num_features, outputCol="features")
df_transformed = assembler.transform(df)  # Apply transformation and create "features" column

# Step 2: Apply StandardScaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_transformed)  # Fit scaler model
df_scaled = scaler_model.transform(df_transformed)  # Transform data

# Show the final dataset with the new scaled features
df_scaled.select("features", "scaled_features").show(truncate=False)

+-------------------------+--------------------------------------------------------------+
|features                 |scaled_features                                               |
+-------------------------+--------------------------------------------------------------+
|[1.192,1.83705,3229.0]   |[-1.2966344596464798,-0.5761277207865202,-2.651812546513928]  |
|[1.664,4.72,5664.0]      |[-1.2108601499827818,-0.5701635414085622,0.33040677144419733] |
|[2.63,24.5,5725.0]       |[-1.035313575543942,-0.5292431410908292,0.40511534573390806]  |
|[2.91,1112.39942,5592.0] |[-0.9844305104892058,1.721377676494085,0.24222615916781742]   |
|[1.458,2.2,3844.0]       |[-1.2482955478444804,-0.575376858334785,-1.8986031499865164]  |
|[14.415,375.0394,5404.0] |[1.106318287563435,0.1959445522932938,0.011976782668381062]   |
|[1.533,3.09,3803.0]      |[-1.234666155419105,-0.5735356471981429,-1.9488171097550104]  |
|[2.69,9.13,4580.0]       |[-1.0244100616036416,-0.5610402367876723,-0.9972013356057443] |

In [33]:
# Updated the DataFrame
df = df_scaled

In [34]:
# Drop existing indexed columns if they exist
for col_name in ["Discovery_Method_Index", "Discovery_Facility_Index"]:
    if col_name in df.columns:
        df = df.drop(col_name)

In [35]:
# Encode categorical variables (for ML)
from pyspark.ml.feature import StringIndexer

categorical_cols = ["Discovery_Method", "Discovery_Facility"]

for col_name in categorical_cols:
    indexer = StringIndexer(inputCol=col_name, outputCol=col_name + "_Index")
    df = indexer.fit(df).transform(df)

df.show(5)

+-----------+---------+----------------+--------------+--------------------+--------------------+-------------------+--------------------------+--------------------------+------------------+-------------------+-------+-----------------+---------+------------+--------------------+--------+-----------------------+--------+--------+---------+--------+--------------------+-----------+------------------+------------+------------+----------+-----------------+---------------+----------------+--------------------+--------------------+------------+-----------+-----------+------------------+-----------------------+-------------------------+--------------+--------------------+--------------------+----------------------+------------------------+
|Planet_Name|Host_Star|Discovery_Method|Discovery_Year|  Discovery_Facility|      Reference_Name|Orbital_Period_Days|Orbital_Period_Error_Upper|Orbital_Period_Error_Lower|Semi_Major_Axis_AU|Planet_Radius_Earth|pl_radj|Planet_Mass_Earth|pl_bmassj|pl_bmasspr

# Export

In [36]:
# Convert vector columns to string format
df_cleaned = df.withColumn("features", col("features").cast("string")) \
               .withColumn("scaled_features", col("scaled_features").cast("string"))

# Save as CSV
df_cleaned.write.csv("Cleaned_Dataset.csv", header=True)