In [2]:
import requests
import pymongo
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
import pandas as pd

# Extract EONET data from NASA API

In [2]:
api_key = 'your_key' #need to be replaced

eonet_url = "https://eonet.gsfc.nasa.gov/api/v2.1/events"
params = {
    "status": "open",
    "limit": 10,
    "api_key": api_key,
}

try:
    response = requests.get(eonet_url, params=params)
    response.raise_for_status()

    data = response.json()
    events = data.get("events", [])

    # Define a function to extract subheaders and their values
    def extract_subheader(event, subheader):
        return event.get(subheader, "N/A")

except Exception as e:
    print("An error occurred:", str(e))

# Connect to Mongo database

In [3]:
uri = "your_uri"

client = MongoClient(uri, server_api=ServerApi('1'))

try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


In [4]:
database_name = client["Nasa_EONET"]
collection_name = database_name["natural_events"]

# Store EONET data in MongoDB

In [5]:
for event in events:
    # Create a document to insert into MongoDB events
    event_document = {
        "Event ID": extract_subheader(event, "id"),
        "Event Title": extract_subheader(event, "title"),
        "Event Description": extract_subheader(event, "description"),
        "Event Link": extract_subheader(event, "link"),
    }

    # Access and store categories 
    categories = event.get("categories", [])
    event_document["Categories"] = [{"Category ID": cat.get("id", "N/A"), "Category Title": cat.get("title", "N/A")} for cat in categories]

    # Access and store sources 
    sources = event.get("sources", [])
    event_document["Sources"] = [{"Source ID": source.get("id", "N/A"), "Source URL": source.get("url", "N/A")} for source in sources]

    # Access and store geometries 
    geometries = event.get("geometries", [])
    event_document["Geometries"] = [{"Magnitude Value": geometry.get("magnitudeValue", "N/A"),
                                     "Magnitude Unit": geometry.get("magnitudeUnit", "N/A"),
                                     "Geometry Date": geometry.get("date", "N/A"),
                                     "Geometry Type": geometry.get("type", "N/A"),
                                     "Geometry Coordinates": geometry.get("coordinates", [])} for geometry in geometries]

    # Insert the document into MongoDB
    collection_name.insert_one(event_document)

In [5]:
print(database_name)

Database(MongoClient(host=['ac-otesk9b-shard-00-01.c4dp6ex.mongodb.net:27017', 'ac-otesk9b-shard-00-00.c4dp6ex.mongodb.net:27017', 'ac-otesk9b-shard-00-02.c4dp6ex.mongodb.net:27017'], document_class=dict, tz_aware=False, connect=True, retrywrites=True, w='majority', authsource='admin', replicaset='atlas-11h46f-shard-0', tls=True, server_api=<pymongo.server_api.ServerApi object at 0x000001A51E9D5120>), 'Nasa_EONET')


# Fetch data from MongoDB using PySpark

In [6]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as f

In [8]:
# For pyspark running in cmd:
# pyspark --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1

In [8]:
spark = SparkSession.builder \
    .appName("MongoDBSparkExample") \
    .config("spark.mongodb.input.uri", uri) \
    .config("spark.mongodb.input.database", database_name) \
    .config("spark.mongodb.input.collection", collection_name) \
    .getOrCreate()

In [9]:
spark

In [184]:
df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("spark.mongodb.input.uri", uri) \
    .option("spark.mongodb.input.database", "Nasa_EONET") \
    .option("spark.mongodb.input.collection", "natural_events").load()

In [185]:
df.show()

+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          Categories|Event Description|  Event ID|          Event Link|         Event Title|          Geometries|             Sources|                 _id|
+--------------------+-----------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   [{12, Volcanoes}]|                 |EONET_6425|https://eonet.gsf...|Kilauea Volcano, ...|[{N/A, N/A, 2023-...|[{SIVolcano, http...|{6505274dde7761e1...|
|[{15, Sea and Lak...|                 |EONET_6423|https://eonet.gsf...|        Iceberg D33A|[{N/A, N/A, 2023-...|[{NATICE, https:/...|{6505274dde7761e1...|
|[{15, Sea and Lak...|                 |EONET_6424|https://eonet.gsf...|        Iceberg D33B|[{N/A, N/A, 2023-...|[{NATICE, https:/...|{6505274dde7761e1...|
|[{10, Severe Stor...|                 |EONET_6421|https:/

In [186]:
df.printSchema()

root
 |-- Categories: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Category ID: integer (nullable = true)
 |    |    |-- Category Title: string (nullable = true)
 |-- Event Description: string (nullable = true)
 |-- Event ID: string (nullable = true)
 |-- Event Link: string (nullable = true)
 |-- Event Title: string (nullable = true)
 |-- Geometries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Magnitude Value: string (nullable = true)
 |    |    |-- Magnitude Unit: string (nullable = true)
 |    |    |-- Geometry Date: string (nullable = true)
 |    |    |-- Geometry Type: string (nullable = true)
 |    |    |-- Geometry Coordinates: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |-- Sources: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Source ID: string (nullable = true)
 |    |    |-- Source URL: string (nullable = true)
 |

# Data manipulation using PySpark

In [191]:
df2 = df.withColumn('Categories', f.explode('Categories'))\
    .withColumn('Category ID', f.col('Categories').getItem('Category ID'))\
    .withColumn('Category Title', f.col('Categories').getItem('Category Title'))

In [192]:
df2 = df2.withColumn('Geometries', f.explode('Geometries'))\
    .withColumn('Magnitude Value', f.col('Geometries').getItem('Magnitude Value'))\
    .withColumn('Magnitude Unit', f.col('Geometries').getItem('Magnitude Unit'))\
    .withColumn('Geometry Date', f.col('Geometries').getItem('Geometry Date'))\
    .withColumn('Geometry Type', f.col('Geometries').getItem('Geometry Type'))\
    .withColumn('Geometry Coordinates', f.col('Geometries').getItem('Geometry Coordinates'))

In [193]:
df2 = df2.withColumn('Sources', f.explode('Sources'))\
    .withColumn('Source ID', f.col('Sources').getItem('Source ID'))\
    .withColumn('Source URL', f.col('Sources').getItem('Source URL'))

In [195]:
df2 = df2.drop('Categories', 'Geometries', 'Sources', 'Event Description')

In [196]:
df2.printSchema()

root
 |-- Event ID: string (nullable = true)
 |-- Event Link: string (nullable = true)
 |-- Event Title: string (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- Category ID: integer (nullable = true)
 |-- Category Title: string (nullable = true)
 |-- Magnitude Value: string (nullable = true)
 |-- Magnitude Unit: string (nullable = true)
 |-- Geometry Date: string (nullable = true)
 |-- Geometry Type: string (nullable = true)
 |-- Geometry Coordinates: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- Source ID: string (nullable = true)
 |-- Source URL: string (nullable = true)



In [197]:
 df2.show()

+----------+--------------------+--------------------+--------------------+-----------+----------------+---------------+--------------+--------------------+-------------+--------------------+---------+--------------------+
|  Event ID|          Event Link|         Event Title|                 _id|Category ID|  Category Title|Magnitude Value|Magnitude Unit|       Geometry Date|Geometry Type|Geometry Coordinates|Source ID|          Source URL|
+----------+--------------------+--------------------+--------------------+-----------+----------------+---------------+--------------+--------------------+-------------+--------------------+---------+--------------------+
|EONET_6425|https://eonet.gsf...|Kilauea Volcano, ...|{6505274dde7761e1...|         12|       Volcanoes|            N/A|           N/A|2023-09-10T00:00:00Z|        Point|  [-155.287, 19.421]|SIVolcano|https://volcano.s...|
|EONET_6423|https://eonet.gsf...|        Iceberg D33A|{6505274dde7761e1...|         15|Sea and Lake Ice|    

In [216]:
# split data into two columns
df1 = df2.withColumn('Date', split(df2['Geometry Date'], 'T').getItem(0)) \
       .withColumn('time', split(df2['Geometry Date'], 'T').getItem(1)) 

In [217]:
df1 = df1.drop('Geometry Date')

In [218]:
# convert PySpark DF to Pandas DF
pandasDF = df1.toPandas()

In [219]:
pandasDF.head()

Unnamed: 0,Event ID,Event Link,Event Title,_id,Category ID,Category Title,Magnitude Value,Magnitude Unit,Geometry Type,Geometry Coordinates,Source ID,Source URL,Date,time
0,EONET_6425,https://eonet.gsfc.nasa.gov/api/v2.1/events/EO...,"Kilauea Volcano, Hawaii","(6505274dde7761e115abc80a,)",12,Volcanoes,,,Point,"[-155.287, 19.421]",SIVolcano,https://volcano.si.edu/volcano.cfm?vn=332010,2023-09-10,00:00:00Z
1,EONET_6423,https://eonet.gsfc.nasa.gov/api/v2.1/events/EO...,Iceberg D33A,"(6505274dde7761e115abc80b,)",15,Sea and Lake Ice,,,Point,"[17.14, -69.38]",NATICE,https://usicecenter.gov/pub/Iceberg_Tabular.csv,2023-09-08,00:00:00Z
2,EONET_6424,https://eonet.gsfc.nasa.gov/api/v2.1/events/EO...,Iceberg D33B,"(6505274dde7761e115abc80c,)",15,Sea and Lake Ice,,,Point,"[18.3, -69.64]",NATICE,https://usicecenter.gov/pub/Iceberg_Tabular.csv,2023-09-08,00:00:00Z
3,EONET_6421,https://eonet.gsfc.nasa.gov/api/v2.1/events/EO...,Hurricane Margot,"(6505274dde7761e115abc80d,)",10,Severe Storms,,,Point,"[-28.3, 16.8]",GDACS,http://www.gdacs.org/report.aspx?eventid=10010...,2023-09-07,21:00:00Z
4,EONET_6421,https://eonet.gsfc.nasa.gov/api/v2.1/events/EO...,Hurricane Margot,"(6505274dde7761e115abc80d,)",10,Severe Storms,,,Point,"[-28.3, 16.8]",NOAA_NHC,https://www.nhc.noaa.gov/archive/2023/MARGOT.s...,2023-09-07,21:00:00Z


In [220]:
client.close()

In [221]:
spark.stop()