# 1. Librerias

In [5]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 37 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 60.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=c51f6071f16e1fff835f1c60059598b61a9709909ba96804499394661c3f715f
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [6]:
%matplotlib inline
import pandas as pd
import numpy as np

#Importamos la librería de funciones clasicas
import pyspark.sql.functions as f

#Librería para crear funciones personalizadas
from pyspark.sql.functions import udf, struct

#Importamos los tipos de datos
from pyspark.sql.types import *

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# 2. Lectura de datos

In [10]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [23]:
"""Trabajaremos con un dataset de tres campos:

1.   Gender: sexo de la persona (Male, Female)
2.   Height: Estatura
3.   Weight: Peso


"""

#Leemos los datos
#Lectura de archivo JSON
#dfJson=spark.read.format("json").option("multiLine", "true").load("/content/drive/MyDrive/Data/transacciones.json")
dfJson=spark.read.format("json").option("multiLine", "false").load("/content/drive/MyDrive/Data/transacciones.json")

In [24]:
#Mostramos los datos
#Vemos que lo está truncando
dfJson.show()

+--------------+--------------------+--------------------+
|       EMPRESA|             PERSONA|         TRANSACCION|
+--------------+--------------------+--------------------+
|   {5, Amazon}|{[{59, 9811935}, ...|{2021-01-23, 2628.0}|
|      {9, IBM}|{[{50, 9912937}, ...|{2021-01-23, 4261.0}|
|  {7, Samsung}|{[{53, 9769557}, ...|{2021-01-23, 1429.0}|
|   {5, Amazon}|{[{51, 9733329}, ...|{2021-01-23, 3385.0}|
|   {4, Toyota}|{[{52, 9091334}, ...|{2021-01-23, 3514.0}|
|      {9, IBM}|{[{59, 9708669}, ...| {2021-01-23, 823.0}|
|{2, Microsoft}|{null, 47, 31, Ry...|{2021-01-23, 3724.0}|
|    {10, Sony}|{[{51, 9443174}],...|{2021-01-23, 3429.0}|
|   {4, Toyota}|{[{54, 9375039}, ...|{2021-01-23, 4267.0}|
|      {9, IBM}|{[{59, 9227653}, ...| {2021-01-23, 796.0}|
|   {4, Toyota}|{[{53, 9758464}, ...| {2021-01-23, 317.0}|
|      {9, IBM}|{[{51, 9058211}, ...| {2021-01-23, 938.0}|
|       {8, HP}|{[{57, 9251990}, ...| {2021-01-23, 887.0}|
|   {5, Amazon}|{[{51, 9993481}],...|{2021-01-23, 2067.0

In [25]:
#Pintamos el esquema-- Trae la semiestructura global
## No necesariamente todos los registros van a tener todos los campos
dfJson.printSchema()

root
 |-- EMPRESA: struct (nullable = true)
 |    |-- ID_EMPRESA: string (nullable = true)
 |    |-- NOMBRE_EMPRESA: string (nullable = true)
 |-- PERSONA: struct (nullable = true)
 |    |-- CONTACTO: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- PREFIJO: string (nullable = true)
 |    |    |    |-- TELEFONO: string (nullable = true)
 |    |-- EDAD: long (nullable = true)
 |    |-- ID_PERSONA: string (nullable = true)
 |    |-- NOMBRE_PERSONA: string (nullable = true)
 |    |-- SALARIO: double (nullable = true)
 |-- TRANSACCION: struct (nullable = true)
 |    |-- FECHA: string (nullable = true)
 |    |-- MONTO: double (nullable = true)



In [27]:
##########################################################################################################
#
# @section 3. Navegando entre campos y sub-campos
#
##########################################################################################################

#Seleccionar algunos campos
df1 = dfJson.select("EMPRESA", "PERSONA")

#Mostramos la data
df1.show(5, False)

+------------+------------------------------------------------------------------------------------------------------+
|EMPRESA     |PERSONA                                                                                               |
+------------+------------------------------------------------------------------------------------------------------+
|{5, Amazon} |{[{59, 9811935}, {53, 9423163}], 33, 26, Brenden, 20549.0}                                            |
|{9, IBM}    |{[{50, 9912937}, {54, 9046676}, {55, 9874284}, {58, 9746053}, {53, 9058704}], 31, 21, Carissa, 1952.0}|
|{7, Samsung}|{[{53, 9769557}, {59, 9754523}, {52, 9063371}, {55, 9301624}, {56, 9770100}], 42, 73, Fiona, 9960.0}  |
|{5, Amazon} |{[{51, 9733329}, {57, 9619332}, {51, 9087416}, {50, 9486747}], 59, 14, Allen, 16289.0}                |
|{4, Toyota} |{[{52, 9091334}, {59, 9831571}], 59, 80, Ebony, 3600.0}                                               |
+------------+------------------------------------------

In [30]:
### Campos primitivos como Fecha y Monto se lista sin problema con solo el .show
### pero los campos complejos como son campos complejos(ARRAYS) hay que consultarlos diferentes.

#Seleccionar los subcampos
### Para seleccionar los subcampos de los campos complejos, se le indica el campo padre y el subcampo.
df2 = dfJson.select("EMPRESA.ID_EMPRESA", "EMPRESA.NOMBRE_EMPRESA", "PERSONA.ID_PERSONA", "PERSONA.NOMBRE_PERSONA")

#Mostramos la data
df2.show(5, False)

+----------+--------------+----------+--------------+
|ID_EMPRESA|NOMBRE_EMPRESA|ID_PERSONA|NOMBRE_PERSONA|
+----------+--------------+----------+--------------+
|5         |Amazon        |26        |Brenden       |
|9         |IBM           |21        |Carissa       |
|7         |Samsung       |73        |Fiona         |
|5         |Amazon        |14        |Allen         |
|4         |Toyota        |80        |Ebony         |
+----------+--------------+----------+--------------+
only showing top 5 rows



In [31]:
#Navegamos por los sub-campos de PERSONA
### Usando el estandar de consulta de campos, referenciando el dataframe y el campo
df3 = dfJson.select(
    dfJson["PERSONA.ID_PERSONA"], 
    dfJson["PERSONA.NOMBRE_PERSONA"], 
    dfJson["PERSONA.CONTACTO"]
)

#Mostramos la data
df3.show(5, False)

+----------+--------------+---------------------------------------------------------------------------+
|ID_PERSONA|NOMBRE_PERSONA|CONTACTO                                                                   |
+----------+--------------+---------------------------------------------------------------------------+
|26        |Brenden       |[{59, 9811935}, {53, 9423163}]                                             |
|21        |Carissa       |[{50, 9912937}, {54, 9046676}, {55, 9874284}, {58, 9746053}, {53, 9058704}]|
|73        |Fiona         |[{53, 9769557}, {59, 9754523}, {52, 9063371}, {55, 9301624}, {56, 9770100}]|
|14        |Allen         |[{51, 9733329}, {57, 9619332}, {51, 9087416}, {50, 9486747}]               |
|80        |Ebony         |[{52, 9091334}, {59, 9831571}]                                             |
+----------+--------------+---------------------------------------------------------------------------+
only showing top 5 rows



In [32]:
### Por ejemplo si nos enfocamos en el campo complejo contacto, y solo queremos los correos.
#Nos enfocaremos en el sub-campo CONTACTO
df4 = dfJson.select(dfJson["PERSONA.CONTACTO"])

#Mostramos la data
df4.show(5, False)

+---------------------------------------------------------------------------+
|CONTACTO                                                                   |
+---------------------------------------------------------------------------+
|[{59, 9811935}, {53, 9423163}]                                             |
|[{50, 9912937}, {54, 9046676}, {55, 9874284}, {58, 9746053}, {53, 9058704}]|
|[{53, 9769557}, {59, 9754523}, {52, 9063371}, {55, 9301624}, {56, 9770100}]|
|[{51, 9733329}, {57, 9619332}, {51, 9087416}, {50, 9486747}]               |
|[{52, 9091334}, {59, 9831571}]                                             |
+---------------------------------------------------------------------------+
only showing top 5 rows



################################################################################
#
# @section 4. Navegando en campos arrays
#
################################################################################

In [33]:
#Obtenemos el primer elemento de cada array
###, o sea va a traer un correo por registro del array
df5 = dfJson.select(dfJson["PERSONA.CONTACTO"].getItem(0))

#Mostramos la data
df5.show(5, False)

+-------------------------------+
|PERSONA.CONTACTO AS CONTACTO[0]|
+-------------------------------+
|{59, 9811935}                  |
|{50, 9912937}                  |
|{53, 9769557}                  |
|{51, 9733329}                  |
|{52, 9091334}                  |
+-------------------------------+
only showing top 5 rows



In [34]:
#Colocamos un alias
### Para que el ombre se más coherente
df6 = dfJson.select(dfJson["PERSONA.CONTACTO"].getItem(0).alias('CONTACTO_1'))

#Mostramos la data
df6.show(5, False)
### Esto me muestre un arreglo de dos campos, TIPO y VALOR.

+-------------+
|CONTACTO_1   |
+-------------+
|{59, 9811935}|
|{50, 9912937}|
|{53, 9769557}|
|{51, 9733329}|
|{52, 9091334}|
+-------------+
only showing top 5 rows



In [37]:
#Obtenemos los sub-campos del primer elemento
df7 = dfJson.select(
    dfJson["PERSONA.NOMBRE_PERSONA"],
    dfJson["PERSONA.CONTACTO"].getItem(0)['PREFIJO'], 
    dfJson["PERSONA.CONTACTO"].getItem(0)['TELEFONO']
)

#Mostramos la data
df7.show(5, False)

+--------------+---------------------------------------+----------------------------------------+
|NOMBRE_PERSONA|PERSONA.CONTACTO AS CONTACTO[0].PREFIJO|PERSONA.CONTACTO AS CONTACTO[0].TELEFONO|
+--------------+---------------------------------------+----------------------------------------+
|Brenden       |59                                     |9811935                                 |
|Carissa       |50                                     |9912937                                 |
|Fiona         |53                                     |9769557                                 |
|Allen         |51                                     |9733329                                 |
|Ebony         |52                                     |9091334                                 |
+--------------+---------------------------------------+----------------------------------------+
only showing top 5 rows



In [40]:
### Se aplano la información en este nivel.

#Colocamos alias para la coherencia de os nombres.
df8 = dfJson.select(
    dfJson["PERSONA.NOMBRE_PERSONA"].alias('NOMBRE_PERSONA'),
    dfJson["PERSONA.CONTACTO"].getItem(0)['PREFIJO'].alias('PREFIJO_1'), 
    dfJson["PERSONA.CONTACTO"].getItem(0)['TELEFONO'].alias('TELEFONO_1')
)

#Mostramos la data
df8.show(5, False)

+--------------+---------+----------+
|NOMBRE_PERSONA|PREFIJO_1|TELEFONO_1|
+--------------+---------+----------+
|Brenden       |59       |9811935   |
|Carissa       |50       |9912937   |
|Fiona         |53       |9769557   |
|Allen         |51       |9733329   |
|Ebony         |52       |9091334   |
+--------------+---------+----------+
only showing top 5 rows



In [41]:
#Podemos navegar manualmente entre varios elementos
df9 = dfJson.select(
    dfJson["PERSONA.NOMBRE_PERSONA"].alias('NOMBRE_PERSONA'),
    dfJson["PERSONA.CONTACTO"].getItem(0)['PREFIJO'].alias('PREFIJO_1'), 
    dfJson["PERSONA.CONTACTO"].getItem(0)['TELEFONO'].alias('TELEFONO_1'),
    dfJson["PERSONA.CONTACTO"].getItem(1)['PREFIJO'].alias('PREFIJO_2'), 
    dfJson["PERSONA.CONTACTO"].getItem(1)['TELEFONO'].alias('TELEFONO_2'),
    dfJson["PERSONA.CONTACTO"].getItem(2)['PREFIJO'].alias('PREFIJO_3'), 
    dfJson["PERSONA.CONTACTO"].getItem(2)['TELEFONO'].alias('TELEFONO_3'),
    dfJson["PERSONA.CONTACTO"].getItem(3)['PREFIJO'].alias('PREFIJO_4'), 
    dfJson["PERSONA.CONTACTO"].getItem(3)['TELEFONO'].alias('TELEFONO_4')
)

### La forma anterior es arcaica y es poco optima porque si hay más coomo 
### vamos a seuir mostrando los siguientes campos.
#Mostramos la data
df9.show(5, False)

+--------------+---------+----------+---------+----------+---------+----------+---------+----------+
|NOMBRE_PERSONA|PREFIJO_1|TELEFONO_1|PREFIJO_2|TELEFONO_2|PREFIJO_3|TELEFONO_3|PREFIJO_4|TELEFONO_4|
+--------------+---------+----------+---------+----------+---------+----------+---------+----------+
|Brenden       |59       |9811935   |53       |9423163   |null     |null      |null     |null      |
|Carissa       |50       |9912937   |54       |9046676   |55       |9874284   |58       |9746053   |
|Fiona         |53       |9769557   |59       |9754523   |52       |9063371   |55       |9301624   |
|Allen         |51       |9733329   |57       |9619332   |51       |9087416   |50       |9486747   |
|Ebony         |52       |9091334   |59       |9831571   |null     |null      |null     |null      |
+--------------+---------+----------+---------+----------+---------+----------+---------+----------+
only showing top 5 rows



################################################################################
#
# @section 4. Aplanando campos arrays
#
################################################################################

In [43]:
#Importamos la función que aplana los campos arrays
from pyspark.sql.functions import explode

#Definimos un identificador y el campo array que aplanaremos
df10= dfJson.select(
    dfJson["PERSONA.NOMBRE_PERSONA"].alias('NOMBRE_PERSONA'), 
    explode("PERSONA.CONTACTO").alias('DATOS_CONTACTO')
)

#Mostramos la data
df10.show(5, False)

+--------------+--------------+
|NOMBRE_PERSONA|DATOS_CONTACTO|
+--------------+--------------+
|Brenden       |{59, 9811935} |
|Brenden       |{53, 9423163} |
|Carissa       |{50, 9912937} |
|Carissa       |{54, 9046676} |
|Carissa       |{55, 9874284} |
+--------------+--------------+
only showing top 5 rows



In [44]:
#Vemos el esquema
df10.printSchema()

root
 |-- NOMBRE_PERSONA: string (nullable = true)
 |-- DATOS_CONTACTO: struct (nullable = true)
 |    |-- PREFIJO: string (nullable = true)
 |    |-- TELEFONO: string (nullable = true)



In [45]:
## Aqui ya elimino los arreglos de la estructura y lo covierte en tipo complejo.

#Podemos realizar procesamientos
df11 = df10.filter(df10["DATOS_CONTACTO.PREFIJO"] == '51')

#Mostramos la data
df11.show(5, False)

+--------------+--------------+
|NOMBRE_PERSONA|DATOS_CONTACTO|
+--------------+--------------+
|Allen         |{51, 9733329} |
|Allen         |{51, 9087416} |
|Jennifer      |{51, 9443174} |
|Sigourney     |{51, 9058211} |
|Sigourney     |{51, 9805215} |
+--------------+--------------+
only showing top 5 rows

