In [1]:
from pyspark.sql import SparkSession, functions as F, Window as W
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField,ArrayType,LongType,StringType,DoubleType,IntegerType
from pyspark.sql.functions import split, explode,col
from pyspark.sql.functions import lit


spark = SparkSession.builder.appName('earthquake_api').getOrCreate()

In [2]:
earthquake_df = spark.read\
    .format('json')\
    .option('multiLine', 'true')\
    .load('data.json')

In [3]:
earthquake_df.printSchema()

root
 |-- count: long (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- alert: string (nullable = true)
 |    |    |-- cdi: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- continent: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- depth: string (nullable = true)
 |    |    |-- detailUrl: string (nullable = true)
 |    |    |-- dmin: string (nullable = true)
 |    |    |-- felt: string (nullable = true)
 |    |    |-- gap: string (nullable = true)
 |    |    |-- geometryType: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- ids: string (nullable = true)
 |    |    |-- latitude: string (nullable = true)
 |    |    |-- locality: string (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- loca

In [4]:
# number of rows
earthquake_df.count()

1

In [5]:
earthquake_df.show()

+-----+--------------------+---------+------+-------------+----------+-----------+-------+----+
|count|                data|errorCode|errors|friendlyError|httpStatus|       noun| result|verb|
+-----+--------------------+---------+------+-------------+----------+-----------+-------+----+
|  100|[{, 5, , 2022wnac...|     none|    []|             |       200|earthquakes|success| GET|
+-----+--------------------+---------+------+-------------+----------+-----------+-------+----+



In [6]:
earthquake_df = earthquake_df.drop("count").drop("errorCode").drop("errors").drop("friendlyError").drop("httpStatus").drop("noun").drop( "result").drop("verb")


In [7]:
earthquake_df.printSchema()

root
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- alert: string (nullable = true)
 |    |    |-- cdi: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- continent: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- date: string (nullable = true)
 |    |    |-- depth: string (nullable = true)
 |    |    |-- detailUrl: string (nullable = true)
 |    |    |-- dmin: string (nullable = true)
 |    |    |-- felt: string (nullable = true)
 |    |    |-- gap: string (nullable = true)
 |    |    |-- geometryType: string (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- ids: string (nullable = true)
 |    |    |-- latitude: string (nullable = true)
 |    |    |-- locality: string (nullable = true)
 |    |    |-- location: string (nullable = true)
 |    |    |-- locationDetails: array (nullable = true

In [8]:
earthquake_df = earthquake_df.withColumn('data', F.explode('data'))

In [9]:
earthquake_df.printSchema()

root
 |-- data: struct (nullable = true)
 |    |-- alert: string (nullable = true)
 |    |-- cdi: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- continent: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- date: string (nullable = true)
 |    |-- depth: string (nullable = true)
 |    |-- detailUrl: string (nullable = true)
 |    |-- dmin: string (nullable = true)
 |    |-- felt: string (nullable = true)
 |    |-- gap: string (nullable = true)
 |    |-- geometryType: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- ids: string (nullable = true)
 |    |-- latitude: string (nullable = true)
 |    |-- locality: string (nullable = true)
 |    |-- location: string (nullable = true)
 |    |-- locationDetails: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- adminLevel: string (nullable = true)
 |    |    |    |-- descriptio

In [10]:
earthquake_df = earthquake_df\
            .withColumn('id', F.col('data').getItem('id'))\
            .withColumn('magnitude', F.col('data').getItem('magnitude').cast('int'))\
            .withColumn('type', F.col('data').getItem('type'))\
            .withColumn('title', F.col('data').getItem('title'))\
    	    .withColumn('date', F.col('data').getItem('date'))\
    	    .withColumn('time', F.col('data').getItem('time'))\
    	    .withColumn('updated', F.col('data').getItem('updated').cast('int'))\
    	    .withColumn('status', F.col('data').getItem('status'))\
    	    .withColumn('latitude', F.col('data').getItem('latitude').cast('int'))\
    	    .withColumn('longitude', F.col('data').getItem('longitude').cast('int'))\
    	    .withColumn('place', F.col('data').getItem('place'))\
    	    .withColumn('location', F.col('data').getItem('location'))\
    	    .withColumn('continent', F.col('data').getItem('continent'))\
    	    .withColumn('country', F.col('data').getItem('country'))
           



In [11]:
earthquake_df = earthquake_df\
                .select('id','magnitude', 'type', 'title', 'date', 'time', 'updated', 'status', 'latitude','longitude', 'place','location','continent','country')


In [14]:
# earthquake_df.select(earthquake_df['counts']).show()

In [15]:
earthquake_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- magnitude: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- updated: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- latitude: integer (nullable = true)
 |-- longitude: integer (nullable = true)
 |-- place: string (nullable = true)
 |-- location: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- country: string (nullable = true)



In [16]:
earthquake_df.show(10)

+----------+---------+----------+--------------------+-------------------+-------------+-------+--------+--------+---------+--------------------+----------------+-------------+--------------------+
|        id|magnitude|      type|               title|               date|         time|updated|  status|latitude|longitude|               place|        location|    continent|             country|
+----------+---------+----------+--------------------+-------------------+-------------+-------+--------+--------+---------+--------------------+----------------+-------------+--------------------+
|tx2022wnac|        3|earthquake|M 3.8 - 37 km WSW...|2022-11-17T04:38:31|1668659911981|   null|reviewed|      31|     -103|37 km WSW of Ment...|  Mentone, Texas|North America|United States of ...|
|tx2022wmtd|        3|earthquake|M 3.5 - 37 km WSW...|2022-11-17T01:04:09|1668647049070|   null|reviewed|      31|     -103|37 km WSW of Ment...|  Mentone, Texas|North America|United States of ...|
|us7000iq9

In [16]:
# number of rows
earthquake_df.count()

100

In [None]:
# Transformation 
# 1. Find the average of magnitude of earthquake.

In [17]:
average_magnitude_df = earthquake_df.agg({'magnitude':'avg'})
average_magnitude_df.select(average_magnitude_df['avg(magnitude)'])
average_magnitude_df.show()

+--------------+
|avg(magnitude)|
+--------------+
|          3.36|
+--------------+



In [1]:
# average_magnitude_df.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/kafka', driver= 'org.postgresql.Driver',
#                                                     dbtable='earthquake_table_1', user='memosha',password='1234').mode('overwrite').save()

In [None]:
# 2. Highest magnitude for each country in the month of November in 2022.

In [17]:
highest_magnitude_november_df = earthquake_df\
                    .groupBy(F.year('date').alias('year'), F.month('date').alias('month'), 'location')\
                    .agg(F.max('magnitude').alias('highest_magnitude'))\
                    .filter('month == 11')\
                    .filter('year == 2022')\
                    .orderBy('location', 'year')

highest_magnitude_november_df.show()

+----+-----+--------------------+-----------------+
|year|month|            location|highest_magnitude|
+----+-----+--------------------+-----------------+
|2022|   11|Bahía de Kino, Me...|                6|
|2022|   11|Colorado City, Ar...|                3|
|2022|   11|       Corinne, Utah|                3|
|2022|   11|     Dubois, Wyoming|                3|
|2022|   11|  Lake Pillsbury, CA|                3|
|2022|   11| Maneadero, B.C., MX|                4|
|2022|   11|      Mentone, Texas|                5|
|2022|   11|    Ravalli, Montana|                3|
|2022|   11|      Stanley, Idaho|                3|
|2022|   11|     Taylor, Wyoming|                3|
|2022|   11|        Toyah, Texas|                3|
|2022|   11|Whites City, New ...|                3|
+----+-----+--------------------+-----------------+



In [18]:
# highest_magnitude_november_df.write.format('jdbc').options(url='jdbc:postgresql://localhost:5432/kafka', driver= 'org.postgresql.Driver',
# dbtable='earthquake_table_1', user='memosha',password='1234').mode('overwrite').save()

In [None]:
# 3. Find the status of earthquake on the basis of each place.

In [75]:
status_place_df = earthquake_df.withColumn('status',F.collect_set('status').over(W.partitionBy('place'))).select('place', 'status').distinct()
status_place_df.show()

+--------------------+----------+
|               place|    status|
+--------------------+----------+
|100km S of San Cl...|[reviewed]|
|12km WSW of Petro...|[reviewed]|
|13km NNW of Grape...|[reviewed]|
|13km SE of Bodfis...|[reviewed]|
|14km ESE of Alum ...|[reviewed]|
|14km ESE of Woffo...|[reviewed]|
|14km SE of Baysid...|[reviewed]|
|15km E of Seven T...|[reviewed]|
|15km ESE of Alum ...|[reviewed]|
|16 km ESE of Laco...|[reviewed]|
|16km WNW of Lake ...|[reviewed]|
|16km WSW of Weitc...|[reviewed]|
|17 km NNW of Tayl...|[reviewed]|
|21 km NW of Bickl...|[reviewed]|
|21 km SSE of Golf...|[reviewed]|
|22 km SSW of Mamm...|[reviewed]|
|23km E of Julian, CA|[reviewed]|
|23km NE of Therma...|[reviewed]|
|24 km NW of Stanl...|[reviewed]|
|25km W of Petroli...|[reviewed]|
+--------------------+----------+
only showing top 20 rows

