In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum, round

In [2]:
# Inicializar Spark
spark = SparkSession.builder.appName("GCPFinalProject").getOrCreate()

25/04/11 23:44:26 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [9]:
# Ruta a los archivos en el bucket staging
bucket = "staging_pf"
ventas_path = f"gs://{bucket}/ventas_stg.csv"
productos_path = f"gs://{bucket}/productos_stg.csv"
clientes_path = f"gs://{bucket}/clientes_stg.csv"

In [10]:
# Leer los archivos CSV
ventas_df = spark.read.option("header", "true").csv(ventas_path, inferSchema=True)
productos_df = spark.read.option("header", "true").csv(productos_path, inferSchema=True)
clientes_df = spark.read.option("header", "true").csv(clientes_path, inferSchema=True)

In [11]:
# ---------- Lógica 1: Ventas Totales por Producto ----------
ventas_productos = ventas_df.join(productos_df, on="id_producto", how="inner")

ventas_por_producto = ventas_productos.groupBy("nombre_producto", "categoria") \
    .agg(_sum("cantidad_vendida").alias("total_vendido"))

In [13]:
ventas_por_producto.show()

[Stage 15:>                                                         (0 + 1) / 1]

+--------------------+-----------+-------------+
|     nombre_producto|  categoria|total_vendido|
+--------------------+-----------+-------------+
|         Monitor LED|Electronica|            1|
|    Cartera de Cuero| Accesorios|            4|
|Parlantes Portatiles|Electronica|            9|
|    Teclado Mecanico|Electronica|            2|
|          Laptop Pro|Electronica|            2|
|   Bicicleta Montana|   Deportes|            2|
|         Mouse Gamer|Electronica|            4|
|   Zapatillas Runner|   Deportes|            5|
|           Tablet 10|Electronica|            3|
|    Silla de Oficina|    Oficina|            7|
|     Pantalon Casual|       Ropa|            4|
|     Camiseta Basica|       Ropa|            1|
|      Boligrafo Pack|    Oficina|            6|
|Chaqueta de Invierno|       Ropa|            2|
|       Camara Reflex|Electronica|            8|
|      Mochila Urbana| Accesorios|            4|
|        Smartphone X|Electronica|            1|
|    Cuaderno Premiu

                                                                                

In [19]:
#Productos y sus Precios con Ventas Realizadas
ventas_con_ingresos = ventas_productos.groupBy("nombre_producto", "precio") \
    .agg(
        _sum("cantidad_vendida").alias("cantidad_total_vendida"),
        round(_sum(col("cantidad_vendida") * col("precio")), 2).alias("ingresos_totales")
    )

In [15]:
ventas_con_ingresos.show()

[Stage 19:>                                                         (0 + 1) / 1]

+--------------------+------+----------------------+----------------+
|     nombre_producto|precio|cantidad_total_vendida|ingresos_totales|
+--------------------+------+----------------------+----------------+
|       Lentes de Sol|  20.0|                    10|           200.0|
|    Cartera de Cuero| 120.0|                     4|           480.0|
|     Camiseta Basica|  15.0|                     1|            15.0|
|          Laptop Pro|1200.5|                     2|          2401.0|
|     Reloj Deportivo|  50.0|                     8|           400.0|
|    Teclado Mecanico|  85.0|                     2|           170.0|
|Parlantes Portatiles|  95.0|                     9|           855.0|
|     Pantalon Casual|  30.0|                     4|           120.0|
|       Camara Reflex| 900.0|                     8|          7200.0|
|   Bicicleta Montana| 450.0|                     2|           900.0|
|    Silla de Oficina| 150.0|                     7|          1050.0|
|         Monitor LE

                                                                                

In [17]:
ventas_por_producto.write \
    .format("bigquery") \
    .option("table", "dataset_proyfinal.ventas_por_producto") \
    .option("temporaryGcsBucket", "staging_pf") \
    .mode("overwrite") \
    .save()



                                                                                

In [18]:
ventas_con_ingresos.write \
    .format("bigquery") \
    .option("table", "dataset_proyfinal.productos_con_ingresos") \
    .option("temporaryGcsBucket", "staging_pf") \
    .mode("overwrite") \
    .save()

                                                                                