# Mongo en Spark

Para usar pyspark en una notebook debemos tener

In [None]:
#pip install findspark

In [2]:
import findspark
findspark.init()

# Creamos la session de Spark
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

### Cargamos el dataframe

En este ejemplo partimos de un texto plano que se descargo de mongo, un JSON que te llega en una sola linea

Pueden usar la pagina web para parsear el string y que se vea bonito en formato JSON
https://jsonformatter.curiousconcept.com/#

In [3]:
jsonString = """{"_id": {"$oid": "644187e5da7c796a5d6d7b3a"}, "isClient": false, "offerData": {"packages": [{"_id": "INFINITY_GOLD", "subProduct": "0001", "name": "Infinity Gold", "maxAssistance": 1148000.0, "cards": [{"_id": "VISA", "levels": [{"_id": "GOLD", "limits": [24000, 30000, 36000, 42000, 48000, 60000]}]}, {"_id": "AMERICAN_EXPRESS", "levels": [{"_id": "GOLD", "limits": [24000, 30000, 36000, 42000, 48000, 60000]}, {"_id": "INTERNATIONAL", "limits": [24000, 30000, 36000, 42000, 48000, 60000]}]}, {"_id": "MASTERCARD", "levels": [{"_id": "GOLD", "limits": [24000, 30000, 36000, 42000, 48000, 60000]}, {"_id": "INTERNATIONAL", "limits": [24000, 30000, 36000, 42000, 48000, 60000]}]}], "loans": [{"_id": "agreed_personal_loan", "minGrantable": 0, "maxGrantable": 1148000}], "accounts": [{"_id": "UNIQUE_ACCOUNT", "name": "Cuenta Unica", "currency": "ARS", "overdraftLimits": [75, 80, 100, 150, 200]}, {"_id": "UNIQUE_ACCOUNT", "name": "Cuenta Unica", "currency": "USD", "overdraftLimits": [0]}]}, {"_id": "INFINITY_ONE", "subProduct": "0500", "name": "Infinity Uno", "maxAssistance": 803600.0, "cards": [{"_id": "VISA", "levels": [{"_id": "INTERNATIONAL", "limits": [24000, 30000, 36000, 42000, 48000, 60000]}]}, {"_id": "AMERICAN_EXPRESS", "levels": [{"_id": "INTERNATIONAL", "limits": [24000, 30000, 36000, 42000, 48000, 60000]}]}, {"_id": "MASTERCARD", "levels": [{"_id": "INTERNATIONAL", "limits": [24000, 30000, 36000, 42000, 48000, 60000]}]}], "loans": [{"_id": "agreed_personal_loan", "minGrantable": 0, "maxGrantable": 1148000}], "accounts": [{"_id": "UNIQUE_ACCOUNT", "name": "Cuenta Unica", "currency": "ARS", "overdraftLimits": [75, 80, 100, 150, 200]}, {"_id": "UNIQUE_ACCOUNT", "name": "Cuenta Unica", "currency": "USD", "overdraftLimits": [0]}]}, {"_id": "SUPER_CUENTA_TRES", "subProduct": "1750", "name": "Supercuenta 3", "maxAssistance": 562520.0, "cards": [{"_id": "VISA", "levels": [{"_id": "INTERNATIONAL", "limits": [24000, 30000, 36000, 42000, 48000, 60000]}]}], "loans": [{"_id": "agreed_personal_loan", "minGrantable": 0, "maxGrantable": 1148000}], "accounts": [{"_id": "UNIQUE_ACCOUNT", "name": "Cuenta Unica", "currency": "ARS", "overdraftLimits": [75, 80, 100, 150, 200]}, {"_id": "UNIQUE_ACCOUNT", "name": "Cuenta Unica", "currency": "USD", "overdraftLimits": [0]}]}, {"_id": "SUPER_CUENTA_UNO", "subProduct": "2191", "name": "Supercuenta Uno con PPP", "maxAssistance": 1148000.0, "cards": [], "loans": [{"_id": "agreed_personal_loan", "minGrantable": 0, "maxGrantable": 1148000}], "accounts": []}], "offerId": "bfa196f12e0d639259bea3155cd34ef3"}, "createdDate": {"$date": 1682016229538}, "segment": "MA"}"""
df_json=spark.createDataFrame([(jsonString, "2023-05-01")],["json","partition_date"])

Esto es lo que tendremos cuando bajemos de mongo, una tabla muy similar a esta estructura

In [4]:
df_json.show()

+--------------------+--------------+
|                json|partition_date|
+--------------------+--------------+
|{"_id": {"$oid": ...|    2023-05-01|
+--------------------+--------------+



### Determine the schema of the JSON payload from the column

Aqui en base al archivo el propio spark va a inferir el formato del json (es decir su schema)

In [5]:
json_schema_df = spark.read.json(df_json.rdd.map(lambda row: row.json))
json_schema = json_schema_df.schema

In [6]:
json_schema

StructType([StructField('_id', StructType([StructField('$oid', StringType(), True)]), True), StructField('createdDate', StructType([StructField('$date', LongType(), True)]), True), StructField('isClient', BooleanType(), True), StructField('offerData', StructType([StructField('offerId', StringType(), True), StructField('packages', ArrayType(StructType([StructField('_id', StringType(), True), StructField('accounts', ArrayType(StructType([StructField('_id', StringType(), True), StructField('currency', StringType(), True), StructField('name', StringType(), True), StructField('overdraftLimits', ArrayType(LongType(), True), True)]), True), True), StructField('cards', ArrayType(StructType([StructField('_id', StringType(), True), StructField('levels', ArrayType(StructType([StructField('_id', StringType(), True), StructField('limits', ArrayType(LongType(), True), True)]), True), True)]), True), True), StructField('loans', ArrayType(StructType([StructField('_id', StringType(), True), StructField

### Apply the schema to payload to read the data

In [6]:
df_details = df_json.withColumn("parsed_data", from_json(df_json["json"], json_schema)).drop("json")
df_details.printSchema()

root
 |-- partition_date: string (nullable = true)
 |-- parsed_data: struct (nullable = true)
 |    |-- _id: struct (nullable = true)
 |    |    |-- $oid: string (nullable = true)
 |    |-- createdDate: struct (nullable = true)
 |    |    |-- $date: long (nullable = true)
 |    |-- isClient: boolean (nullable = true)
 |    |-- offerData: struct (nullable = true)
 |    |    |-- offerId: string (nullable = true)
 |    |    |-- packages: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- _id: string (nullable = true)
 |    |    |    |    |-- accounts: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- _id: string (nullable = true)
 |    |    |    |    |    |    |-- currency: string (nullable = true)
 |    |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |    |-- overdraftLimits: array (nullable = true)
 |    |    |    |   

## OJO

|-- element: struct (containsNull = true)
Esto no es un campo

In [7]:
# Bajamas un nivel para no hacer 'parsed_data.columna'
df = df_details.select(col("parsed_data.*"),col("partition_date"))
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- createdDate: struct (nullable = true)
 |    |-- $date: long (nullable = true)
 |-- isClient: boolean (nullable = true)
 |-- offerData: struct (nullable = true)
 |    |-- offerId: string (nullable = true)
 |    |-- packages: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- _id: string (nullable = true)
 |    |    |    |-- accounts: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- _id: string (nullable = true)
 |    |    |    |    |    |-- currency: string (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |    |-- overdraftLimits: array (nullable = true)
 |    |    |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- cards: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |  

Ahora que tenemos?

In [8]:
df.show()

+--------------------+---------------+--------+--------------------+-------+--------------+
|                 _id|    createdDate|isClient|           offerData|segment|partition_date|
+--------------------+---------------+--------+--------------------+-------+--------------+
|{644187e5da7c796a...|{1682016229538}|   false|{bfa196f12e0d6392...|     MA|    2023-05-01|
+--------------------+---------------+--------+--------------------+-------+--------------+



Correcto!

Tenemos solo el primer nivel de la estructura pero podemos acceder a los adentro con la notacion punto

In [9]:
df.select('_id','_id.$oid').show()

+--------------------+--------------------+
|                 _id|                $oid|
+--------------------+--------------------+
|{644187e5da7c796a...|644187e5da7c796a5...|
+--------------------+--------------------+



#### Aclaracion
El show no muestra las Keys, pero existen dentro de las columnas, por ejemplo en **"_id"** vemos

{"$oid":"644187e5da7c796a5d6d7b3a"}

**Si tu estructura no tiene listas se acabo, pasa a la parte final de guardar**

## LAS LISTAS

Si bien podemos desarmar los json en horizontal, como vemos las listas?

A primera vista en forma vertical, para rescatar y dividir las columnas

Para eso usamo **EXPLODE** para que cada elemento de la lista se transforme en un nuevo registro, manteniendo los demas valores iguales para cada uno

*UPDATE: Les recomiendo siempre usar explode_outer porque uno nunca sabe si todos los registros tienen la misma estructura*

In [10]:
df2 = df.withColumn("offerData", explode_outer("offerData.packages"))
df2.show()

+--------------------+---------------+--------+--------------------+-------+--------------+
|                 _id|    createdDate|isClient|           offerData|segment|partition_date|
+--------------------+---------------+--------+--------------------+-------+--------------+
|{644187e5da7c796a...|{1682016229538}|   false|{INFINITY_GOLD, [...|     MA|    2023-05-01|
|{644187e5da7c796a...|{1682016229538}|   false|{INFINITY_ONE, [{...|     MA|    2023-05-01|
|{644187e5da7c796a...|{1682016229538}|   false|{SUPER_CUENTA_TRE...|     MA|    2023-05-01|
|{644187e5da7c796a...|{1682016229538}|   false|{SUPER_CUENTA_UNO...|     MA|    2023-05-01|
+--------------------+---------------+--------+--------------------+-------+--------------+



In [None]:
#df2.printSchema()

```
|-- offerData: struct (nullable = true)
     |-- pepito =====> Lo perdiste
|    |-- offerId: string (nullable = true)     =====> Lo perdiste
|    |-- packages: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- _id: string (nullable = true)
|    |    |    |-- accounts: array (nullable = true)
                    
                    ||
                   _||_
                   \  /
                    \/

|-- offerData: struct (nullable = true)
|    |-- _id: string (nullable = true)
|    |-- accounts: array (nullable = true)
```

Epa!! borraste el campo **"offerId"**, cuidado **acabas de reemplazar totalmente la columna offerData** 

Usemos un nuevo nombre para la columna / para este ejemplo use df2 para no ensuciar el original df 

In [11]:
df = df.withColumn("pkg_explode", explode("offerData.packages"))

In [12]:
df.select("pkg_explode","pkg_explode._id","pkg_explode.subProduct","pkg_explode.maxAssistance", "offerData.packages").show()

+--------------------+-----------------+----------+-------------+--------------------+
|         pkg_explode|              _id|subProduct|maxAssistance|            packages|
+--------------------+-----------------+----------+-------------+--------------------+
|{INFINITY_GOLD, [...|    INFINITY_GOLD|      0001|    1148000.0|[{INFINITY_GOLD, ...|
|{INFINITY_ONE, [{...|     INFINITY_ONE|      0500|     803600.0|[{INFINITY_GOLD, ...|
|{SUPER_CUENTA_TRE...|SUPER_CUENTA_TRES|      1750|     562520.0|[{INFINITY_GOLD, ...|
|{SUPER_CUENTA_UNO...| SUPER_CUENTA_UNO|      2191|    1148000.0|[{INFINITY_GOLD, ...|
+--------------------+-----------------+----------+-------------+--------------------+



**Yyyyy si tengo una lista dentro de otra?**

No importa, hacemos exploide de ese campo y los demas se repiten automaticamente

In [13]:
df = df.withColumn("cards_explode", explode("pkg_explode.cards"))

In [14]:
df.select("pkg_explode._id", "cards_explode.*").show()

+-----------------+----------------+--------------------+
|              _id|             _id|              levels|
+-----------------+----------------+--------------------+
|    INFINITY_GOLD|            VISA|[{GOLD, [24000, 3...|
|    INFINITY_GOLD|AMERICAN_EXPRESS|[{GOLD, [24000, 3...|
|    INFINITY_GOLD|      MASTERCARD|[{GOLD, [24000, 3...|
|     INFINITY_ONE|            VISA|[{INTERNATIONAL, ...|
|     INFINITY_ONE|AMERICAN_EXPRESS|[{INTERNATIONAL, ...|
|     INFINITY_ONE|      MASTERCARD|[{INTERNATIONAL, ...|
|SUPER_CUENTA_TRES|            VISA|[{INTERNATIONAL, ...|
+-----------------+----------------+--------------------+



### Cargar el dataframe a HIVE

Armamos el datafarme final, consideraciones:

**Los nombres**: Usemos alias para no tener repetir los nombres, te salta error `(AnalysisException: Found duplicate column(s) in the table)`

**Los tipos de datos**: Hay que castear algunos tipos de datos, ejemplo boolean, list. Ver si es mejor castearlos como string

Usen el tipo de dato que corresponda su equivalente: https://spark.apache.org/docs/latest/sql-ref-datatypes.html

| Hive   | Spark       |
|--------|-------------|
| Int    | IntegerType |
| String | StringType  |

Los que tienen practica sabran que si haces un df = df2.show(). Quiten el `show()`, para los que recien empiezan ojo: https://cumsum.wordpress.com/2020/09/26/pyspark-attributeerror-nonetype-object-has-no-attribute/


In [15]:
df_final= df.select(col("_id.$oid").alias("id"),
        #col("isClient").cast(StringType().alias("is_client"),
        col("isClient").alias("is_client"),
        col("offerData.offerId").alias("offer_id"),
        col("pkg_explode._id").alias("pkg_id"),
        col("pkg_explode.subProduct").alias("pkg_sub_product"),
        col("pkg_explode.name").alias("pkg_name"),
        col("pkg_explode.maxAssistance").alias("pkg_max_assistance"),            
        col("createdDate.$date").alias("created_date"),         
        col("partition_date")
    )

In [16]:
df_final.show()

+--------------------+---------+--------------------+-----------------+---------------+-------------+------------------+-------------+--------------+
|                  id|is_client|            offer_id|           pkg_id|pkg_sub_product|     pkg_name|pkg_max_assistance| created_date|partition_date|
+--------------------+---------+--------------------+-----------------+---------------+-------------+------------------+-------------+--------------+
|644187e5da7c796a5...|    false|bfa196f12e0d63925...|    INFINITY_GOLD|           0001|Infinity Gold|         1148000.0|1682016229538|    2023-05-01|
|644187e5da7c796a5...|    false|bfa196f12e0d63925...|    INFINITY_GOLD|           0001|Infinity Gold|         1148000.0|1682016229538|    2023-05-01|
|644187e5da7c796a5...|    false|bfa196f12e0d63925...|    INFINITY_GOLD|           0001|Infinity Gold|         1148000.0|1682016229538|    2023-05-01|
|644187e5da7c796a5...|    false|bfa196f12e0d63925...|     INFINITY_ONE|           0500| Infinity Uno

### Filtrar

In [None]:
print("EXAMPLE INDIVIDUAL") # Poner a desicion tuya
df_final.filter("id = '6500fb91c8733d11d476d832'").show()

### Codigo de Guardado

Uso **.saveAsTable** porque si no existe la tabla la crea 

Muy util en la etapa de desarrollo. Sientanse libre de usar el metodo de guardado que prefieran

In [None]:
'''
df_final \
        .write \
        .partitionBy('partition_date') \
        .option("compression", 'gzip') \
        .saveAsTable(name='bi_corp_staging.pongan_el_suyo',
                        format='parquet',
                        mode='Overwrite',
                        path='/santander/bi-corp/staging/pongan_el_suyo')
'''

## Vamos al codigo en vivo

[RUTA S3: unico_spark](https://s3.console.aws.amazon.com/s3/buckets/sarp1ae1as3zonda0lake001?region=us-east-1&prefix=santander/bi-corp/staging/onboarding/unico_spark/&showversions=false)

<hr>

### ACLARATION Y PRECAUCION

Mi ejemplo hace el tratamiento y la seleccion al final. Eso no quiere decir que no lo puedan hacer en el trayecto pero cuidado.

`.drop("_id")\ ` # esto borra todos los que se llamen `_id` (me afecto cuando no tenia alias)

In [None]:
    # Si ya no se va a usar mas puedes borrarla asi no mantienes esa lista de elementos
    # print("EXPLOIDE offerData.packages")
    # print('-'*20)
    # df = df.select(col("*"), col("offerData.offerId"))\
    #   .withColumn("pkg_explode", explode("offerData.packages"))\
    #   .drop("offerData")

- Si se borra los que son con punto no se borran se almacenan en cache/ 
- Tienes que borrar la columna entera pero antes guardate las columnas de adentro que no hayas exploteado
- Si una clave tiene un valor vacio, igual lo infiero. Pero no puede aparecer cargarse porque esta vacio, en este caso castearlo como NULL. Muy raro pero a considerar

Si los datos salen de mongo, entonces es un json 100%

Pero si estan trabajando con estructuras que se aproximan y esto les rompe por no ser 100% json pueden usar la forma Manual

### FORMA MANUAL

Ustedes pueden definir el schema manualmente, el cual vimos en la notebook **Campo Lista Spark**

A mi se me acalambro y solo es la mitad

In [None]:
schema = StructType(
      [
        StructField('_id',  StringType(), True ),
        StructField('hasDebitCard', StringType(), True),
        StructField('isClient', StringType(), True),
        StructField('offerData', StructType([StructField('packages',ArrayType(StringType()), True), StructField('offerId',StringType(), True)]), True),
        StructField('segmentObu', StringType(), True),
        StructField('startAt', StringType(), True),
        StructField('flow', StringType(), True),
        StructField('listOffer', ArrayType(StringType()), True),
        StructField('obuId', StringType(), True),
        StructField('metadata', StringType(), True),
        StructField('track_id', StringType(), True),
        StructField('contact', StringType(), True),
        StructField('job', StringType(), True),
        StructField('product', StringType(), True),
        StructField('document_type', StringType(), True),
        StructField('document_number', StringType(), True),
        StructField('nup', StringType(), True),
        StructField('channel', StringType(), True),
        StructField('biometry_id', StringType(), True),
        StructField('prospect', StringType(), True),
        StructField('createdDate', StringType(), True),
        StructField('segment', StringType(), True)
      ]
    )
#Para los normales que no son listas podemos definir lo emebedido con la notacion de x.y    --------->
mapped_df = df_json.withColumn("json", from_json("value", schema))

Si no quieres usar un schema, puedes usar las funciones cotidianas `json_tuple` pero es como volver a HIVE no vale la pena