<img alt="Pascual logo" height="120px" src="https://github.com/andresperez86/Data_Analysis_20252/blob/master/cropped-Institucion_Pascual_Bravo_Logo.png?raw=true" align="center" hspace="10px" vspace="10px" style="width:520px;height:152px;">
<h1><font color='01b3c2'> <center>  MapReduce.</font> </center>

<font  face="Courier New" size="3">
 <p><center>Prof. Andres Fernando Pérez  MSc.</center></p>
</font>



In [1]:
# --- (En Colab) instala PySpark: ---
# !pip install -q pyspark

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Crear sesión Spark
spark = SparkSession.builder.appName("MapReduceVentas").getOrCreate()

# =========================
# Datos de ejemplo
# =========================
data = [
    ("Tienda1", "Manzana", "Frutas", 5, 1000),
    ("Tienda1", "Pera",    "Frutas", 3, 1200),
    ("Tienda1", "Leche",   "Lácteos", 4, 2500),
    ("Tienda2", "Manzana", "Frutas", 2, 1100),
    ("Tienda2", "Queso",   "Lácteos", 1, 5000),
    ("Tienda2", "Pera",    "Frutas", 6, 1150),
    ("Tienda3", "Leche",   "Lácteos",10, 2400),
    ("Tienda3", "Manzana", "Frutas", 8, 1050),
]
cols = ["tienda", "producto", "categoria", "cantidad", "precio_unitario"]
df = spark.createDataFrame(data, cols)

# =========================
# (Opcional) Ver datos
# =========================
df.show(truncate=False)

# ============================================================
# MAP (conceptual): preparar columnas derivadas
#   - ingreso = cantidad * precio_unitario
#   - la "clave" conceptual sería (tienda, categoria)
# ============================================================
df_map = df.withColumn("ingreso", F.col("cantidad") * F.col("precio_unitario"))
df_map.show(truncate=False)

# ============================================================
# SHUFFLE: groupBy (tienda, categoria)
# REDUCE: agregaciones (sumatorias y promedio de precio)
#   precio_promedio_unitario = ingreso_total / unidades_totales
# ============================================================
agg = (
    df_map
    .groupBy("tienda", "categoria")
    .agg(
        F.sum("cantidad").alias("unidades_totales"),
        F.sum("ingreso").alias("ingreso_total")
    )
    .withColumn(
        "precio_prom_unit",
        (F.col("ingreso_total") / F.col("unidades_totales")).cast("double")
    )
    .orderBy("tienda", "categoria")
)

print("=== Totales por (tienda, categoría) ===")
agg.show(truncate=False)

# ============================================================
# EXTRA 1: Totales por categoría (en toda la cadena)
# ============================================================
totales_categoria = (
    df_map.groupBy("categoria")
    .agg(
        F.sum("cantidad").alias("unidades_totales"),
        F.sum("ingreso").alias("ingreso_total")
    )
    .withColumn(
        "precio_prom_unit",
        (F.col("ingreso_total") / F.col("unidades_totales")).cast("double")
    )
    .orderBy("categoria")
)

print("=== Totales por categoría (global) ===")
totales_categoria.show(truncate=False)

# ============================================================
# EXTRA 2: Producto TOP por categoría (por unidades vendidas)
#   - Window partition por categoría
#   - rank según unidades (desc)
# ============================================================
prod_categoria = (
    df_map.groupBy("categoria", "producto")
    .agg(F.sum("cantidad").alias("unidades"))
)

w = Window.partitionBy("categoria").orderBy(F.desc("unidades"))
top_prod_categoria = (
    prod_categoria
    .withColumn("rank", F.rank().over(w))
    .filter(F.col("rank") == 1)
    .drop("rank")
    .orderBy("categoria")
)

print("=== Producto top por categoría (por unidades) ===")
top_prod_categoria.show(truncate=False)

# ============================================================
# (Opcional) Versión RDD muy breve para ver el “sabor MapReduce”
# ============================================================
rdd = spark.sparkContext.parallelize(data)

# MAP: ((tienda, categoria), (cantidad, ingreso))
rdd_map = rdd.map(lambda x: ((x[0], x[2]), (x[3], x[3]*x[4])))

# REDUCE: sumar tupla (cantidad, ingreso) por clave
from operator import add
def add_pairs(a, b):
    return (a[0] + b[0], a[1] + b[1])

rdd_reduce = rdd_map.reduceByKey(add_pairs)

# precio_prom_unit = ingreso_total / unidades_totales
rdd_result = rdd_reduce.map(lambda kv: (kv[0][0], kv[0][1], kv[1][0], kv[1][1], kv[1][1]/kv[1][0]))

print("=== (RDD) Totales por (tienda, categoría) ===")
for row in rdd_result.collect():
    print({
        "tienda": row[0],
        "categoria": row[1],
        "unidades_totales": row[2],
        "ingreso_total": row[3],
        "precio_prom_unit": float(row[4])
    })

# spark.stop()  # Descomenta si quieres cerrar la sesión al finalizar


+-------+--------+---------+--------+---------------+
|tienda |producto|categoria|cantidad|precio_unitario|
+-------+--------+---------+--------+---------------+
|Tienda1|Manzana |Frutas   |5       |1000           |
|Tienda1|Pera    |Frutas   |3       |1200           |
|Tienda1|Leche   |Lácteos  |4       |2500           |
|Tienda2|Manzana |Frutas   |2       |1100           |
|Tienda2|Queso   |Lácteos  |1       |5000           |
|Tienda2|Pera    |Frutas   |6       |1150           |
|Tienda3|Leche   |Lácteos  |10      |2400           |
|Tienda3|Manzana |Frutas   |8       |1050           |
+-------+--------+---------+--------+---------------+

+-------+--------+---------+--------+---------------+-------+
|tienda |producto|categoria|cantidad|precio_unitario|ingreso|
+-------+--------+---------+--------+---------------+-------+
|Tienda1|Manzana |Frutas   |5       |1000           |5000   |
|Tienda1|Pera    |Frutas   |3       |1200           |3600   |
|Tienda1|Leche   |Lácteos  |4       |2500

## Bibliography and related resources.
<p align="justify"><font face="Verdana" size="2.5"> [1]&nbsp;&nbsp;&nbsp;&nbsp; J. Leskovec, A. Rajaraman, y J. D. Ullman, <cite>Mining of Massive Datasets</cite>, 2nd ed. New York, NY, USA: Cambridge University Press, 2014. <a href="http://www.mmds.org/"> here</a> </p>

<p align="justify"><font face="Verdana" size="2.5"> [2]&nbsp;&nbsp;&nbsp;&nbsp;Dean, J. and Ghemawat, S. 2004. <cite>MapReduce: Simplified data processing on large clusters. </cite>In Proceedings of Operating Systems Design and <br> &emsp; &emsp; Implementation (OSDI). San Francisco, CA. 137-150. <a href="https://static.googleusercontent.com/media/research.google.com/es//archive/mapreduce-osdi04.pdf"> here</a></p>

<p align="left"><b><font face='Courier New' color="white" align="left" size=4>Copyright.</font></b>
<img alt="GIIAM" height="120px" src="https://pascualbravo.edu.co/investigacion/giiam/" align="right" hspace="10px" vspace="0px" height="120" width="350"">
                                                                                                                              
<font face='Verdana' size="2.5">
Andres Fernando Perez G. <a href="https://scienti.minciencias.gov.co/cvlac/visualizador/generarCurriculoCv.do?cod_rh=0000347507">  CvLAC</a><br>
I.U Pascual Bravo.<br>
Calle 73 # 73A – 226<br>
Medellín, Colombia. South America.
    
</p>
</font>
    
</p>
</font>

<center><b><font color='01b3c2' face="Lucida Calligraphy,Comic Sans MS,Lucida Console" size="4">I.U Pascual Bravo.</font></b> </center>