In [1]:
import os
import sys
os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
# Path to the CSV file.
file_path = "file:///home/talentum/test-jupyter/test/P2/M1/SM2/Project/nearest-earth-objects(1910-2024).csv"

# Read the CSV file into a DataFrame.
neo = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the first few Rows of the DataFrame.
neo.show(5)

+-------+------------------+------------------+----------------------+----------------------+-------------+-----------------+-------------+------------+
| neo_id|              name|absolute_magnitude|estimated_diameter_min|estimated_diameter_max|orbiting_body|relative_velocity|miss_distance|is_hazardous|
+-------+------------------+------------------+----------------------+----------------------+-------------+-----------------+-------------+------------+
|2162117|162117 (1998 SD15)|             19.14|           0.394961694|           0.883161196|        Earth|      71745.40105|5.814362332E7|       false|
|2349507|  349507 (2008 QY)|              18.5|           0.530340723|           1.185877909|        Earth|      109949.7571|5.580104782E7|        true|
|2455415|  455415 (2003 GA)|             21.45|           0.136318556|           0.304817558|        Earth|       24865.5068|6.720688772E7|       false|
|3132126|         (2002 PB)|             20.63|           0.198863453|            

In [4]:
# Print all the Columns of the NEO DataFrame.
neo.printSchema()

root
 |-- neo_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- absolute_magnitude: double (nullable = true)
 |-- estimated_diameter_min: double (nullable = true)
 |-- estimated_diameter_max: double (nullable = true)
 |-- orbiting_body: string (nullable = true)
 |-- relative_velocity: double (nullable = true)
 |-- miss_distance: double (nullable = true)
 |-- is_hazardous: boolean (nullable = true)



In [5]:
# Print the First 20 Rows of the NEO DataFrame.
neo.show()

+--------+-------------------+------------------+----------------------+----------------------+-------------+-----------------+-------------+------------+
|  neo_id|               name|absolute_magnitude|estimated_diameter_min|estimated_diameter_max|orbiting_body|relative_velocity|miss_distance|is_hazardous|
+--------+-------------------+------------------+----------------------+----------------------+-------------+-----------------+-------------+------------+
| 2162117| 162117 (1998 SD15)|             19.14|           0.394961694|           0.883161196|        Earth|      71745.40105|5.814362332E7|       false|
| 2349507|   349507 (2008 QY)|              18.5|           0.530340723|           1.185877909|        Earth|      109949.7571|5.580104782E7|        true|
| 2455415|   455415 (2003 GA)|             21.45|           0.136318556|           0.304817558|        Earth|       24865.5068|6.720688772E7|       false|
| 3132126|          (2002 PB)|             20.63|           0.19886345

In [6]:
# Summary of NEO DataFrame.
neo.describe().show()

+-------+--------------------+--------------------+------------------+----------------------+----------------------+-------------+------------------+--------------------+
|summary|              neo_id|                name|absolute_magnitude|estimated_diameter_min|estimated_diameter_max|orbiting_body| relative_velocity|       miss_distance|
+-------+--------------------+--------------------+------------------+----------------------+----------------------+-------------+------------------+--------------------+
|  count|              338199|              338199|            338171|                338171|                338171|       338199|            338199|              338199|
|   mean|1.7599388682018574E7|                null| 22.93252495926504|    0.1578120466895513|   0.35287846403961476|         null| 51060.66290760665|4.1535350932192504E7|
| stddev|2.2872246428931765E7|                null|2.9112163902923123|    0.3138851378754725|    0.7018685054245288|         null|26399.238434932

In [7]:
# Counting the Number of Rows.
neo.count()

338199

In [8]:
neo.select("neo_id").distinct().count()

33514

In [9]:
neo.select("name").distinct().count()

33514

# Finding the NULL Values in the Features.

In [10]:
# Importing Functions.
from pyspark.sql.functions import col, isnan, when, count

In [11]:
null_counts_magnitude = neo.select([count(when(col("absolute_magnitude").isNull() | isnan("absolute_magnitude"), absolute_magnitude)).alias("absolute_magnitude") for absolute_magnitude in neo.columns])

null_counts_magnitude.show()

+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|                28|                28|                28|                28|                28|                28|                28|                28|                28|
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+



In [12]:
null_counts_dia_max = neo.select([count(when(col("estimated_diameter_max").isNull() | isnan("estimated_diameter_max"), estimated_diameter_max)).alias("estimated_diameter_max") for estimated_diameter_max in neo.columns])

null_counts_dia_max.show()

+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|                    28|                    28|                    28|                    28|                    28|                    28|                    28|                    28|                    28|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+--

In [13]:
null_counts_dia_min = neo.select([count(when(col("estimated_diameter_min").isNull() | isnan("estimated_diameter_min"), estimated_diameter_min)).alias("estimated_diameter_min") for estimated_diameter_min in neo.columns])

null_counts_dia_min.show()

+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|                    28|                    28|                    28|                    28|                    28|                    28|                    28|                    28|                    28|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+--

# Calculating MEAN Values of the Features.

In [14]:
from pyspark.sql.types import DoubleType, StructField, StructType

In [15]:
from pyspark.sql.functions import mean
from pyspark.sql.types import DoubleType

In [16]:
# Ensure the column "absolute_magnitude" exists and is of type DoubleType

if "absolute_magnitude" in neo.columns and isinstance(neo.schema["absolute_magnitude"].dataType, DoubleType):
    absolute_magnitude_mean = neo.select(mean("absolute_magnitude").alias("absolute_magnitude_mean"))
    absolute_magnitude_mean.show()
else:
    print("Column 'absolute_magnitude' does not exist or is not of type DoubleType.")


+-----------------------+
|absolute_magnitude_mean|
+-----------------------+
|      22.93252495926504|
+-----------------------+



In [17]:
# Ensure the column "absolute_magnitude" exists and is of type DoubleType

if "estimated_diameter_max" in neo.columns and isinstance(neo.schema["estimated_diameter_max"].dataType, DoubleType):
    estimated_diameter_max_mean = neo.select(mean("estimated_diameter_max").alias("estimated_diameter_max_mean"))
    estimated_diameter_max_mean.show()
else:
    print("Column 'estimated_diameter_max' does not exist or is not of type DoubleType.")


+---------------------------+
|estimated_diameter_max_mean|
+---------------------------+
|        0.35287846403961476|
+---------------------------+



In [18]:
# Ensure the column "absolute_magnitude" exists and is of type DoubleType

if "estimated_diameter_min" in neo.columns and isinstance(neo.schema["estimated_diameter_min"].dataType, DoubleType):
    estimated_diameter_min_mean = neo.select(mean("estimated_diameter_min").alias("estimated_diameter_min_mean"))
    estimated_diameter_min_mean.show()
else:
    print("Column 'estimated_diameter_min' does not exist or is not of type DoubleType.")

+---------------------------+
|estimated_diameter_min_mean|
+---------------------------+
|         0.1578120466895513|
+---------------------------+



# Filling the NULL with MEAN Values in the Features.

In [19]:
absolute_magnitude_mean.show()

+-----------------------+
|absolute_magnitude_mean|
+-----------------------+
|      22.93252495926504|
+-----------------------+



In [20]:
# Fill NA values in the `absolute_magnitude` column with the mean value
neo = neo.fillna({"absolute_magnitude": 22.93252495926504})

neo.select("absolute_magnitude").show()

+------------------+
|absolute_magnitude|
+------------------+
|             19.14|
|              18.5|
|             21.45|
|             20.63|
|              22.7|
|              25.0|
|              21.5|
|             19.75|
|              21.7|
|             23.45|
|             26.02|
|              21.5|
|             23.76|
|             16.71|
|              24.3|
|             24.32|
|              24.9|
|             21.72|
|             17.36|
|             19.29|
+------------------+
only showing top 20 rows



In [21]:
null_counts = neo.select([count(when(col("absolute_magnitude").isNull() | isnan("absolute_magnitude"), absolute_magnitude)).alias("absolute_magnitude") for absolute_magnitude in neo.columns])

null_counts.show()

+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|absolute_magnitude|
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|                 0|                 0|                 0|                 0|                 0|                 0|                 0|                 0|                 0|
+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+



In [22]:
estimated_diameter_max_mean.show()

+---------------------------+
|estimated_diameter_max_mean|
+---------------------------+
|        0.35287846403961476|
+---------------------------+



In [23]:
# Fill NA values in the `absolute_magnitude` column with the mean value
neo = neo.fillna({"estimated_diameter_max": 0.35287846403961476})
    
neo.select("estimated_diameter_max").show()

+----------------------+
|estimated_diameter_max|
+----------------------+
|           0.883161196|
|           1.185877909|
|           0.304817558|
|             0.4446722|
|           0.171411509|
|           0.059434687|
|           0.297879063|
|           0.666868155|
|           0.271668934|
|           0.121350055|
|           0.037156943|
|           0.297879063|
|           0.105205872|
|           2.704207287|
|           0.082042707|
|           0.081290534|
|           0.062235757|
|           0.269178258|
|           2.004656557|
|           0.824213984|
+----------------------+
only showing top 20 rows



In [24]:
null_counts = neo.select([count(when(col("estimated_diameter_max").isNull() | isnan("estimated_diameter_max"), estimated_diameter_max)).alias("estimated_diameter_max") for estimated_diameter_max in neo.columns])

null_counts.show()

+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|estimated_diameter_max|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|                     0|                     0|                     0|                     0|                     0|                     0|                     0|                     0|                     0|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+--

In [25]:
estimated_diameter_min_mean.show()

+---------------------------+
|estimated_diameter_min_mean|
+---------------------------+
|         0.1578120466895513|
+---------------------------+



In [26]:
# Fill NA values in the `absolute_magnitude` column with the mean value
neo = neo.fillna({"estimated_diameter_min": 0.1578120466895513})

neo.select("estimated_diameter_min").show()

+----------------------+
|estimated_diameter_min|
+----------------------+
|           0.394961694|
|           0.530340723|
|           0.136318556|
|           0.198863453|
|           0.076657557|
|               0.02658|
|           0.133215567|
|           0.298232505|
|           0.121494041|
|           0.054269395|
|            0.01661709|
|           0.133215567|
|           0.047049496|
|           1.209358264|
|           0.036690614|
|           0.036354232|
|           0.027832677|
|           0.120380177|
|           0.896509666|
|           0.368599699|
+----------------------+
only showing top 20 rows



In [27]:
null_counts = neo.select([count(when(col("estimated_diameter_min").isNull() | isnan("estimated_diameter_min"), estimated_diameter_min)).alias("estimated_diameter_min") for estimated_diameter_min in neo.columns])

null_counts.show()

+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|estimated_diameter_min|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+
|                     0|                     0|                     0|                     0|                     0|                     0|                     0|                     0|                     0|
+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+--