# Objective of the project 🚀

My objective is to create a machine learning model for object classification and then, try and compare it with a convolutional neural network (CNN) for images. For that, I will use Spark MLlib to train and evaluate the model. Secondly, create the CNN and compare both 🪐




# First Steps

## Download the data

The source of the data is: https://skyserver.sdss.org/CasJobs/
It is necessary to register and login a user to download the data you want. Then, you have to make a query specifying:
- Amount of rows
- Columns
- Where to keep the csv
- The database

As I want to get as much as possible data, I will not a maximum of rows.

I also add a "where" so I can get only data from planets, galaxies and stars:
- type = 3: Galaxies
- type = 6: Stars

and I downloaded the dataset to start working with it.

<img src="/home/haizeagonzalez/myproject/bigDataAstronomy/notebookImages/img1.png">

## Understanding the data

The columns we have are:
- objID: Unique identifier of the object → TYPE bigInt
- ra: Right ascension → TYPE float
- dec: Declination → TYPE float
- petroRad: Petrosian radius, used to know the size of galaxies in astronomical pictures. It is the amount of light that a galaxy emits in a sepecific radius. Very used because it is independent of the distance and brightness. We use different photometric filters:
    - petroRad_u: Near-ultraviolet
    - petroRad_g: Blue-Green
    - petroRad_r: Red
    - petroRad_i: Near-infrared
    - petroRad_z: Deeper infrared
 → TYPE: Real

- modelMag: Brightness measure adjusted to a galaxy model. Usual for galaxies. Also for all filters (u, g, r, i and z) → TYPE Real
- psfMag: Brightness measure based on the point source light profile. Usual for stars. Also for all filters (u, g, r, i and z) → TYPE Real
- u_g: (modelMag_u - modelMag_g)
- g_r: (modelMag_g - modelMag_r)
- r_i: (modelMag_r - modelMag_i)
- i_z: (modelMag_i - modelMag_z)
- fracDeV: The amount of brightness that the object has in the De Vaucouleurs profile. Also for all filters (u, g, r, i and z) → TYPE Real
- flags: Bit comination that explains different characteristics of the object. If we convert it to binary and check SDSS documentarion, we get a meaning for each bit → TYPE bigInt
- clean: Indicator that tell us if the object was cleaned → TYPE int



### What for?

PetroRad:
- Stars: Small and constant in all filters.
- Galaxies: Bigger and variates depending on the wavelengths.

ModelMag and psfMag:
- In the red filter:
    - Stars: modelMag_r ≈ psfMag_r
    - Galaxies: modelMag_r > psfMag_r
- In other filers:
    - Galaxies are usuarlly more red  (modelMag_g - modelMag_r is big).
    - Stars has different colors depending on their type.

fracDeV:
- Stars: fracDeV ≈ 0.
- Galaxies: fracDeV ≈ 1 (eliptic) or fracDeV < 1 (espiral).



# Spark

## Spark configuration

First, we need to create a spark sesion in case there is no one or get if there exists: "getOrCreate". I also decided to create a log in case there is any error during the process.

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("bigDataAstronomyProject") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "1") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

your 131072x1 screen size is bogus. expect trouble
25/03/16 17:45:09 WARN Utils: Your hostname, SS22-3006967600 resolves to a loopback address: 127.0.1.1; using 172.20.232.160 instead (on interface eth0)
25/03/16 17:45:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/16 17:45:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Then, we need to read de csv data.

In [4]:
path = "/home/haizeagonzalez/bigDataProject/primaryObjs.csv"
path2 = "/home/haizeagonzalez/myproject/primaryObjs_reduced.csv"

df = spark.read.csv(path, header=True)

Now, we are going to check if the data is correctly loaded.

In [5]:
df.show()

+-------------------+----------------+------------------+----+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+--------+--------+---------+---------+-------------+----------+---------+---------+---------+---------+----------+---------------+-----+
|              objID|              ra|               dec|type|petroRad_u|petroRad_g|petroRad_r|petroRad_i|petroRad_z|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|psfMag_u|psfMag_g|psfMag_r|psfMag_i|psfMag_z|      u_g|      g_r|          r_i|       i_z|fracDeV_u|fracDeV_g|fracDeV_r|fracDeV_i| fracDeV_z|          flags|clean|
+-------------------+----------------+------------------+----+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+--------+--------+---------+---------+-------------+----------+---------+---------+---------+---------+----------+---------------

The schema and the chacacteristics of the data.

In [6]:
df.printSchema()

root
 |-- objID: string (nullable = true)
 |-- ra: string (nullable = true)
 |-- dec: string (nullable = true)
 |-- type: string (nullable = true)
 |-- petroRad_u: string (nullable = true)
 |-- petroRad_g: string (nullable = true)
 |-- petroRad_r: string (nullable = true)
 |-- petroRad_i: string (nullable = true)
 |-- petroRad_z: string (nullable = true)
 |-- modelMag_u: string (nullable = true)
 |-- modelMag_g: string (nullable = true)
 |-- modelMag_r: string (nullable = true)
 |-- modelMag_i: string (nullable = true)
 |-- modelMag_z: string (nullable = true)
 |-- psfMag_u: string (nullable = true)
 |-- psfMag_g: string (nullable = true)
 |-- psfMag_r: string (nullable = true)
 |-- psfMag_i: string (nullable = true)
 |-- psfMag_z: string (nullable = true)
 |-- u_g: string (nullable = true)
 |-- g_r: string (nullable = true)
 |-- r_i: string (nullable = true)
 |-- i_z: string (nullable = true)
 |-- fracDeV_u: string (nullable = true)
 |-- fracDeV_g: string (nullable = true)
 |-- fracDe

As all columns are string, we need to convert them into their type. For that:

In [7]:
from pyspark.sql.functions import col

df = df.withColumn("objID", col("objID").cast("long")) \
       .withColumn("ra", col("ra").cast("float")) \
       .withColumn("dec", col("dec").cast("float")) \
       .withColumn("petroRad_u", col("petroRad_u").cast("float")) \
       .withColumn("petroRad_g", col("petroRad_g").cast("float")) \
       .withColumn("petroRad_r", col("petroRad_r").cast("float")) \
       .withColumn("petroRad_i", col("petroRad_i").cast("float")) \
       .withColumn("petroRad_z", col("petroRad_z").cast("float")) \
       .withColumn("modelMag_u", col("modelMag_u").cast("float")) \
       .withColumn("modelMag_g", col("modelMag_g").cast("float")) \
       .withColumn("modelMag_r", col("modelMag_r").cast("float")) \
       .withColumn("modelMag_i", col("modelMag_i").cast("float")) \
       .withColumn("modelMag_z", col("modelMag_z").cast("float")) \
       .withColumn("psfMag_u", col("psfMag_u").cast("float")) \
       .withColumn("psfMag_g", col("psfMag_g").cast("float")) \
       .withColumn("psfMag_r", col("psfMag_r").cast("float")) \
       .withColumn("psfMag_i", col("psfMag_i").cast("float")) \
       .withColumn("psfMag_z", col("psfMag_z").cast("float")) \
       .withColumn("u_g", col("u_g").cast("float")) \
       .withColumn("g_r", col("g_r").cast("float")) \
       .withColumn("r_i", col("r_i").cast("float")) \
       .withColumn("i_z", col("i_z").cast("float")) \
       .withColumn("fracDeV_u", col("fracDeV_u").cast("float")) \
       .withColumn("fracDeV_g", col("fracDeV_g").cast("float")) \
       .withColumn("fracDeV_r", col("fracDeV_r").cast("float")) \
       .withColumn("fracDeV_i", col("fracDeV_i").cast("float")) \
       .withColumn("fracDeV_z", col("fracDeV_z").cast("float")) \
       .withColumn("flags", col("flags").cast("long")) \
       .withColumn("clean", col("clean").cast("int"))

df.printSchema()

root
 |-- objID: long (nullable = true)
 |-- ra: float (nullable = true)
 |-- dec: float (nullable = true)
 |-- type: string (nullable = true)
 |-- petroRad_u: float (nullable = true)
 |-- petroRad_g: float (nullable = true)
 |-- petroRad_r: float (nullable = true)
 |-- petroRad_i: float (nullable = true)
 |-- petroRad_z: float (nullable = true)
 |-- modelMag_u: float (nullable = true)
 |-- modelMag_g: float (nullable = true)
 |-- modelMag_r: float (nullable = true)
 |-- modelMag_i: float (nullable = true)
 |-- modelMag_z: float (nullable = true)
 |-- psfMag_u: float (nullable = true)
 |-- psfMag_g: float (nullable = true)
 |-- psfMag_r: float (nullable = true)
 |-- psfMag_i: float (nullable = true)
 |-- psfMag_z: float (nullable = true)
 |-- u_g: float (nullable = true)
 |-- g_r: float (nullable = true)
 |-- r_i: float (nullable = true)
 |-- i_z: float (nullable = true)
 |-- fracDeV_u: float (nullable = true)
 |-- fracDeV_g: float (nullable = true)
 |-- fracDeV_r: float (nullable = tr

Now that we have all the structure, we are going to explore and clean the data.

## Data cleaning

In principle, the data is cleaned because we get it from CasJobs and we apply clear filter to get good data. However, we are going to check whether there is any null value and the amount of galaxies and stars.  

In [8]:
from pyspark.sql.functions import col, when, count

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+-----+---+---+----+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+--------+--------+---+---+---+---+---------+---------+---------+---------+---------+-----+-----+
|objID| ra|dec|type|petroRad_u|petroRad_g|petroRad_r|petroRad_i|petroRad_z|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|psfMag_u|psfMag_g|psfMag_r|psfMag_i|psfMag_z|u_g|g_r|r_i|i_z|fracDeV_u|fracDeV_g|fracDeV_r|fracDeV_i|fracDeV_z|flags|clean|
+-----+---+---+----+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+--------+--------+---+---+---+---+---------+---------+---------+---------+---------+-----+-----+
|    0|  0|  0|   0|         0|         0|         0|         0|         0|         0|         0|         0|         0|         0|       0|       0|       0|       0|       0|  0|  0|  0|  0|        0|        0|        0|        0|       

                                                                                

In [9]:
df.groupBy("type").count().show()

[Stage 5:>                                                          (0 + 8) / 8]

+----+-------+
|type|  count|
+----+-------+
|   3| 986539|
|   6|2513461|
+----+-------+



                                                                                

As we can see, there is no null values and the amount of galaxies are less than the amount of stars, which make sense. However, this can affect the model so we are going to balance the data.

First, as it is a binary classification, we will update stars to 0 and galaxies to 1.

In [10]:
df = df.withColumn("type", when(col("type") == 6,1).otherwise(0))

In [11]:
df_stars = df.filter(df["type"] == 0)
df_galaxies = df.filter(df["type"] == 1)

#Print to know the conversion is correctly done
df_stars.show(5)
df_galaxies.show(5)

+-------------------+---------+-----------+----+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+--------+--------+---------+---------+---------+----------+---------+---------+---------+---------+----------+---------------+-----+
|              objID|       ra|        dec|type|petroRad_u|petroRad_g|petroRad_r|petroRad_i|petroRad_z|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|psfMag_u|psfMag_g|psfMag_r|psfMag_i|psfMag_z|      u_g|      g_r|      r_i|       i_z|fracDeV_u|fracDeV_g|fracDeV_r|fracDeV_i| fracDeV_z|          flags|clean|
+-------------------+---------+-----------+----+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+--------+--------+---------+---------+---------+----------+---------+---------+---------+---------+----------+---------------+-----+
|1237648704596673591|228.13321|  0.1894192|   

### Balance the data

As we have much more objects of one class, we need to balance the data to have a good model. Explicar más el por qué.

In [12]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

#Get the amount of stars and galaxies
count_stars = df.filter(col("type") == 0).count()
count_galaxies = df.filter(col("type") == 1).count()

#Select the minimum number of both clases
min_count = min(count_stars, count_galaxies)

# Submuestreo: Tomar solo 'min_count' elementos de cada clase
df_stars = df.filter(col("type") == 0).sample(fraction=min_count / count_stars, seed=1)
df_galaxies = df.filter(col("type") == 1).sample(fraction=min_count / count_galaxies, seed=1)

#Union the data
df_balanced = df_stars.union(df_galaxies)

#Check if everything is ok
df_balanced.groupBy("type").count().show()
df_balanced.show(5)


                                                                                

+----+------+
|type| count|
+----+------+
|   0|986539|
|   1|985461|
+----+------+

+-------------------+---------+-----------+----+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+--------+--------+---------+---------+---------+----------+---------+---------+---------+---------+----------+---------------+-----+
|              objID|       ra|        dec|type|petroRad_u|petroRad_g|petroRad_r|petroRad_i|petroRad_z|modelMag_u|modelMag_g|modelMag_r|modelMag_i|modelMag_z|psfMag_u|psfMag_g|psfMag_r|psfMag_i|psfMag_z|      u_g|      g_r|      r_i|       i_z|fracDeV_u|fracDeV_g|fracDeV_r|fracDeV_i| fracDeV_z|          flags|clean|
+-------------------+---------+-----------+----+----------+----------+----------+----------+----------+----------+----------+----------+----------+----------+--------+--------+--------+--------+--------+---------+---------+---------+----------+---------+---------+---------+-----

We don't get exactly the same amount of data beacuse PySpark approximates the data.

As for the supervised machine model we won't use objID, ra, dec, flags and clean columns, we are going to remove them. 

In [13]:
df_ml_model = df_balanced.select("type", "petroRad_u", "petroRad_g", "petroRad_r", "petroRad_i", "petroRad_z",
                        "modelMag_u", "modelMag_g", "modelMag_r", "modelMag_i", "modelMag_z",
                        "psfMag_u", "psfMag_g", "psfMag_r", "psfMag_i", "psfMag_z",
                        "u_g", "g_r", "r_i", "i_z",
                        "fracDeV_u", "fracDeV_g", "fracDeV_r", "fracDeV_i", "fracDeV_z")

df_ml_model.printSchema()

root
 |-- type: integer (nullable = false)
 |-- petroRad_u: float (nullable = true)
 |-- petroRad_g: float (nullable = true)
 |-- petroRad_r: float (nullable = true)
 |-- petroRad_i: float (nullable = true)
 |-- petroRad_z: float (nullable = true)
 |-- modelMag_u: float (nullable = true)
 |-- modelMag_g: float (nullable = true)
 |-- modelMag_r: float (nullable = true)
 |-- modelMag_i: float (nullable = true)
 |-- modelMag_z: float (nullable = true)
 |-- psfMag_u: float (nullable = true)
 |-- psfMag_g: float (nullable = true)
 |-- psfMag_r: float (nullable = true)
 |-- psfMag_i: float (nullable = true)
 |-- psfMag_z: float (nullable = true)
 |-- u_g: float (nullable = true)
 |-- g_r: float (nullable = true)
 |-- r_i: float (nullable = true)
 |-- i_z: float (nullable = true)
 |-- fracDeV_u: float (nullable = true)
 |-- fracDeV_g: float (nullable = true)
 |-- fracDeV_r: float (nullable = true)
 |-- fracDeV_i: float (nullable = true)
 |-- fracDeV_z: float (nullable = true)



## Spark ML

As our objective is to create a machine learning model, we need to convert the data in a correct format: Vectors.

In [14]:
from pyspark.ml.feature import VectorAssembler

features = df.columns[1:] #We don't get the type beacuse is the result we want to get.
assembler = VectorAssembler(inputCols = features, outputCol = "features")

In [15]:
df = assembler.transform(df).select("features", "Type")
df.show(5)

+--------------------+----+
|            features|Type|
+--------------------+----+
|[228.133209228515...|   0|
|[228.132659912109...|   0|
|[228.133590698242...|   1|
|[228.134521484375...|   0|
|[228.134750366210...|   0|
+--------------------+----+
only showing top 5 rows



Now, we are going to divide the dataset into train and test, so we can get the accuracy of the model.

In [16]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed = 1)

We are going to try different models to check which is the best for our case.

In [20]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, GBTClassifier, LinearSVC

models = {
    "Logistic Regression": LogisticRegression(labelCol = "Type", featuresCol = "features"),
    #"Decision Tree": DecisionTreeClassifier(labelCol="Type", featuresCol="features"),
    #"Random Forest": RandomForestClassifier(labelCol="Type", featuresCol="features", numTrees=100),
    #"Gradient Boosted Trees": GBTClassifier(labelCol="Type", featuresCol="features"),
    #"Linear SVM": LinearSVC(labelCol="Type", featuresCol="features")
}

lr = LogisticRegression(labelCol="Type", featuresCol="features")


In [18]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="Type", predictionCol="prediction", metricName="accuracy")

In [21]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.01, 0.001])  # Regularización
             .addGrid(lr.maxIter, [10, 20, 50])         # Número de iteraciones
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  # ElasticNet (L1 o L2)
             .build())

# Configurar el CrossValidator
crossval = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=5  # Número de pliegues (folds) para la validación cruzada
)

# Ajustar el modelo con CrossValidation
cv_model = crossval.fit(train_data)

# Hacer predicciones sobre el conjunto de test
predictions = cv_model.transform(test_data)

# Evaluar el modelo
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy después de Cross Validation: {accuracy}")

# Mostrar la matriz de confusión
from pyspark.mllib.evaluation import MulticlassMetrics
predictions_rdd = predictions.select("prediction", "Type").rdd
metrics = MulticlassMetrics(predictions_rdd)

print("Confusion Matrix:")
print(metrics.confusionMatrix())



Accuracy después de Cross Validation: 1.0


                                                                                

Confusion Matrix:


25/03/16 18:13:04 ERROR Executor: Exception in task 7.0 in stage 5922.0 (TID 30413)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^

Py4JJavaError: An error occurred while calling o34670.confusionMatrix.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 5922.0 failed 1 times, most recent failure: Lost task 7.0 in stage 5922.0 (TID 30413) (172.20.232.160 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/session.py", line 1459, in prepare
    verify_func(obj)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2201, in verify
    verify_value(obj)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2174, in verify_struct
    verifier(v)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2201, in verify
    verify_value(obj)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2195, in verify_default
    verify_acceptable_types(obj)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2020, in verify_acceptable_types
    raise PySparkTypeError(
pyspark.errors.exceptions.base.PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `DoubleType()` can not accept object `0` in type `int`.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$collectAsMap$1(PairRDDFunctions.scala:738)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
	at org.apache.spark.rdd.PairRDDFunctions.collectAsMap(PairRDDFunctions.scala:737)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions$lzycompute(MulticlassMetrics.scala:61)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusions(MulticlassMetrics.scala:52)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass$lzycompute(MulticlassMetrics.scala:78)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.tpByClass(MulticlassMetrics.scala:76)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.labels$lzycompute(MulticlassMetrics.scala:241)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.labels(MulticlassMetrics.scala:241)
	at org.apache.spark.mllib.evaluation.MulticlassMetrics.confusionMatrix(MulticlassMetrics.scala:113)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/session.py", line 1459, in prepare
    verify_func(obj)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2201, in verify
    verify_value(obj)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2174, in verify_struct
    verifier(v)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2201, in verify
    verify_value(obj)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2195, in verify_default
    verify_acceptable_types(obj)
  File "/home/haizeagonzalez/bigDataProject/bigdataenv/lib/python3.12/site-packages/pyspark/sql/types.py", line 2020, in verify_acceptable_types
    raise PySparkTypeError(
pyspark.errors.exceptions.base.PySparkTypeError: [CANNOT_ACCEPT_OBJECT_IN_TYPE] `DoubleType()` can not accept object `0` in type `int`.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
for name, model in models.items():
    model_trained = model.fit(train_data)
    predictions = model_trained.transform(test_data)
    #auc = evaluator.evaluate(predictions)
    accuracy = evaluator.evaluate(predictions)
    #print(f"{name}: AUC = {auc:.4f}")
    print(f"{name}: Accuracy = {accuracy:.4f}")


Logistic Regression: Accuracy = 1.0000
Decision Tree: Accuracy = 1.0000


In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType

for name, model in models.items():
    # Entrenamiento del modelo
    model_trained = model.fit(train_data)

    # Predicciones
    predictions = model_trained.transform(test_data)
    
    # Evaluación
    accuracy = evaluator.evaluate(predictions)
    print(f"{name}: Accuracy = {accuracy:.4f}")
    
    # Seleccionar las columnas de predicción y etiquetas
    predictionAndLabels = predictions.select(
        F.col("prediction").cast(FloatType()), 
        F.col("type").cast(FloatType())  # Asegurarse de que 'type' es float
    )

    # Convertir a RDD para usar MulticlassMetrics
    metrics = MulticlassMetrics(predictionAndLabels.rdd.map(tuple))

    # Obtener la matriz de confusión
    conf_matrix = metrics.confusionMatrix().toArray()

    # Mostrar la matriz de confusión
    print(f"{name}: Confusion Matrix")
    print(conf_matrix)


Logistic Regression: Accuracy = 1.0000




Logistic Regression: Confusion Matrix
[[10563.     0.]
 [    0.  9195.]]
Decision Tree: Accuracy = 1.0000




Decision Tree: Confusion Matrix
[[10563.     0.]
 [    0.  9195.]]


cross validation

matriz confusion -> Si esta bien (cross validation si esta mal overfitting)

El accuracy el 100 pero puede ser que haya sido overfitting por tanto, puede ser que al haber más estrellas que galaxias, prediga el que más haya. Es por ello que vamos a bajar la cantidad de estrellas para que el modelo esté balanceado.

# Images

As I need also de images, I have downloaded from https://skyserver.sdss.org/dr18, specifying with a request:
- the location of the object (with right ascension (RA) and declination (dec))
- the zoom of the picture (scale)
- the dimmensions of the photo (with and height)