In [1]:
# Intialization
import os
import sys

os.environ["SPARK_HOME"] = "/home/talentum/spark"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
# In below two lines, use /usr/bin/python2.7 if you want to use Python 2
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.6" 
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

# NOTE: Whichever package you want mention here.
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0 pyspark-shell' 
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.3 pyspark-shell'
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.6.0,org.apache.spark:spark-avro_2.11:2.4.0 pyspark-shell'

In [2]:
#Entrypoint 2.x
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Earthquake Prediction").enableHiveSupport().getOrCreate()

# On yarn:
# spark = SparkSession.builder.appName("Spark SQL basic example").enableHiveSupport().master("yarn").getOrCreate()
# specify .master("yarn")

sc = spark.sparkContext

In [3]:
df = spark.read.option("header", True).csv("combined_data.csv")
df.show(5)

+--------------------+--------+---------+-----+---+-------+----+-----+----+----+-----+-------------+--------------------+--------------------+----------+---------------+----------+--------+------+--------+--------------+---------+
|                time|latitude|longitude|depth|mag|magType| nst|  gap|dmin| rms|  net|           id|             updated|               place|      type|horizontalError|depthError|magError|magNst|  status|locationSource|magSource|
+--------------------+--------+---------+-----+---+-------+----+-----+----+----+-----+-------------+--------------------+--------------------+----------+---------------+----------+--------+------+--------+--------------+---------+
|2013-05-31T23:58:...|  43.304| -105.404|  0.0|3.1|     ml|10.0|141.5|null|1.89|rusms|rusms00005459|2022-06-09T21:13:...|50 km S of Wright...| explosion|           null|      null|    null|  null|reviewed|            us|       us|
|2013-05-31T22:34:...|  51.127|  -178.08| 26.0|3.1|     ml|17.0|261.5|null|n

In [4]:
df.head()

Row(time='2013-05-31T23:58:13.560Z', latitude='43.304', longitude='-105.404', depth='0.0', mag='3.1', magType='ml', nst='10.0', gap='141.5', dmin=None, rms='1.89', net='rusms', id='rusms00005459', updated='2022-06-09T21:13:46.666Z', place='50 km S of Wright, Wyoming', type='explosion', horizontalError=None, depthError=None, magError=None, magNst=None, status='reviewed', locationSource='us', magSource='us')

In [5]:
df.columns

['time',
 'latitude',
 'longitude',
 'depth',
 'mag',
 'magType',
 'nst',
 'gap',
 'dmin',
 'rms',
 'net',
 'id',
 'updated',
 'place',
 'type',
 'horizontalError',
 'depthError',
 'magError',
 'magNst',
 'status',
 'locationSource',
 'magSource']

In [6]:
from pyspark.sql.functions import to_date, to_timestamp, col

# Directly convert string to timestamp and extract date
df = df.withColumn("date", to_date(to_timestamp(col("time"))))

In [7]:
df.columns

['time',
 'latitude',
 'longitude',
 'depth',
 'mag',
 'magType',
 'nst',
 'gap',
 'dmin',
 'rms',
 'net',
 'id',
 'updated',
 'place',
 'type',
 'horizontalError',
 'depthError',
 'magError',
 'magNst',
 'status',
 'locationSource',
 'magSource',
 'date']

In [8]:
df = df.drop('time',
 'magType',
 'nst',
 'gap',
 'dmin',
 'rms',
 'net',
 'id',
 'updated',
 'Type',
 'horizontalError',
 'depthError',
 'magError',
 'magNst',
 'status',
 'locationSource',
 'magSource')

In [9]:
df.columns

['latitude', 'longitude', 'depth', 'mag', 'place', 'date']

In [10]:
from pyspark.sql.functions import split, trim

# Split the string by comma and take the second part (after the comma)
df = df.withColumn("place", trim(split(col("place"), ",").getItem(1)))

In [11]:
df.show(5)

+--------+---------+-----+---+---------------+----------+
|latitude|longitude|depth|mag|          place|      date|
+--------+---------+-----+---+---------------+----------+
|  43.304| -105.404|  0.0|3.1|        Wyoming|2013-06-01|
|  51.127|  -178.08| 26.0|3.1|         Alaska|2013-06-01|
|  21.648|  143.032|322.5|4.4|           null|2013-06-01|
|  -10.28|   161.56| 58.8|4.7|Solomon Islands|2013-06-01|
|  -43.36|  172.945| 20.3|4.0|           null|2013-05-31|
+--------+---------+-----+---+---------------+----------+
only showing top 5 rows



In [14]:
df.count()

238992

In [15]:
from pyspark.sql.functions import avg
# Average magnitude per place
avg_mag_df = df.groupBy("place").agg(avg("mag").alias("avg_mag"))
avg_mag_df.show(5)

+-------------+-----------------+
|        place|          avg_mag|
+-------------+-----------------+
|         Utah|3.451642512077295|
|       Hawaii|3.350071983852591|
|       Russia|4.462530315278912|
|     Anguilla|3.736242774566475|
|Russia region|4.426767676767676|
+-------------+-----------------+
only showing top 5 rows



In [16]:
# Filter places with avg_mag > 3
dangerous_places_df = avg_mag_df.filter(col("avg_mag") > 3)
dangerous_places_df.show(5)

+-------------+-----------------+
|        place|          avg_mag|
+-------------+-----------------+
|         Utah|3.451642512077295|
|       Hawaii|3.350071983852591|
|       Russia|4.462530315278912|
|     Anguilla|3.736242774566475|
|Russia region|4.426767676767676|
+-------------+-----------------+
only showing top 5 rows



In [17]:
# Average coordinates for each place
coord_df = df.groupBy("place").agg(
    avg("latitude").alias("latitude"),
    avg("longitude").alias("longitude")
)

In [18]:
df = df.drop('latitude','longitude')
df.columns

['depth', 'mag', 'place', 'date']

In [19]:
# Merge coordinates back with original DataFrame (on 'place')
df = df.join(coord_df, on="place", how="inner")

In [20]:
df.show()

+------------------+------+----+----------+-------------------+-------------------+
|             place| depth| mag|      date|           latitude|          longitude|
+------------------+------+----+----------+-------------------+-------------------+
|           Wyoming|   0.0| 3.1|2013-06-01|  43.73179712249905|-105.60074515264581|
|            Alaska|  26.0| 3.1|2013-06-01|  55.57031222130659|-144.59539603275684|
|   Solomon Islands|  58.8| 4.7|2013-06-01|-10.610102945544568| 162.91138339108923|
|  Papua New Guinea|  35.0| 4.0|2013-05-31| -5.567790788234048|  150.3215314456188|
|             Tonga| 101.9| 4.8|2013-05-31|-18.905669998471673| -174.8927523154518|
|            Alaska|  13.1| 3.3|2013-05-31|  55.57031222130659|-144.59539603275684|
|                CA|10.487|3.09|2013-05-31| 36.103720015592565|-118.86856682093632|
|            Mexico| 108.5| 4.0|2013-05-31|  17.56200411031402| -98.26777818015127|
|           Vanuatu|  10.0| 5.1|2013-05-31|-16.342389658335247| 167.95876039

In [21]:
df.printSchema()

root
 |-- place: string (nullable = true)
 |-- depth: string (nullable = true)
 |-- mag: string (nullable = true)
 |-- date: date (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [22]:
from pyspark.sql.types import DoubleType

# Cast columns to appropriate types
df = df.withColumn("depth", col("depth").cast(DoubleType())) \
       .withColumn("mag", col("mag").cast(DoubleType()))

In [23]:
df.printSchema()

root
 |-- place: string (nullable = true)
 |-- depth: double (nullable = true)
 |-- mag: double (nullable = true)
 |-- date: date (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)



In [24]:
df.show()

+------------------+------+----+----------+-------------------+-------------------+
|             place| depth| mag|      date|           latitude|          longitude|
+------------------+------+----+----------+-------------------+-------------------+
|           Wyoming|   0.0| 3.1|2013-06-01|  43.73179712249905|-105.60074515264581|
|            Alaska|  26.0| 3.1|2013-06-01|  55.57031222130659|-144.59539603275684|
|   Solomon Islands|  58.8| 4.7|2013-06-01|-10.610102945544568| 162.91138339108923|
|  Papua New Guinea|  35.0| 4.0|2013-05-31| -5.567790788234048|  150.3215314456188|
|             Tonga| 101.9| 4.8|2013-05-31|-18.905669998471673| -174.8927523154518|
|            Alaska|  13.1| 3.3|2013-05-31|  55.57031222130659|-144.59539603275684|
|                CA|10.487|3.09|2013-05-31| 36.103720015592565|-118.86856682093632|
|            Mexico| 108.5| 4.0|2013-05-31|  17.56200411031402| -98.26777818015127|
|           Vanuatu|  10.0| 5.1|2013-05-31|-16.342389658335247| 167.95876039

In [25]:
df.repartition(1).write.csv("cleaned_data",header=True, mode="overwrite")

In [26]:
with open("schema.txt", "w") as f:
    f.write(df.schema.simpleString())

In [27]:
df.count()

198550