In [1]:
import pandas as pd
import requests as r
import json

# Data Ingestion

Steps consists of:
- Fetch json data from API
- Save json data to local
- Transform from json to parquet
- Save parquet data to local
- Upload json and parquet data to data lake

## Fetch Data

| Column name | Description | Data type |
| --- | --- | --- |
| time | The time of the earthquake. | Datetime |
| latitude | The latitude of the earthquake. | Float |
| longitude | The longitude of the earthquake. | Float |
| depth | The depth of the earthquake. | Float |
| mag | The magnitude of the earthquake. | Float |
| magType | The type of magnitude measurement used. | String |
| nst | The number of seismic stations used to calculate the magnitude. | Integer |
| gap | The maximum angular distance between azimuthal gaps. | Float |
| dmin | The distance to the nearest station. | Float |
| rms | The root-mean-square travel time residual. | Float |
| net | The network detected. | String |
| id |  |  |
| updated | The time the earthquake was last updated. | Datetime |
| place | The location of the earthquake. | String |
| type | | |
| horizontalError | The horizontal error of the earthquake. | Float |
| depthError | The depth error of the earthquake. | Float |
| magError | The magnitude error of the earthquake. | Float |
| magNst | The number of seismic stations used to calculate the magnitude error. | Integer |
| status | The status of the earthquake. | String |
| locationSource | The source of the location of the earthquake. | String |
| magSource | | |arthquake. (String)

In [2]:
format = "geojson"
starttime = "2023-01-01"
endtime = "2023-01-02"

response = r.get(f"https://earthquake.usgs.gov/fdsnws/event/1/query?format={format}&starttime={starttime}&endtime={endtime}")

In [3]:
json_data = response.json()

I want to know which data range retrieved from params `starttime=2023-01-01` & `endtime=2023-01-02`

In [4]:
import datetime

In [5]:
time_ls = []
for i in range(json_data['metadata']['count']):
    time_eq = json_data['features'][i]['properties']['time']
    time_ls.append(datetime.datetime.fromtimestamp(time_eq/1000.0))

print(max(time_ls), min(time_ls))

2023-01-01 23:59:29.195000 2023-01-01 00:06:14.840000


The data retrieved is exclusive to `starttime` (2023-01-01) range (not include `endtime` (2023-01-02))

## Data Understanding

| Key | Description/Value |
| --- | --- |
| `type` | `FeatureCollection` |
| `metadata` | Information related to data fetched |
| `feature` | Earthquake event |
| `bbox` | Contains `min_lat`, `max_lat`, `min_long`, `max_long`, `min_depth`, `max_depth` from all data fetched |

In [6]:
json_data.keys()

dict_keys(['type', 'metadata', 'features', 'bbox'])

In [7]:
json_data['type']

'FeatureCollection'

In [8]:
json_data['metadata'].keys()

dict_keys(['generated', 'url', 'title', 'status', 'api', 'count'])

In [9]:
# metadata of the files retirieved
json_data['metadata']

{'generated': 1700711620000,
 'url': 'https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&starttime=2023-01-01&endtime=2023-01-02',
 'title': 'USGS Earthquakes',
 'status': 200,
 'api': '1.14.0',
 'count': 349}

In [10]:
# min long, min lat, min depth, max long, max lat, max depth
json_data['bbox']

[-178.81666666667, -60.8195, -3.05, 179.599, 67.2198, 595.724]

In [11]:
# first record
json_data['features'][0].keys()

dict_keys(['type', 'properties', 'geometry', 'id'])

In [12]:
json_data['features'][0]['type']

'Feature'

In [13]:
# 26 features
json_data['features'][0]['properties'].keys()

dict_keys(['mag', 'place', 'time', 'updated', 'tz', 'url', 'detail', 'felt', 'cdi', 'mmi', 'alert', 'status', 'tsunami', 'sig', 'net', 'code', 'ids', 'sources', 'types', 'nst', 'dmin', 'rms', 'gap', 'magType', 'type', 'title'])

In [15]:
json_data['features'][0]

{'type': 'Feature',
 'properties': {'mag': 4.2,
  'place': '5 km NNE of Jayapura, Indonesia',
  'time': 1672617569195,
  'updated': 1678575106040,
  'tz': None,
  'url': 'https://earthquake.usgs.gov/earthquakes/eventpage/us7000j3yb',
  'detail': 'https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=us7000j3yb&format=geojson',
  'felt': None,
  'cdi': None,
  'mmi': None,
  'alert': None,
  'status': 'reviewed',
  'tsunami': 0,
  'sig': 271,
  'net': 'us',
  'code': '7000j3yb',
  'ids': ',us7000j3yb,',
  'sources': ',us,',
  'types': ',origin,phase-data,',
  'nst': 19,
  'dmin': 16.495,
  'rms': 0.45,
  'gap': 99,
  'magType': 'mb',
  'type': 'earthquake',
  'title': 'M 4.2 - 5 km NNE of Jayapura, Indonesia'},
 'geometry': {'type': 'Point', 'coordinates': [140.7385, -2.493, 10]},
 'id': 'us7000j3yb'}

In [14]:
json_data['features'][1]

{'type': 'Feature',
 'properties': {'mag': 3.62,
  'place': '103 km N of Suárez, Puerto Rico',
  'time': 1672617413930,
  'updated': 1672619526212,
  'tz': None,
  'url': 'https://earthquake.usgs.gov/earthquakes/eventpage/pr2023001004',
  'detail': 'https://earthquake.usgs.gov/fdsnws/event/1/query?eventid=pr2023001004&format=geojson',
  'felt': None,
  'cdi': None,
  'mmi': None,
  'alert': None,
  'status': 'reviewed',
  'tsunami': 0,
  'sig': 202,
  'net': 'pr',
  'code': '2023001004',
  'ids': ',pr2023001004,',
  'sources': ',pr,',
  'types': ',origin,phase-data,',
  'nst': 14,
  'dmin': 1.1104,
  'rms': 0.44,
  'gap': 270,
  'magType': 'md',
  'type': 'earthquake',
  'title': 'M 3.6 - 103 km N of Suárez, Puerto Rico'},
 'geometry': {'type': 'Point', 'coordinates': [-65.7256, 19.3601, 34]},
 'id': 'pr2023001004'}

In [16]:
# coordinates: latitude, longitude, depth
json_data['features'][0]['geometry']

{'type': 'Point', 'coordinates': [140.7385, -2.493, 10]}

In [17]:
json_data['features'][0]['id']

'us7000j3yb'

## Save json data to local

In [6]:
with open(f"data/raw/json/data_{starttime}.json", "w") as f:
    json.dump(json_data, f)

## Convert from json to parquet

### Flatten and preprocessing

Challenges:
- Flatten nested json data
- Convert data type

In [35]:
df_json = pd.json_normalize(json_data, record_path=["features"], meta="metadata")

df_json.head()

Unnamed: 0,type,id,properties_mag,properties_place,properties_time,properties_updated,properties_tz,properties_url,properties_detail,properties_felt,...,properties_nst,properties_dmin,properties_rms,properties_gap,properties_magType,properties_type,properties_title,geometry_type,geometry_coordinates,metadata
0,Feature,us7000j3yb,4.2,"5 km NNE of Jayapura, Indonesia",1672617569195,1678575106040,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,19.0,16.495,0.45,99.0,mb,earthquake,"M 4.2 - 5 km NNE of Jayapura, Indonesia",Point,"[140.7385, -2.493, 10]","{'generated': 1700308166000, 'url': 'https://e..."
1,Feature,pr2023001004,3.62,"103 km N of Suárez, Puerto Rico",1672617413930,1672619526212,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,14.0,1.1104,0.44,270.0,md,earthquake,"M 3.6 - 103 km N of Suárez, Puerto Rico",Point,"[-65.7256, 19.3601, 34]","{'generated': 1700308166000, 'url': 'https://e..."
2,Feature,av91082783,-0.76,"85 km NNW of Karluk, Alaska",1672617277550,1672860382700,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,6.0,,0.06,217.0,ml,earthquake,"M -0.8 - 85 km NNW of Karluk, Alaska",Point,"[-155.180333333333, 58.2275, 3.08]","{'generated': 1700308166000, 'url': 'https://e..."
3,Feature,nc73827681,0.57,"10km NW of The Geysers, CA",1672617223560,1673410751822,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,21.0,0.007542,0.02,96.0,md,earthquake,"M 0.6 - 10km NW of The Geysers, CA",Point,"[-122.8415, 38.8445, 2.06]","{'generated': 1700308166000, 'url': 'https://e..."
4,Feature,pr71390293,2.28,,1672617148580,1672618140720,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,5.0,0.09231,0.08,242.0,md,earthquake,M 2.3 -,Point,"[-66.8565, 17.8848333333333, 11.19]","{'generated': 1700308166000, 'url': 'https://e..."


In [36]:
df_json.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 349 entries, 0 to 348
Data columns (total 31 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   type                  349 non-null    object 
 1   id                    349 non-null    object 
 2   properties_mag        348 non-null    float64
 3   properties_place      327 non-null    object 
 4   properties_time       349 non-null    int64  
 5   properties_updated    349 non-null    int64  
 6   properties_tz         0 non-null      object 
 7   properties_url        349 non-null    object 
 8   properties_detail     349 non-null    object 
 9   properties_felt       31 non-null     float64
 10  properties_cdi        31 non-null     float64
 11  properties_mmi        7 non-null      float64
 12  properties_alert      5 non-null      object 
 13  properties_status     349 non-null    object 
 14  properties_tsunami    349 non-null    int64  
 15  properties_sig        3

Flatten `geometry_coordinates` feature

In [37]:
# flatten geometry_coordinates
df_json[["geometry_coordinates_latitude", "geometry_coordinates_longitude", "geometry_coordinates_depth"]] = df_json["geometry_coordinates"].tolist()

In [39]:
df_json.drop(["geometry_coordinates"], axis=1, inplace=True)

Flatten `geometry_coordinates` feature

In [40]:
df_json['metadata'][0]

{'generated': 1700308166000,
 'url': 'https://earthquake.usgs.gov/fdsnws/event/1/query?format=geojson&starttime=2023-01-01&endtime=2023-01-02',
 'title': 'USGS Earthquakes',
 'status': 200,
 'api': '1.14.0',
 'count': 349}

In [41]:
df_json = pd.concat(
    [df_json, pd.json_normalize(df_json["metadata"]).add_prefix("metadata_")],
    axis=1
)

In [43]:
df_json.drop(["metadata"], axis=1, inplace=True)

Fill null values

In [58]:
df_json["properties_cdi"]

0     NaN
1     NaN
2     NaN
3     NaN
4     NaN
       ..
344   NaN
345   NaN
346   NaN
347   NaN
348   NaN
Name: properties_cdi, Length: 349, dtype: float64

In [51]:
df_json.info

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 349 entries, 0 to 348
Data columns (total 38 columns):
 #   Column                          Non-Null Count  Dtype  
---  ------                          --------------  -----  
 0   type                            349 non-null    object 
 1   id                              349 non-null    object 
 2   properties_mag                  348 non-null    float64
 3   properties_place                327 non-null    object 
 4   properties_time                 349 non-null    int64  
 5   properties_updated              349 non-null    int64  
 6   properties_tz                   0 non-null      object 
 7   properties_url                  349 non-null    object 
 8   properties_detail               349 non-null    object 
 9   properties_felt                 31 non-null     float64
 10  properties_cdi                  31 non-null     float64
 11  properties_mmi                  7 non-null      float64
 12  properties_alert                5 no

In [71]:
df_json["metadata_generated_datetime"] = pd.to_datetime(df_json["metadata_generated"], unit="ms")

In [72]:
df_json["properties_time_datetime"] = pd.to_datetime(df_json["properties_time"], unit="ms")

In [73]:
df_json["properties_updated_datetime"] = pd.to_datetime(df_json["properties_updated"], unit="ms")

In [81]:
df_json[['properties_felt', 'properties_nst']] = df_json[['properties_felt', 'properties_nst']].astype('Int64')

In [82]:
df_json.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 349 entries, 0 to 348
Data columns (total 41 columns):
 #   Column                          Non-Null Count  Dtype         
---  ------                          --------------  -----         
 0   type                            349 non-null    object        
 1   id                              349 non-null    object        
 2   properties_mag                  348 non-null    float64       
 3   properties_place                327 non-null    object        
 4   properties_time                 349 non-null    int64         
 5   properties_updated              349 non-null    int64         
 6   properties_tz                   0 non-null      object        
 7   properties_url                  349 non-null    object        
 8   properties_detail               349 non-null    object        
 9   properties_felt                 31 non-null     Int64         
 10  properties_cdi                  31 non-null     float64       
 11  proper

In [83]:
df_json.to_parquet(f"data/raw/parquet/data_{starttime}.parquet", compression="gzip")

## Save parquet data to locale

In [3]:
df_pq = pd.read_parquet("../data/raw/parquet/data_2023-01-01.parquet")

df_pq.head()

Unnamed: 0,type,id,properties_mag,properties_place,properties_time,properties_updated,properties_tz,properties_url,properties_detail,properties_felt,...,geometry_coordinates_depth,metadata_generated,metadata_url,metadata_title,metadata_status,metadata_api,metadata_count,metadata_generated_datetime,properties_time_datetime,properties_updated_datetime
0,Feature,us7000j3yb,4.2,"5 km NNE of Jayapura, Indonesia",1672617569195,1678575106040,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,10.0,1700558827000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 09:27:07,2023-01-01 23:59:29.195,2023-03-11 22:51:46.040
1,Feature,pr2023001004,3.62,"103 km N of Suárez, Puerto Rico",1672617413930,1672619526212,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,34.0,1700558827000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 09:27:07,2023-01-01 23:56:53.930,2023-01-02 00:32:06.212
2,Feature,av91082783,-0.76,"85 km NNW of Karluk, Alaska",1672617277550,1672860382700,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,3.08,1700558827000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 09:27:07,2023-01-01 23:54:37.550,2023-01-04 19:26:22.700
3,Feature,nc73827681,0.57,"10km NW of The Geysers, CA",1672617223560,1673410751822,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,2.06,1700558827000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 09:27:07,2023-01-01 23:53:43.560,2023-01-11 04:19:11.822
4,Feature,pr71390293,2.28,,1672617148580,1672618140720,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,11.19,1700558827000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 09:27:07,2023-01-01 23:52:28.580,2023-01-02 00:09:00.720


## Upload json and parquet data to data lake

In [None]:
def upload_to_gcs(local_path, bucket, dstination_path):
  pass

# Transform with Spark 

In [1]:
import pandas as pd

In [44]:
df_pandas = pd.read_parquet("../data/raw/parquet/data_2023-01-01.parquet")

df_pandas.head()

Unnamed: 0,type,id,properties_mag,properties_place,properties_time,properties_updated,properties_tz,properties_url,properties_detail,properties_felt,...,geometry_coordinates_depth,metadata_generated,metadata_url,metadata_title,metadata_status,metadata_api,metadata_count,metadata_generated_datetime,properties_time_datetime,properties_updated_datetime
0,Feature,us7000j3yb,4.2,"5 km NNE of Jayapura, Indonesia",1672617569195,1678575106040,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,10.0,1700570933000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 12:48:53,2023-01-01 23:59:29.195,2023-03-11 22:51:46.040
1,Feature,pr2023001004,3.62,"103 km N of Suárez, Puerto Rico",1672617413930,1672619526212,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,34.0,1700570933000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 12:48:53,2023-01-01 23:56:53.930,2023-01-02 00:32:06.212
2,Feature,av91082783,-0.76,"85 km NNW of Karluk, Alaska",1672617277550,1672860382700,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,3.08,1700570933000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 12:48:53,2023-01-01 23:54:37.550,2023-01-04 19:26:22.700
3,Feature,nc73827681,0.57,"10km NW of The Geysers, CA",1672617223560,1673410751822,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,2.06,1700570933000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 12:48:53,2023-01-01 23:53:43.560,2023-01-11 04:19:11.822
4,Feature,pr71390293,2.28,,1672617148580,1672618140720,,https://earthquake.usgs.gov/earthquakes/eventp...,https://earthquake.usgs.gov/fdsnws/event/1/que...,,...,11.19,1700570933000,https://earthquake.usgs.gov/fdsnws/event/1/que...,USGS Earthquakes,200,1.14.0,349,2023-11-21 12:48:53,2023-01-01 23:52:28.580,2023-01-02 00:09:00.720


In [55]:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 349 entries, 0 to 348
Data columns (total 41 columns):
 #   Column                          Non-Null Count  Dtype         
---  ------                          --------------  -----         
 0   type                            349 non-null    object        
 1   id                              349 non-null    object        
 2   properties_mag                  348 non-null    float64       
 3   properties_place                327 non-null    object        
 4   properties_time                 349 non-null    int64         
 5   properties_updated              349 non-null    int64         
 6   properties_tz                   0 non-null      object        
 7   properties_url                  349 non-null    object        
 8   properties_detail               349 non-null    object        
 9   properties_felt                 31 non-null     Int64         
 10  properties_cdi                  31 non-null     float64       
 11  proper

| Column | Description | Type |
| --- | --- | --- |
| type |  |  |
| id | A unique identifier for the event. | String |
| properties_mag | The magnitude of the earthquake. | Float |
| properties_place | The location of the earthquake. | String |
| properties_time | The time of the earthquake. | Datetime |
| properties_updated | The time the earthquake was last updated. | Datetime |
| properties_tz | Timezone offset from UTC in minutes at the event epicenter. | Integer |
| properties_url | Link to USGS Event Page for event. | String |
| properties_detail | Link to GeoJSON detail feed from a GeoJSON summary feed. | String |
| properties_felt | The total number of felt reports submitted to the DYFI? system. | Integer |
| properties_cdi | The maximum reported intensity for the event. | Float |
| properties_mmi | The maximum estimated instrumental intensity for the event. | Float |
| properties_alert | The alert level from the PAGER earthquake impact scale. | String |
| properties_status | Status is either automatic or reviewed. | String |
| properties_tsunami | This flag is set to "1" for large events in oceanic regions and "0" otherwise. | Integer |
| properties_sig | A number describing how significant the event is. | Integer |
| properties_net | The ID of a data contributor. | String |
| properties_code | An identifying code assigned by - and unique from - the corresponding source for the event. | String |
| properties_ids | A comma-separated list of event ids that are associated to an event. | String |
| properties_sources | A comma-separated list of network contributors. | String |
| properties_types | A comma-separated list of product types associated to this event. | String |
| properties_nst | Number of seismic stations which reported P- and S-arrival times for this earthquake. | Integer |
| properties_dmin | Horizontal distance from the epicenter to the nearest station (in degrees). | Float |
| properties_rms | The root-mean-square (RMS) travel time residual, in sec, using all weights. | Float |
| properties_gap | The largest azimuthal gap between azimuthally adjacent stations (in degrees). | Float |
| properties_magType | The method or algorithm used to calculate the preferred magnitude for the event. | String |
| properties_type | A comma-separated list of product types associated to this event. | String |
| properties_title |  | String |
| geometry_type |  |  |
| geometry_coordinates_latitude | Decimal degrees latitude. Negative values for southern latitudes. | Float |
| geometry_coordinates_longitude | Decimal degrees longitude. Negative values for western longitudes. | Float |
| geometry_coordinates_depth | The depth where the earthquake begins to rupture. | Float |
| metadata_generated | Time when the feed was most recently updated. Times are reported in milliseconds since the epoch. | Integer |
| metadata_url | Url of the feed. | String |
| metadata_title | The title of the feed. | String |
| metadata_status | HTTP status code of response. | String |
| metadata_api | Version of API that generated feed. | String |
| metadata_count | Number of earthquakes in feed. | Integer |
| metadata_generated_datetime | Datetime format of metadata_generated | Datetime |
| properties_time_datetime | Datetime format of properties_time | Datetime |
| properties_updated_datetime | Datetime format of properties_updated | Datetime |

In [4]:
df_pandas["properties_place"].apply(lambda x: x.split(",")[-1].strip() if x is not None else None).value_counts()

Alaska                            112
CA                                106
Hawaii                             24
Puerto Rico                        15
Indonesia                           7
Nevada                              5
Texas                               4
Puerto Rico region                  4
New Mexico                          3
Alaska Peninsula                    3
Papua New Guinea                    3
Philippines                         3
Vanuatu                             2
Oklahoma                            2
Central Alaska                      2
Washington                          2
Chile                               2
Southern Alaska                     2
Fiji region                         2
Wyoming                             1
Japan                               1
Utah                                1
Afghanistan                         1
Mid-Indian Ridge                    1
Myanmar                             1
Tonga                               1
Kashmir-Xinj

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [3]:
pyspark.__version__

'3.2.4'

In [4]:
import os

GOOGLE_APPLICATION_CREDENTIALS = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")

In [5]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", f"{os.environ.get('HOME')}/dev/lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", GOOGLE_APPLICATION_CREDENTIALS)

In [6]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", GOOGLE_APPLICATION_CREDENTIALS)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

23/11/24 07:47:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [7]:
spark = SparkSession.builder.config(conf=sc.getConf()).getOrCreate()

In [30]:
df = spark.read.option("inferSchema", "true").parquet("gs://earthquake-usgs_data/raw/parquet/data_2023-01-01.parquet")

In [31]:
df.show()

+-------+------------+--------------+--------------------+---------------+------------------+-------------+--------------------+--------------------+---------------+--------------+--------------+----------------+-----------------+------------------+--------------+--------------+---------------+--------------------+------------------+--------------------+--------------+---------------+--------------+--------------+------------------+---------------+--------------------+-------------+-----------------------------+------------------------------+--------------------------+------------------+--------------------+----------------+---------------+------------+--------------+
|   type|          id|properties_mag|    properties_place|properties_time|properties_updated|properties_tz|      properties_url|   properties_detail|properties_felt|properties_cdi|properties_mmi|properties_alert|properties_status|properties_tsunami|properties_sig|properties_net|properties_code|      properties_ids|propert

In [32]:
df.printSchema()

root
 |-- type: string (nullable = true)
 |-- id: string (nullable = true)
 |-- properties_mag: double (nullable = true)
 |-- properties_place: string (nullable = true)
 |-- properties_time: long (nullable = true)
 |-- properties_updated: long (nullable = true)
 |-- properties_tz: integer (nullable = true)
 |-- properties_url: string (nullable = true)
 |-- properties_detail: string (nullable = true)
 |-- properties_felt: long (nullable = true)
 |-- properties_cdi: double (nullable = true)
 |-- properties_mmi: double (nullable = true)
 |-- properties_alert: string (nullable = true)
 |-- properties_status: string (nullable = true)
 |-- properties_tsunami: long (nullable = true)
 |-- properties_sig: long (nullable = true)
 |-- properties_net: string (nullable = true)
 |-- properties_code: string (nullable = true)
 |-- properties_ids: string (nullable = true)
 |-- properties_sources: string (nullable = true)
 |-- properties_types: string (nullable = true)
 |-- properties_nst: long (nullabl

In [35]:
df.select("properties_time").show()

+---------------+
|properties_time|
+---------------+
|  1672617569195|
|  1672617413930|
|  1672617277550|
|  1672617223560|
|  1672617148580|
|  1672616861690|
|  1672616392936|
|  1672616048220|
|  1672615658422|
|  1672614624070|
|  1672614609024|
|  1672614089036|
|  1672614027350|
|  1672613918250|
|  1672613113450|
|  1672613051930|
|  1672612866340|
|  1672612541510|
|  1672612276570|
|  1672612213740|
+---------------+
only showing top 20 rows



In [36]:
df.agg(F.min("properties_time").alias("min_time"), F.max("properties_time").alias("max_time")).show()

+-------------+-------------+
|     min_time|     max_time|
+-------------+-------------+
|1672531574840|1672617569195|
+-------------+-------------+



In [37]:
from pyspark.sql.functions import split, size, when, col

# Split the properties_place column
split_col = split(col("properties_place"), ",")

# Get the last item of the split column
last_item = split_col.getItem(size(split_col)-1)

# Create the new country column
df.withColumn("region", when(col("properties_place").isNotNull(), last_item).otherwise(None)).select("region").show()



+-------------------+
|             region|
+-------------------+
|          Indonesia|
|        Puerto Rico|
|             Alaska|
|                 CA|
|               null|
|                 CA|
|              Texas|
|             Alaska|
|       Sea of Japan|
|                 CA|
|          Indonesia|
|   Papua New Guinea|
|                 CA|
|                 CA|
|             Alaska|
|             Alaska|
|             Alaska|
|                 CA|
| Dominican Republic|
|                 CA|
+-------------------+
only showing top 20 rows



In [38]:
selected_columns = [
 'id',
 'properties_mag',
 'properties_place',
 'properties_time',
 'properties_updated',
 'properties_tz',
 'properties_url',
 'properties_detail',
 'properties_felt',
 'properties_cdi',
 'properties_mmi',
 'properties_alert',
 'properties_status',
 'properties_tsunami',
 'properties_sig',
 'properties_net',
 'properties_code',
 'properties_ids',
 'properties_sources',
 'properties_types',
 'properties_nst',
 'properties_dmin',
 'properties_rms',
 'properties_gap',
 'properties_magType',
 'properties_type',
 'properties_title',
 'geometry_coordinates_latitude',
 'geometry_coordinates_longitude',
 'geometry_coordinates_depth',
 'region']

# Split the properties_place column
split_col = split(col("properties_place"), ",")

# Get the last item of the split column
last_item = split_col.getItem(size(split_col)-1)

df = df \
.withColumn("properties_time", F.to_timestamp(col("properties_time")/1000)) \
.withColumn("properties_updated", F.to_timestamp(col("properties_updated")/1000)) \
.withColumn("region", when(col("properties_place").isNotNull(), last_item).otherwise(None)) \
.select(selected_columns)



In [42]:
new_columns = {i: i.replace("properties_", "") for i in df.columns}

new_columns

{'id': 'id',
 'properties_mag': 'mag',
 'properties_place': 'place',
 'properties_time': 'time',
 'properties_updated': 'updated',
 'properties_tz': 'tz',
 'properties_url': 'url',
 'properties_detail': 'detail',
 'properties_felt': 'felt',
 'properties_cdi': 'cdi',
 'properties_mmi': 'mmi',
 'properties_alert': 'alert',
 'properties_status': 'status',
 'properties_tsunami': 'tsunami',
 'properties_sig': 'sig',
 'properties_net': 'net',
 'properties_code': 'code',
 'properties_ids': 'ids',
 'properties_sources': 'sources',
 'properties_types': 'types',
 'properties_nst': 'nst',
 'properties_dmin': 'dmin',
 'properties_rms': 'rms',
 'properties_gap': 'gap',
 'properties_magType': 'magType',
 'properties_type': 'type',
 'properties_title': 'title',
 'geometry_coordinates_latitude': 'geometry_coordinates_latitude',
 'geometry_coordinates_longitude': 'geometry_coordinates_longitude',
 'geometry_coordinates_depth': 'geometry_coordinates_depth',
 'region': 'region'}

In [43]:
from functools import reduce

df = reduce(lambda df, idx: df.withColumnRenamed(list(new_columns.keys())[idx], list(new_columns.values())[idx]), range(len(new_columns)), df)

In [44]:
df.show()

+------------+-----+--------------------+--------------------+--------------------+----+--------------------+--------------------+----+----+----+-----+--------+-------+---+---+----------+--------------------+-------+--------------------+----+------------+----+-----+-------+----------+--------------------+-----------------------------+------------------------------+--------------------------+-------------------+
|          id|  mag|               place|                time|             updated|  tz|                 url|              detail|felt| cdi| mmi|alert|  status|tsunami|sig|net|      code|                 ids|sources|               types| nst|        dmin| rms|  gap|magType|      type|               title|geometry_coordinates_latitude|geometry_coordinates_longitude|geometry_coordinates_depth|             region|
+------------+-----+--------------------+--------------------+--------------------+----+--------------------+--------------------+----+----+----+-----+--------+-------+--

In [45]:
df.columns

['id',
 'mag',
 'place',
 'time',
 'updated',
 'tz',
 'url',
 'detail',
 'felt',
 'cdi',
 'mmi',
 'alert',
 'status',
 'tsunami',
 'sig',
 'net',
 'code',
 'ids',
 'sources',
 'types',
 'nst',
 'dmin',
 'rms',
 'gap',
 'magType',
 'type',
 'title',
 'geometry_coordinates_latitude',
 'geometry_coordinates_longitude',
 'geometry_coordinates_depth',
 'region']

In [46]:
df.write.mode("append").parquet("../data/transformed/2023/01")

                                                                                

In [47]:
df_2 = spark.read.option("inferSchema", "true").parquet("../data/raw/parquet/data_2023-01-02.parquet")

In [48]:
df_2 = df_2 \
.withColumn("properties_time", F.to_timestamp(col("properties_time")/1000)) \
.withColumn("properties_updated", F.to_timestamp(col("properties_updated")/1000)) \
.withColumn("region", when(col("properties_place").isNotNull(), last_item).otherwise(None)) \
.select(selected_columns)

In [49]:
df_2.write.mode("append").parquet("../data/transformed/2023/01")

In [50]:
df_2023_01 = spark.read.option("inferSchema", "true").parquet("../data/transformed/2023/01/")

In [51]:
df_2023_01.agg(min("properties_time"), max("properties_time")).show()

+--------------------+--------------------+
|min(properties_time)|max(properties_time)|
+--------------------+--------------------+
|2023-01-01 00:06:...|2023-01-02 23:55:...|
+--------------------+--------------------+

