In [1]:
import numpy
import pandas as pd
import json
import sqlalchemy as db
from sqlalchemy.orm import Session
from sqlalchemy import text
import matplotlib.pyplot as plt
import seaborn as sns
import warnings

#I do not guarantee that the following code is 100% sql injection proof

<h2>Connect to (MySQL) Server</h2>

In [33]:
username = "root"
password = "password"
server = "localhost"
database = "quakes"

engine = db.create_engine(f"mysql+pymysql://{username}:{password}@{server}/{database}")
metadata = db.MetaData()

In [3]:
engine.table_names()

  engine.table_names()


['Agencies',
 'Alerts',
 'Contributed',
 'Events',
 'idmap',
 'MagnitudeTypes',
 'Status']

In [4]:
events_tbl = db.Table('Events', metadata, autoload_with=engine)
idMap_tbl = db.Table('IdMap', metadata, autoload_with=engine)
agencies_tbl = db.Table('Agencies', metadata, autoload_with=engine)
status_tbl = db.Table('Status', metadata, autoload_with=engine)
alerts_tbl = db.Table('Alerts', metadata, autoload_with=engine)
contributed_tbl = db.Table('Contributed', metadata, autoload_with=engine)
mag_types_tbl = db.Table('MagnitudeTypes', metadata, autoload_with=engine)

<h2>Read data from files</h2>

In [54]:
def process_ids(local_id, local_ids, con):
    """
    Inserts (if not present) local_id + local_ids into IdMap table. 
    Throws an exception if:
        1. Some of local_id + local_ids are already assigned to different global ids, e.g. 
                local_ids contains 'us12345' and 'us67890' with 'us12345' --> 1 and 'us67890' --> 2.
        2. There was an event with the same local_id.
        
        Parameters:
            local_id: int --- primary id associated with the event.
            local_ids: Iterable[int] --- other ids associated with the event (possibly including local_id).
            con: sqlalchemy.engine.Connection --- connection to sql server.
            
        Output: 
            (primary_key_to_record_with_local_id: int, global_id_assigned_to_local_id: int).
    """
    
    #pk(IdMap) for local_id (or None if it wasn't inserted)
    res_id = con.execute(db.select(idMap_tbl.c.id).where(idMap_tbl.c.local_id == local_id)).scalar()
    
    #throw exception if there was an event with the same local_id
    if res_id is not None and \
       con.execute(db.select(events_tbl.c.id).where(events_tbl.c.local_id == res_id)).rowcount > 0:
        raise Exception(f"Duplicate local_id {local_id}.")
    
    #local_ids = set of local_id + local_ids
    local_ids = set(local_ids)
    local_ids.add(local_id)
    
    #pk(IdMap)'s for all local_ids
    res = con.execute(db.select(idMap_tbl.c.id).where(idMap_tbl.c.local_id.in_(local_ids)))
    
    #if all local_ids were already assigned some global_id, fetch that global_id.
    ###possibly removable
    if res.rowcount == len(local_ids):
        stmt = db.select(idMap_tbl.c.id, idMap_tbl.c.global_id).where(idMap_tbl.c.local_id == local_id)
        res2 = list(con.execute(stmt))
        return res2[0][0], res2[0][1]
    
    #find all global ids corresponding to local ids
    globals = list(con.execute(db.select(idMap_tbl.c.global_id).where(idMap_tbl.c.local_id.in_(local_ids)).distinct()))
    
    #throw if local ids are assigned more than one global id
    if len(globals) > 1:
        raise Exception('Overlapping is not supported.')
    
    
    new_global = None
    #if some global id was assigned, get it
    if (len(globals) > 0):
        new_global = globals[0][0]
    #otherwise define new global id as max of all global ids + 1 (if table is empty return 1)
    else:
        possible_id = con.execute(text("SELECT MAX(global_id) FROM IdMap")).scalar()
        new_global = 1 if possible_id is None else possible_id + 1

    #insert those local ids into IdMap table that are not already present
    for lid in local_ids:
        insert_stmt = db.dialects.mysql \
                                 .insert(idMap_tbl).values(local_id=lid, global_id=new_global)
        res = con.execute(insert_stmt.on_duplicate_key_update(global_id=insert_stmt.inserted.global_id))   
    
    #find pk(IdMap) for local_id
    db.select(idMap_tbl.c.id).where(idMap_tbl.c.local_id == local_id)
    id = con.execute(db.select(idMap_tbl.c.id).where(idMap_tbl.c.local_id == local_id)).scalar()
    return id, new_global


In [55]:
def process_sources(sources, event_global_id, con):
    """
    Updates Contributed table.
    Throws an exception if one of sources is not already present in Agencies table.
    
        Parameters:
            sources: Iterable[str] --- agencies that contributed to the event
            event_global_id: int --- primary key to the event in question
            con: sqlalchemy.engine.Connection --- connection to sql server
        
        Output:
            No output
    """
    
    sources = set(sources)

    #find pk(Agencies)'s for all sources
    stmt = db.select(agencies_tbl.c.id).where(agencies_tbl.c.abbreviation.in_(sources))
    res = con.execute(stmt)
    
    #throw if some agency is not found in Agencies table
    if res.rowcount != len(sources):
        raise Exception(f'Some agency among {sources} is not found. Add all new agencies to Agencies table.')
    
    #update Contributed table
    select_stmt = db.select(agencies_tbl.c.id, event_global_id).where(agencies_tbl.c.abbreviation.in_(sources))
    insert_stmt = db.dialects.mysql.insert(contributed_tbl) \
                            .from_select(['agency', 'event_global_id'], select_stmt)
    con.execute(insert_stmt.on_duplicate_key_update(agency=insert_stmt.inserted.agency))

In [56]:
def process_geojson_entry(entry, con):
    """
    Processes an event (entry) into schema.
    Throws if agency(network/net) is null.
    
        Parameters:
            entry: dict --- geojson for the event
            con: sqlalchemy.engine.Connection --- connection to sql server
        
        Output:
            No output
    """
    #do nothing if entry is not an earthquake
    if entry['properties']['type'] != 'earthquake':
        warnings.warn("Not an earthquake.")
        return
    
    ###debug
    if process_geojson_entry.j % 100 == 0:
        print(f"{process_geojson_entry.j} entries processed.")
    process_geojson_entry.j = process_geojson_entry.j + 1
    
    local_id = entry['id']
    [long, lat, depth] = entry['geometry']['coordinates']
    props = entry['properties']
    
    #process ids
    id_IdMap, global_id = process_ids(local_id, filter(None, props['ids'].split(',')), con)
    
    #process alert, status and magnitude type
    alert_id = con.execute(db.select(alerts_tbl.c.id).where(alerts_tbl.c.name == props['alert'])).scalar()
    status_id = con.execute(db.select(status_tbl.c.id).where(status_tbl.c.name == props['status'])).scalar()
    mag_type_id = con.execute(db.select(mag_types_tbl.c.id).where(mag_types_tbl.c.name == props['magType'])).scalar()
    
    #find pk(Agencies) for agency
    net_id = con.execute(db.select(agencies_tbl.c.id).where(agencies_tbl.c.abbreviation == props['net'])).scalar()
    if net_id is None:
        raise Exception('Agency is required.')
    
    #process sources
    sources = set(filter(None, props['sources'].split(',')))
    sources.add(props['net'])
    process_sources(sources, global_id, con)
    
    #insert event into Events table
    con.execute(events_tbl.insert().values(
                      magnitude=props['mag'], place=props['place'], time=props['time']//1000, \
                      updated=props['updated']//1000, timezone_offset=props['tz'], url=props['url'], \
                      detail=props['detail'], felt=props['felt'], cdi=props['cdi'], \
                      mmi=props['mmi'], alert=alert_id, status=status_id, \
                      tsunami=props['tsunami'], significance=props['sig'], network=net_id, \
                      n_stations=props['nst'], dmin=props['dmin'], rms=props['rms'], \
                      gap=props['gap'], magnitude_type=mag_type_id, title=props['title'], \
                      longitude=long, latitude=lat, depth=depth, \
                      local_id=id_IdMap, types=props['types'], code=props['code'] \
    ))

In [57]:
def process_geojson_file(geojson, con):
    """
    Processes a geojson file into schema 
    
        Parameters:
            geojson: dict --- geojson with events
            con: sqlalchemy.engine.Connection --- connection to sql server
        
        Output:
            No output
    """
    records = geojson['features']
    for record in records:
        process_geojson_entry(record, con)

<h4>Geojson files:</h4>

In [58]:
all_files = !ls
all_files = list(filter(lambda x: x.startswith("quakes-"), all_files))
all_files

['quakes-00000-20000.json',
 'quakes-20000-40000.json',
 'quakes-40000-60000.json',
 'quakes-60000-80000.json',
 'quakes-80000-100000.json']

<h4>Read them inside the server</h4>

In [61]:
all_files = [ 'quakes-80000-100000.json']
process_geojson_entry.j = 1
for file in all_files:
    with open(file) as f:
        with Session(engine) as session, session.begin():
            data = json.load(f)
            process_geojson_file(data, session)

100




200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000
2100
2200
2300
2400
2500
2600
2700
2800
2900
3000
3100
3200
3300
3400
3500
3600
3700
3800
3900
4000
4100
4200
4300
4400
4500
4600
4700
4800
4900
5000
5100
5200
5300
5400
5500
5600
5700
5800
5900
6000
6100
6200
6300
6400
6500
6600
6700
6800
6900
7000
7100
7200
7300
7400
7500
7600
7700
7800
7900
8000
8100
8200
8300
8400
8500
8600
8700
8800
8900
9000
9100
9200
9300
9400
9500
9600
9700
9800
9900
10000
10100
10200
10300
10400
10500
10600
10700
10800
10900
11000
11100
11200
11300
11400
11500
11600
11700
11800
11900
12000
12100
12200
12300
12400
12500
12600
12700
12800
12900
13000
13100
13200
13300
13400
13500
13600
13700
13800
13900
14000
14100
14200
14300
14400
14500
14600
14700
14800
14900
15000
15100
15200
15300
15400
15500
15600
15700
15800
15900
16000
16100
16200
16300
16400
16500
16600
16700
16800
16900
17000
17100
17200
17300
17400
17500
17600
17700
17800
17900
18000
18100
18200
18300
18400
18500
18