In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=ac59a718de7e90a78fba6b908b45a1e6f8887049ae89427aaa643551ecb31224
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("test").getOrCreate()
spark

In [None]:
df = spark.read.csv("/content/drive/MyDrive/data/Asteroid_Data_All.csv")
df.show()

+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+--------+--------------+----------+--------------+--------------+----+--------+-------------+------+--------+----+----+----+----+------+------+
|              _c0|              _c1|               _c2|              _c3|              _c4|              _c5|              _c6|              _c7|             _c8|     _c9|          _c10|      _c11|          _c12|          _c13|_c14|    _c15|         _c16|  _c17|    _c18|_c19|_c20|_c21|_c22|  _c23|  _c24|
+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+--------+--------------+----------+--------------+--------------+----+--------+-------------+------+--------+----+----+----+----+------+------+
|        full_name|                a|                 e|                i|     

The columns in the dataset are:

* Name: The name of the asteroid.
* Neo Reference ID: A unique identifier for the asteroid assigned by NASA.
* Absolute Magnitude: The brightness of the asteroid, as seen from Earth.
* Diameter (km): The estimated diameter of the asteroid in kilometers.
* Est Dia in M(min): The estimated minimum diameter of the asteroid in meters.
* Est Dia in M(max): The estimated maximum diameter of the asteroid in meters.
* H: The asteroid's taxonomic type, which indicates its composition and surface features.
* G: The asteroid's geometric albedo, which is a measure of its reflectivity.
* T: The asteroid's synodic rotation period, which is the time it takes to rotate once on its axis.
* U: The asteroid's measured or inferred impact probability on Earth.
* Epoch: The date and time at which the asteroid's orbital elements were calculated.
* Per: The asteroid's perihelion distance, which is the closest distance it comes to the Sun.
* Aph: The asteroid's aphelion distance, which is the farthest distance it comes to the Sun.
* Q: The asteroid's semi-major axis, which is half the distance between its perihelion and aphelion.
* E: The asteroid's eccentricity, which is a measure of how elliptical its orbit is.
* I: The asteroid's inclination, which is the angle between its orbital plane and the ecliptic plane.
* Node: The point in the asteroid's orbit where it crosses the ecliptic plane from south to north.
* Mean Motion: The asteroid's average orbital speed.
* Opposition Date: The date and time when the asteroid will be closest to Earth in its orbit.
* Magnitude: The asteroid's apparent magnitude, which is a measure of its brightness as seen from Earth.
* Phase Angle: The angle between the Sun, the asteroid, and the observer.
* Spectral Class: The asteroid's spectral type, which is a measure of its surface composition.
* Rot Period (hrs): The asteroid's rotation period in hours.
* Spectral slope: The asteroid's spectral slope, which is a measure of the variation in its spectral reflectance with wavelength.
* Radar Albedo: The asteroid's radar albedo, which is a measure of its reflectivity to radar waves.
* Radar H: The asteroid's radar brightness, which is a measure of its radar cross-section.
* Hazardous: A flag indicating whether the asteroid is considered to be a potential hazard to Earth.
* diameter: The diameter of the asteroid in kilometers.
* extent: The extent of the asteroid in kilometers. This is the maximum length of the asteroid in any direction.
* albedo: The albedo of the asteroid, which is a measure of its reflectivity.
rot per: The rotation period of the asteroid in hours.
* GM: The gravitational parameter of the asteroid. This is a measure of the asteroid's mass.
* BV: The B-V color index of the asteroid. This is a measure of the asteroid's color.
* UB: The U-B color index of the asteroid. This is another measure of the asteroid's color.
* IR: The I-R color index of the asteroid. This is a third measure of the asteroid's color.
* spec T: The taxonomic type of the asteroid. This is a measure of the asteroid's composition and surface features.
* spec B: The spectral slope of the asteroid. This is a measure of the variation in its spectral reflectance with wavelength.

## **Data Cleaning and Preprocessing**

In [None]:
# Define the new column names
new_column_names = [
    "full_name", "a", "e", "i", "om", "w", "q", "ad", "per_y", "data_arc",
    "condition_code", "n_obs_used", "n_del_obs_used", "n_dop_obs_used", "H",
    "diameter", "extent", "albedo", "rot_per", "GM", "BV", "UB", "IR",
    "spec_B", "spec_T"
]

# Rename the columns using withColumnRenamed
for old_name, new_name in zip(df.columns, new_column_names):
    df = df.withColumnRenamed(old_name, new_name)

# Show the renamed DataFrame
df.show()

+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+--------+--------------+----------+--------------+--------------+----+--------+-------------+------+--------+----+----+----+----+------+------+
|        full_name|                a|                 e|                i|               om|                w|                q|               ad|           per_y|data_arc|condition_code|n_obs_used|n_del_obs_used|n_dop_obs_used|   H|diameter|       extent|albedo| rot_per|  GM|  BV|  UB|  IR|spec_B|spec_T|
+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+--------+--------------+----------+--------------+--------------+----+--------+-------------+------+--------+----+----+----+----+------+------+
|        full_name|                a|                 e|                i|     

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Add a row index to the DataFrame
windowSpec = Window.orderBy(F.monotonically_increasing_id())
df = df.withColumn("row_index", F.row_number().over(windowSpec))

# Filter out the first row
df = df.filter(df.row_index != 1).drop("row_index")

# Show the DataFrame without the first row
df.show()

+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+--------+--------------+----------+--------------+--------------+----+--------+-------------+------+--------+----+----+----+----+------+------+
|        full_name|                a|                 e|                i|               om|                w|                q|               ad|           per_y|data_arc|condition_code|n_obs_used|n_del_obs_used|n_dop_obs_used|   H|diameter|       extent|albedo| rot_per|  GM|  BV|  UB|  IR|spec_B|spec_T|
+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+--------+--------------+----------+--------------+--------------+----+--------+-------------+------+--------+----+----+----+----+------+------+
|          1 Ceres|2.765348506018043|.07913825487621974|10.58682160714853|80.39

In [None]:
# Remove duplicate rows based on "full_name" column
df_cleaned = df.dropDuplicates(subset=['full_name'])
df_cleaned.show()

+-----------------+---------+---------+--------+---------+---------+---------+---------+----------------+--------+--------------+----------+--------------+--------------+----+--------+------+------+-------+----+----+----+----+------+------+
|        full_name|        a|        e|       i|       om|        w|        q|       ad|           per_y|data_arc|condition_code|n_obs_used|n_del_obs_used|n_dop_obs_used|   H|diameter|extent|albedo|rot_per|  GM|  BV|  UB|  IR|spec_B|spec_T|
+-----------------+---------+---------+--------+---------+---------+---------+---------+----------------+--------+--------------+----------+--------------+--------------+----+--------+------+------+-------+----+----+----+----+------+------+
|       (1077 T-2)|2.6440377|0.2097091| 4.51445|207.39193|167.43097|2.0895589|3.1985165|4.29941786830938|       6|          null|         8|          null|          null|17.0|    null|  null|  null|   null|null|null|null|null|  null|  null|
|       (1083 T-1)|2.6319796|0.30162

In [None]:
from pyspark.sql.types import DoubleType

# Convert columns to appropriate data types
columns_to_convert = ['a', 'e', 'i', 'om', 'w', 'q', 'ad', 'per_y', 'H', 'diameter', 'extent', 'albedo', 'rot_per']
for col_name in columns_to_convert:
    df_cleaned = df_cleaned.withColumn(col_name, col(col_name).cast(DoubleType()))

df_cleaned.show()


NameError: ignored

In [None]:
# Count the number of rows in the DataFrame
row_count = df.count()

# Print the row count
print("Number of rows:", row_count)


Number of rows: 620291


# **Working on the Orbital Analysis**

In [None]:
from pyspark.sql.functions import radians, degrees, sqrt, col
from pyspark.sql.types import DoubleType


# Rename columns to match the changed names
new_column_names = [
    "full_name", "a", "e", "i", "om", "w", "q", "ad", "per_y", "data_arc",
    "condition_code", "n_obs_used", "n_del_obs_used", "n_dop_obs_used", "H",
    "diameter", "extent", "albedo", "rot_per", "GM", "BV", "UB", "IR",
    "spec_B", "spec_T"
]
for old_name, new_name in zip(df.columns, new_column_names):
    df = df.withColumnRenamed(old_name, new_name)

# Define necessary constants
G = 6.67430e-11  # Gravitational constant in m^3 kg^-1 s^-2
M_sun = 1.989e30  # Solar mass in kg

# Convert orbital elements to radians
df = df.withColumn("om_rad", radians(df['om']))
df = df.withColumn("w_rad", radians(df['w']))

# Calculate semi-major axis (a) in meters
df = df.withColumn("a_m", df['q'] / (1 - df['e']))

# Calculate orbital period (T) in seconds
df = df.withColumn("orbital_period_s", 2 * 3.141592653589793 * sqrt(df['a_m']**3 / (G * M_sun)))

# Calculate orbital velocity (v) in m/s
df = df.withColumn("orbital_velocity_m_s", 2 * 3.141592653589793 * df['a_m'] / df['orbital_period_s'])

# Calculate orbital inclination (i) in degrees
df = df.withColumn("orbital_inclination_deg", degrees(df['i']))

# Calculate synodic rotation period (T_syn) in hours
df = df.withColumn("synodic_rotation_period_hrs", df['rot_per'] / (1 - df['e']))

# Show the DataFrame with calculated orbital parameters
df.show()


+-----------------+-----------------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+--------+--------------+----------+--------------+--------------+----+--------+-------------+------+--------+----+----+----+----+------+------+------------------+-------------------+------------------+--------------------+--------------------+-----------------------+---------------------------+
|        full_name|                a|                 e|                i|               om|                w|                q|               ad|           per_y|data_arc|condition_code|n_obs_used|n_del_obs_used|n_dop_obs_used|   H|diameter|       extent|albedo| rot_per|  GM|  BV|  UB|  IR|spec_B|spec_T|            om_rad|              w_rad|               a_m|    orbital_period_s|orbital_velocity_m_s|orbital_inclination_deg|synodic_rotation_period_hrs|
+-----------------+-----------------+------------------+-----------------+--------

In [None]:
import matplotlib.pyplot as plt

# Create a scatter plot of semi-major axis (a) vs orbital period (T)
plt.scatter(df.select('a_m').collect(), df.select('T_s').collect(), marker='o', alpha=0.5)
plt.xlabel('Semi-Major Axis (m)')
plt.ylabel('Orbital Period (s)')
plt.title('Semi-Major Axis vs Orbital Period')
plt.grid()
plt.show()



AnalysisException: ignored


The graph you sent me shows the relationship between the semi-major axis and orbital period of asteroids. The semi-major axis is the average distance of an asteroid from the Sun, and the orbital period is the time it takes the asteroid to complete one orbit around the Sun.

The graph shows that there is a positive correlation between the semi-major axis and orbital period. This means that asteroids with larger semi-major axes also have longer orbital periods. This is consistent with Kepler's third law, which states that the square of the orbital period is proportional to the cube of the semi-major axis.

The features of the graph include:

* Semi-major axis: The average distance of an asteroid from the Sun.
Orbital period: The time it takes an asteroid to complete one orbit around the Sun.
* Number of asteroids: The number of asteroids in the dataset.


The interventions that we can do to reduce the risk of asteroid impact include:

* Tracking asteroids: We need to track asteroids more closely so that we can identify potential hazards early on.
* Deflecting asteroids: We can deflect asteroids away from Earth using a variety of methods, such as kinetic impact, gravity tractor, or nuclear explosion.
Destroying asteroids: We can destroy asteroids by using a nuclear explosion or a large laser.

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Define a function to calculate the position of the asteroid at a given time
def calculate_position(a, e, om, w, M, t):
    n = np.sqrt(G * M_sun / a**3)  # Mean motion
    E = M + e * np.sin(M)
    x = a * (np.cos(E) - e)
    y = a * np.sqrt(1 - e**2) * np.sin(E)
    r = np.sqrt(x**2 + y**2)
    v = np.arctan2(y, x)
    x_prime = r * (np.cos(om) * np.cos(v + w) - np.sin(om) * np.sin(v + w) * np.cos(i))
    y_prime = r * (np.sin(om) * np.cos(v + w) + np.cos(om) * np.sin(v + w) * np.cos(i))
    z_prime = r * np.sin(v + w) * np.sin(i)
    return x_prime, y_prime, z_prime

# Select an asteroid by index (replace 'index' with the desired index)
index = 0
asteroid = df.select('a', 'e', 'om_rad', 'w_rad', 'i', 'orbital_period_s').collect()[index]

# Calculate the positions over a time range
time_range = np.linspace(0, asteroid['orbital_period_s'], 1000)
positions = [calculate_position(asteroid['a_m'], asteroid['e'], asteroid['om_rad'], asteroid['w_rad'], G * M_sun, t) for t in time_range]

# Extract x, y, and z coordinates
x_coords = [position[0] for position in positions]
y_coords = [position[1] for position in positions]

# Plot the asteroid's orbit
plt.figure(figsize=(8, 8))
plt.plot(x_coords, y_coords, label='Orbit')
plt.scatter(0, 0, color='red', marker='o', label='Sun')
plt.xlabel('X (m)')
plt.ylabel('Y (m)')
plt.title('Asteroid Orbit')
plt.legend()
plt.grid()
plt.show()


ValueError: ignored