In [8]:
# import sys

# # Only for Python 3.12+
# if sys.version_info >= (3, 12):
#     try:
#         from setuptools._distutils.version import LooseVersion
#         import types
#         sys.modules['distutils.version'] = types.ModuleType('distutils.version')
#         sys.modules['distutils.version'].LooseVersion = LooseVersion
#     except ImportError:
#         print("You need to install setuptools first. Run: pip install setuptools")


In [9]:
import requests
from datetime import datetime, timedelta, UTC
from sqlalchemy import create_engine, text
import _mysql_connector
from pyspark.sql import SparkSession
from pyspark.sql.types import * #imports all datatypes... to use for schema creation
import pandas as pd

import os
from dotenv import load_dotenv

load_dotenv()


True

In [10]:
spark = SparkSession.builder \
    .appName("Earthquake Data Processing") \
    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.32") \
    .getOrCreate()

user = os.getenv("MYSQL_USER")
password = os.getenv("MYSQL_PASSWORD")
host = os.getenv("MYSQL_HOST")  # Default to localhost if not set
port = os.getenv("MYSQL_PORT")  # Default to 3306 if not set
database = os.getenv("MYSQL_DB")  # Default to 'earthquake_db' if not set

JDBC_URL = f"jdbc:mysql://{host}:{port}/{database}"

In [11]:
# create table.. mysql requires primary key
def create_db_and_table():

    # create database safely
    MYSQL_URI_NO_DB = f"mysql+mysqlconnector://{user}:{password}@{host}:{port}/"
    engine = create_engine(MYSQL_URI_NO_DB)
    engine_no_db = create_engine(MYSQL_URI_NO_DB, pool_pre_ping=True)

    with engine_no_db.connect() as conn:
        print(f"Creating database if not exists: {database}")
        conn.execute(text(f"CREATE DATABASE IF NOT EXISTS `{database}`;"))

    # reset engine state
    engine_no_db.dispose()


    # connect to db and create table
    MYSQL_URI = f"mysql+mysqlconnector://{user}:{password}@{host}:{port}/{database}"

    engine = create_engine(MYSQL_URI, pool_pre_ping=True)
    with engine.connect() as conn:
        conn.execute(text("""
            CREATE TABLE IF NOT EXISTS earthquake_events (
                id VARCHAR(100) PRIMARY KEY,
                place TEXT,
                magnitude DOUBLE,
                time DATETIME,
                longitude DOUBLE,
                latitude DOUBLE,
                depth DOUBLE
            );
        """))

        engine.dispose()
        print("Table 'earthquake_events' updated in MySQL.")


In [12]:
# This script fetches earthquake data from the USGS Earthquake API for the past 1 hr

def fetch_earthquake_data():
    endtime = datetime.now(UTC)
    starttime = endtime - timedelta(minutes=60)

    # Format timestamps in ISO8601 as required by API
    start_str = starttime.strftime("%Y-%m-%dT%H:%M:%S")
    end_str = endtime.strftime("%Y-%m-%dT%H:%M:%S")

    url = (
        "https://earthquake.usgs.gov/fdsnws/event/1/query"
        f"?format=geojson&starttime={start_str}&endtime={end_str}"
    )
    response = requests.get(url)
    response.raise_for_status()  # Raise an error for bad responses
    data = response.json()

    features = data.get("features", [])
    earthquakes = []

    for f in features:
        props = f["properties"]
        coords = f["geometry"]["coordinates"]
        earthquakes.append({
            "id": f["id"],
            "place": props["place"],
            "magnitude": float(props["mag"]) if props["mag"] is not None else None,
            "time": datetime.fromtimestamp(props["time"] / 1000.0),
            "longitude": float(coords[0]),
            "latitude": float(coords[1]),
            "depth": float(coords[2])
        })

    return earthquakes

In [13]:

# Convert to Spark DataFrame
def earthquakes_to_df(data):
    schema = StructType([
        StructField("id", StringType(), False),     #false = not nullable
        StructField("place", StringType(), True),
        StructField("magnitude", DoubleType(), True),
        StructField("time", TimestampType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("depth", DoubleType(), True)
    ])
    return spark.createDataFrame(data, schema=schema)

In [14]:
# write to mysql with spark
def load_to_mysql(df):
    if df.count() == 0:
        print("No recent earthquakes")
        return
    
    # write to MySQL
    df.write \
        .format("jdbc") \
        .option("url", f"jdbc:mysql://{host}:{port}/{database}") \
        .option("dbtable", "earthquake_events") \
        .option("user", user) \
        .option("password", password) \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .mode("append") \
        .save()
    print("Earthquake data loaded into MySQL.")

    
    

In [15]:
earthquakes = fetch_earthquake_data()
df = earthquakes_to_df(earthquakes)
df.show()

                                                                                

+------------+--------------------+---------+--------------------+---------+--------+------+
|          id|               place|magnitude|                time|longitude|latitude| depth|
+------------+--------------------+---------+--------------------+---------+--------+------+
|tx2025sgsxkm|34 km SSW of Los ...|      1.3|2025-09-16 17:51:...| -102.081|   32.44|6.9435|
|tx2025sgsjvw|55 km S of Whites...|      1.7|2025-09-16 17:36:...| -104.485|  31.681|6.0999|
|tx2025sgsira|40 km E of McKinn...|      1.2|2025-09-16 17:34:...| -102.111|  32.369|4.7221|
|ak025bwkl6p2|90 km N of Karluk...|      1.1|2025-09-16 17:10:...|-154.6913|  58.371|   0.0|
+------------+--------------------+---------+--------------------+---------+--------+------+



                                                                                

In [None]:
"""
    alternative without schema... not recommended, but useful for quick testing:

    earthquakes = fetch_earthquake_data()
    df2 = spark.createDataFrame(earthquakes)  # No schema passed, datatypes are inferred automatically
    df2.show(truncate=False)
    df2.printSchema()
"""

'\n    alternative without schema... not recommended, but useful for quick testing:\n\n    earthquakes = fetch_earthquake_data()\n    df2 = spark.createDataFrame(earthquakes)  # No schema passed, datatypes are inferred automatically\n    df2.show(truncate=False)\n    df2.printSchema()\n'

: 

In [None]:
# MYSQL_URI = f"mysql+mysqlconnector://{user}:{password}@{host}:{port}/{database}"

# # Load into MySQL using Pandas
# def load_to_mysql(df):
#     if df.count() == 0:
#         print("No new earthquake data.")
#         return
    
#     from pyspark.sql.functions import col

#     # Cast all timestamp columns to string before converting to Pandas to avoid type errors
#     df = df.select([
#         col(c).cast("string") if dtype == "timestamp" else col(c)
#         for c, dtype in df.dtypes
#     ])
#     # Convert Spark DataFrame to Pandas DataFrame for SQLAlchemy
#     pdf = df.toPandas()
#     engine = create_engine(MYSQL_URI)
#     pdf.to_sql('earthquake_events', con=engine, if_exists='append', index=False)

: 

In [None]:
if __name__ == "__main__":
    create_db_and_table()

    data = fetch_earthquake_data()

    if not data:
        print("No recent earthquakes.")
        exit()
    else: 
        df = earthquakes_to_df(data)
        df.show()

        load_to_mysql(df)

Creating database if not exists: earthquakes
Table 'earthquake_events' updated in MySQL.


                                                                                

+------------+--------------------+---------+--------------------+-----------------+----------------+----------------+
|          id|               place|magnitude|                time|        longitude|        latitude|           depth|
+------------+--------------------+---------+--------------------+-----------------+----------------+----------------+
|  nn00900561|50 km E of Tonopa...|      2.1|2025-07-14 09:37:...|        -116.6702|         37.9838|            15.9|
|  nc75209667|4 km NW of Pinnac...|     1.26|2025-07-14 09:36:...|-121.176002502441|36.5511665344238|2.17000007629395|
|  us7000qcjp|228 km E of Ust’-...|      5.0|2025-07-14 09:28:...|         166.1011|         55.9153|            10.0|
|ak0258ylj2lf|61 km WNW of Nanw...|      1.7|2025-07-14 09:27:...|        -152.9585|         59.5043|            94.8|
|  nc75209662|8 km NNW of The G...|     0.65|2025-07-14 09:20:...|-122.817001342773|38.8296661376953|2.67000007629395|
+------------+--------------------+---------+---

                                                                                

Earthquake data loaded into MySQL.


: 