## Spark SQL es muy fácil de usar, y punto. Quizá ya sepas que también es bastante difícil de dominar.

Para ser competente en Spark, hay que tener tres habilidades fundamentales:

-La capacidad de manipular y entender los datos
-El conocimiento de cómo adaptar la herramienta a las necesidades del programador
-El arte de encontrar un equilibrio entre los factores que afectan a las ejecuciones de los trabajos de Spark

He elaborado los siguientes seis ejercicios que se asemejan a algunas situaciones típicas a las que los desarrolladores de Spark se enfrentan a diario cuando construyen sus pipelines: estos ayudarán a evaluar las habilidades anteriores.

### El conjunto de datos
Describamos brevemente el conjunto de datos que vamos a utilizar: consta de tres tablas procedentes de la base de datos de una tienda, con productos, ventas y vendedores. Los datos están disponibles en archivos Parquet


<center><img src="https://miro.medium.com/max/700/1*wA4xJu3LMcm_vR5pFJkLpA.png" width=500 height=500 />
    



### Tabla de ventas
Cada fila de esta tabla es un pedido y cada pedido puede contener sólo un producto. Cada fila almacena los siguientes campos:

- order_id: El ID del pedido
- product_id: El único producto vendido en el pedido. Todos los pedidos tienen exactamente un producto)
- seller_id: El ID del empleado vendedor que vendió el producto
- num_pieces_sold: El número de unidades vendidas para el producto específico en el pedido
- bill_raw_text: Una cadena que representa el texto en bruto de la factura asociada al pedido
- date: La fecha del pedido.

### Tabla de productos
Cada fila representa un producto distinto. Los campos son

- product_id: El ID del producto
- product_name: El nombre del producto
- price: El precio del producto

### Tabla de vendedores
Esta tabla contiene la lista de todos los vendedores:

- seller_id: El ID del vendedor
- seller_name: El nombre del vendedor
- daily_target: El número de artículos (independientemente del tipo de producto) que el vendedor necesita para alcanzar su cuota. Por ejemplo, si el objetivo diario es 100.000, el empleado necesita vender 100.000 productos, puede alcanzar la cuota vendiendo 100.000 unidades del producto_0, pero también vendiendo 30.000 unidades del producto_1 y 70.000 unidades del producto_2


### Ejercicios
La mejor manera de aprovechar los ejercicios que se presentan a continuación es obtener los datos e implementar un código de trabajo que resuelva los problemas propuestos.

Consejo: He construido el conjunto de datos para poder trabajar en una sola máquina: cuando escribas el código, imagina lo que pasaría con un conjunto de datos 100 veces mayor.

Aunque sepas cómo resolverlas, ¡mi consejo es que no te saltes las preguntas de calentamiento! (si sabes que Spark te llevará unos segundos).


In [1]:
# Pyspark
!pip install pyspark pyspark-stubs

Collecting pyspark-stubs
  Downloading pyspark_stubs-3.0.0.post3-py3-none-any.whl.metadata (10 kB)
Collecting pyspark
  Downloading pyspark-3.0.3.tar.gz (209.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m209.1/209.1 MB[0m [31m33.4 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9 (from pyspark)
  Downloading py4j-0.10.9-py2.py3-none-any.whl.metadata (1.3 kB)
Downloading pyspark_stubs-3.0.0.post3-py3-none-any.whl (110 kB)
Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
Building wheels for collected packages: pyspark
[33m  DEPRECATION: Building 'pyspark' using the legacy setup.py bdist_wheel mechanism, which will be removed in a future version. pip 25.3 will enforce this behaviour change. A possible replacement is to use the standardized build interface by setting the `--use-pep517` option, (possibly combined with `--no-build-isolation`), or adding a `pyproject.toml` file to th

### 0.1 Averigüe cuántos pedidos, cuántos productos y cuántos vendedores hay en los datos.

¿Cuántos productos se han vendido al menos una vez? ¿Cuál es el producto que contiene más pedidos?

Crea la sesión de Spark y lea los archivos con el siguiente código

```
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.files.overwrite", "true") \
    .appName("Exercise1") \
    .getOrCreate()


products = spark.read.csv(
   'file:///clase8/products.csv', header=True, mode="DROPMALFORMED"
)

sellers = spark.read.csv(
   'file:///clase8/sellers.csv', header=True, mode="DROPMALFORMED"
)

sales = spark.read.csv(
   'file:///clase8/sales.csv', header=True, mode="DROPMALFORMED"
)
```
    

    

### 0.2 ¿Cuántos productos distintos se han vendido en cada día?

Cree la sesión de Spark utilizando el siguiente código

```
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.files.overwrite", "true") \
    .appName("Exercise1") \
    .getOrCreate()


products = spark.read.csv(
   'file:///clase8/products.csv', header=True, mode="DROPMALFORMED"
)

sellers = spark.read.csv(
   'file:///clase8/sellers.csv', header=True, mode="DROPMALFORMED"
)

sales = spark.read.csv(
   'file:///clase8/sales.csv', header=True, mode="DROPMALFORMED"
)
```

In [4]:
#Solución ejercicio 0.1
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.files.overwrite", "true") \
    .appName("Exercise1") \
    .getOrCreate()


products = spark.read.csv(
   'file:///clase8/products.csv', header=True, mode="DROPMALFORMED"
)

sellers = spark.read.csv(
   'file:///clase8/sellers.csv', header=True, mode="DROPMALFORMED"
)

sales = spark.read.csv(
   'file:///clase8/sales.csv', header=True, mode="DROPMALFORMED"
)

#   Print the number of orders
print("Number of Orders: {}".format(sales.count()))

#   Print the number of sellers
print("Number of sellers: {}".format(sellers.count()))

#   Print the number of products
print("Number of products: {}".format(products.count()))

spark.stop()

25/05/28 20:54:50 INFO SparkEnv: Registering MapOutputTracker
25/05/28 20:54:50 INFO SparkEnv: Registering BlockManagerMaster
25/05/28 20:54:50 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/05/28 20:54:50 INFO SparkEnv: Registering OutputCommitCoordinator
                                                                                

Number of Orders: 500001
Number of sellers: 10
Number of products: 40000


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Create Spark session
spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "500mb") \
    .appName("Exercise1") \
    .getOrCreate()

# Read Source tables
products = spark.read.csv(
   'file:///clase8/products.csv', header=True, mode="DROPMALFORMED"
)

sellers = spark.read.csv(
   'file:///clase8/sellers.csv', header=True, mode="DROPMALFORMED"
)

sales = spark.read.csv(
   'file:////clase8/sales.csv', header=True, mode="DROPMALFORMED"
)

sales.groupby(col("date")).agg(countDistinct(col("product_id")).alias("distinct_products_sold")).orderBy(
    col("distinct_products_sold").desc()).show()

spark.stop()

                                                                                

+----------+----------------------+
|      date|distinct_products_sold|
+----------+----------------------+
|2020-07-07|                  2500|
|2020-07-01|                  2478|
|2020-07-09|                  2469|
|2020-07-08|                  2439|
|2020-07-04|                  2429|
|2020-07-05|                  2419|
|2020-07-10|                  2403|
|2020-07-02|                  2391|
|2020-07-03|                  2367|
|2020-07-06|                  2330|
+----------+----------------------+



## Comparativa entre Spark y Pandas

In [6]:
import time
import pandas as pd
from pyspark.sql import SparkSession

def benchmark_pandas():
    start = time.perf_counter()

    # Read CSVs into pandas
    products = pd.read_csv('/clase8/products.csv')
    sellers  = pd.read_csv('/clase8/sellers.csv')
    sales    = pd.read_csv('/clase8/sales.csv')

    # Compute counts
    n_products = len(products)
    n_sellers  = len(sellers)
    n_sales    = len(sales)

    elapsed = time.perf_counter() - start
    return elapsed, (n_products, n_sellers, n_sales)

def benchmark_spark():
    start = time.perf_counter()

    # Start SparkSession
    spark = SparkSession.builder \
        .master("local") \
        .config("spark.sql.autoBroadcastJoinThreshold", -1) \
        .config("spark.files.overwrite", "true") \
        .appName("Benchmark") \
        .getOrCreate()

    # Read CSVs into DataFrames
    products = spark.read.csv('file:///clase8/products.csv', header=True, mode="DROPMALFORMED")
    sellers  = spark.read.csv('file:///clase8/sellers.csv', header=True, mode="DROPMALFORMED")
    sales    = spark.read.csv('file:///clase8/sales.csv', header=True, mode="DROPMALFORMED")

    # Trigger full scan counts
    n_sales    = sales.count()
    n_sellers  = sellers.count()
    n_products = products.count()

    spark.stop()

    elapsed = time.perf_counter() - start
    return elapsed, (n_products, n_sellers, n_sales)

def main():
    pd_time, (pd_prod, pd_seller, pd_sales) = benchmark_pandas()
    print(f"[pandas]   Products={pd_prod}, Sellers={pd_seller}, Sales={pd_sales}")
    print(f"[pandas]   Elapsed time: {pd_time:.2f} s\n")

    spark_time, (sp_prod, sp_seller, sp_sales) = benchmark_spark()
    print(f"[PySpark]  Products={sp_prod}, Sellers={sp_seller}, Sales={sp_sales}")
    print(f"[PySpark]  Elapsed time: {spark_time:.2f} s")

if __name__ == "__main__":
    main()


[pandas]   Products=4000000, Sellers=100, Sales=500000
[pandas]   Elapsed time: 8.64 s



25/05/28 22:02:56 INFO SparkEnv: Registering MapOutputTracker
25/05/28 22:02:56 INFO SparkEnv: Registering BlockManagerMaster
25/05/28 22:02:56 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
25/05/28 22:02:57 INFO SparkEnv: Registering OutputCommitCoordinator
                                                                                

[PySpark]  Products=4000000, Sellers=100, Sales=500000
[PySpark]  Elapsed time: 7.67 s
