In [68]:
import import_ipynb

#import findspark
#findspark.init()

ModuleNotFoundError: No module named 'findspark'

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *

from pyspark.sql.functions import *


In [4]:
spark = SparkSession.builder.getOrCreate()
df =spark.sql("SELECT 'spark' as hello ")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [5]:
#configure spark session
spark = SparkSession\
    .builder\
    .master('local[2]')\
    .appName('quakes_etl')\
    .config('spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:2.4.1')\
    .getOrCreate()

In [7]:
# Load the dataset
df_load = spark.read.csv(r"C:\Users\dubey\OneDrive\Desktop\sparkProj\database.csv", header=True)

In [8]:

# Remove all fields we don't need
lst_dropped_columns = ['Depth Error', 'Time', 'Depth Seismic Stations','Magnitude Error','Magnitude Seismic Stations','Azimuthal Gap', 'Horizontal Distance','Horizontal Error',
    'Root Mean Square','Source','Location Source','Magnitude Source','Status']

df_load = df_load.drop(*lst_dropped_columns)


In [9]:
# preview of data
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|
+----------+--------+---------+----------+-----+---------+--------------+------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|
+----------+--------+---------+----------+-----+---------+--------------+------------+
only showing top 5 rows



In [10]:
# Create a year field and add it to the df_load dataframe
df_load = df_load.withColumn('Year', year(to_timestamp('Date', 'dd/MM/yyyy')))                       

In [11]:
# preview of data
df_load.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [12]:
# Create the quakes freq dataframe form the year and count values
df_quake_freq = df_load.groupBy('Year').count().withColumnRenamed('count', 'Counts')

# preview of data
df_quake_freq.show(5)


+----+------+
|Year|Counts|
+----+------+
|1965|   156|
|1966|    98|
|1967|   103|
|1968|   106|
|1969|   114|
+----+------+
only showing top 5 rows



In [13]:
#preview of data frame schema
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [14]:
# Cast string fields to double types   ####lattitude & longitude were string
df_load= df_load.withColumn('Latitude', df_load['Latitude'].cast(DoubleType()))\
    .withColumn('Longitude', df_load['Longitude'].cast(DoubleType()))\
    .withColumn('Depth', df_load['Depth'].cast(DoubleType()))\
    .withColumn('Magnitude', df_load['Magnitude'].cast(DoubleType()))

In [15]:
#preview of data frame schema
df_load.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: double (nullable = true)
 |-- Magnitude: double (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Year: integer (nullable = true)



In [16]:
# Create avg and max magnitude fields and add to df_quake_freq
df_max = df_load.groupBy('Year').max('Magnitude').withColumnRenamed('max(Magnitude)', 'Max_Magnitude')
df_avg = df_load.groupBy('Year').avg('Magnitude').withColumnRenamed('avg(Magnitude)', 'Avg_Magnitude')


In [17]:
#preview 
df_max.show(5)

+----+-------------+
|Year|Max_Magnitude|
+----+-------------+
|1965|          8.7|
|1966|          7.7|
|1967|          7.2|
|1968|          7.6|
|1969|          7.5|
+----+-------------+
only showing top 5 rows



In [18]:
#preview 
df_avg.show(5)

+----+------------------+
|Year|     Avg_Magnitude|
+----+------------------+
|1965| 6.009615384615388|
|1966| 6.060714285714285|
|1967|5.9621359223300985|
|1968| 6.070754716981133|
|1969| 6.015789473684214|
+----+------------------+
only showing top 5 rows



In [19]:
# Join the max and avg dfs to df_quake_freq
df_quake_freq = df_quake_freq.join(df_avg, ['Year']).join(df_max, ['Year'])


In [20]:
#preview Df_quake_freq
df_quake_freq.show(5)

+----+------+------------------+-------------+
|Year|Counts|     Avg_Magnitude|Max_Magnitude|
+----+------+------------------+-------------+
|1965|   156| 6.009615384615388|          8.7|
|1966|    98| 6.060714285714285|          7.7|
|1967|   103|5.9621359223300985|          7.2|
|1968|   106| 6.070754716981133|          7.6|
|1969|   114| 6.015789473684214|          7.5|
+----+------+------------------+-------------+
only showing top 5 rows



In [21]:
# Remove records with null values
df_load.dropna()
df_quake_freq.dropna()

DataFrame[Year: int, Counts: bigint, Avg_Magnitude: double, Max_Magnitude: double]

In [22]:
# Load df_load into mongodb
df_load.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quakes').save()


In [23]:
# Load df_quake_freq into mongodb
df_quake_freq.write.format('mongo')\
    .mode('overwrite')\
    .option('spark.mongodb.output.uri', 'mongodb://127.0.0.1:27017/Quake.quake_freq').save()


In [24]:
# Print dataframe heads
print(df_quake_freq.show(5))

+----+------+------------------+-------------+
|Year|Counts|     Avg_Magnitude|Max_Magnitude|
+----+------+------------------+-------------+
|1965|   156| 6.009615384615388|          8.7|
|1966|    98| 6.060714285714285|          7.7|
|1967|   103|5.9621359223300985|          7.2|
|1968|   106| 6.070754716981133|          7.6|
|1969|   114| 6.015789473684214|          7.5|
+----+------+------------------+-------------+
only showing top 5 rows

None
