In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import col, to_timestamp

In [2]:
credentials_location = '../../../.secrets/gcs_credentials_key.json'

In [4]:
#adjusting spark configuration
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)\
    .set("spark.sql.parquet.enableVectorizedReader","false")\
    .set("spark.sql.parquet.writeLegacyFormat","true")\
    .set("spark.sql.files.ignoreCorruptFiles", "true")

#spark.sql.files.ignoreCorruptFiles <- important

In [5]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")


23/04/07 17:40:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [127]:
# log treshold: "WARN", "ERROR"
sc.setLogLevel("ERROR")

In [36]:
#in case you need to stop spark context + open/clode jupyter notebook
#sc.stop()

In [7]:
spark = SparkSession.builder\
    .config(conf=sc.getConf())\
    .getOrCreate()

In [43]:
#usefull info
spark

In [68]:
from pyspark.sql import types

In [70]:
schema = types.StructType([
    types.StructField("station", types.StringType(), True),
    types.StructField("valid", types.StringType(), True),
    types.StructField("lon", types.DoubleType(), True),
    types.StructField("lat", types.DoubleType(), True),
    types.StructField("elevation", types.DoubleType(), True),
    types.StructField("tmpf", types.DoubleType(), True),
    types.StructField("dwpf", types.DoubleType(), True),
    types.StructField("relh", types.DoubleType(), True),
    types.StructField("drct", types.DoubleType(), True),
    types.StructField("sknt", types.DoubleType(), True),
    types.StructField("p01i", types.DoubleType(), True),
    types.StructField("alti", types.DoubleType(), True),
    types.StructField("mslp", types.DoubleType(), True),
    types.StructField("vsby", types.DoubleType(), True),
    types.StructField("gust", types.DoubleType(), True),
    types.StructField("skyc1", types.StringType(), True),
    types.StructField("skyc2", types.StringType(), True),
    types.StructField("skyc3", types.StringType(), True),
    types.StructField("skyc4", types.StringType(), True),
    types.StructField("skyl1", types.DoubleType(), True),
    types.StructField("skyl2", types.DoubleType(), True),
    types.StructField("skyl3", types.DoubleType(), True),
    types.StructField("skyl4", types.DoubleType(), True),
    types.StructField("wxcodes", types.StringType(), True),
    types.StructField("ice_accretion_1hr", types.DoubleType(), True),
    types.StructField("ice_accretion_3hr", types.DoubleType(), True),
    types.StructField("ice_accretion_6hr", types.DoubleType(), True),
    types.StructField("peak_wind_gust", types.DoubleType(), True),
    types.StructField("peak_wind_drct", types.DoubleType(), True),
    types.StructField("peak_wind_time", types.StringType(), True),
    types.StructField("feel", types.DoubleType(), True),
    types.StructField("metar", types.StringType(), True),
    types.StructField("snowdepth", types.DoubleType(), True)
])

In [115]:
#get data from GCS bucket
metar_all_data = spark.read\
    .schema(schema=schema)\
    .parquet('gs://batch-metar-bucket-v1/data/*/*/*')

In [116]:
metar_all_data = metar_all_data\
    .withColumn('valid', to_timestamp(col('valid'), 'yyy-MM-dd HH:mm'))

In [117]:
metar_all_data.count()

                                                                                

104002

In [120]:
#schema check
#metar_all_data.printSchema()

Spark SQL

In [122]:
metar_all_data.createGlobalTempView('metar_data_all')

In [123]:
#SQL test
data = spark.sql('''
          SELECT *
          FROM metar_data_all
          ''')

In [125]:
#save local
data\
    .coalesce(1)\
    .write.csv('../flows/data/reports', mode='overwrite')

                                                                                