In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("mongodbtest1") \
    .master('local')\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
    .config('spark.driver.memory','8g')\
    .getOrCreate()
spark

:: loading settings :: url = jar:file:/opt/conda/envs/bigdata/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-30053d75-148a-4909-9088-ca13d1fb95e0;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 578ms :: artifacts dl 48ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts

In [3]:
from pymongo import MongoClient       

client = MongoClient('mongo-csgyab-6513-spring.db',
                      username='abr9982',
                      password="abr9982",
                      authSource = "abr9982")

db=client.abr9982
db

Database(MongoClient(host=['mongo-csgyab-6513-spring.db:27017'], document_class=dict, tz_aware=False, connect=True, authsource='abr9982'), 'abr9982')

In [4]:
# read earthquake dataset into Spark
earthquakes = spark.read\
    .option("header", True)\
    .option("inferSchema", True)\
    .csv("datasets/earthquakes/*")\
    .repartition(20)

                                                                                

In [5]:
earthquakes.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- depth: double (nullable = true)
 |-- mag: double (nullable = true)
 |-- magType: string (nullable = true)
 |-- nst: integer (nullable = true)
 |-- gap: double (nullable = true)
 |-- dmin: double (nullable = true)
 |-- rms: double (nullable = true)
 |-- net: string (nullable = true)
 |-- id: string (nullable = true)
 |-- updated: timestamp (nullable = true)
 |-- place: string (nullable = true)
 |-- type: string (nullable = true)
 |-- horizontalError: double (nullable = true)
 |-- depthError: double (nullable = true)
 |-- magError: double (nullable = true)
 |-- magNst: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- locationSource: string (nullable = true)
 |-- magSource: string (nullable = true)



In [6]:
# write earthquake dataset into MongoDB

earthquakes.write.format('com.mongodb.spark.sql.DefaultSource')\
    .option('uri','mongodb://abr9982:abr9982@mongo-csgyab-6513-spring.db/abr9982.earthquakes')\
    .save()

                                                                                

In [7]:
# read wildfire dataset into Spark

wildfires = spark.read\
    .option("header", True)\
    .option("inferSchema", True)\
    .csv("datasets/Wildfire History.csv")\
    .drop("AgencyCode.IncidentNumber")\
    .repartition(20)

In [8]:
# write wildfire dataset into MongoDB

wildfires.write.format('com.mongodb.spark.sql.DefaultSource')\
    .option('uri','mongodb://abr9982:abr9982@mongo-csgyab-6513-spring.db/abr9982.wildfires')\
    .save()

                                                                                

In [9]:
wildfires.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Acres: string (nullable = true)
 |-- County: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Notes: string (nullable = true)



In [10]:
# read volcanic eruption dataset into Spark

volcanic_eruptions = spark.read\
    .option("header", True)\
    .option("inferSchema", True)\
    .csv("datasets/volcano_eruptions.csv")

In [11]:
# write volcanic eruption dataset into MongoDB

volcanic_eruptions.write.format('com.mongodb.spark.sql.DefaultSource')\
    .option('uri','mongodb://abr9982:abr9982@mongo-csgyab-6513-spring.db/abr9982.volcanic_eruptions')\
    .mode('overwrite')\
    .save()

In [12]:
volcanic_eruptions.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Dy: integer (nullable = true)
 |-- Tsu: integer (nullable = true)
 |-- Eq: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Elevation (m): integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- VEI: integer (nullable = true)
 |-- Agent: string (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Damage: integer (nullable = true)



In [13]:
db.list_collection_names()

['wildfires',
 'volcanic_eruptions',
 'foreclosures',
 'restaurants',
 'durham_restaurants',
 'earthquakes']

In [14]:
# folium is a mapping library

pip install folium

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [15]:
# exploring the dataset:
# find the average latitude and longitude of the earthquake dataset (geo-center of the dataset)

eq = db.earthquakes.aggregate([
    {"$group": {
        "_id": None,
        "avgLatitude": {"$avg": "$latitude"},
        "avgLongitude": {"$avg": "$longitude"}
    }}
])

for e in eq:
    avg_lat = e['avgLatitude']
    avg_long = e['avgLongitude']
    print(avg_lat)
    print(avg_long)

19.349408765152237
-155.38710310312973


In [16]:
# exploring the dataset:
# find the oldest and most recent record in the dataset

most_recent = db.earthquakes.find({}).sort({'time':1}).limit(1)
oldest = db.earthquakes.find({}).sort({'time':-1}).limit(1)

for doc in most_recent:
    mr = doc['time']
for doc in oldest:
    o = doc['time']
print(mr)
print(o)
    

2000-01-01 19:38:29.600000
2024-04-17 22:48:26.220000


In [17]:
# convert rgb color code to hex color code
def rgb_to_hex(r, g, b):
    return f'#{r:02x}{g:02x}{b:02x}'

# get the shade based on the magnitude of an event from a color gradient scale
# got color gradient function from https://stackoverflow.com/questions/22218140/calculate-the-color-at-a-given-point-on-a-gradient-between-two-colors
def get_shade(low_color, high_color, mag, max_mag):
    resR = low_color[0] + (mag/max_mag) * (high_color[0] - low_color[0]);
    resG = low_color[1] + (mag/max_mag) * (high_color[1] - low_color[1]);
    resB = low_color[2] + (mag/max_mag) * (high_color[2] - low_color[2]);
    return rgb_to_hex(int(resR),int(resG),int(resB))
    

In [18]:
# create a gradient effect for a circle marker by overlaying multiple circles, each circle is small and more opaque than the last

def create_gradient_circle(map_object, location, radius, color, steps=10):
    step_radius = radius / steps
    for i in range(steps):
        folium.Circle(
            location=location,
            radius=radius - i * step_radius,  # decrease radius with each step
            color=color,
            fill=True,
            fill_color=color,
            fill_opacity=steps*i/100 + 0.1,  # increase opacity as radius decreases
            line_opacity=steps*i/100 + 0.1, # increase opacity as radius decreases
            weight=0,
            popup="{} meters".format(radius),
        ).add_to(map_object)

In [19]:
import numpy as np
eqs = db.earthquakes.find({})

max_magnitude = 10

# add a color and radius field to each document in the earthquake collection
# calculate the correct color with get_shade
# calculate the radius, based on the earthquake's magnitude, with the below equation from stack exchange: https://gis.stackexchange.com/questions/221931/calculate-radius-from-magnitude-of-earthquake-on-leaflet-map
for eq in eqs:
    color = get_shade((255,255,255), (255,0,0), eq['mag'], max_magnitude)
    radius = (np.exp(eq['mag'] / 1.01 - 0.13)) * 200
    db.earthquakes.update_one({'_id':eq['_id']},{'$set':{'color':color, 'radius':radius}})

eqs = db.earthquakes.find({}).limit(1)
for eq in eqs:
    print(eq)
    

{'_id': ObjectId('663bffa07c3c3b7265911a6b'), 'time': datetime.datetime(2020, 9, 11, 13, 47, 27, 640000), 'latitude': 19.1638336181641, 'longitude': -155.478668212891, 'depth': 35.1399993896484, 'mag': 2.28999996, 'magType': 'md', 'nst': 46, 'gap': 156.0, 'rms': 0.119999997, 'net': 'hv', 'id': 'hv72129036', 'updated': datetime.datetime(2020, 9, 11, 13, 50, 47, 210000), 'place': '4 km S of P?hala, Hawaii', 'type': 'earthquake', 'horizontalError': 0.63, 'depthError': 0.930000007, 'magError': 1.26999998, 'magNst': 27, 'status': 'automatic', 'locationSource': 'hv', 'magSource': 'hv', 'color': '#ffc4c4', 'radius': 1695.3492734466142}


In [20]:
db.volcanic_eruptions.find_one()

{'_id': ObjectId('663bffad7c3c3b726592a06b'),
 'Year': 2000,
 'Month': 11,
 'Dy': 3,
 'Name': 'Kilauea',
 'Location': 'Hawaiian Is',
 'Country': 'United States',
 'Latitude': 19.421,
 'Longitude': -155.287,
 'Elevation (m)': 1222,
 'Type': 'Shield volcano',
 'Agent': 'G',
 'Deaths': 2,
 'Damage': 0}

In [21]:
ves = db.volcanic_eruptions.find({})

max_damage = 5

# add color field to valcanic eruptions dataset, calculating the shade based on the damage the eruption caused.
for ve in ves:
    if ve['Damage'] > 0:
        color = get_shade((255,255,255), (0,255,0), ve['Damage'], max_damage)
    else:
        color = "#C8F9FF"
    db.volcanic_eruptions.update_one({'_id':ve['_id']},{'$set':{'color':color}})

ves = db.volcanic_eruptions.find({}).limit(2)
for ve in ves:
    print(ve)

{'_id': ObjectId('663bffad7c3c3b726592a06b'), 'Year': 2000, 'Month': 11, 'Dy': 3, 'Name': 'Kilauea', 'Location': 'Hawaiian Is', 'Country': 'United States', 'Latitude': 19.421, 'Longitude': -155.287, 'Elevation (m)': 1222, 'Type': 'Shield volcano', 'Agent': 'G', 'Deaths': 2, 'Damage': 0, 'color': '#C8F9FF'}
{'_id': ObjectId('663bffad7c3c3b726592a06c'), 'Year': 2009, 'Month': 11, 'Dy': 30, 'Name': 'Kilauea', 'Location': 'Hawaiian Is', 'Country': 'United States', 'Latitude': 19.421, 'Longitude': -155.287, 'Elevation (m)': 1222, 'Type': 'Shield volcano', 'Damage': 2, 'color': '#99ff99'}


In [33]:
import folium
from folium.plugins import HeatMap
from datetime import datetime
import time
from IPython.display import display, clear_output

# the average location of the data did not center the map where we wanted, so we created a map centered around the coordinates Google gave for Hawaii

# the earthquake magnitude scale has no upper limit, but almost all recorded earthquakes have a magnitude below 10
max_magnitude = 10

# create a map for each month in the data
for year in range(2000,2025):
    for month in range(13):
        if year < 2024 or (year == 2024 and month < 4):
            
            # create map
            map_ = folium.Map(location=[19.8987, -155.6659], zoom_start=8)

            # EARTHQUAKES
            # filter collection for data in the given month and year
            data = db.earthquakes.find({"$expr": {
                    "$and": [
                        {"$eq": [{"$year": "$time"}, year]},
                        {"$eq": [{"$month": "$time"}, month]}
                    ]
                }})
            
            # create a gradient circle for each event
            for event in data:
                radius=event['radius']
                color=event['color']
                location=[event['latitude'], event['longitude']]
                create_gradient_circle(map_, location, radius, color)

            # WILDFIRES
            # since our wildfire dataset did not have a column for intensity/heat of fire, we gave every wildfire event the same color
            wildfire_color = "#FFFF00"
            # filter collection for data in the given month and year
            data = db.wildfires.find({'Month': month, 'Year':year})
            for event in data:
                if event['Latitude'] == "NA" or event['Longitude'] == "NA" or event['Acres'] == "NA":
                    continue
                else:
                    try:
                        # convert number of acres burned to square meters to find the radius
                        radius = radius=(float(event['Acres'])*4046.86)/np.pi
                        # had trouble scaling wildfire events for our map so we put a cap on wildfire size
                        if radius > 5000:
                            radius = 5000
                        location = [float(event['Latitude']), float(event['Longitude'])]
                        # create a circle on the map for the wildfire event
                        folium.Circle(
                        location=location,
                        radius=radius,
                        color=wildfire_color,
                        fill=True,
                        fill_color=wildfire_color,
                        fill_opacity=0.6,  
                        line_opacity=0.6,
                        weight=0,
                        popup=f"{radius} meters",
                        ).add_to(map_)
                    except:
                        continue

            # VOLCANIC ERUPTION DATA
            # filter collection for data in the given month and year
            data = db.volcanic_eruptions.find({'Month': month, 'Year':year})
            for event in data:
                location = [event['Latitude'], event['Longitude']]
                # create circle on map for volcanic eruption
                folium.Circle(
                location=location,
                radius=3000, # there was no column in the data with the radius of the event so we used 3000 to make the circle visible on the map
                color=event['color'],
                fill=True,
                fill_color=event['color'],
                fill_opacity=0.6,  
                line_opacity=0.6,
                weight=0,
                popup=f"{radius} meters",
                ).add_to(map_)
                
            
            # save to HTML
            filepath = 'heatmaps/heatmap'+str(month)+str(year)+'.html'
            map_.save(filepath)

In [27]:
# view example map

map_