In [1]:
import pyspark

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_unixtime, to_timestamp

In [2]:
spark = SparkSession.builder.appName('earthquake-data-cleaning').getOrCreate()

25/08/10 21:07:05 WARN Utils: Your hostname, recurSe resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/08/10 21:07:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/08/10 21:07:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Note that we will be only using the sample data for earthquake-data-wth-countries.csv which was only taken from 10000 rows of the original data.

In [3]:
df = spark.read.option('header', 'true').option('inferSchema', 'true').csv('earthquake-data-wth-countries.csv')

In [4]:
df.show()

+--------------------+-------------+---------+----------+------------+-----+-----+-------+----+----------+--------------------+--------+
|               place|         time|magnitude|       lat|        long|depth|alert|tsunami|  tz|      type|             country|  region|
+--------------------+-------------+---------+----------+------------+-----+-----+-------+----+----------+--------------------+--------+
|8km SSW of Lytle ...|-631157391770|     2.58|34.1911667|    -117.522| 4.49| null|      0|null|earthquake|United States of ...|Americas|
|24km WNW of Searl...|-631215832260|     2.01|35.8593333|-117.6506667|  0.0| null|      0|null|earthquake|United States of ...|Americas|
|28km N of El Sauz...|-631241139690|      3.3|32.1433333|-116.6288333|  6.0| null|      0|null|earthquake|              Mexico|Americas|
|1km SSW of Artesi...|-631251141040|     1.83|33.8561667|-118.0893333| 0.25| null|      0|null|earthquake|United States of ...|Americas|
|16km SE of Primo ...|-631284369930|     

In [4]:
df.printSchema()

root
 |-- place: string (nullable = true)
 |-- time: long (nullable = true)
 |-- magnitude: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- alert: string (nullable = true)
 |-- tsunami: integer (nullable = true)
 |-- tz: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)



Let us write the dataframe into parquet first.

In [5]:
df.repartition(4).write.parquet('parquet/', mode='overwrite')

                                                                                

In [6]:
df = spark.read.parquet('parquet/')

I have targetted the data cleaning procedures needed base on the data exploration I've done in pandas. First, let's convert the time column into a useful and readable format. Also, let's check if there are no invalid time or out of range time.

In [7]:
df_fmt_time = df.withColumn('earthquake_datetime', from_unixtime(col('time')/1000)). \
    withColumn('earthquake_datetime', to_timestamp('earthquake_datetime'))

In [8]:
df_fmt_time.printSchema()

root
 |-- place: string (nullable = true)
 |-- time: long (nullable = true)
 |-- magnitude: double (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- alert: string (nullable = true)
 |-- tsunami: integer (nullable = true)
 |-- tz: string (nullable = true)
 |-- type: string (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- earthquake_datetime: timestamp (nullable = true)



In [9]:
df_fmt_time.filter((df_fmt_time.earthquake_datetime > datetime.fromisoformat('1900-01-01')) & (df_fmt_time.earthquake_datetime < datetime.now())).count()

10000

The filtered dataframe has still the same counts as the original dataframe. The column is also timestamp. Let's now proceed to data cleaning.

In [10]:
df = df_fmt_time

In [11]:
from pyspark.sql import functions as F

In [12]:
df_clean = (df
    .withColumnRenamed('lat', 'latitude')
    .withColumnRenamed('long', 'longitude')
    .select('place', 'earthquake_datetime', 'magnitude', 'latitude', 'longitude', 'depth', 'country', 'region', 'alert', 'tsunami', 'type')
    .filter(
        (F.col('magnitude') >= -1) &
        (F.col('magnitude') <= 10) &
        (F.col('magnitude').isNotNull())
    )
    .filter(
        (F.col('latitude') >= -90) &
        (F.col('latitude') <= 90)
    )
    .filter(
        (F.col('longitude') >= -180) &
        (F.col('longitude') <= 180)
    )
    .filter(
        (F.col('earthquake_datetime') >= datetime.fromisoformat('1900-01-01')) &
        (F.col('earthquake_datetime') <= datetime.fromisoformat('2025-06-30'))
    )
    .dropDuplicates(subset=['place', 'earthquake_datetime'])
    .na.fill({'depth': 0})
           )

In [13]:
df_clean.count()

7635

In [14]:
df_clean.show()

+--------------------+-------------------+---------+----------+------------+-----+--------------------+--------+-----+-------+------------+
|               place|earthquake_datetime|magnitude|  latitude|   longitude|depth|             country|  region|alert|tsunami|        type|
+--------------------+-------------------+---------+----------+------------+-----+--------------------+--------+-----+-------+------------+
|0 km E of Rio Del...|1940-10-22 19:01:00|      4.5|      40.5|      -124.1|  0.0|United States of ...|Americas| null|      0|  earthquake|
|0 km NNE of Ayahu...|1942-11-18 08:19:34|     5.66|   -14.201|     -73.056| 15.0|                Peru|Americas| null|      0|  earthquake|
|0 km NNE of K?r?k...|1938-05-14 14:55:25|     5.36|    39.853|      33.509| 15.0|              Turkey|    Asia| null|      0|  earthquake|
|0 km NNW of Kirtl...|1943-03-09 12:25:25|      4.5|    41.628|     -81.309|  7.0|United States of ...|Americas| null|      0|  earthquake|
|0 km NNW of Ridge..

In [15]:
df_clean.printSchema()

root
 |-- place: string (nullable = true)
 |-- earthquake_datetime: timestamp (nullable = true)
 |-- magnitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- depth: double (nullable = false)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- alert: string (nullable = true)
 |-- tsunami: integer (nullable = true)
 |-- type: string (nullable = true)



For the cleaning, we have converted the time (milliseconds from the epoch) to a useful and readable datetime format. We have also removed the tz column which has very small non-null values.<br>
We have removed null magnitudes and values that are out of range. Same for depth, latitude, and longitude. We have transformed null depth values to 0 and lastly, we dropped duplicate rows base on place and time.<br>
We can now stage the data to the warehouse and do some further dbt transformations to create data models.

Note: This is just trial cleaning so we can interactively watch the process. Official script is stored in the pipeline folder.