In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, IntegerType, DateType, StructField, StringType, TimestampType
import logging, traceback
import requests
import sys

In [2]:
"""
# Used when submitting job to spark master with parameters
start_year = int(sys.argv[1])
end_year = int(sys.argv[2])
"""
start_year = 2022
end_year = 2022

In [3]:
# For ingestion to local (used when developing)
URL_PREFIX = 'https://noaa-ghcn-pds.s3.amazonaws.com'
TEMP_STORAGE_PATH = '/home/marcos/ghcn-d/spark/data'

In [4]:
# For local spark master
conf = pyspark.SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "gcs-connector-hadoop3-latest.jar, spark-bigquery-with-dependencies_2.12-0.24.2.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/home/marcos/.google/credentials/google_credentials.json") \
    .set("spark.hadoop.fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") \
    .set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")

sc = pyspark.SparkContext(conf=conf)

spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

22/04/21 14:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
22/04/21 14:55:33 WARN DependencyUtils: Local jar /home/marcos/ghcn-d/spark/gcs-connector-hadoop3-latest.jar does not exist, skipping.
22/04/21 14:55:33 WARN DependencyUtils: Local jar /home/marcos/ghcn-d/spark/spark-bigquery-with-dependencies_2.12-0.24.2.jar does not exist, skipping.
22/04/21 14:55:33 INFO SparkContext: Running Spark version 3.2.1
22/04/21 14:55:33 INFO ResourceUtils: No custom resources configured for spark.driver.
22/04/21 14:55:33 INFO SparkContext: Submitted application: test
22/04/21 14:55:33 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task res

22/04/21 14:55:35 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir.
22/04/21 14:55:35 INFO SharedState: Warehouse path is 'file:/home/marcos/ghcn-d/spark/spark-warehouse'.


In [5]:
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = "ghcnd_raw"
spark.conf.set('temporaryGcsBucket', bucket)

In [6]:
# Used only when developing with local spark master
def download_file(url, local_file_path):
    # NOTE the stream=True parameter below
    with requests.get(url, stream=True) as r:
        r.raise_for_status()
        with open(local_file_path, 'wb') as f:
            for chunk in r.iter_content(chunk_size=8192): 
                # If you have chunk encoded response uncomment if
                # and set chunk_size parameter to None.
                #if chunk: 
                f.write(chunk)
    return local_file_path

In [7]:
def process_year(year, mode, df_stations, df_countries):

  """
  # For developing process read directly from origin
  csv_file_name = f'/{year}.csv'
  dataset_url = URL_PREFIX + '/csv' + csv_file_name
  csv_file_path = TEMP_STORAGE_PATH + csv_file_name

  download_file(dataset_url, csv_file_path)    

  schema = StructType([
      StructField("id", StringType(), True),
      StructField("date", IntegerType(), True),
      StructField("element", StringType(), True),   
      StructField("value", IntegerType(), True),   
      StructField("m_flag", StringType(), True),   
      StructField("q_flag", StringType(), True),   
      StructField("s_flag", StringType(), True),
      StructField("obs_time",IntegerType(), True)
  ])

  df = spark.read \
    .options(header=False)
    .schema(schema)
    .csv(csv_file_path)
  """


  # Option, read from BQ
  df = spark.read.format('bigquery') \
    .option('project','ghcn-d') \
    .option('dataset','ghcnd') \
    .option('table',f'{year}').load()


  # Option, read from GCS
  #df = spark.read.parquet(f'gs://ghcnd_raw/{year}.parquet')

  print(f'processing year {year}...')
  # Only used when reading from csv in order to convert to date. 
  # If reading from BQ, this is already done
  # df = df.withColumn("date", F.to_date(df.date.cast("string"), "yyyyMMdd"))

  df = df \
    .drop("q_flag") \
    .withColumn("tmax", 
          F.when(df.element == "TMAX", 
              F.when(df.value > 700, None).otherwise(
                  F.when(df.value < -700, None). otherwise(
                      df.value.cast("double")/10)
                  )
          ).otherwise("None")
      ) \
      .withColumn("tmin", 
          F.when(df.element == "TMIN", 
              F.when(df.value > 700, None).otherwise(
                  F.when(df.value < -700, None). otherwise(
                      df.value.cast("double")/10)
                  )
          ).otherwise("None")
      ) \
      .withColumn("prcp", F.when(df.element == "PRCP", df.value.cast("double")).otherwise(None)) \
      .withColumn("snow", F.when(df.element == "SNOW", df.value.cast("double")).otherwise(None)) \
      .withColumn("snwd", F.when(df.element == "SNWD", df.value.cast("double")).otherwise(None))

  df_daily = df \
      .groupBy("id", "date").agg( 
          F.avg("tmax"),
          F.avg("tmin"),
          F.avg("prcp"),
          F.avg("snow"),
          F.avg("snwd"),
          F.first("m_flag"),
          F.first("s_flag")
      ) \
      .join(df_stations, df.id == df_stations.station_id, "inner") \
      .join(df_countries, df_stations.country_code == df_countries.code, "inner") \
      .drop ('station_id', 'code') \
      .toDF('id','date','tmax','tmin','prcp','snow','snwd','m_flag','s_flag','latitude','longitude','elevation','station_name','country_code','country_name') 

  # Note: toDF after joins, otherwise join will raise error
  # Note: toDF since BQ does not allow field names with () and average generates these kind of names avg(tmax)

  df_yearly =  df \
    .withColumn("date", F.trunc("date", "year")) \
    .groupBy("id", "date").agg( 
      F.avg("tmax"),
      F.avg("tmin"),
      F.avg("prcp"),
      F.avg("snow"),
      F.avg("snwd"),
      F.first("m_flag"),
      F.first("s_flag")
    ) \
    .join(df_stations, df.id == df_stations.station_id, "inner") \
    .join(df_countries, df_stations.country_code == df_countries.code, "inner") \
    .drop ('station_id', 'code') \
    .toDF('id','date','tmax','tmin','prcp','snow','snwd','m_flag','s_flag','latitude','longitude','elevation','station_name','country_code','country_name') \

  # For some reason, partition by date does not work after F.year("date"). This has to be fixed
  # Also, partition is needed for clustering
  df_yearly.write \
    .format("bigquery") \
    .mode(mode) \
    .option("clusteredFields", "date, country_code") \
    .option('project','ghcn-d') \
    .option('dataset','production') \
    .option('table','fact_observations_spark_yearly') \
    .save()
    
  
  df_daily.write \
    .format("bigquery") \
    .mode(mode) \
    .option("partitionField", "date") \
    .option("partitionType", "YEAR") \
    .option("clusteredFields", "country_code") \
    .option('project','ghcn-d') \
    .option('dataset','production') \
    .option('table','fact_observations_spark') \
    .save()
  


  print(f'process {year} done')

In [8]:

# Use if needed to read from BigQuery instead of GCS
df_stations = spark.read.format('bigquery') \
  .option('project','ghcn-d') \
  .option('dataset','ghcnd') \
  .option('table', 'stations').load() \
  .drop('state', 'gsn_flag', 'hcn_crn_flag', 'wmo_id') \
  .withColumnRenamed('name', 'station_name') \
  .withColumnRenamed('id', 'station_id') \
  .withColumn('country_code', F.substring('station_id', 0, 2))

df_countries = spark.read.format('bigquery') \
  .option('project','ghcn-d') \
  .option('dataset','ghcnd') \
  .option('table', 'countries').load() \
  .withColumnRenamed('name', 'country_name')
  

In [9]:
"""
df_stations = spark.read.parquet('gs://ghcnd_raw/ghcnd-stations.parquet') \
  .drop('state', 'gsn_flag', 'hcn_crn_flag', 'wmo_id') \
  .withColumnRenamed('name', 'station_name') \
  .withColumnRenamed('id', 'station_id') \
  .withColumn('country_code', F.substring('station_id', 0, 2))

df_countries = spark.read.parquet('gs://ghcnd_raw/ghcnd-countries.parquet') \
  .withColumnRenamed('name', 'country_name')
"""

"\ndf_stations = spark.read.parquet('gs://ghcnd_raw/ghcnd-stations.parquet')   .drop('state', 'gsn_flag', 'hcn_crn_flag', 'wmo_id')   .withColumnRenamed('name', 'station_name')   .withColumnRenamed('id', 'station_id')   .withColumn('country_code', F.substring('station_id', 0, 2))\n\ndf_countries = spark.read.parquet('gs://ghcnd_raw/ghcnd-countries.parquet')   .withColumnRenamed('name', 'country_name')\n"

In [10]:
for year in range(start_year, end_year+1):
  if year == start_year:
    process_year(year, 'overwrite', df_stations, df_countries)
  else:
    process_year(year, 'append', df_stations, df_countries)

processing year 2022...


22/04/21 14:55:48 INFO DirectBigQueryRelation: Querying table ghcn-d.ghcnd.2022, parameters sent from Spark: requiredColumns=[id,m_flag,s_flag,date,element,value], filters=[]
22/04/21 14:55:48 INFO DirectBigQueryRelation: Going to read from ghcn-d.ghcnd.2022 columns=[id, m_flag, s_flag, date, element, value], filter=''
22/04/21 14:55:50 INFO DirectBigQueryRelation: Created read session for table 'ghcn-d.ghcnd.2022': projects/ghcn-d/locations/europe-west6/sessions/CAISDE5ZMmFmR1F3eWoxehoCaHcaAmh4
22/04/21 14:55:50 INFO DirectBigQueryRelation: Querying table ghcn-d.ghcnd.stations, parameters sent from Spark: requiredColumns=[id,latitude,longitude,elevation,name], filters=[IsNotNull(id)]
22/04/21 14:55:50 INFO DirectBigQueryRelation: Going to read from ghcn-d.ghcnd.stations columns=[id, latitude, longitude, elevation, name], filter='(`id` IS NOT NULL)'
22/04/21 14:55:50 INFO DirectBigQueryRelation: Created read session for table 'ghcn-d.ghcnd.stations': projects/ghcn-d/locations/europe-we

22/04/21 14:55:54 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 6158 bytes result sent to driver
22/04/21 14:55:54 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 1483 ms on ghcnd.europe-west6-a.c.ghcn-d.internal (executor driver) (1/1)
22/04/21 14:55:54 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
22/04/21 14:55:54 INFO DAGScheduler: ResultStage 1 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264) finished in 1.609 s
22/04/21 14:55:54 INFO DAGScheduler: Job 1 is finished. Cancelling potential speculative or zombie tasks for this job
22/04/21 14:55:54 INFO TaskSchedulerImpl: Killing all running tasks in stage 1: Stage finished
22/04/21 14:55:54 INFO DAGScheduler: Job 1 finished: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264, took 1.966297 s
22/04/21 14:55:54 INFO CodeGenerator: Code generated in 15.479927 ms            
22/04/21 14:55:54 INFO MemoryStore: Block broadcast_3 stored as values in memory

22/04/21 14:57:23 INFO CodecPool: Got brand-new compressor [.snappy](0 + 1) / 1]
22/04/21 14:57:26 INFO GoogleCloudStorageFileSystem: Successfully repaired 'gs://ghcnd_raw/.spark-bigquery-local-1650552935200-01ca10a0-9961-46df-825e-97a7421cac08/_temporary/0/_temporary/' directory.
22/04/21 14:57:26 INFO FileOutputCommitter: Saved output of task 'attempt_20220421145722171570773572826873_0004_m_000000_3' to gs://ghcnd_raw/.spark-bigquery-local-1650552935200-01ca10a0-9961-46df-825e-97a7421cac08/_temporary/0/task_20220421145722171570773572826873_0004_m_000000
22/04/21 14:57:26 INFO SparkHadoopMapRedUtil: attempt_20220421145722171570773572826873_0004_m_000000_3: Committed
22/04/21 14:57:26 INFO Executor: Finished task 0.0 in stage 4.0 (TID 3). 4525 bytes result sent to driver
22/04/21 14:57:26 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 3) in 3038 ms on ghcnd.europe-west6-a.c.ghcn-d.internal (executor driver) (1/1)
22/04/21 14:57:26 INFO TaskSchedulerImpl: Removed TaskSet 4.0, 

22/04/21 14:57:32 INFO Executor: Finished task 0.0 in stage 6.0 (TID 5). 6072 bytes result sent to driver
22/04/21 14:57:32 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 5) in 257 ms on ghcnd.europe-west6-a.c.ghcn-d.internal (executor driver) (1/1)
22/04/21 14:57:32 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 
22/04/21 14:57:32 INFO DAGScheduler: ResultStage 6 ($anonfun$withThreadLocalCaptured$1 at FutureTask.java:264) finished in 0.270 s
22/04/21 14:57:32 INFO DAGScheduler: Job 5 is finished. Cancelling potential speculative or zombie tasks for this job
22/04/21 14:57:32 INFO TaskSchedulerImpl: Killing all running tasks in stage 6: Stage finished
22/04/21 14:57:32 INFO DAGScheduler: Job 5 finished: $anonfun$withThreadLocalCaptured$1 at FutureTask.java:264, took 0.277867 s
22/04/21 14:57:32 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 4.0 MiB, free 396.3 MiB)
22/04/21 14:57:32 INFO MemoryStore: Bloc

22/04/21 14:58:59 INFO UnsafeExternalSorter: Thread 198 spilling sort data of 100.0 MiB to disk (0  time so far)
22/04/21 14:58:59 INFO UnsafeExternalSorter: Thread 240 spilling sort data of 100.0 MiB to disk (0  time so far)
22/04/21 14:58:59 INFO BlockManagerInfo: Removed broadcast_8_piece0 on ghcnd.europe-west6-a.c.ghcn-d.internal:40217 in memory (size: 19.3 KiB, free: 428.3 MiB)
22/04/21 14:58:59 INFO UnsafeExternalSorter: Thread 238 spilling sort data of 100.0 MiB to disk (0  time so far)
22/04/21 14:58:59 INFO UnsafeExternalSorter: Thread 239 spilling sort data of 100.0 MiB to disk (0  time so far)
22/04/21 14:59:01 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
22/04/21 14:59:01 INFO FileOutputCommitter: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false
22/04/21 14:59:01 INFO SQLHadoopMapReduceCommitProtocol: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputComm

22/04/21 14:59:01 INFO ParquetWriteSupport: Initialized Parquet WriteSupport with Catalyst schema:
{
  "type" : "struct",
  "fields" : [ {
    "name" : "id",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "date",
    "type" : "date",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "tmax",
    "type" : "double",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "tmin",
    "type" : "double",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "prcp",
    "type" : "double",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "snow",
    "type" : "double",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "snwd",
    "type" : "double",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "m_flag",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "name" : "s_flag",
    "type" : "string",
    "nullable" : true,
    "metadata" : { }
  }, {
    "n

22/04/21 14:59:12 INFO Executor: Executor is trying to kill task 3.0 in stage 9.0 (TID 10), reason: Stage cancelled
22/04/21 14:59:12 INFO Executor: Executor is trying to kill task 1.0 in stage 9.0 (TID 8), reason: Stage cancelled
22/04/21 14:59:12 INFO TaskSchedulerImpl: Stage 9 was cancelled
22/04/21 14:59:12 INFO DAGScheduler: ResultStage 9 (save at BigQueryWriteHelper.scala:64) failed in 13.870 s due to Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 7) (ghcnd.europe-west6-a.c.ghcn-d.internal executor driver): org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at or

22/04/21 14:59:13 WARN Utils: Suppressing exception in catch: Java heap space
java.lang.OutOfMemoryError: Java heap space
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.buildContentChunk(MediaHttpUploader.java:579)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.resumableUpload(MediaHttpUploader.java:380)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.media.MediaHttpUploader.upload(MediaHttpUploader.java:308)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:528)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
	at com.google.cloud.hadoop.repackaged.gcs.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execut

ConnectionRefusedError: [Errno 111] Connection refused

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 480, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/clientserver.py", line 503, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
