In [30]:
BUCKET_NAME = 'air-quality-data-0123'
JSON_KEY_PATH = '/home/ali_marzouk/air-quality-421919-36f116eb9049.json'
BQ_DATASET = 'air_quality_0123'
DATE_TABLE_NAME = 'date'
LOCATION_TABLE_NAME = 'location'
PARAMETER_TABLE_NAME = 'parameter'
MEASUREMENT_TABLE_NAME = 'measurement'

In [31]:
import os
print(os.environ['HADOOP_CLASSPATH'])

:/home/ali_marzouk/gcs-connector/gcs-connector-hadoop3-latest.jar:/home/ali_marzouk/bigquery-connector/spark-bigquery-with-dependencies_2.12-0.38.0.jar


In [32]:
import pyspark
from pyspark.sql import SparkSession

spark = spark = SparkSession.builder \
    .appName('spark_gcs_to_big_query') \
    .config("spark.jars", "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-latest.jar,https://github.com/GoogleCloudDataproc/spark-bigquery-connector/releases/download/0.38.0/spark-bigquery-with-dependencies_2.12-0.38.0.jar") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR") 
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.json.keyfile", JSON_KEY_PATH)
spark._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark._jsc.hadoopConfiguration().set("google.cloud.auth.service.account.enable", "true")

In [33]:
from datetime import date

def load_today_data_from_gcs():
    today = date.today()
    today_date_str = today.strftime("%d-%m-%Y")
    return spark.read.format("parquet").load(f"gs://{BUCKET_NAME}/{today_date_str}/")

In [34]:
from pyspark.sql import DataFrame as SparkDataFrame
from typing import List

def drop_duplicates(dfs: List[SparkDataFrame]):
    results = []
    for df in dfs:
        results.append(df.dropDuplicates(["external_id"]))
    return results

In [35]:
from pyspark.sql.functions import explode, col, to_timestamp, udf, expr
from pyspark.sql.types import StringType

def get_tables_dfs(full_data_df):
    date_extenal_id_builder_udf = udf(lambda tmpstmp_utc, local: "{}.{}".format(tmpstmp_utc, local))
    location_external_id_builder_udf = udf(lambda location_id: str(location_id))
    measurement_extenal_id_builder_udf = udf(lambda tmpstmp_utc, local, location_id, parameter: "{}.{}.{}.{}".format(tmpstmp_utc, local, location_id, parameter))
    parameter_externla_id_builder_udf = udf(lambda name, unit: "{}.{}".format(name, unit))

    read_df = full_data_df.select(explode("result.results").alias('data'))\
            .withColumn('date_uuid', expr("uuid()"))\
            .withColumn('parameter_uuid', expr("uuid()"))
    date_df = read_df.select(col("date_uuid").alias('id'), to_timestamp('data.date.utc').alias("timestamp_utc"),'data.date.local', date_extenal_id_builder_udf("data.date.utc", "data.date.local").alias("external_id"))
    location_df = read_df.select('data.coordinates.longitude', 'data.coordinates.latitude', col("data.locationId").cast(StringType()).alias('id'), "data.location", "data.country", "data.city", location_external_id_builder_udf("data.locationId").alias("external_id"))
    parameter_df = read_df.select(col("parameter_uuid").alias('id'), 'data.unit', col('data.parameter').alias('name'), parameter_externla_id_builder_udf("data.parameter", "data.unit").alias("external_id"))
    measurement_df = read_df \
            .select("data.value", col("parameter_uuid").alias("parameter_id"), col("date_uuid").alias("date_id"), col("data.locationId").alias("location_id").cast(StringType()), \
                    measurement_extenal_id_builder_udf\
                    (to_timestamp('data.date.utc'),"data.date.local",col("data.locationId"), "data.parameter").alias("external_id"))
    return [date_df, location_df, parameter_df, measurement_df]

In [36]:
def save_df_to_bq(table_df, table_name):
    table_df.drop('external_id') \
        .write \
        .format("bigquery") \
        .option("table","{}.{}".format(BQ_DATASET, table_name)) \
        .option("temporaryGcsBucket", BUCKET_NAME) \
        .mode('append') \
        .save()

In [37]:
full_data_df = load_today_data_from_gcs()
[date_df, location_df, parameter_df, measurement_df] = get_tables_dfs(full_data_df)
print ("[spark_gcs_to_big_query] - [SAVING] date data frame")
save_df_to_bq(date_df, DATE_TABLE_NAME)
print ("[spark_gcs_to_big_query] - [SAVED] date data frame")

print ("[spark_gcs_to_big_query] - [SAVING] location data frame")
save_df_to_bq(location_df, LOCATION_TABLE_NAME)
print ("[spark_gcs_to_big_query] - [SAVED] location data frame")

print ("[spark_gcs_to_big_query] - [SAVING] parameter data frame")
save_df_to_bq(parameter_df, PARAMETER_TABLE_NAME)
print ("[spark_gcs_to_big_query] - [SAVED] parameter data frame")

print ("[spark_gcs_to_big_query] - [SAVING] measurement data frame")
save_df_to_bq(measurement_df, MEASUREMENT_TABLE_NAME)
print ("[spark_gcs_to_big_query] - [SAVED] measurement data frame")

                                                                                

[spark_gcs_to_big_query] - [SAVING] date data frame


                                                                                

[spark_gcs_to_big_query] - [SAVED] date data frame
[spark_gcs_to_big_query] - [SAVING] location data frame


                                                                                

[spark_gcs_to_big_query] - [SAVED] location data frame
[spark_gcs_to_big_query] - [SAVING] parameter data frame


                                                                                

[spark_gcs_to_big_query] - [SAVED] parameter data frame
[spark_gcs_to_big_query] - [SAVING] measurement data frame


                                                                                

[spark_gcs_to_big_query] - [SAVED] measurement data frame


In [63]:
spark.sparkContext.stop()
spark.stop()