In [None]:
!pip install pyspark

In [None]:
import os
import urllib.request
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

# --- CONFIGURATION ---
raw_dir = "../data/raw"
clean_dir = "../data/clean"
base_url = "http://cdn.gea.esac.esa.int/Gaia/gdr3/gaia_source/"

files_to_download = [
"GaiaSource_000000-003111.csv.gz",
"GaiaSource_003112-005263.csv.gz",
"GaiaSource_005264-006601.csv.gz",
"GaiaSource_006602-007952.csv.gz",
"GaiaSource_007953-010234.csv.gz",
"GaiaSource_010235-012597.csv.gz",
"GaiaSource_012598-014045.csv.gz",
"GaiaSource_014046-015369.csv.gz",
"GaiaSource_015370-016240.csv.gz",
"GaiaSource_016241-017018.csv.gz",
"GaiaSource_017019-017658.csv.gz",
"GaiaSource_017659-018028.csv.gz",
"GaiaSource_018029-018472.csv.gz",
"GaiaSource_018473-019161.csv.gz",
"GaiaSource_019162-019657.csv.gz",
"GaiaSource_019658-020091.csv.gz",
"GaiaSource_020092-020493.csv.gz",
"GaiaSource_020494-020747.csv.gz",
"GaiaSource_020748-020984.csv.gz",
"GaiaSource_020985-021233.csv.gz",
"GaiaSource_021234-021441.csv.gz",
"GaiaSource_021442-021665.csv.gz",
"GaiaSource_021666-021919.csv.gz",
"GaiaSource_021920-022158.csv.gz",
"GaiaSource_022159-022410.csv.gz"
]



In [None]:
# --- STEP 1: DOWNLOAD (Only runs if files missing) ---
if not os.path.exists(raw_dir): os.makedirs(raw_dir)

print("Checking raw files...")
for f in files_to_download:
    local_path = os.path.join(raw_dir, f)
    if not os.path.exists(local_path):
        print(f"Downloading {f}...")
        urllib.request.urlretrieve(base_url + f, local_path)
    else:
        print(f"{f} already exists.")



In [None]:
# checking the size of the raw data 
!du -sh {raw_dir}

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Stellar Data Analysis") \
    .master("local[*]") \
    .getOrCreate()


In [None]:
# --- STEP 2: SPARK ETL ---
print("Reading raw data...")

raw_df = spark.read \
    .option("header", "true") \
    .option("comment", "#") \
    .option("nullValue", "null") \
    .option("nanValue", "NaN") \
    .csv(raw_dir)  # Reads all chunks

# defining the features we need
cols = [
    "source_id", "ra", "dec", "parallax", "parallax_error", 
    "pmra", "pmdec", "phot_g_mean_mag", "bp_rp", "teff_gspphot"
]

# We use a loop to apply it to all columns safely

for c in cols:
    raw_df = raw_df.withColumn(c, col(c).cast("double"))

clean_df = raw_df.select(cols) \
 .filter(col("parallax").isNotNull()) \
 .filter(col("parallax") > 0) \
 .filter(col("ra").isNotNull()) \
 .filter(col("dec").isNotNull())

print("data cleaned")

In [None]:
# --- STEP 3: OPTIMIZE WRITE ---
print(f"Writing clean data to {clean_dir}...")
clean_df.coalesce(5).write.mode("overwrite").parquet(clean_dir)

In [None]:
!du -sh {clean_dir}

In [None]:
clean_df.describe().show()