In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder.master("local[*]")\
        .appName("Hello")\
        .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:2.4.1")\
        .getOrCreate()
import warnings
warnings.filterwarnings("ignore")

In [4]:
df= spark.read.csv("database.csv",header=True)

In [5]:
df.take(1)

[Stage 2:>                                                          (0 + 1) / 1]                                                                                

[Row(Date='01/02/1965', Time='13:44:18', Latitude='19.246', Longitude='145.616', Type='Earthquake', Depth='131.6', Depth Error=None, Depth Seismic Stations=None, Magnitude='6', Magnitude Type='MW', Magnitude Error=None, Magnitude Seismic Stations=None, Azimuthal Gap=None, Horizontal Distance=None, Horizontal Error=None, Root Mean Square=None, ID='ISCGEM860706', Source='ISCGEM', Location Source='ISCGEM', Magnitude Source='ISCGEM', Status='Automatic')]

In [6]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Depth Error: string (nullable = true)
 |-- Depth Seismic Stations: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- Magnitude Error: string (nullable = true)
 |-- Magnitude Seismic Stations: string (nullable = true)
 |-- Azimuthal Gap: string (nullable = true)
 |-- Horizontal Distance: string (nullable = true)
 |-- Horizontal Error: string (nullable = true)
 |-- Root Mean Square: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Location Source: string (nullable = true)
 |-- Magnitude Source: string (nullable = true)
 |-- Status: string (nullable = true)



In [7]:
list_a_supp=["Depth Error","Depth Seismic Stations","Time","Magnitude Seismic Stations","Magnitude Error","Azimuthal",
            "Root Mean Square","Location Source","Magnitude Source","Source","Status","Azimuthal Gap","Horizontal Distance",
            "Horizontal Error"]  


In [8]:
df= df.drop(*list_a_supp)

In [9]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Depth: string (nullable = true)
 |-- Magnitude: string (nullable = true)
 |-- Magnitude Type: string (nullable = true)
 |-- ID: string (nullable = true)



In [10]:
df.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|
+----------+--------+---------+----------+-----+---------+--------------+------------+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|
+----------+--------+---------+----------+-----+---------+--------------+------------+
only showing top 5 rows



In [11]:
df.dtypes

[('Date', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('Type', 'string'),
 ('Depth', 'string'),
 ('Magnitude', 'string'),
 ('Magnitude Type', 'string'),
 ('ID', 'string')]

In [12]:
df=df.withColumn("Year",year(to_timestamp("Date","dd/mm/yyyy")))

In [13]:
df.show(5)

+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|      Date|Latitude|Longitude|      Type|Depth|Magnitude|Magnitude Type|          ID|Year|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
|01/02/1965|  19.246|  145.616|Earthquake|131.6|        6|            MW|ISCGEM860706|1965|
|01/04/1965|   1.863|  127.352|Earthquake|   80|      5.8|            MW|ISCGEM860737|1965|
|01/05/1965| -20.579| -173.972|Earthquake|   20|      6.2|            MW|ISCGEM860762|1965|
|01/08/1965| -59.076|  -23.557|Earthquake|   15|      5.8|            MW|ISCGEM860856|1965|
|01/09/1965|  11.938|  126.427|Earthquake|   15|      5.8|            MW|ISCGEM860890|1965|
+----------+--------+---------+----------+-----+---------+--------------+------------+----+
only showing top 5 rows



In [14]:
df.dtypes

[('Date', 'string'),
 ('Latitude', 'string'),
 ('Longitude', 'string'),
 ('Type', 'string'),
 ('Depth', 'string'),
 ('Magnitude', 'string'),
 ('Magnitude Type', 'string'),
 ('ID', 'string'),
 ('Year', 'int')]

In [15]:
df_freq=df.groupby("Year").count().withColumnRenamed("count","Counts")
df_freq.show(5)

[Stage 5:>                                                          (0 + 1) / 1]

+----+------+
|Year|Counts|
+----+------+
|1990|   528|
|1975|   411|
|1977|   425|
|2003|   485|
|2007|   608|
+----+------+
only showing top 5 rows



                                                                                

In [16]:
df = df.withColumn("Latitude",df.Latitude.cast(DoubleType()))\
    .withColumn("Longitude",df.Longitude.cast(DoubleType()))\
    .withColumn("Depth",df.Depth.cast(DoubleType()))\
     .withColumn("Magnitude",df.Magnitude.cast(DoubleType()))

In [17]:
df.dtypes

[('Date', 'string'),
 ('Latitude', 'double'),
 ('Longitude', 'double'),
 ('Type', 'string'),
 ('Depth', 'double'),
 ('Magnitude', 'double'),
 ('Magnitude Type', 'string'),
 ('ID', 'string'),
 ('Year', 'int')]

In [18]:
df

DataFrame[Date: string, Latitude: double, Longitude: double, Type: string, Depth: double, Magnitude: double, Magnitude Type: string, ID: string, Year: int]

In [19]:
df_max=df.groupBy("Year").max("Magnitude").withColumnRenamed("max(Magnitude)","Max_magnitude")

In [20]:
df_avg=df.groupBy("Year").avg("Magnitude").withColumnRenamed("avg(Magnitude)","Avg_magnitude")

In [21]:
df_avg.show(5)

[Stage 8:>                                                          (0 + 1) / 1]

+----+-----------------+
|Year|    Avg_magnitude|
+----+-----------------+
|1990|5.860624999999987|
|1975|  5.8488807785888|
|1977|5.783764705882346|
|2003|5.885731958762881|
|2007|5.886019736842098|
+----+-----------------+
only showing top 5 rows



                                                                                

In [22]:
df_freq=df_freq.join(df_avg,["Year"]).join(df_max,["Year"])

In [23]:
df_freq.show(5)

                                                                                

+----+------+-----------------+-------------+
|Year|Counts|    Avg_magnitude|Max_magnitude|
+----+------+-----------------+-------------+
|1990|   528|5.860624999999987|          7.8|
|1975|   411|  5.8488807785888|          7.9|
|1977|   425|5.783764705882346|          7.9|
|2003|   485|5.885731958762881|          8.3|
|2007|   608|5.886019736842098|          8.4|
+----+------+-----------------+-------------+
only showing top 5 rows



In [24]:
df.dropna()
df_freq.dropna()

DataFrame[Year: int, Counts: bigint, Avg_magnitude: double, Max_magnitude: double]

In [25]:
df.write.format("mongo")\
    .mode("overwrite")\
    .option("spark.mongodb.output.uri","mongodb://127.0.0.1:27017/Quake.quakes").save()

                                                                                

In [26]:
df_freq.write.format("mongo")\
    .mode("overwrite")\
    .option("spark.mongodb.output.uri","mongodb://127.0.0.1:27017/Quake.quakes_frequences").save()

                                                                                

$$"""
Machine \quad learning
"""$$

In [28]:
df_test=spark.read.csv("query.csv",header=True)

In [29]:
df_test.take(1)

[Row(time='2017-01-02T00:13:06.300Z', latitude='-36.0365', longitude='51.9288', depth='10', mag='5.7', magType='mwb', nst=None, gap='26', dmin='14.685', rms='1.37', net='us', id='us10007p5d', updated='2017-03-27T23:53:17.040Z', place='Southwest Indian Ridge', type='earthquake', horizontalError='10.3', depthError='1.7', magError='0.068', magNst='21', status='reviewed', locationSource='us', magSource='us')]

In [30]:
df_train=spark.read.format("mongo")\
    .option("spark.mongodb.input.uri","mongodb://127.0.0.1:27017/Quake.quakes").load()


                                                                                

In [32]:
df_train.show(5)

+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|      Date|Depth|          ID|Latitude|Longitude|Magnitude|Magnitude Type|      Type|Year|                 _id|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+--------------------+
|01/02/1965|131.6|ISCGEM860706|  19.246|  145.616|      6.0|            MW|Earthquake|1965|{620beafb741f9b69...|
|01/04/1965| 80.0|ISCGEM860737|   1.863|  127.352|      5.8|            MW|Earthquake|1965|{620beafb741f9b69...|
|01/05/1965| 20.0|ISCGEM860762| -20.579| -173.972|      6.2|            MW|Earthquake|1965|{620beafb741f9b69...|
|01/08/1965| 15.0|ISCGEM860856| -59.076|  -23.557|      5.8|            MW|Earthquake|1965|{620beafb741f9b69...|
|01/09/1965| 15.0|ISCGEM860890|  11.938|  126.427|      5.8|            MW|Earthquake|1965|{620beafb741f9b69...|
+----------+-----+------------+--------+---------+---------+--------------+----------+----+-----

In [33]:
df_test.printSchema()

root
 |-- time: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- depth: string (nullable = true)
 |-- mag: string (nullable = true)
 |-- magType: string (nullable = true)
 |-- nst: string (nullable = true)
 |-- gap: string (nullable = true)
 |-- dmin: string (nullable = true)
 |-- rms: string (nullable = true)
 |-- net: string (nullable = true)
 |-- id: string (nullable = true)
 |-- updated: string (nullable = true)
 |-- place: string (nullable = true)
 |-- type: string (nullable = true)
 |-- horizontalError: string (nullable = true)
 |-- depthError: string (nullable = true)
 |-- magError: string (nullable = true)
 |-- magNst: string (nullable = true)
 |-- status: string (nullable = true)
 |-- locationSource: string (nullable = true)
 |-- magSource: string (nullable = true)



In [34]:
df_test2=df_test["time","latitude","longitude","mag","depth"]

In [35]:
df_test2=df_test2.withColumnRenamed("time","Date").withColumnRenamed("latitude","Latitude")\
    .withColumnRenamed("longitude","Longitude").withColumnRenamed("mag","Magnitude")\
    .withColumnRenamed("depth","Depth")

In [36]:
df_test2.show(5)

+--------------------+--------+---------+---------+------+
|                Date|Latitude|Longitude|Magnitude| Depth|
+--------------------+--------+---------+---------+------+
|2017-01-02T00:13:...|-36.0365|  51.9288|      5.7|    10|
|2017-01-02T13:13:...|  -4.895| -76.3675|      5.9|   106|
|2017-01-02T13:14:...|-23.2513| 179.2383|      6.3|551.62|
|2017-01-03T09:09:...| 24.0151|  92.0177|      5.7|    32|
|2017-01-03T21:19:...|-43.3527| -74.5017|      5.5| 10.26|
+--------------------+--------+---------+---------+------+
only showing top 5 rows

