<a href="https://colab.research.google.com/github/cahya-wirawan/quake/blob/main/jupyter/spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Earthquake Data Collection from the USGS API using Apache Spark

## EDA of Earthquake Data from USGS

In [None]:
!pip install pyspark requests folium



In [None]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, ArrayType
import folium

In [None]:
def get_earthquake_data():
    """
    Fetches earthquake data from the USGS API.

    Returns:
        A list of earthquake features, or an empty list if the request fails.
    """
    # url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_month.geojson" # This could containe > 9000 entries
    url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/all_day.geojson" # This containe around 250 entries
    try:
        response = requests.get(url)
        response.raise_for_status()  # Raise an exception for bad status codes (4xx or 5xx)
        data = response.json()
        return data.get('features', [])
    except requests.exceptions.RequestException as e:
        print(f"Error fetching data: {e}")
        return []

In [None]:
# 1. Initialize Spark Session
spark = SparkSession.builder \
    .appName("USGSEarthquakeDataCollector") \
    .master("local[*]") \
    .getOrCreate()

# 2. Define the schema to match the nested GeoJSON structure
# This is a robust way to handle the data and avoid schema inference issues.
properties_schema = StructType([
    StructField("mag", DoubleType(), True),
    StructField("place", StringType(), True),
    StructField("time", LongType(), True),
    StructField("url", StringType(), True),
    StructField("tsunami", LongType(), True),
    StructField("magType", StringType(), True),
    StructField("type", StringType(), True),
    StructField("title", StringType(), True),
])

geometry_schema = StructType([
    StructField("type", StringType(), True),
    StructField("coordinates", ArrayType(DoubleType()), True),
])

feature_schema = StructType([
    StructField("type", StringType(), True),
    StructField("properties", properties_schema, True),
    StructField("geometry", geometry_schema, True),
    StructField("id", StringType(), True),
])

In [None]:
# 3. Fetch data on the driver node
print("Fetching earthquake data from USGS API...")
earthquake_features = get_earthquake_data()
len(earthquake_features)

Fetching earthquake data from USGS API...


247

In [None]:
# earthquake_features[0]['properties']['mag']=6

In [None]:
if earthquake_features:
    # 4. Create the Spark DataFrame from the collected data
    # The data is now parallelized into an RDD and then a DataFrame.
    print(f"Successfully fetched {len(earthquake_features)} earthquake records.")

    # Explicitly cast 'mag' and coordinate values to float
    for feature in earthquake_features:
        if 'mag' in feature.get('properties', {}) and isinstance(feature['properties']['mag'], int):
            feature['properties']['mag'] = float(feature['properties']['mag'])
        if 'geometry' in feature and 'coordinates' in feature['geometry']:
            feature['geometry']['coordinates'] = [float(c) if isinstance(c, int) else c for c in feature['geometry']['coordinates']]


    df = spark.createDataFrame(earthquake_features, schema=feature_schema)

    # 5. Show the DataFrame schema and some data
    print("DataFrame Schema:")
    df.printSchema()

    print("Sample Data:")
    df.show(5, truncate=False)

    # 6. Perform a simple analysis: flatten the structure and find the strongest earthquakes
    from pyspark.sql.functions import col, from_unixtime

    analysis_df = df.select(
        col("id"),
        col("properties.place").alias("location"),
        col("properties.mag").alias("magnitude"),
        col("properties.magType").alias("magnitude_type"),
        # Convert Unix timestamp (milliseconds) to a readable format
        from_unixtime(col("properties.time") / 1000).alias("event_time"),
        col("geometry.coordinates").getItem(0).alias("longitude"),
        col("geometry.coordinates").getItem(1).alias("latitude")
    ).orderBy(col("magnitude").desc())

    print("Analysis - Top 5 Strongest Earthquakes in the Past Month:")
    analysis_df.show(5, truncate=False)

else:
    print("Could not fetch data, skipping DataFrame creation.")

Successfully fetched 247 earthquake records.
DataFrame Schema:
root
 |-- type: string (nullable = true)
 |-- properties: struct (nullable = true)
 |    |-- mag: double (nullable = true)
 |    |-- place: string (nullable = true)
 |    |-- time: long (nullable = true)
 |    |-- url: string (nullable = true)
 |    |-- tsunami: long (nullable = true)
 |    |-- magType: string (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- title: string (nullable = true)
 |-- geometry: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |-- id: string (nullable = true)

Sample Data:
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------+------------+
|type   |properties                     

In [None]:

analysis_df.write.mode("overwrite").parquet("output_data/earthquakes.parquet")
analysis_df.coalesce(1).write.mode("overwrite").option("header", "true").csv("output_data/earthquakes.csv")
analysis_df.write.mode("overwrite").json("output_data/earthquakes.json")

print("Data successfully saved to disk in Parquet, CSV, and JSON formats.")

Data successfully saved to disk in Parquet, CSV, and JSON formats.


In [None]:
analysis_df.show(5)

+----------+--------------------+---------+--------------+-------------------+---------+--------+
|        id|            location|magnitude|magnitude_type|         event_time|longitude|latitude|
+----------+--------------------+---------+--------------+-------------------+---------+--------+
|us7000qthm|15 km SSW of Puyo...|      5.3|           mww|2025-09-03 14:56:44| -78.0697| -1.6058|
|us7000qteq|180 km SSE of Vil...|      5.3|           mww|2025-09-03 03:38:19| 159.3851| 51.4268|
|us7000qte0|southeast of the ...|      5.0|           mww|2025-09-03 00:37:57| 170.3701|-23.5586|
|us7000qtdv|78 km SSW of Lata...|      5.0|            mb|2025-09-03 00:15:24| 165.5859|-11.4024|
|us7000qtdt|53 km E of Palu, ...|      5.0|           mww|2025-09-02 23:53:17|  120.344| -0.9703|
+----------+--------------------+---------+--------------+-------------------+---------+--------+
only showing top 5 rows



In [None]:
# Convert Spark DataFrame to Pandas DataFrame for easier visualization
# Only select the necessary columns to keep it efficient
earthquake_pandas_df = analysis_df.select("location", "magnitude", "event_time", "longitude", "latitude").toPandas()
# earthquake_pandas_df = analysis_df[analysis_df['magnitude']>6].select("location", "magnitude", "event_time", "longitude", "latitude").toPandas()


In [None]:


# Create a base map centered around a general location (e.g., the world)
m = folium.Map(location=[0, 0], zoom_start=2)

# Add markers for each earthquake
for index, row in earthquake_pandas_df.iterrows():
    folium.Marker(
        location=[row['latitude'], row['longitude']],
        popup=f"Magnitude: {row['magnitude']}<br>Location: {row['location']}<br>Time: {row['event_time']}",
        tooltip=row['location']
    ).add_to(m)

# Display the map
display(m)

In [None]:
# Create a base map centered around a general location (e.g., the world)
m = folium.Map(location=[0, 0], zoom_start=2)

# Add markers for each earthquake with size based on magnitude
for index, row in earthquake_pandas_df.iterrows():
    # Scale the magnitude for better visualization on the map
    marker_size = row['magnitude'] * 2  # Adjust the scaling factor as needed

    folium.CircleMarker(
        location=[row['latitude'], row['longitude']],
        radius=marker_size,
        color='red',
        fill=True,
        fill_color='red',
        fill_opacity=0.6,
        popup=f"Magnitude: {row['magnitude']}<br>Location: {row['location']}<br>Time: {row['event_time']}",
        tooltip=row['location']
    ).add_to(m)

# Display the map
display(m)

**Filter by Magnitude:**

You can filter earthquakes above a certain magnitude. For example, to see earthquakes with a magnitude greater than 5.0:

In [None]:
filtered_by_magnitude_df = analysis_df.filter(analysis_df.magnitude > 5.0)
print("Earthquakes with Magnitude > 5.0:")
filtered_by_magnitude_df.show(truncate=False)

Earthquakes with Magnitude > 5.0:
+----------+---------------------------------+---------+--------------+-------------------+---------+--------+
|id        |location                         |magnitude|magnitude_type|event_time         |longitude|latitude|
+----------+---------------------------------+---------+--------------+-------------------+---------+--------+
|us7000qteq|180 km SSE of Vilyuchinsk, Russia|5.3      |mww           |2025-09-03 03:38:19|159.3851 |51.4268 |
|us7000qthm|15 km SSW of Puyo, Ecuador       |5.3      |mww           |2025-09-03 14:56:44|-78.0697 |-1.6058 |
+----------+---------------------------------+---------+--------------+-------------------+---------+--------+



**Filter by Location (using a keyword):**

You can filter earthquakes that occurred in a specific place by searching for a keyword in the 'location' column. For example, to find earthquakes in "Indonesia":

In [None]:
filtered_by_location_df = analysis_df.filter(analysis_df.location.contains("Indonesia"))
print("Earthquakes in Indonesia:")
filtered_by_location_df.show(truncate=False)

Earthquakes in Indonesia:
+----------+--------------------------------+---------+--------------+-------------------+---------+--------+
|id        |location                        |magnitude|magnitude_type|event_time         |longitude|latitude|
+----------+--------------------------------+---------+--------------+-------------------+---------+--------+
|us7000qtdt|53 km E of Palu, Indonesia      |5.0      |mww           |2025-09-02 23:53:17|120.344  |-0.9703 |
|us7000qtk7|192 km WNW of Abepura, Indonesia|4.9      |mb            |2025-09-03 16:41:12|139.0965 |-1.7907 |
|us7000qtgg|177 km WNW of Abepura, Indonesia|4.7      |mb            |2025-09-03 10:18:41|139.1728 |-1.9569 |
+----------+--------------------------------+---------+--------------+-------------------+---------+--------+



In [None]:
# Save the DataFrame as a managed table named 'earthquakes_table'
analysis_df.write.mode("overwrite").saveAsTable("earthquakes_table")

print("DataFrame saved as 'earthquakes_table'.")

# You can now query this table using Spark SQL
print("Querying the newly created table:")
spark.sql("SELECT location, magnitude FROM earthquakes_table WHERE magnitude > 6.0").show()

DataFrame saved as 'earthquakes_table'.
Querying the newly created table:
+--------------------+---------+
|            location|magnitude|
+--------------------+---------+
|2025 Southern Dra...|      7.5|
|108 km SSE of Lat...|      6.3|
|193 km WNW of Abe...|      6.3|
|8 km SSW of Bigad...|      6.1|
+--------------------+---------+



In [None]:
magnitude_3 = spark.sql("SELECT location, magnitude FROM earthquakes_table WHERE magnitude > 6.0 and location LIKE '%Indonesia%'")
magnitude_3.show(truncate=False)

+--------------------------------+---------+
|location                        |magnitude|
+--------------------------------+---------+
|193 km WNW of Abepura, Indonesia|6.3      |
+--------------------------------+---------+



You can combine these filters as well. Let me know what specific criteria you'd like to use for filtering!

## Natural Language conversion into a Spark SQL query

Python script that uses the Gemini API to convert a natural language query into a Spark SQL query and then executes the generated Spark SQL query using Spark.

The Gemini API Key is available at https://aistudio.google.com/app/apikey

### Install and configure the gemini api

#### Subtask:
Install the necessary library and configure the Gemini API key.


In [None]:
!pip install google-generativeai

In [None]:
from google.colab import userdata

**Reasoning**:
The first step is to install the required library and import it, then configure the API key.



In [None]:
from google.colab import userdata
import google.generativeai as genai
import os

# Configure the Gemini API key. Replace "YOUR_API_KEY" with your actual key.
# It's recommended to use environment variables for security.
# genai.configure(api_key=os.environ.get("GEMINI_API_KEY"))
gemini_api_key = userdata.get('GOOGLE_API_KEY')
genai.configure(api_key=gemini_api_key) # Replace with your actual key

### Define a function to generate spark sql

#### Subtask:
Create a Python function that takes a natural language query as input and uses the Gemini API to generate a Spark SQL query.


**Reasoning**:
Define a Python function that uses the Gemini API to convert a natural language query to a Spark SQL query based on the provided instructions.



In [None]:
def generate_spark_sql(natural_language_query: str) -> str:
    """
    Converts a natural language query into a Spark SQL query using the Gemini API.

    Args:
        natural_language_query: The natural language query.

    Returns:
        The generated Spark SQL query string.
    """
    # Configure the model
    generation_config = {
      "temperature": 0.7,
      "top_p": 0.95,
      "top_k": 0,
      "max_output_tokens": 8192,
    }

    # Create a Generative Model instance
    model = genai.GenerativeModel(model_name="gemini-1.5-flash-latest",
                                  generation_config=generation_config)

    # Create the prompt for the API
    prompt_parts = [
        f"""Convert the following natural language query into a Spark SQL query that can be executed on a table named 'earthquakes_table' with columns: id (string), location (string), magnitude (double), magnitude_type (string), event_time (string), longitude (double), latitude (double).

Natural Language Query: {natural_language_query}

Spark SQL Query:
""",
    ]

    # Generate content using the model
    response = model.generate_content(prompt_parts)

    # Extract the generated SQL query from the response
    # Assuming the API response contains the SQL query directly
    spark_sql_query = response.text.strip()

    # Remove markdown code block delimiters if they exist
    if spark_sql_query.startswith("```sql"):
        spark_sql_query = spark_sql_query[len("```sql"):].strip()
    if spark_sql_query.endswith("```"):
        spark_sql_query = spark_sql_query[:-len("```")].strip()

    return spark_sql_query

### Test the function

#### Subtask:
Use the defined function with a sample natural language query to generate and print the Spark SQL query.


**Reasoning**:
Call the generate_spark_sql function with a sample natural language query and print the returned SQL query.



In [None]:
# 1. Call the generate_spark_sql function with a sample natural language query
sample_query = "Show all earthquakes with magnitude greater than 3.0 in Indonesia"
generated_sql_query = generate_spark_sql(sample_query)

# 2. Print the variable containing the generated Spark SQL query
print("Generated Spark SQL Query:")
print(generated_sql_query)

Generated Spark SQL Query:
SELECT *
FROM earthquakes_table
WHERE magnitude > 3.0 AND location LIKE '%Indonesia%';


### Execute the spark sql query

#### Subtask:
Execute the generated Spark SQL query using Spark and display the results.


**Reasoning**:
Check if the generated_sql_query variable is available and not empty, then execute the query using spark.sql() and display the results.



In [None]:
if 'generated_sql_query' in locals() and generated_sql_query:
    print("Executing Spark SQL Query:")
    try:
        query_result_df = spark.sql(generated_sql_query)
        query_result_df.show(truncate=False)
    except Exception as e:
        print(f"Error executing Spark SQL query: {e}")
else:
    print("Generated SQL query not available or is empty. Cannot execute.")

Executing Spark SQL Query:
+----------+--------------------------------+---------+--------------+-------------------+---------+--------+
|id        |location                        |magnitude|magnitude_type|event_time         |longitude|latitude|
+----------+--------------------------------+---------+--------------+-------------------+---------+--------+
|us6000r01m|193 km WNW of Abepura, Indonesia|6.3      |mww           |2025-08-12 08:24:22|138.9733 |-2.057  |
|us6000r1n3|12 km NNW of Poso, Indonesia    |5.8      |mww           |2025-08-16 22:38:52|120.7271 |-1.2819 |
+----------+--------------------------------+---------+--------------+-------------------+---------+--------+



In [None]:
# Stop the Spark session
spark.stop()