In [8]:
from pyspark.sql import SparkSession
from google.cloud import bigquery
from google.oauth2 import service_account

In [None]:
# Forma 1 de crear una instancia de SparkSession
spark = SparkSession.builder.appName("EjemploLecturaJSON").getOrCreate()

In [9]:
# Forma 2 de crear una instancia de SparkSession con lo necesario para poder cargar data a big query con el conector de BigQuery
spark = SparkSession.builder \
  .appName("EjemploLecturaJSON") \
  .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.21.0") \
  .getOrCreate()

In [10]:
ruta_json = "../Dataset_Original/Yelp/review-001.json"

#leemos el archivo JSON en un DataFrame de Spark
df = spark.read.json(ruta_json)

#mostramos el contenido del DataFrame
df.show()

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+
|XQfwVwDr-v0ZS3_Cb...|   0|2018-07-07 22:09:11|    0|KU_O5udG6zpxOg-Vc...|  3.0|If you decide to ...|     0|mh_-eMZ6K5RLWhZyI...|
|7ATYjTIgM3jUlt4UM...|   1|2012-01-03 15:28:18|    0|BiTunyQ73aT9WBnpR...|  5.0|I've taken a lot ...|     1|OyoGAe7OKpv6SyGZT...|
|YjUWPpI6HXG530lwP...|   0|2014-02-05 20:30:30|    0|saUsX_uimxRlCVr67...|  3.0|Family diner. Had...|     0|8g_iMtfSiwikVnbP2...|
|kxX2SOes4o-D3ZQBk...|   1|2015-01-04 00:01:03|    0|AqPFMleE6RsU23_au...|  5.0|Wow!  Yummy, diff...|     1|_7bHUi9Uuf5__HHc_...|
|e4Vwtrqf-wpJfwesg...|   1|2017-01-14 20:54:15|    0|Sx8TMOWLNuJBWer-0...|  4.0|Cute inter

Prueba con SQL

In [None]:
# Crea o reemplaza una vista temporal para el DataFrame
df.createOrReplaceTempView("pruebaTabla1")

In [None]:
# Ejecuta una consulta SQL
resultado_sql = spark.sql("SELECT count(*) FROM mi_tabla")

# Muestra el resultado
resultado_sql.show()


Cargamos la tabla en big query

In [None]:
# Proporciona la ruta al archivo de clave de tu cuenta de servicio
credentials = service_account.Credentials.from_service_account_file(
    '../credenciales.json')

table_id = 'proyecto-final-henry-412703.Datawarehouse_PF.pruebaTabla1'

# Crea un cliente de BigQuery con tus credenciales
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

Forma 1: Con el cliente de BigQuery

In [None]:
# Carga el DataFrame en BigQuery
job = client.load_table_from_dataframe(df, table_id)

# Espera a que la carga se complete
job.result()

print("Carga completada.")

Otra forma de hacer la carga el conector de BigQuery

In [None]:
# Suponiendo que `df` ya está definido y tienes `spark` como tu SparkSession
# Configura las opciones necesarias para BigQuery
bucket = "data_lake_pf_henry"
project = "proyecto-final-henry"
dataset = "Datawarehouse_PF"
table = "pruebaTabla1"

In [None]:

# Guarda el DataFrame en BigQuery
df.write.format("bigquery") \
    .option("temporaryGcsBucket", bucket) \
    .option("table", table_id) \
    .mode("append") \
    .save()


In [None]:
# Detén la sesión de Spark al finalizar
spark.stop()