In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
spark = SparkSession.builder.getOrCreate()
spark

# Landing

**persona**

In [None]:
# 4.1 Estructura del dataframe.
df_schema = StructType([
StructField("ID", StringType(),True),
StructField("NOMBRE", StringType(),True),
StructField("TELEFONO", StringType(),True),
StructField("CORREO", StringType(),True),
StructField("FECHA_INGRESO", StringType(),True),
StructField("EDAD", IntegerType(),True),
StructField("SALARIO", DoubleType(),True),
StructField("ID_EMPRESA", StringType(),True),
])

In [None]:
# 4.2 Definimos ruta del archivo

#Archivo en Cloud Storage - Google Cloud Platform
ruta_persona_raw = "gs://bigdata-dmc-faru/datalake/WORKLOAD/PERSONAS/persona.data"

In [None]:
df_personas = spark.read.format("CSV").option("header","true").option("delimiter","|").schema(df_schema).load(ruta_persona_raw)
df_personas.printSchema()
df_personas.show(5)

root
 |-- ID: string (nullable = true)
 |-- NOMBRE: string (nullable = true)
 |-- TELEFONO: string (nullable = true)
 |-- CORREO: string (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- EDAD: integer (nullable = true)
 |-- SALARIO: double (nullable = true)
 |-- ID_EMPRESA: string (nullable = true)



[Stage 0:>                                                          (0 + 1) / 1]

+---+---------+--------------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|      TELEFONO|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+--------------+--------------------+-------------+----+-------+----------+
|  1|     Carl|1-745-633-9145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|      155-2498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|1-204-956-8594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|1-719-862-9385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|      839-8044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
+---+---------+--------------+--------------------+-------------+----+-------+----------+
only showing top 5 rows



                                                                                

In [None]:
num_rows = df_personas.count()
print("La cantidad de registro del dataframe es: ", num_rows)

[Stage 1:>                                                          (0 + 1) / 1]

La cantidad de registro del dataframe es:  100


                                                                                

In [None]:
df_personas.describe('salario').show()

+-------+-----------------+
|summary|          salario|
+-------+-----------------+
|  count|              100|
|   mean|         11684.55|
| stddev|6841.493958437246|
|    min|           1256.0|
|    max|          24575.0|
+-------+-----------------+



In [None]:
#Archivo en Cloud Storage - Google Cloud Platform
ruta_persona_landing = "gs://bigdata-dmc-faru/datalake/LANDING/PERSONAS/"
#Si alguna carpeta no existe, automáticamente Spark la va a crear

#Archivo DBFS - DataBricks
# ruta_persona_landing = "/FileStore/tables/landing/personas/"


#Archivo en HDFS - Hadoop
#ruta_persona_landing = "hdfs:/introduccion-apache-spark/datalake/landing/personas/"

df_personas.write.mode("overwrite").format("parquet").save(ruta_persona_landing)

                                                                                

**empresa**

In [None]:
#Estructura del dataframe.
df_schema_empresas = StructType([
StructField("ID", StringType(),True),
StructField("EMPRESA_NAME", StringType(),True)
])

#Archivo en Cloud Storage - Google Cloud Platform
ruta_empresa_raw = "gs://bigdata-dmc-faru/datalake/WORKLOAD/EMPRESAS/empresa.data"

df_empresa = spark.read.format("CSV").option("header","true").option("delimiter","|").schema(df_schema_empresas).load(ruta_empresa_raw)
df_empresa.printSchema()
df_empresa.show(5)

ruta_empresa_landing = "gs://bigdata-dmc-faru/datalake/LANDING/EMPRESAS/"
#Si alguna carpeta no existe, automáticamente Spark la va a crear

df_empresa.write.mode("overwrite").format("parquet").save(ruta_empresa_landing)

root
 |-- ID: string (nullable = true)
 |-- EMPRESA_NAME: string (nullable = true)

+---+------------+
| ID|EMPRESA_NAME|
+---+------------+
|  1|     Walmart|
|  2|   Microsoft|
|  3|       Apple|
|  4|      Toyota|
|  5|      Amazon|
+---+------------+
only showing top 5 rows



                                                                                

**TRANSACCION**

In [None]:
#Estructura del dataframe.
df_schema_transaccion = StructType([
StructField("ID_PERSONA", StringType(),True),
StructField("ID_EMPRESA", StringType(),True),
StructField("MONTO", DoubleType(),True),
StructField("FECHA", StringType(),True)
])

#Archivo en Cloud Storage - Google Cloud Platform
ruta_transaccion_raw = "gs://bigdata-dmc-faru/datalake/WORKLOAD/TRANSACCIONES/transacciones.data"

df_transaccion = spark.read.format("CSV").option("header","true").option("delimiter","|").schema(df_schema_transaccion).load(ruta_transaccion_raw)
df_transaccion.printSchema()
df_transaccion.show(5)

ruta_transaccion_landing = "gs://bigdata-dmc-faru/datalake/LANDING/TRANSACCIONES/"
#Si alguna carpeta no existe, automáticamente Spark la va a crear

df_transaccion.write.mode("overwrite").format("parquet").save(ruta_transaccion_landing)

root
 |-- ID_PERSONA: string (nullable = true)
 |-- ID_EMPRESA: string (nullable = true)
 |-- MONTO: double (nullable = true)
 |-- FECHA: string (nullable = true)

+----------+----------+------+----------+
|ID_PERSONA|ID_EMPRESA| MONTO|     FECHA|
+----------+----------+------+----------+
|        18|         3|1383.0|2018-01-21|
|        30|         6|2331.0|2018-01-21|
|        47|         2|2280.0|2018-01-21|
|        28|         1| 730.0|2018-01-21|
|        91|         4|3081.0|2018-01-21|
+----------+----------+------+----------+
only showing top 5 rows



                                                                                

# Curated

**persona**

In [None]:
df_personas_landing = spark.read.format("parquet").option("header","true").load(ruta_persona_landing)
df_personas_landing.printSchema()
df_personas_landing.show(10)
# transformar según reglas de negocio
df_personas_procesado = df_personas_landing.withColumn('telefono', regexp_replace('telefono', '-', ''))
df_personas_procesado.show(10)

ruta_persona_curated = 'bigdata-dmc-faru/datalake/CURATED/PERSONAS'
df_personas_procesado.write.mode("overwrite").format("parquet").save(ruta_persona_curated)

root
 |-- ID: string (nullable = true)
 |-- NOMBRE: string (nullable = true)
 |-- TELEFONO: string (nullable = true)
 |-- CORREO: string (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- EDAD: integer (nullable = true)
 |-- SALARIO: double (nullable = true)
 |-- ID_EMPRESA: string (nullable = true)



                                                                                

+---+---------+--------------+--------------------+-------------+----+-------+----------+
| ID|   NOMBRE|      TELEFONO|              CORREO|FECHA_INGRESO|EDAD|SALARIO|ID_EMPRESA|
+---+---------+--------------+--------------------+-------------+----+-------+----------+
|  1|     Carl|1-745-633-9145|arcu.Sed.et@ante....|   2004-04-23|  32|20095.0|         5|
|  2|Priscilla|      155-2498|Donec.egestas.Ali...|   2019-02-17|  34| 9298.0|         2|
|  3|  Jocelyn|1-204-956-8594|amet.diam@loborti...|   2002-08-01|  27|10853.0|         3|
|  4|    Aidan|1-719-862-9385|euismod.et.commod...|   2018-11-06|  29| 3387.0|        10|
|  5|  Leandra|      839-8044|at@pretiumetrutru...|   2002-10-10|  41|22102.0|         1|
|  6|     Bert|      797-4453|a.felis.ullamcorp...|   2017-04-25|  70| 7800.0|         7|
|  7|     Mark|1-680-102-6792|Quisque.ac@placer...|   2006-04-21|  52| 8112.0|         5|
|  8|    Jonah|      214-2975|eu.ultrices.sit@v...|   2017-10-07|  23|17040.0|         5|
|  9|    H

                                                                                

**empresas**

In [None]:
df_empresas_landing = spark.read.format("parquet").option("header","true").load(ruta_empresa_landing)
df_empresas_landing.printSchema()
df_empresas_landing.show(10)
# transformar según reglas de negocio
df_empresas_procesado = df_empresas_landing.withColumn('EMPRESA_NAME',upper(col('EMPRESA_NAME')))

ruta_empresas_curated = 'bigdata-dmc-faru/datalake/CURATED/EMPRESAS'
df_empresas_procesado.write.mode("overwrite").format("parquet").save(ruta_empresas_curated)

root
 |-- ID: string (nullable = true)
 |-- EMPRESA_NAME: string (nullable = true)

+---+------------+
| ID|EMPRESA_NAME|
+---+------------+
|  1|     Walmart|
|  2|   Microsoft|
|  3|       Apple|
|  4|      Toyota|
|  5|      Amazon|
|  6|      Google|
|  7|     Samsung|
|  8|          HP|
|  9|         IBM|
| 10|        Sony|
+---+------------+



                                                                                