In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import pyspark.sql.functions as F
from pyspark.sql.types import *

In [2]:
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

### Tabla detalle cliente

In [3]:
dir_archivo = "/home/tonatiuh/Documents/Desarrollo/ZophiaLearning/ejercicios/"
nombre_archivo = 'ecommerce/curated/detalle_cliente.parquet'

In [4]:
df = spark.read.format("parquet")\
        .load(dir_archivo+nombre_archivo)

In [5]:
df.select('direccion').show(n=10, truncate=False)

+----------------------------------------------------------------------------------------+
|direccion                                                                               |
+----------------------------------------------------------------------------------------+
|Pasaje Sur Hinojosa 546 Interior 968Vieja Myanmar, OAX 06497                            |
|Prolongacion Norte Guillen 815 833San Vicente de la Montaña, TAMPS 44107                |
|Avenida Norte Lozano 005 Edif. 213 , Depto. 580San Pascual de la Montaña, MEX 11015-7667|
|Ampliacion Cortes 072 Edif. 362 , Depto. 032Nueva Belarus, DGO 36879                    |
|Eje vial Republica Centroafricana 816 Interior 150San Gustavo los altos, SIN 75788-3053 |
|Retorno Veracruz de Ignacio de la Llave 424 Interior 386Nueva Tailandia, BCS 49534      |
|Calle Fernandez 479 Interior 434Nueva Etiopia, Q. ROO 93717-4366                        |
|Circunvalacion Hungria 372 350Vieja Iran, SIN 08028                                     |

#### Direccion

In [6]:
df_procesado = df.withColumn('direccion_split', F.split(F.col('direccion'), ','))
df_procesado = df_procesado.withColumn('numero_elementos', F.size(F.col('direccion_split')))

df_procesado = df_procesado.withColumn('estado_cp', F.col('direccion_split').getItem(F.col('numero_elementos') - 1 ))
df_procesado = df_procesado.withColumn('estado_cp', F.trim(F.col('estado_cp')))

df_procesado = df_procesado.withColumn('estado_cp_split', F.split(F.col('estado_cp'), ' '))
df_procesado = df_procesado.withColumn('numero_elementos_estado', F.size(F.col('estado_cp_split')))
df_procesado = df_procesado.withColumn('codigo_postal', F.col('estado_cp_split').getItem(F.col('numero_elementos_estado') - 1))

df_procesado = df_procesado.withColumn('estado_cp_split_drop', F.expr("slice(estado_cp_split, 1, numero_elementos_estado-1)"))
df_procesado = df_procesado.withColumn('estado', F.concat_ws(' ', 'estado_cp_split_drop'))

df_procesado = df_procesado.withColumn('direccion_split_drop', F.expr("slice(direccion_split, 1, numero_elementos-1)"))
df_procesado = df_procesado.withColumn('direccion_procesado', F.concat_ws(' ', 'direccion_split_drop'))

In [7]:
df_direccion = df_procesado.drop(
    'direccion_split',
    'numero_elementos',
    'estado_cp',
    'estado_cp_split',
    'numero_elementos_estado',
    'estado_cp_split_drop',
    'direccion_split_drop',
    'direccion')

df_direccion = df_direccion.withColumnRenamed('direccion_procesado', 'direccion')
df_direccion.cache()

DataFrame[cliente_id: string, cliente: string, email: string, telefono: string, metodo_pago: string, numero_tarjeta: string, codigo_postal: string, estado: string, direccion: string]

In [8]:
df_direccion.show(n=5, vertical=True, truncate=False)

-RECORD 0----------------------------------------------------------------------------------
 cliente_id     | C-31ij                                                                   
 cliente        | Ilse Carolina Arguello Vargas                                            
 email          | rolvera@yahoo.com                                                        
 telefono       | 592.549.7283x41724                                                       
 metodo_pago    | tarjeta                                                                  
 numero_tarjeta | 3516303758383771                                                         
 codigo_postal  | 06497                                                                    
 estado         | OAX                                                                      
 direccion      | Pasaje Sur Hinojosa 546 Interior 968Vieja Myanmar                        
-RECORD 1-----------------------------------------------------------------------

### Almacenamiento

In [9]:
nombre_destino = 'ecommerce/curated/detalle_cliente_direccion.parquet'

In [10]:
df_direccion.write.mode('overwrite').parquet(dir_archivo+nombre_destino)

#### Adicional

In [11]:
df_filtrado = df_direccion.limit(10)

In [12]:
df_filtrado.show(3)

+----------+--------------------+--------------------+------------------+-----------+----------------+-------------+------+--------------------+
|cliente_id|             cliente|               email|          telefono|metodo_pago|  numero_tarjeta|codigo_postal|estado|           direccion|
+----------+--------------------+--------------------+------------------+-----------+----------------+-------------+------+--------------------+
|    C-31ij|Ilse Carolina Arg...|   rolvera@yahoo.com|592.549.7283x41724|    tarjeta|3516303758383771|        06497|   OAX|Pasaje Sur Hinojo...|
|    C-95aQ|Isaac Berta Espin...|geronimo50@yahoo.com|  +90(3)9021386651|    tarjeta|2226879089263405|        44107| TAMPS|Prolongacion Nort...|
|    C-98oG|Genaro Noelia Oli...|   jorge02@yahoo.com|       08526507692|    tarjeta|2245122837899163|   11015-7667|   MEX|Avenida Norte Loz...|
+----------+--------------------+--------------------+------------------+-----------+----------------+-------------+------+-------

In [13]:
df_pandas = df_filtrado.toPandas()
nombre_csv = "output/detalle_cliente_direccion.csv"
df_pandas.to_csv(nombre_csv, index=False)

### Unpersist

In [14]:
df_direccion.unpersist()

DataFrame[cliente_id: string, cliente: string, email: string, telefono: string, metodo_pago: string, numero_tarjeta: string, codigo_postal: string, estado: string, direccion: string]