In [1]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from helpers import reader, raw_details, data_cleaner_aggregator, writer
spark = SparkSession.builder.appName("App").getOrCreate()

# Usar unicamente para declarar las variables de entorno  -  sustituyendo las rutas correspondientes para JAVA y HADOOP
import os
os.environ["PATH"] += r";C:\hadoop-3.3.6\bin"
os.environ["JAVA_HOME"] = "C:\\Program Files\\Eclipse Adoptium\\jdk-21.0.7.6-hotspot"

# Lectura principal del insumo RAW

In [2]:
path = "raw/data_prueba_tecnica.csv"  # Delimitar ruta path donde esta ubicado el archivo.
reader_type = "csv" # Tipo de lectura
rd = reader.Reader(spark, reader_type, path)
df = rd.reader_process()


INFO:prueba_técnica:Lectura correcta para una ruta csv


+----------------------------------------+----------+----------------------------------------+------+---------------+----------+----------+
|id                                      |name      |company_id                              |amount|status         |created_at|paid_at   |
+----------------------------------------+----------+----------------------------------------+------+---------------+----------+----------+
|48ba4bdbfb56ceebb32f2bd0263e759be942af3d|MiPasajefy|cbf1c8b09cd5b549416d49d220a40cbd317f952e|3.0   |voided         |2019-03-19|NULL      |
|05fc6f5ac66b6ee7e4253aa5d0c2299eb47aaaf4|MiPasajefy|cbf1c8b09cd5b549416d49d220a40cbd317f952e|3.0   |pending_payment|2019-05-06|NULL      |
|2cdce231c1fc6a2061bfa2f1d978351fe217245d|MiPasajefy|cbf1c8b09cd5b549416d49d220a40cbd317f952e|3.0   |voided         |2019-02-22|NULL      |
|81633ba310a50b673efd469c37139576982901aa|MiPasajefy|cbf1c8b09cd5b549416d49d220a40cbd317f952e|102.61|paid           |2019-02-27|2019-02-27|
|6ccfc4c24e788e4bca4

In [3]:
df.select("created_at").where(F.length(F.col("created_at")) != 10).show()  # Visualización de casos atiícos en fecha (created_at)

+-------------------+
|         created_at|
+-------------------+
|2019-02-27T00:00:00|
|           20190516|
|           20190121|
+-------------------+



# Detalles generales insumo RAW

In [4]:
campos_llave = ["id"] # Pasar uno o más columnas que conformen la llave primaria 
rwd = raw_details.RawDetails(df,campos_llave)
rwd.crude_stage_details()

 10000 registros 
 7 columnas 
Detalles de schema raw
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- company_id: string (nullable = true)
 |-- amount: string (nullable = true)
 |-- status: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- paid_at: string (nullable = true)

Conteo de nulos por campo
id: 3
name: 3
company_id: 4
amount: 0
status: 0
created_at: 0
paid_at: 3991

        Registros unicos por campo/s llave)
        9998
        


# Etapa de limpieza, generación de información (ventas por día)

In [5]:
dca = data_cleaner_aggregator.DataCleanerAggregator(df)
final_df = dca.final_sales_details()

INFO:prueba_técnica: Se ha generado el DF con los detalles de venta por cliente con 211 registros


# Escritura del DF final con el detalle de ventas

In [6]:
wr = writer.Writer(final_df)
wr.processed_df()

El df ha sido escrito correctamente en la carpeta 'processed'


In [7]:
df_final = spark.read.parquet("/processed")

In [8]:
df_final.count()

211

In [11]:
df_final.show(5, truncate = False)

+--------------+-------------+------------------+
|cliente_name  |purchase_date|total_sales_amount|
+--------------+-------------+------------------+
|MiPasajefy    |2019-01-01   |4150.039999999999 |
|MiPasajefy    |2019-01-02   |17044.920000000002|
|Muebles chidos|2019-01-03   |3199.0            |
|MiPasajefy    |2019-01-03   |6735.659999999998 |
|MiPasajefy    |2019-01-04   |6349.69           |
+--------------+-------------+------------------+
only showing top 5 rows
