In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import date_format

In [32]:
credentials_location = '/workspaces/de-eq-asmnt-2024/google_credentials/de-eq-asmnt-2024-6ee51b1c99e1.json'
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.jars", "./lib/spark-bigquery-with-dependencies_2.12-0.23.2.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [34]:
sc = SparkContext.getOrCreate(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

In [40]:
gs_bucket_raw = 'gs://de-eq-asmnt-2024-raw-bucket'
gs_bucket_stage = 'gs://de-eq-asmnt-2024-staging-bucket'

In [41]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

In [44]:
df = spark.read.csv(gs_bucket_raw + '/eq_events/raw/*/*', header='true')

                                                                                

In [45]:
df = df.withColumn("year", date_format(df.date, "yyyy")).withColumn("month", date_format(df.date, "MM"))

In [46]:
df.show()

24/04/17 04:26:06 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , event_id, datetime, location, latitude, longitude, depth, magnitude, significance, alert, country, date, timestamp, level
 Schema: _c0, event_id, datetime, location, latitude, longitude, depth, magnitude, significance, alert, country, date, timestamp, level
Expected: _c0 but found: 
CSV file: gs://de-eq-asmnt-2024-raw-bucket/eq_events/raw/raw/eq_events_2005/eq_events_2005_10.csv
+------+----------+--------------------+--------------------+--------+------------+------+---------+------------+-----+-------+----------+---------------+-----+----+-----+
|   _c0|  event_id|            datetime|            location|latitude|   longitude| depth|magnitude|significance|alert|country|      date|      timestamp|level|year|month|
+------+----------+--------------------+--------------------+--------+------------+------+---------+------------+-----+-------+----------+---------------+-----+----+-----+
|564993

In [47]:
df = df.drop('_c0')

In [48]:
df.registerTempTable('eq_events')

In [49]:
df_final = spark.sql("""
select *
from eq_events
cluster by country
""")

In [50]:
df_monthly = spark.sql("""
select 
    country, 
    date_trunc('month', date) as eq_month,
    COUNT(event_id) as events_occ,
    avg(depth) as avg_depth_month,
    avg(magnitude) as avg_mag_month,
    avg(significance) as avg_sig_month,
    max(depth) as max_depth_month,
    max(magnitude) as max_mag_month,
    max(significance) as max_sig_month,
    min(depth) as min_depth_month,
    min(magnitude) as min_mag_month,
    min(significance) as min_sig_month
from eq_events
group by 1,2
cluster by country
""")

In [51]:
df_week = spark.sql("""
select 
    country, 
    date_trunc('week', date) as eq_week,
    COUNT(event_id) as events_occ,
    avg(depth) as avg_depth_week,
    avg(magnitude) as avg_mag_week,
    avg(significance) as avg_sig_week,
    max(depth) as max_depth_week,
    max(magnitude) as max_mag_week,
    max(significance) as max_sig_week,
    min(depth) as min_depth_week,
    min(magnitude) as min_mag_week,
    min(significance) as min_sig_week
from eq_events
group by 1,2
cluster by country
""")

In [52]:
df_daily = spark.sql("""
select 
    country, 
    date,
    COUNT(event_id) as events_occ,
    avg(depth) as avg_depth_day,
    avg(magnitude) as avg_mag_day,
    avg(significance) as avg_sig_day,
    max(depth) as max_depth_day,
    max(magnitude) as max_mag_day,
    max(significance) as max_sig_day,
    min(depth) as min_depth_day,
    min(magnitude) as min_mag_day,
    min(significance) as min_sig_day
from eq_events
group by 1,2 
cluster by country
""")

In [53]:
df_monthly.count()

                                                                                

34963

In [18]:
df.coalesce(1).write.option("header", "true").partitionBy('year', 'month').parquet(gs_bucket + '/eq_events/processed/final', mode='overwrite')

                                                                                

In [19]:
df_week.coalesce(1).write.option("header", "true").partitionBy('eq_week').parquet(gs_bucket + '/eq_events/processed/weekly/', mode='overwrite')

                                                                                

In [20]:
df_monthly.coalesce(1).write.option("header", "true").partitionBy('eq_month').parquet(gs_bucket + '/eq_events/processed/monthly/', mode='overwrite')

                                                                                

In [22]:
df_daily.coalesce(1).write.option("header", "true").parquet(gs_bucket + '/eq_events/processed/daily/', mode='overwrite')

                                                                                

In [None]:
df.coalesce(1).write.option("header", "true").partitionBy('year', 'month').parquet(gs_bucket + '/eq_events/processed/final', mode='overwrite')
df_week.coalesce(1).write.option("header", "true").partitionBy('eq_week').parquet(gs_bucket + '/eq_events/processed/weekly/', mode='overwrite')
df_monthly.coalesce(1).write.option("header", "true").partitionBy('eq_month').parquet(gs_bucket + '/eq_events/processed/monthly/', mode='overwrite')
df_daily.coalesce(1).write.option("header", "true").parquet(gs_bucket + '/eq_events/processed/daily/', mode='overwrite')

In [54]:
df_daily.write.format('bigquery').option('table', 'eq_events.eq_daily').option("header", "true").save()

Py4JJavaError: An error occurred while calling o378.save.
: java.lang.ClassNotFoundException: 
Failed to find data source: bigquery. Please find packages at
https://spark.apache.org/third-party-projects.html
       
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedToFindDataSourceError(QueryExecutionErrors.scala:587)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:675)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:725)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:864)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:256)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:661)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:661)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:661)
	... 16 more
