In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-procesamiento')\
  .getOrCreate()

ruta_curated_empresas_google_cloud = "gs://curso-apache-spark/datalake/curated/empresas"
ruta_curated_personas_google_cloud = "gs://curso-apache-spark/datalake/curated/personas/"

In [4]:
df_personas_curated = spark.read.format("parquet").option("header","true").load(ruta_curated_personas_google_cloud)

df_empresas_curated = spark.read.format("parquet").option("header","true").load(ruta_curated_empresas_google_cloud)

In [5]:
df_personas_curated.show()

[Stage 4:>                                                          (0 + 1) / 1]

+---+---------+-----------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+-----------+--------------------+-------------+----+-------+----------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
|  6|     Bert|    7974453|a.felis.ullamcorp...|   2017-04-25|  70| 7800.0|         7|
|  7|     Mark|16801026792|Quisque.ac@placer...|   2006-04-21|  52| 8112.0|         5|
|  8|    Jonah|    2142975|eu.ultrices.sit@v...|   2017-10-07|  23|17040.0|         5|
|  9|    Hanae|    9352277|          eu@Nun

                                                                                

In [6]:
df_personas_curated.createOrReplaceTempView("tb_personas")
df_empresas_curated.createOrReplaceTempView("tb_empresas")

df_join = spark.sql("SELECT * FROM tb_personas p inner join tb_empresas e on e.ID = p.ID_EMPRESA")
df_join.show(5)

+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
| ID|   NOMBRE|   telefono|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA| ID|EMPRESA_NAME|
+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
|  1|     Carl|17456339145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|  5|      AMAZON|
|  2|Priscilla|    1552498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|  2|   MICROSOFT|
|  3|  Jocelyn|12049568594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|  3|       APPLE|
|  4|    Aidan|17198629385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10| 10|        SONY|
|  5|  Leandra|    8398044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|  1|     WALMART|
+---+---------+-----------+--------------------+-------------+----+-------+----------+---+------------+
only showing top 5 rows



In [7]:
df_select = df_join.select(col('EDAD'),col('SALARIO'),col('EMPRESA_NAME'))
df_select.show()

+----+-------+------------+
|EDAD|SALARIO|EMPRESA_NAME|
+----+-------+------------+
|  32|20095.0|      AMAZON|
|  34| 9298.0|   MICROSOFT|
|  27|10853.0|       APPLE|
|  29| 3387.0|        SONY|
|  41|22102.0|     WALMART|
|  70| 7800.0|     SAMSUNG|
|  52| 8112.0|      AMAZON|
|  23|17040.0|      AMAZON|
|  69| 6834.0|       APPLE|
|  19| 7996.0|     SAMSUNG|
|  48| 4913.0|          HP|
|  24|19943.0|          HP|
|  34| 9501.0|      AMAZON|
|  59|16289.0|   MICROSOFT|
|  27| 1539.0|      AMAZON|
|  26| 3377.0|   MICROSOFT|
|  60| 6851.0|      GOOGLE|
|  34| 4759.0|     SAMSUNG|
|  70|17403.0|      TOYOTA|
|  24|18752.0|         IBM|
+----+-------+------------+
only showing top 20 rows



In [8]:
ruta_functional = "gs://curso-apache-spark/datalake/functional/salario_empresa/"

df_select.repartition(1).write.mode("overwrite").format("parquet").save(ruta_functional)

                                                                                

In [9]:
df_salario_empresa = spark.read.format("parquet").option("header","true").load(ruta_functional)
df_salario_empresa.show(10)

+----+-------+------------+
|EDAD|SALARIO|EMPRESA_NAME|
+----+-------+------------+
|  32|20095.0|      AMAZON|
|  34| 9298.0|   MICROSOFT|
|  27|10853.0|       APPLE|
|  29| 3387.0|        SONY|
|  41|22102.0|     WALMART|
|  70| 7800.0|     SAMSUNG|
|  52| 8112.0|      AMAZON|
|  23|17040.0|      AMAZON|
|  69| 6834.0|       APPLE|
|  19| 7996.0|     SAMSUNG|
+----+-------+------------+
only showing top 10 rows



# Ejecución en consola de .py con apache spark

Comando para ejecutar **pyspark**
<code> spark-submit --master local[*] "path_archivo_.py" </code>

Comando para procesar con **bigquery**

<code> spark-submit --master local[*] "path_archivo_.py" --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar</code>


# CARGA DE ARCHIVO PARQUET A BIGQUERY 
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet

In [12]:
from google.cloud import bigquery

client = bigquery.Client()

# TODO: MODIFICAR EL NOMBRE DE TABLA DE BIGQUERY
table_id = "course-big-data-336218.datalake.salario_empresa "

job_config = bigquery.LoadJobConfig(write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
                                    source_format=bigquery.SourceFormat.PARQUET,
                                   )
uri = "gs://curso-apache-spark/datalake/functional/salario_empresa/*.parquet"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  

load_job.result()

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))


Loaded 100 rows.
