# Spark JDBC

En el corazón de la integración de Spark con bases de datos relacionales encontramos JDBC (Java Database Connectivity). JDBC actúa como un puente esencial, proporcionando una interfaz estandarizada que permite a las aplicaciones Spark comunicarse con bases de datos relacionales. Esta interfaz no es simplemente un canal de comunicación; es un conjunto completo de protocolos y estándares que facilitan operaciones de lectura y escritura de manera eficiente y segura.

## Spark SQL y su Relación con JDBC

Spark SQL emerge como uno de los módulos más poderosos dentro del ecosistema Spark. Este módulo introduce el concepto de DataFrames, una abstracción que permite trabajar con datos estructurados de manera intuitiva y eficiente. Cuando combinamos Spark SQL con JDBC, obtenemos una herramienta extremadamente versátil para el procesamiento de datos.  

La integración de Spark SQL con JDBC va más allá de simples operaciones de lectura y escritura. El sistema permite ejecutar consultas complejas que se benefician del procesamiento distribuido de Spark, mientras mantiene la integridad y las características ACID de las bases de datos relacionales. Esta simbiosis permite aprovechar lo mejor de ambos mundos: la escalabilidad de Spark y la confiabilidad de las bases de datos relacionales.

## Arquitectura

La arquitectura de la integración Spark-JDBC se construye sobre varios componentes fundamentales que trabajan en conjunto. El Driver JDBC actúa como el intérprete principal, traduciendo las instrucciones de Spark en comandos que la base de datos puede entender. Este componente maneja no solo la traducción de comandos, sino también la gestión de tipos de datos y la optimización de consultas.  

El Connection Pool representa otro componente crucial en esta arquitectura. En lugar de crear nuevas conexiones para cada operación, mantiene un conjunto de conexiones activas que pueden ser reutilizadas. Este enfoque reduce significativamente la sobrecarga asociada con el establecimiento de conexiones y mejora el rendimiento general del sistema.  

El sistema de particionamiento en esta arquitectura merece especial atención. Permite dividir grandes conjuntos de datos en fragmentos manejables que pueden procesarse en paralelo. Este particionamiento no es arbitrario; se basa en estrategias sofisticadas que consideran la distribución de datos y los recursos disponibles.

## Operaciones y Optimización

Las operaciones en el contexto de Spark-JDBC pueden clasificarse en tres categorías principales: lectura, escritura y transformación. Las operaciones de lectura pueden variar desde la simple recuperación de tablas completas hasta consultas complejas con múltiples joins y agregaciones. La escritura, por otro lado, puede implicar inserciones masivas, actualizaciones o operaciones de upsert.  

La optimización en este contexto es un arte complejo. El push-down de predicados representa una de las técnicas más importantes, permitiendo que los filtros se ejecuten en la base de datos antes de que los datos se transfieran a Spark. Esto puede reducir significativamente la cantidad de datos transferidos y mejorar el rendimiento general.  

La gestión de recursos y la configuración de parámetros juegan un papel crucial en el rendimiento. El tamaño del fetch, el número de particiones y el tamaño del batch deben ajustarse cuidadosamente según las características específicas de cada caso de uso. Estos ajustes pueden tener un impacto significativo en el rendimiento y la utilización de recursos.

## Ejemplo de conexión de JDBC desde Databricks Community con SQL Server de Azure. 

### Creación de SQL Server en Azure  

- Crear un grupo de recursos. Luego crear un recurso > Bases de datos > **SQL Database**  
- Configura los detalles:  
  -. Nombre del sesrvidor SQL: databricks-sql-server  
  -. Región: Italy North u otra disponible  
  -. Autenticación: Habilita SQL Authentication y configura:  
    1. Usuario: `adminuser`  
    2. Contraseña: `ContraseñaFuerte123`  
- Marca la casilla de "Habilitar acceso a Azure Services"  
  

### Crear la base de datos: AdventureWorksLT  

- En Additional settings, en Data Source escoger Sample para que se habilite la carga de [AdventureWorksLT](https://learn.microsoft.com/es-es/sql/samples/adventureworks-install-configure?view=sql-server-ver16&tabs=ssms). Lo demas se deja por defecto.  

- En la misma sección, crea una base de datos con nombre cualquiera (en este ejemplo le hemos llamado db-notebook-4). Selecciona el servidor creado (databricks-sql-server).Nivel de precio: Elige el plan más económico (Básico - DTU: 5).   

- Ve al servidor que has creado y en networking configurar el firewall. Añade la IP pública de tu conexión local y habilita "Permitir acceso a todos los servicios de Azure". Habilita la dirección IP publica de databricks, suele ser: 54.200.13.2. Guarda los cambios.  

### Configurar Databricks Community Edition  
Ve a la pestaña Compute y selecciona Create Compute con estos requisitos:  
  - Cluster Name: AdventureWorksCluster.  
  - Databricks Runtime Version: 11.3 LTS (Scala 2.12, Spark 3.3.1)  
  - Crear clúster.  
  - Mientras el clúster se esta creando, descarga el controlador JDBC para SQL Server, en este caso usaremos [este](https://tajamar365.sharepoint.com/:u:/s/3405-MasterIA2024-2025/EeR4l4udCBFClfiDcBhI7PMBG-VfNpuLHlAQ7a1FVBC5OA?e=Igqclx).  
  - En Databricks subir el controlador a tu workspace o a tu DBFS.
  - Una vez que el cluster esté activo ve al Cluster y en el boton Libraries cargar el controlador haciendo click en `Install New` y le pasas el path donde has guardado el controlador.  

### Conectar Databricks con SQL Server  

Cómo saber la IP pública de Databricks en Azure

In [0]:
import requests # Obtener la IP pública del nodo
public_ip = requests.get('https://api.ipify.org').text
print(f"La IP pública del nodo es: {public_ip}")

La IP pública del nodo es: 34.222.221.121


Crea un notebook en Databricks y añade el siguiente código ( con tus datos de configuración) :

In [0]:
# Configuración de conexión JDBC
jdbcHostname = "databricks-sql-server-andy.database.windows.net"  # Servidor SQL
jdbcPort = 1433
jdbcDatabase = "db-notebook4"  # Nombre exacto de tu base de datos
jdbcUsername = "adminuser"  # Cambiar por tu usuario configurado
jdbcPassword = "ContraseñaFuerte123"  # Cambiar por la contraseña configurada

jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};databaseName={jdbcDatabase}"

# Propiedades de conexión
connectionProperties = {
    "user": jdbcUsername,
    "password": jdbcPassword,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

### Consulta de Prueba

In [0]:
# Consulta de prueba
query = "(SELECT TOP 10 * FROM SalesLT.Product) AS temp"  # Cambia por una tabla válida si es necesario

# Leer datos desde SQL Server
try:
    df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
    df.show()  # Mostrar los datos
except Exception as e:
    print(f"Error al conectar: {e}")

+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-------------------+----------------+--------------------+----------------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|Color|StandardCost|ListPrice|Size| Weight|ProductCategoryID|ProductModelID|      SellStartDate|        SellEndDate|DiscontinuedDate|      ThumbNailPhoto|ThumbnailPhotoFileName|             rowguid|        ModifiedDate|
+---------+--------------------+-------------+-----+------------+---------+----+-------+-----------------+--------------+-------------------+-------------------+----------------+--------------------+----------------------+--------------------+--------------------+
|      680|HL Road Frame - B...|   FR-R92B-58|Black|   1059.3100|1431.5000|  58|1016.04|               18|             6|2002-06-01 00:00:00|               null|            null|[47 49 46 38 39 6...|  no_i

### Prueba con consultas simples usando Pyspark:  
 

#### 1. Listar Listar todas las tablas disponibles

In [0]:
query = "(SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'BASE TABLE') AS temp"
df_tables = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df_tables.show(truncate=False)


+------------------------------+
|TABLE_NAME                    |
+------------------------------+
|Customer                      |
|ProductModel                  |
|ProductDescription            |
|Product                       |
|ProductModelProductDescription|
|ProductCategory               |
|BuildVersion                  |
|ErrorLog                      |
|Address                       |
|CustomerAddress               |
|SalesOrderDetail              |
|SalesOrderHeader              |
+------------------------------+



#### 2. Productos con precios mayores a $50

In [0]:
query = "(SELECT ProductID, Name, ListPrice FROM SalesLT.Product WHERE ListPrice > 50) AS temp"
df_filtered = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df_filtered.show()


+---------+--------------------+---------+
|ProductID|                Name|ListPrice|
+---------+--------------------+---------+
|      680|HL Road Frame - B...|1431.5000|
|      706|HL Road Frame - R...|1431.5000|
|      717|HL Road Frame - R...|1431.5000|
|      718|HL Road Frame - R...|1431.5000|
|      719|HL Road Frame - R...|1431.5000|
|      720|HL Road Frame - R...|1431.5000|
|      721|HL Road Frame - R...|1431.5000|
|      722|LL Road Frame - B...| 337.2200|
|      723|LL Road Frame - B...| 337.2200|
|      724|LL Road Frame - B...| 337.2200|
|      725|LL Road Frame - R...| 337.2200|
|      726|LL Road Frame - R...| 337.2200|
|      727|LL Road Frame - R...| 337.2200|
|      728|LL Road Frame - R...| 337.2200|
|      729|LL Road Frame - R...| 337.2200|
|      730|LL Road Frame - R...| 337.2200|
|      731|ML Road Frame - R...| 594.8300|
|      732|ML Road Frame - R...| 594.8300|
|      733|ML Road Frame - R...| 594.8300|
|      734|ML Road Frame - R...| 594.8300|
+---------+

#### 3. Contar productos por categoría

In [0]:
query = """
(SELECT ProductCategoryID, COUNT(*) AS TotalProducts
 FROM SalesLT.Product
 GROUP BY ProductCategoryID) AS temp
"""
df_count = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df_count.show()


+-----------------+-------------+
|ProductCategoryID|TotalProducts|
+-----------------+-------------+
|                5|           32|
|                6|           43|
|                7|           22|
|                8|            8|
|                9|            3|
|               10|            2|
|               11|            1|
|               12|            3|
|               13|            2|
|               14|            3|
|               15|            3|
|               16|           28|
|               17|            7|
|               18|           33|
|               19|            9|
|               20|           18|
|               21|           14|
|               22|            3|
|               23|            1|
|               24|            6|
+-----------------+-------------+
only showing top 20 rows



> Actividad: Mejorar la query anterior

#### 4. Contar el total de productos por tamaño

In [0]:
df.groupBy("Size").count().orderBy("count", ascending=False).show()


+----+-----+
|Size|count|
+----+-----+
|null|    4|
|   M|    2|
|  58|    2|
|   L|    1|
|   S|    1|
+----+-----+



In [0]:
from pyspark.sql import functions as F

# Agrupar por "Size", contar los registros y ordenar de forma descendente por el conteo
df_grouped = df.groupBy("Size").
                count().
                withColumnRenamed("count", "SizeCount")

# Ordenar de manera descendente por "SizeCount"
df_grouped.orderBy(F.col("SizeCount")
                   .desc())
                   .show()

+----+---------+
|Size|SizeCount|
+----+---------+
|null|        4|
|   M|        2|
|  58|        2|
|   L|        1|
|   S|        1|
+----+---------+



> Mejorar/corregir la query anterior

#### 5. Calcular el precio promedio de los productos

In [0]:
df.selectExpr("AVG(ListPrice) AS AveragePrice").show()


+------------+
|AveragePrice|
+------------+
|309.59400000|
+------------+



> ¿y si obtenemos el precio promedio por cada producto?. Mejorar la query

In [0]:
from pyspark.sql import functions as F

# Agrupar por ProductID y calcular el promedio de ListPrice, incluyendo ProductName
df_avg_price_per_product = df.groupBy("ProductID", "Name").agg(
    F.avg("ListPrice").alias("AveragePrice")
)

# Mostrar el resultado
df_avg_price_per_product.show()

+---------+--------------------+-------------+
|ProductID|                Name| AveragePrice|
+---------+--------------------+-------------+
|      712|        AWC Logo Cap|   8.99000000|
|      706|HL Road Frame - R...|1431.50000000|
|      713|Long-Sleeve Logo ...|  49.99000000|
|      709|Mountain Bike Soc...|   9.50000000|
|      710|Mountain Bike Soc...|   9.50000000|
|      708|Sport-100 Helmet,...|  34.99000000|
|      680|HL Road Frame - B...|1431.50000000|
|      711|Sport-100 Helmet,...|  34.99000000|
|      707|Sport-100 Helmet,...|  34.99000000|
|      714|Long-Sleeve Logo ...|  49.99000000|
+---------+--------------------+-------------+



#### 6. Encontrar productos sin categoría asignada

In [0]:
df.filter(df.ProductCategoryID.isNull()).select("ProductID", "Name", "ProductCategoryID").show()


+---------+----+-----------------+
|ProductID|Name|ProductCategoryID|
+---------+----+-----------------+
+---------+----+-----------------+



> ¿Es la query anterior correcta? Demostrar con otra query que si

In [0]:
# Filtrar directamente y mostrar el resultado
df.select("ProductID", "Name", "ProductCategoryID") \
  .where(F.col("ProductCategoryID").isNull()) \
  .show()

+---------+----+-----------------+
|ProductID|Name|ProductCategoryID|
+---------+----+-----------------+
+---------+----+-----------------+



#### 7. Contar productos por color

In [0]:
df.groupBy("Color").count().orderBy("count", ascending=False).show()


+-----+-----+
|Color|count|
+-----+-----+
|Multi|    3|
|White|    2|
|Black|    2|
|  Red|    2|
| Blue|    1|
+-----+-----+



#### 8. Calcular el costo total de todos los productos

In [0]:
df.selectExpr("SUM(StandardCost) AS TotalCost").show()


+---------+
|TotalCost|
+---------+
|2248.5784|
+---------+



> Mejorar query anterior

In [0]:
# Calcular el costo total de todos los productos
df.select(F.sum("StandardCost").alias("TotalCost")).show()

+---------+
|TotalCost|
+---------+
|2248.5784|
+---------+



#### 9. Productos que contienen una palabra específica en su nombre 

In [0]:
# Filtrar productos que contienen la palabra 'Helmet' en el nombre
df.filter(df.Name.contains("Helmet")).select("ProductID", "Name", "ListPrice").show()


+---------+--------------------+---------+
|ProductID|                Name|ListPrice|
+---------+--------------------+---------+
|      707|Sport-100 Helmet,...|  34.9900|
|      708|Sport-100 Helmet,...|  34.9900|
|      711|Sport-100 Helmet,...|  34.9900|
+---------+--------------------+---------+



> Lista los nombres de todos los productos

In [0]:
df.select("ProductID", "Name").show(truncate=False)

+---------+----------------------+
|ProductID|Name                  |
+---------+----------------------+
|879      |All-Purpose Bike Stand|
|712      |AWC Logo Cap          |
|877      |Bike Wash - Dissolver |
|843      |Cable Lock            |
|952      |Chain                 |
|866      |Classic Vest, L       |
|865      |Classic Vest, M       |
|864      |Classic Vest, S       |
|878      |Fender Set - Mountain |
|948      |Front Brakes          |
+---------+----------------------+



#### 10. Listar productos creados después de 2005

In [0]:
df.filter(df.SellStartDate >= "2005-01-01").select("ProductID", "Name", "SellStartDate").show()


+---------+--------------------+-------------------+
|ProductID|                Name|      SellStartDate|
+---------+--------------------+-------------------+
|      707|Sport-100 Helmet,...|2005-07-01 00:00:00|
|      708|Sport-100 Helmet,...|2005-07-01 00:00:00|
|      709|Mountain Bike Soc...|2005-07-01 00:00:00|
|      710|Mountain Bike Soc...|2005-07-01 00:00:00|
|      711|Sport-100 Helmet,...|2005-07-01 00:00:00|
|      712|        AWC Logo Cap|2005-07-01 00:00:00|
|      713|Long-Sleeve Logo ...|2005-07-01 00:00:00|
|      714|Long-Sleeve Logo ...|2005-07-01 00:00:00|
+---------+--------------------+-------------------+



#### 11. Producto más caro por categoría 

In [0]:
from pyspark.sql.functions import col, max as spark_max

df.groupBy("ProductCategoryID").agg(spark_max("ListPrice").alias("MaxPrice")).orderBy("MaxPrice", ascending=False).show()


+-----------------+---------+
|ProductCategoryID| MaxPrice|
+-----------------+---------+
|               18|1431.5000|
|               25|  49.9900|
|               35|  34.9900|
|               27|   9.5000|
|               23|   8.9900|
+-----------------+---------+



#### 12. Calcular el precio promedio por categoría

In [0]:
from pyspark.sql.functions import avg

df.groupBy("ProductCategoryID").agg(avg("ListPrice").alias("AveragePrice")).orderBy("AveragePrice", ascending=False).show()


+-----------------+-------------+
|ProductCategoryID| AveragePrice|
+-----------------+-------------+
|               18|1431.50000000|
|               25|  49.99000000|
|               35|  34.99000000|
|               27|   9.50000000|
|               23|   8.99000000|
+-----------------+-------------+



> Mejorar query anterior

In [0]:
from pyspark.sql import functions as F

# Agrupar por "ProductCategoryID" y calcular el precio promedio con un alias más claro
df.groupBy("ProductCategoryID") \
  .agg(F.avg("ListPrice").alias("AveragePrice")) \
  .orderBy(F.desc("AveragePrice")) \
  .show(truncate=False)

+-----------------+-------------+
|ProductCategoryID|AveragePrice |
+-----------------+-------------+
|18               |1431.50000000|
|25               |49.99000000  |
|35               |34.99000000  |
|27               |9.50000000   |
|23               |8.99000000   |
+-----------------+-------------+



#### 13. Encontrar productos descontinuados (Discontinued no es NULL)

In [0]:
df.filter(df.DiscontinuedDate.isNotNull()).select("ProductID", "Name", "DiscontinuedDate").show()


+---------+----+----------------+
|ProductID|Name|DiscontinuedDate|
+---------+----+----------------+
+---------+----+----------------+



> Comprueba con otra query que la salida anterior es correcta

In [0]:
df.where(df.DiscontinuedDate.isNotNull()) \
  .select("ProductID", "Name", "DiscontinuedDate") \
  .show()

+---------+----+----------------+
|ProductID|Name|DiscontinuedDate|
+---------+----+----------------+
+---------+----+----------------+



#### 14. Productos con precios mayores que su costo estándar 

In [0]:
df.filter(df.ListPrice > df.StandardCost).select("ProductID", "Name", "ListPrice", "StandardCost").show()

+---------+--------------------+---------+------------+
|ProductID|                Name|ListPrice|StandardCost|
+---------+--------------------+---------+------------+
|      680|HL Road Frame - B...|1431.5000|   1059.3100|
|      706|HL Road Frame - R...|1431.5000|   1059.3100|
|      707|Sport-100 Helmet,...|  34.9900|     13.0863|
|      708|Sport-100 Helmet,...|  34.9900|     13.0863|
|      709|Mountain Bike Soc...|   9.5000|      3.3963|
|      710|Mountain Bike Soc...|   9.5000|      3.3963|
|      711|Sport-100 Helmet,...|  34.9900|     13.0863|
|      712|        AWC Logo Cap|   8.9900|      6.9223|
|      713|Long-Sleeve Logo ...|  49.9900|     38.4923|
|      714|Long-Sleeve Logo ...|  49.9900|     38.4923|
+---------+--------------------+---------+------------+



### Actividad 1: Repetir las consultas anteriores pero usando SQL (no pyspark) 

#### Registrar las tablas que utilizarás como tabla temporal, por ejemplo:

In [0]:
query = "(SELECT * FROM SalesLT.Product) AS temp"
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df.createOrReplaceTempView("Product")


In [0]:
%sql
SELECT ProductID, Name, ListPrice
FROM Product
ORDER BY ListPrice DESC
LIMIT 5;


ProductID,Name,ListPrice
749,"Road-150 Red, 62",3578.27
753,"Road-150 Red, 56",3578.27
750,"Road-150 Red, 44",3578.27
751,"Road-150 Red, 48",3578.27
752,"Road-150 Red, 52",3578.27


#### 1. Listar todas las tablas disponibles

In [0]:
%sql
SHOW TABLES;

database,tableName,isTemporary
,product,True


In [0]:
%sql
SELECT * FROM Product;

ProductID,Name,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,ProductCategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ThumbNailPhoto,ThumbnailPhotoFileName,rowguid,ModifiedDate
680,"HL Road Frame - Black, 58",FR-R92B-58,Black,1059.31,1431.5,58,1016.04,18,6,2002-06-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,43DD68D6-14A4-461F-9069-55309D90EA7E,2008-03-11T10:01:36.827+0000
706,"HL Road Frame - Red, 58",FR-R92R-58,Red,1059.31,1431.5,58,1016.04,18,6,2002-06-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,9540FF17-2712-4C90-A3D1-8CE5568B2462,2008-03-11T10:01:36.827+0000
707,"Sport-100 Helmet, Red",HL-U509-R,Red,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,2E1EF41A-C08A-4FF6-8ADA-BDE58B64A712,2008-03-11T10:01:36.827+0000
708,"Sport-100 Helmet, Black",HL-U509,Black,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,A25A44FB-C2DE-4268-958F-110B8D7621E2,2008-03-11T10:01:36.827+0000
709,"Mountain Bike Socks, M",SO-B909-M,White,3.3963,9.5,M,,27,18,2005-07-01T00:00:00.000+0000,2006-06-30T00:00:00.000+0000,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,18F95F47-1540-4E02-8F1F-CC1BCB6828D0,2008-03-11T10:01:36.827+0000
710,"Mountain Bike Socks, L",SO-B909-L,White,3.3963,9.5,L,,27,18,2005-07-01T00:00:00.000+0000,2006-06-30T00:00:00.000+0000,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,161C035E-21B3-4E14-8E44-AF508F35D80A,2008-03-11T10:01:36.827+0000
711,"Sport-100 Helmet, Blue",HL-U509-B,Blue,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,FD7C0858-4179-48C2-865B-ABD5DFC7BC1D,2008-03-11T10:01:36.827+0000
712,AWC Logo Cap,CA-1098,Multi,6.9223,8.99,,,23,2,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,B9EDE243-A6F4-4629-B1D4-FFE1AEDC6DE7,2008-03-11T10:01:36.827+0000
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,Multi,38.4923,49.99,S,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,FD449C82-A259-4FAE-8584-6CA0255FAF68,2008-03-11T10:01:36.827+0000
714,"Long-Sleeve Logo Jersey, M",LJ-0192-M,Multi,38.4923,49.99,M,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,6A290063-A0CF-432A-8110-2EA0FDA14308,2008-03-11T10:01:36.827+0000


%md
#### 2. Productos con precios mayores a $50

In [0]:
%sql
SELECT ProductID, Name, ListPrice
FROM Product
WHERE ListPrice > 50;

ProductID,Name,ListPrice
680,"HL Road Frame - Black, 58",1431.5
706,"HL Road Frame - Red, 58",1431.5
717,"HL Road Frame - Red, 62",1431.5
718,"HL Road Frame - Red, 44",1431.5
719,"HL Road Frame - Red, 48",1431.5
720,"HL Road Frame - Red, 52",1431.5
721,"HL Road Frame - Red, 56",1431.5
722,"LL Road Frame - Black, 58",337.22
723,"LL Road Frame - Black, 60",337.22
724,"LL Road Frame - Black, 62",337.22


#### 3. Contar productos por categoría

In [0]:
%sql
SELECT ProductCategoryId, COUNT(*) AS ProductCount
FROM Product
GROUP BY ProductCategoryId;

ProductCategoryId,ProductCount
31,1
34,1
28,3
27,4
26,7
12,3
22,3
13,2
16,28
6,43


#### 4. Contar el total de productos por tamaño

In [0]:
%sql
SELECT Size, COUNT(*) AS ProductCount
FROM Product
GROUP BY Size;

Size,ProductCount
54,9
XL,3
42,15
52,16
,84
70,1
46,11
M,11
L,11
60,11


#### 5. Calcular el precio promedio de los productos

In [0]:
%sql
SELECT AVG(ListPrice) AS AveragePrice
FROM Product;


AveragePrice
744.59522034


#### 6. Encontrar productos sin categoría asignada

In [0]:
%sql
SELECT *
FROM Product
WHERE ProductCategoryId IS NULL;

ProductID,Name,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,ProductCategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ThumbNailPhoto,ThumbnailPhotoFileName,rowguid,ModifiedDate


%md
#### 7. Contar productos por color

In [0]:
%sql
SELECT Color, COUNT(*) AS ProductCount
FROM Product
GROUP BY Color;

Color,ProductCount
,50
Multi,8
Silver,36
Blue,26
White,4
Black,89
Yellow,36
Red,38
Grey,1
Silver/Black,7


#### 8. Calcular el costo total de todos los productos

In [0]:
%sql
SELECT SUM(ListPrice) AS TotalCost
FROM Product;

TotalCost
219655.59


#### 9. Productos que contienen una palabra específica en su nombre 

In [0]:
%sql
SELECT *
FROM Product
WHERE name LIKE '%helmet%';

ProductID,Name,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,ProductCategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ThumbNailPhoto,ThumbnailPhotoFileName,rowguid,ModifiedDate
708,"Sport-100 Helmet, Black",HL-U509,Black,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,A25A44FB-C2DE-4268-958F-110B8D7621E2,2008-03-11T10:01:36.827+0000
711,"Sport-100 Helmet, Blue",HL-U509-B,Blue,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,FD7C0858-4179-48C2-865B-ABD5DFC7BC1D,2008-03-11T10:01:36.827+0000
707,"Sport-100 Helmet, Red",HL-U509-R,Red,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,2E1EF41A-C08A-4FF6-8ADA-BDE58B64A712,2008-03-11T10:01:36.827+0000


%md
> Lista los nombres de todos los productos

In [0]:
%sql
SELECT Name
FROM Product;

Name
All-Purpose Bike Stand
AWC Logo Cap
Bike Wash - Dissolver
Cable Lock
Chain
"Classic Vest, L"
"Classic Vest, M"
"Classic Vest, S"
Fender Set - Mountain
Front Brakes


%md
#### 10. Listar productos creados después de 2005

In [0]:
%sql
SELECT ProductID, Name, SellStartDate
FROM Product
WHERE YEAR(SellStartDate) > 2005;

ProductID,Name,SellStartDate
779,"Mountain-200 Silver, 38",2006-07-01T00:00:00.000+0000
780,"Mountain-200 Silver, 42",2006-07-01T00:00:00.000+0000
781,"Mountain-200 Silver, 46",2006-07-01T00:00:00.000+0000
782,"Mountain-200 Black, 38",2006-07-01T00:00:00.000+0000
783,"Mountain-200 Black, 42",2006-07-01T00:00:00.000+0000
784,"Mountain-200 Black, 46",2006-07-01T00:00:00.000+0000
785,"Mountain-300 Black, 38",2006-07-01T00:00:00.000+0000
786,"Mountain-300 Black, 40",2006-07-01T00:00:00.000+0000
787,"Mountain-300 Black, 44",2006-07-01T00:00:00.000+0000
788,"Mountain-300 Black, 48",2006-07-01T00:00:00.000+0000


%md
#### 11. Producto más caro por categoría 

In [0]:
%sql
SELECT ProductCategoryId, MAX(ListPrice) AS MaxPrice
FROM Product
GROUP BY ProductCategoryId
ORDER BY ProductCategoryId ASC;

ProductCategoryId,MaxPrice
5,3399.99
6,3578.27
7,2384.07
8,120.27
9,121.49
10,106.5
11,20.24
12,404.99
13,121.46
14,229.49


#### 12. Calcular el precio promedio por categoría

In [0]:
%sql
SELECT ProductCategoryId, AVG(ListPrice) AS AvgPrice
FROM Product
GROUP BY ProductCategoryId;

ProductCategoryId,AvgPrice
31,159.0
34,21.98
28,74.99
27,9.245
26,64.27571429
12,278.99
22,89.99
13,106.475
16,678.25357143
6,1597.45


#### 13. Encontrar productos descontinuados (Discontinued no es NULL)

In [0]:
%sql
SELECT ProductID, Name, DiscontinuedDate
FROM Product
WHERE DiscontinuedDate IS NOT NULL;

ProductID,Name,DiscontinuedDate


%md
#### 14. Productos con precios mayores que su costo estándar 

In [0]:
%sql
SELECT ProductID, Name, ListPrice, StandardCost
FROM Product
WHERE ListPrice > StandardCost;

ProductID,Name,ListPrice,StandardCost
680,"HL Road Frame - Black, 58",1431.5,1059.31
706,"HL Road Frame - Red, 58",1431.5,1059.31
707,"Sport-100 Helmet, Red",34.99,13.0863
708,"Sport-100 Helmet, Black",34.99,13.0863
709,"Mountain Bike Socks, M",9.5,3.3963
710,"Mountain Bike Socks, L",9.5,3.3963
711,"Sport-100 Helmet, Blue",34.99,13.0863
712,AWC Logo Cap,8.99,6.9223
713,"Long-Sleeve Logo Jersey, S",49.99,38.4923
714,"Long-Sleeve Logo Jersey, M",49.99,38.4923


### Actividad 2. Utilizando PySpark responda a las siguientes preguntas:

#### 1. Escribe un código para calcular cuántos productos tienen un ListPrice mayor que el precio promedio de todos los productos.

In [0]:
from pyspark.sql.functions import avg

# Calcular el precio promedio de todos los productos
average_price = df.agg(avg("ListPrice")).collect()[0][0]

# Filtrar los productos cuyo ListPrice es mayor que el promedio
count_above_average = df.filter(df.ListPrice > average_price).count()

print(count_above_average)


102


#### 2. Filtra todos los productos cuyo nombre comience con la letra "A" y muestra su ProductID, Name y ListPrice.

In [0]:
filtered_products = df.filter(df.Name.startswith("A")).select("ProductID", "Name", "ListPrice")
filtered_products.show()


+---------+--------------------+---------+
|ProductID|                Name|ListPrice|
+---------+--------------------+---------+
|      879|All-Purpose Bike ...| 159.0000|
|      712|        AWC Logo Cap|   8.9900|
+---------+--------------------+---------+



#### 3. Calcula la desviación estándar de la columna StandardCost.

In [0]:
from pyspark.sql.functions import stddev

# Calcular la desviación estándar de StandardCost
stddev_standard_cost = df.select(stddev("StandardCost")).collect()[0][0]
print(stddev_standard_cost)


534.8956068198088


#### 4. Ordena los productos por ListPrice en orden ascendente y muestra los 10 productos más baratos. 

In [0]:
cheapest_products = df.orderBy("ListPrice").limit(10)
cheapest_products.show()


+---------+--------------------+-------------+-----+------------+---------+----+------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+
|ProductID|                Name|ProductNumber|Color|StandardCost|ListPrice|Size|Weight|ProductCategoryID|ProductModelID|      SellStartDate|SellEndDate|DiscontinuedDate|      ThumbNailPhoto|ThumbnailPhotoFileName|             rowguid|        ModifiedDate|
+---------+--------------------+-------------+-----+------------+---------+----+------+-----------------+--------------+-------------------+-----------+----------------+--------------------+----------------------+--------------------+--------------------+
|      873| Patch Kit/8 Patches|      PK-7098| null|      0.8565|   2.2900|null|  null|               41|           114|2007-07-01 00:00:00|       null|            null|[47 49 46 38 39 6...|  tirepatch_kit_sma...|36E638E4-68DF-411..

#### 5. Filtra los productos cuyo tamaño sea "M" y cuyo precio sea mayor a $50. 

In [0]:
filtered_products_M_50 = df.filter((df.Size == "M") & (df.ListPrice > 50))
filtered_products_M_50.display()


ProductID,Name,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,ProductCategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ThumbNailPhoto,ThumbnailPhotoFileName,rowguid,ModifiedDate
849,"Men's Sports Shorts, M",SH-M897-M,Black,24.7459,59.99,M,,26,13,2006-07-01T00:00:00.000+0000,2007-06-30T00:00:00.000+0000,,R0lGODlhUAAyAPcAAJ+fn3t6XMLBsra0nbm5udnZ0u/u6EZISa2rk6+vr6qqql9eEPj49kVDG+Li4ltZPWRjQ+zs7OLh3mtqK3JyLH9+RPr6+ebm5nt6Verp4vb28oWEVHNxVmZlE2VkLeXl3rS0tKGehVpZDjw6Eqakiujo6Po= (truncated),shorts_male_small.gif,DB37B435-74B9-43D3-B363-ABBEAD107BC4,2008-03-11T10:01:36.827+0000
853,"Women's Tights, M",TG-W091-M,Black,30.9334,74.99,M,,28,38,2006-07-01T00:00:00.000+0000,2007-06-30T00:00:00.000+0000,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,4D8E186C-B8C9-4C64-B411-4995DD87E316,2008-03-11T10:01:36.827+0000
856,"Men's Bib-Shorts, M",SB-M891-M,Multi,37.1209,89.99,M,,22,12,2006-07-01T00:00:00.000+0000,2007-06-30T00:00:00.000+0000,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,E0CBEC04-F4FC-450F-9780-F8EA7691FEBD,2008-03-11T10:01:36.827+0000
865,"Classic Vest, M",VE-C304-M,Blue,23.749,63.5,M,,29,1,2007-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,2E52F96E-64A1-4069-911C-E3FD6E094A1E,2008-03-11T10:01:36.827+0000
868,"Women's Mountain Shorts, M",SH-W890-M,Black,26.1763,69.99,M,,26,37,2007-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAALOzs62IVo00CqWkjn57UEZFHUlFHczLwnV1RWZkRzw6EYxyQldVNXJxQoKBUXI/FFlXM/7+/v39/b6+vrm5uaSkpKqqqq+vr87Ozt3d3dfX18rKyt/f38bGxuTk5Obm5tHR0ejo6NTU1MLCwtra2rS0tJ8= (truncated),shorts_female_small.gif,968E3610-E583-42E8-8AB6-484A799B1774,2008-03-11T10:01:36.827+0000
882,"Short-Sleeve Classic Jersey, M",SJ-0194-M,Yellow,41.5723,53.99,M,,25,32,2007-07-01T00:00:00.000+0000,,,R0lGODlhTgAxAPcAALOyV8PDYYeHh/LwB2dlWevuTHl4SoyGQmloOLS3mLizHE5LOXBzb7S0tCZFbsnK4ZqVQjY/YH57UaSkpLGxsWpmLZKSkjMyMuPj4+zs7N7e3oF/SZOMRL29vVZSNTNFjzE3UpeXqnx7JdjYCeDg7H5/o0I= (truncated),awc_tee_male_yellow_small.gif,BBBF003B-367D-4D71-AF71-10F50B6234A0,2008-03-11T10:01:36.827+0000


#### 6. Escribe un código para contar los productos donde la columna Color es nula

In [0]:
from pyspark.sql.functions import col

null_color_count = df.filter(col("Color").isNull()).count()
print(null_color_count)


50


#### 7. Escribe un código para listar todas las combinaciones únicas de Color y Size en la tabla.

In [0]:
unique_combinations = df.select("Color", "Size").distinct()
unique_combinations.show()


+-----+----+
|Color|Size|
+-----+----+
|  Red|  58|
|  Red|  44|
|Multi|  XL|
|  Red|  62|
|  Red|  52|
|Multi|null|
|Black|  44|
|Multi|   L|
|Black|  62|
|Black|  60|
|White|   L|
|Multi|   M|
|Multi|   S|
|  Red|  56|
|  Red|  60|
| Blue|null|
|Black|  58|
|White|   M|
|Black|null|
|  Red|null|
+-----+----+
only showing top 20 rows



#### 8. Calcula la diferencia promedio entre ListPrice y StandardCost para todos los productos. 

In [0]:
from pyspark.sql.functions import col

average_difference = df.select((col("ListPrice") - col("StandardCost")).alias("PriceDiff")).agg(avg("PriceDiff")).collect()[0][0]
print(average_difference)


306.37487288


#### 9. ¿Cuáles son los productos cuya fecha de modificación se encuentra entre el 11 de marzo de 2008 a las 10:01:00 y el 11 de marzo de 2008 a las 10:03:00, mostrando el ProductID, Name y ModifiedDate?

In [0]:
from pyspark.sql.functions import to_timestamp, lit

# Definir las fechas de inicio y fin como cadenas
start_date = "2008-03-11 10:01:00"
end_date = "2008-03-11 10:03:00"

# Filtrar los productos cuya fecha de modificación esté entre las fechas dadas
filtered_products_date = df.filter(
    (df.ModifiedDate >= to_timestamp(lit(start_date), "yyyy-MM-dd HH:mm:ss")) & 
    (df.ModifiedDate <= to_timestamp(lit(end_date), "yyyy-MM-dd HH:mm:ss"))
).select("ProductID", "Name", "ModifiedDate")

# Mostrar los resultados
filtered_products_date.show()


+---------+--------------------+--------------------+
|ProductID|                Name|        ModifiedDate|
+---------+--------------------+--------------------+
|      680|HL Road Frame - B...|2008-03-11 10:01:...|
|      706|HL Road Frame - R...|2008-03-11 10:01:...|
|      707|Sport-100 Helmet,...|2008-03-11 10:01:...|
|      708|Sport-100 Helmet,...|2008-03-11 10:01:...|
|      709|Mountain Bike Soc...|2008-03-11 10:01:...|
|      710|Mountain Bike Soc...|2008-03-11 10:01:...|
|      711|Sport-100 Helmet,...|2008-03-11 10:01:...|
|      712|        AWC Logo Cap|2008-03-11 10:01:...|
|      713|Long-Sleeve Logo ...|2008-03-11 10:01:...|
|      714|Long-Sleeve Logo ...|2008-03-11 10:01:...|
|      715|Long-Sleeve Logo ...|2008-03-11 10:01:...|
|      716|Long-Sleeve Logo ...|2008-03-11 10:01:...|
|      717|HL Road Frame - R...|2008-03-11 10:01:...|
|      718|HL Road Frame - R...|2008-03-11 10:01:...|
|      719|HL Road Frame - R...|2008-03-11 10:01:...|
|      720|HL Road Frame - R

#### 10. ¿Cuáles son los productos de cada categoría (ProductCategoryID) que tienen un precio (ListPrice) mayor al costo estándar (StandardCost), y cuántos productos cumplen esta condición por categoría?

In [0]:
from pyspark.sql.functions import count

category_products = df.filter(df.ListPrice > df.StandardCost).groupBy("ProductCategoryID").agg(count("*").alias("Count"))
category_products.show()


+-----------------+-----+
|ProductCategoryID|Count|
+-----------------+-----+
|               31|    1|
|               34|    1|
|               28|    3|
|               27|    4|
|               26|    7|
|               12|    3|
|               22|    3|
|               13|    2|
|               16|   28|
|                6|   43|
|               40|    2|
|               20|   18|
|                5|   32|
|               19|    9|
|               41|   11|
|               15|    3|
|               37|    3|
|               17|    7|
|                9|    3|
|               35|    3|
+-----------------+-----+
only showing top 20 rows



#### 11. Filtra los productos cuyo ListPrice esté entre $20 y $100.

In [0]:
filtered_price_range = df.filter((df.ListPrice >= 20) & (df.ListPrice <= 100))
filtered_price_range.display()


ProductID,Name,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,ProductCategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ThumbNailPhoto,ThumbnailPhotoFileName,rowguid,ModifiedDate
707,"Sport-100 Helmet, Red",HL-U509-R,Red,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,2E1EF41A-C08A-4FF6-8ADA-BDE58B64A712,2008-03-11T10:01:36.827+0000
708,"Sport-100 Helmet, Black",HL-U509,Black,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,A25A44FB-C2DE-4268-958F-110B8D7621E2,2008-03-11T10:01:36.827+0000
711,"Sport-100 Helmet, Blue",HL-U509-B,Blue,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,FD7C0858-4179-48C2-865B-ABD5DFC7BC1D,2008-03-11T10:01:36.827+0000
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,Multi,38.4923,49.99,S,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,FD449C82-A259-4FAE-8584-6CA0255FAF68,2008-03-11T10:01:36.827+0000
714,"Long-Sleeve Logo Jersey, M",LJ-0192-M,Multi,38.4923,49.99,M,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,6A290063-A0CF-432A-8110-2EA0FDA14308,2008-03-11T10:01:36.827+0000
715,"Long-Sleeve Logo Jersey, L",LJ-0192-L,Multi,38.4923,49.99,L,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,34CF5EF5-C077-4EA0-914A-084814D5CBD5,2008-03-11T10:01:36.827+0000
716,"Long-Sleeve Logo Jersey, XL",LJ-0192-X,Multi,38.4923,49.99,XL,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,6EC47EC9-C041-4DDA-B686-2125D539CE9B,2008-03-11T10:01:36.827+0000
805,LL Headset,HS-0296,,15.1848,34.2,,,15,59,2006-07-01T00:00:00.000+0000,2007-06-30T00:00:00.000+0000,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,BB6BD7B3-A34D-4D64-822E-781FA6838E19,2008-03-11T10:01:36.827+0000
808,LL Mountain Handlebars,HB-M243,,19.7758,44.54,,,8,52,2006-07-01T00:00:00.000+0000,,,R0lGODlhRQAyAPcAAOLi4//ulPyPMcg2OPBKK/3dyvuWjv/++fhuNoqGifr6+tWSkNPS1Os1Lry8vdxpaOsBAMwAAPz8/OoWAfb19P2rjKGiofJxVbcBAf/3pfvGmdLHyJqTlNUAAPu3pdkAAJmamd3c3qZTVOX29fdaBf/+6/8= (truncated),handlebar_small.gif,B59B7BF2-7AFC-4A74-B063-F942F1E0DA19,2008-03-11T10:01:36.827+0000
809,ML Mountain Handlebars,HB-M763,,27.4925,61.92,,,8,54,2006-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,AE6020DF-D9C9-4D34-9795-1F80E6BBF5A5,2008-03-11T10:01:36.827+0000


#### 12. Para cada ProductCategoryID, muestra los 5 productos con el costo (StandardCost) más alto.

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Crear una ventana por ProductCategoryID y ordenar por StandardCost en orden descendente
windowSpec = Window.partitionBy("ProductCategoryID").orderBy(df.StandardCost.desc())

# Obtener los 5 productos con el costo más alto por categoría
top_5_expensive = df.withColumn("row_num", row_number().over(windowSpec))\
                    .filter(col("row_num") <= 5)\
                    .select("ProductCategoryID", "ProductID", "StandardCost")

top_5_expensive.show()


+-----------------+---------+------------+
|ProductCategoryID|ProductID|StandardCost|
+-----------------+---------+------------+
|                5|      771|   1912.1544|
|                5|      772|   1912.1544|
|                5|      773|   1912.1544|
|                5|      774|   1912.1544|
|                5|      775|   1898.0944|
|                6|      749|   2171.2942|
|                6|      750|   2171.2942|
|                6|      751|   2171.2942|
|                6|      752|   2171.2942|
|                6|      753|   2171.2942|
|                7|      954|   1481.9379|
|                7|      955|   1481.9379|
|                7|      956|   1481.9379|
|                7|      957|   1481.9379|
|                7|      966|   1481.9379|
|                8|      810|     53.3999|
|                8|      813|     53.3999|
|                8|      947|     40.6571|
|                8|      809|     27.4925|
|                8|      812|     27.4925|
+----------

#### 13. Filtra los productos cuya columna ThumbNailPhoto no es nula y muestra su ProductID, Name, y ThumbNailPhotoFileName.

In [0]:
filtered_thumbnail = df.filter(df.ThumbNailPhoto.isNotNull()).select("ProductID", "Name", "ThumbNailPhotoFileName")
filtered_thumbnail.show()


+---------+--------------------+----------------------+
|ProductID|                Name|ThumbNailPhotoFileName|
+---------+--------------------+----------------------+
|      680|HL Road Frame - B...|  no_image_availabl...|
|      706|HL Road Frame - R...|  no_image_availabl...|
|      707|Sport-100 Helmet,...|  no_image_availabl...|
|      708|Sport-100 Helmet,...|  no_image_availabl...|
|      709|Mountain Bike Soc...|  no_image_availabl...|
|      710|Mountain Bike Soc...|  no_image_availabl...|
|      711|Sport-100 Helmet,...|  no_image_availabl...|
|      712|        AWC Logo Cap|  no_image_availabl...|
|      713|Long-Sleeve Logo ...|  awc_jersey_male_s...|
|      714|Long-Sleeve Logo ...|  awc_jersey_male_s...|
|      715|Long-Sleeve Logo ...|  awc_jersey_male_s...|
|      716|Long-Sleeve Logo ...|  awc_jersey_male_s...|
|      717|HL Road Frame - R...|  no_image_availabl...|
|      718|HL Road Frame - R...|  no_image_availabl...|
|      719|HL Road Frame - R...|  no_image_avail

#### 14. Agrupa los productos por ProductCategoryID y calcula el precio total (SUM(ListPrice)) por categoría. 

In [0]:
from pyspark.sql.functions import sum

category_price_total = df.groupBy("ProductCategoryID").agg(sum("ListPrice").alias("TotalPrice"))
category_price_total.show()


+-----------------+----------+
|ProductCategoryID|TotalPrice|
+-----------------+----------+
|               31|  159.0000|
|               34|   21.9800|
|               28|  224.9700|
|               27|   36.9800|
|               26|  449.9300|
|               12|  836.9700|
|               22|  269.9700|
|               13|  212.9500|
|               16|18991.1000|
|                6|68690.3500|
|               40|   44.9800|
|               20|11365.4800|
|                5|53867.6800|
|               19|  356.7000|
|               41|  214.3100|
|               15|  261.2200|
|               37|   93.9700|
|               17|  448.1300|
|                9|  276.7200|
|               35|  104.9700|
+-----------------+----------+
only showing top 20 rows



#### 15. Filtra los productos cuya fecha de inicio de venta (SellStartDate) esté entre el 1 de enero de 2005 y el 31 de diciembre de 2006. 

In [0]:
start_date = "2005-01-01"
end_date = "2006-12-31"

filtered_sell_date = df.filter(
    (df.SellStartDate >= to_timestamp(lit(start_date), "yyyy-MM-dd")) &
    (df.SellStartDate <= to_timestamp(lit(end_date), "yyyy-MM-dd"))
)

filtered_sell_date.show()


ProductID,Name,ProductNumber,Color,StandardCost,ListPrice,Size,Weight,ProductCategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,ThumbNailPhoto,ThumbnailPhotoFileName,rowguid,ModifiedDate
707,"Sport-100 Helmet, Red",HL-U509-R,Red,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,2E1EF41A-C08A-4FF6-8ADA-BDE58B64A712,2008-03-11T10:01:36.827+0000
708,"Sport-100 Helmet, Black",HL-U509,Black,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,A25A44FB-C2DE-4268-958F-110B8D7621E2,2008-03-11T10:01:36.827+0000
709,"Mountain Bike Socks, M",SO-B909-M,White,3.3963,9.5,M,,27,18,2005-07-01T00:00:00.000+0000,2006-06-30T00:00:00.000+0000,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,18F95F47-1540-4E02-8F1F-CC1BCB6828D0,2008-03-11T10:01:36.827+0000
710,"Mountain Bike Socks, L",SO-B909-L,White,3.3963,9.5,L,,27,18,2005-07-01T00:00:00.000+0000,2006-06-30T00:00:00.000+0000,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,161C035E-21B3-4E14-8E44-AF508F35D80A,2008-03-11T10:01:36.827+0000
711,"Sport-100 Helmet, Blue",HL-U509-B,Blue,13.0863,34.99,,,35,33,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,FD7C0858-4179-48C2-865B-ABD5DFC7BC1D,2008-03-11T10:01:36.827+0000
712,AWC Logo Cap,CA-1098,Multi,6.9223,8.99,,,23,2,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAxAPcAAAAAAIAAAACAAICAAAAAgIAAgACAgICAgMDAwP8AAAD/AP//AAAA//8A/wD//////wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= (truncated),no_image_available_small.gif,B9EDE243-A6F4-4629-B1D4-FFE1AEDC6DE7,2008-03-11T10:01:36.827+0000
713,"Long-Sleeve Logo Jersey, S",LJ-0192-S,Multi,38.4923,49.99,S,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,FD449C82-A259-4FAE-8584-6CA0255FAF68,2008-03-11T10:01:36.827+0000
714,"Long-Sleeve Logo Jersey, M",LJ-0192-M,Multi,38.4923,49.99,M,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,6A290063-A0CF-432A-8110-2EA0FDA14308,2008-03-11T10:01:36.827+0000
715,"Long-Sleeve Logo Jersey, L",LJ-0192-L,Multi,38.4923,49.99,L,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,34CF5EF5-C077-4EA0-914A-084814D5CBD5,2008-03-11T10:01:36.827+0000
716,"Long-Sleeve Logo Jersey, XL",LJ-0192-X,Multi,38.4923,49.99,XL,,25,11,2005-07-01T00:00:00.000+0000,,,R0lGODlhUAAyAPcAABQlZ/z8/J+fnyxLZ87OzuTk5Dtliqqqqiw0WMrKyvDw8EdXpRAYLsbGxubm5urq6qSkpO7u7uLi4mlmZa+vrxAhNiJai4iVqtLQ5rGxsY2v0zMzNfr6+i5zsiVLcmNwmLK8yHZ1esLCwhhHnfb29lB1rDI= (truncated),awc_jersey_male_small.gif,6EC47EC9-C041-4DDA-B686-2125D539CE9B,2008-03-11T10:01:36.827+0000


### Actividad 3.  
 La Empresa decide "migrar" de sql server a postgreSQL. Efectuar la conexion Databricks Community con PostgreSQL en Azure. Efectuar algunas consultas sobre PostgreSQL usando PySpark y Scala. Utiliza una base de datos cualquiera.  
Si la version de community da muchos problemas utilizar Azure Databricks.

### Creación de Postgre-SQL Server en Azure

Crear un grupo de recursos. Luego crear un recurso > Bases de datos > Database for PostgreSQL Flexible server

Configura los detalles:

-. Nombre del servidor SQL: postgresql-server
-. Región: Italy North u otra disponible
-. High availability: Disable
-. Autenticación: Habilita SQL Authentication y configura:
Usuario: adminuser
Contraseña: ContraseñaFuerte123
Marca la casilla de "Habilitar acceso a Azure Services"

Compute + Storage: -> Configure server
- Compute Tier: Burstable
- Compute size: Standard_B1ms
- Storage type: Premium SSD
- Storage size: 32GiB
- Permormance Tier: P4 (120)

Instalamos el [driver](https://jdbc.postgresql.org/download/) en el cluster/computo

Comprobamos la IP de Databricks

In [0]:
import requests # Obtener la IP pública del nodo
public_ip = requests.get('https://api.ipify.org').text
print(f"La IP pública del nodo es: {public_ip}")

La IP pública del nodo es: 52.40.150.249


In [0]:
# Configuración de conexión JDBC para PostgreSQL
jdbcHostname = "postgre-sql-server.postgres.database.azure.com"  # Servidor PostgreSQL
jdbcPort = 5432  # Puerto estándar para PostgreSQL
jdbcDatabase = "postgresql-db"  # Nombre exacto de tu base de datos PostgreSQL
jdbcUsername = "adminuser"  # Cambiar por tu usuario configurado
jdbcPassword = "ContrasenaFuerte123"  # Cambiar por la contraseña configurada

# Formato de URL para PostgreSQL
jdbcUrl = f"jdbc:postgresql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"

# Propiedades de conexión
connectionProperties = {
    "user": jdbcUsername,
    "password": jdbcPassword,
    "driver": "org.postgresql.Driver"  # Driver JDBC para PostgreSQL
}



In [0]:
# Probar que muestra una tabla

try:
    # Intentar realizar la consulta
    query = "(SELECT table_name FROM information_schema.tables WHERE table_schema = 'public') AS tables"
    df_tables = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
   
    # Mostrar las tablas
    df_tables.show()
   
except Exception as e:
    print("Error de conexión:", e)

Error de conexión: An error occurred while calling o620.jdbc.
: org.postgresql.util.PSQLException: The connection attempt failed.
	at org.postgresql.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:331)
	at org.postgresql.core.ConnectionFactory.openConnection(ConnectionFactory.java:49)
	at org.postgresql.jdbc.PgConnection.<init>(PgConnection.java:223)
	at org.postgresql.Driver.makeConnection(Driver.java:400)
	at org.postgresql.Driver.connect(Driver.java:259)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:123)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:119)
	at org.apache.spark.sql.execution.datasou

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DecimalType, BooleanType
from decimal import Decimal

# Crear una sesión de Spark
spark = SparkSession.builder \
    .appName("Create Products Table") \
    .getOrCreate()

# Crear la base de datos
spark.sql("DROP DATABASE IF EXISTS PracticeDB CASCADE")
spark.sql("CREATE DATABASE PracticeDB")
spark.sql("USE PracticeDB")

# Definir el esquema de la tabla
schema = StructType([
    StructField("ProductID", IntegerType(), nullable=False),
    StructField("ProductName", StringType(), nullable=False),
    StructField("ListPrice", DecimalType(10, 2), nullable=False),
    StructField("StandardCost", DecimalType(10, 2), nullable=False),
    StructField("CategoryID", IntegerType(), nullable=True),
    StructField("InStock", BooleanType(), nullable=True),
])

# Crear los datos (convertir los floats a Decimal)
data = [
    (1, "Smartphone", Decimal('699.99'), Decimal('450.00'), 1, True),
    (2, "Laptop", Decimal('1299.99'), Decimal('900.00'), 1, True),
    (3, "T-Shirt", Decimal('19.99'), Decimal('5.00'), 2, True),
    (4, "Blender", Decimal('49.99'), Decimal('30.00'), 3, True),
    (5, "Vacuum Cleaner", Decimal('149.99'), Decimal('80.00'), 3, False)
]

# Crear el DataFrame
df = spark.createDataFrame(data, schema)

# Especificar el camino para almacenar la tabla Parquet
table_path = "/mnt/parquet/PracticeDB/Products"

# Guardar el DataFrame como tabla Parquet
df.write.format("parquet").mode("overwrite").saveAsTable("PracticeDB.Products")

In [0]:
%scala
import org.apache.spark.sql.functions._

// Sample data
val data = Seq(
  (1, "Smartphone", 699.99, 450.00, 1, true),
  (2, "Laptop", 1299.99, 900.00, 1, true),
  (3, "T-Shirt", 19.99, 5.00, 2, true),
  (4, "Blender", 49.99, 30.00, 3, true),
  (5, "Vacuum Cleaner", 149.99, 80.00, 3, false)
)

// Define the schema
val schema = Seq("ProductID", "ProductName", "ListPrice", "StandardCost", "CategoryID", "InStock")

// Create the DataFrame
val df_scala = spark.createDataFrame(data).toDF(schema: _*)


##### Ver todos los productos

In [0]:
# Ver todos los productos
spark.sql("SELECT * FROM Products").show()

+---------+--------------+---------+------------+----------+-------+
|ProductID|   ProductName|ListPrice|StandardCost|CategoryID|InStock|
+---------+--------------+---------+------------+----------+-------+
|        5|Vacuum Cleaner|   149.99|       80.00|         3|  false|
|        1|    Smartphone|   699.99|      450.00|         1|   true|
|        4|       Blender|    49.99|       30.00|         3|   true|
|        3|       T-Shirt|    19.99|        5.00|         2|   true|
|        2|        Laptop|  1299.99|      900.00|         1|   true|
+---------+--------------+---------+------------+----------+-------+



In [0]:
# Ver todos los productos
df.show()

+---------+--------------+---------+------------+----------+-------+
|ProductID|   ProductName|ListPrice|StandardCost|CategoryID|InStock|
+---------+--------------+---------+------------+----------+-------+
|        1|    Smartphone|   699.99|      450.00|         1|   true|
|        2|        Laptop|  1299.99|      900.00|         1|   true|
|        3|       T-Shirt|    19.99|        5.00|         2|   true|
|        4|       Blender|    49.99|       30.00|         3|   true|
|        5|Vacuum Cleaner|   149.99|       80.00|         3|  false|
+---------+--------------+---------+------------+----------+-------+



In [0]:
%scala
//Ver todos los productos
spark.sql("SELECT * FROM Products").show()

In [0]:
%scala
//Ver todos los productos
df_scala.show()

##### Ver los productos cuyo precio (ListPrice) sea mayor a 100

In [0]:
# Ver los productos cuyo precio (ListPrice) sea mayor a 100
spark.sql("SELECT ProductID, ProductName, ListPrice FROM Products WHERE ListPrice > 100").show()

+---------+--------------+---------+
|ProductID|   ProductName|ListPrice|
+---------+--------------+---------+
|        5|Vacuum Cleaner|   149.99|
|        1|    Smartphone|   699.99|
|        2|        Laptop|  1299.99|
+---------+--------------+---------+



In [0]:
# Ver los productos cuyo precio (ListPrice) sea mayor a 100
df.filter(df.ListPrice > 100).select("ProductID", "ProductName", "ListPrice").show()

+---------+--------------+---------+
|ProductID|   ProductName|ListPrice|
+---------+--------------+---------+
|        1|    Smartphone|   699.99|
|        2|        Laptop|  1299.99|
|        5|Vacuum Cleaner|   149.99|
+---------+--------------+---------+



In [0]:
%scala
// Ver los productos cuyo precio (ListPrice) sea mayor a 100
spark.sql("SELECT ProductID, ProductName, ListPrice FROM Products WHERE ListPrice > 100").show()

In [0]:
%scala
// Ver todos los productos
df_scala.filter($"ListPrice" > 100)
  .select("ProductID", "ProductName", "ListPrice")
  .show()


##### Obtener el promedio del precio de los productos

In [0]:
# Obtener el promedio del precio de los productos
spark.sql("SELECT AVG(ListPrice) AS AveragePrice FROM Products").show()


+------------+
|AveragePrice|
+------------+
|  443.990000|
+------------+



In [0]:
# Obtener el promedio del precio de los productos
df.selectExpr("AVG(ListPrice) AS AveragePrice").show()

+------------+
|AveragePrice|
+------------+
|  443.990000|
+------------+



In [0]:
%scala
// Obtener el promedio del precio de los productos
spark.sql("SELECT AVG(ListPrice) AS AveragePrice FROM Products").show()

In [0]:
%scala
// Obtener el promedio del precio de los productos
df_scala.selectExpr("AVG(ListPrice) AS AveragePrice").show()


##### Productos cuyo stock esté disponible (InStock = TRUE)

In [0]:
# Productos cuyo stock esté disponible (InStock = TRUE)
spark.sql("SELECT ProductID, ProductName FROM Products WHERE InStock = TRUE").show()


+---------+-----------+
|ProductID|ProductName|
+---------+-----------+
|        1| Smartphone|
|        4|    Blender|
|        3|    T-Shirt|
|        2|     Laptop|
+---------+-----------+



In [0]:
df.filter(df.InStock == True).select("ProductID", "ProductName").show()


+---------+-----------+
|ProductID|ProductName|
+---------+-----------+
|        1| Smartphone|
|        2|     Laptop|
|        3|    T-Shirt|
|        4|    Blender|
+---------+-----------+



In [0]:
%scala
// Productos cuyo stock esté disponible (InStock = TRUE)
spark.sql("SELECT ProductID, ProductName FROM Products WHERE InStock = TRUE").show()

In [0]:
%scala
// Productos cuyo stock esté disponible (InStock = TRUE)
df_scala.filter($"InStock" === true)
  .select("ProductID", "ProductName")
  .show()


##### Productos de una categoría específica

In [0]:
# Productos de una categoría específica
spark.sql("SELECT ProductID, ProductName, CategoryID FROM Products WHERE CategoryID = 3").show()

+---------+--------------+----------+
|ProductID|   ProductName|CategoryID|
+---------+--------------+----------+
|        5|Vacuum Cleaner|         3|
|        4|       Blender|         3|
+---------+--------------+----------+



In [0]:
df.filter(df.CategoryID == 3).select("ProductID", "ProductName", "CategoryID").show()

+---------+--------------+----------+
|ProductID|   ProductName|CategoryID|
+---------+--------------+----------+
|        4|       Blender|         3|
|        5|Vacuum Cleaner|         3|
+---------+--------------+----------+



In [0]:
%scala
// Productos de una categoría específica
spark.sql("SELECT ProductID, ProductName, CategoryID FROM Products WHERE CategoryID = 3").show()


In [0]:
%scala
// Productos de una categoría específica
df_scala.filter($"CategoryID" === 3)
  .select("ProductID", "ProductName", "CategoryID")
  .show()

##### Contar el número de productos en cada categoría

In [0]:
# Contar el número de productos en cada categoría
spark.sql("SELECT CategoryID, COUNT(*) AS ProductCount FROM Products GROUP BY CategoryID").show()


+----------+------------+
|CategoryID|ProductCount|
+----------+------------+
|         3|           2|
|         1|           2|
|         2|           1|
+----------+------------+



In [0]:
df.groupBy("CategoryID").count().show()


+----------+-----+
|CategoryID|count|
+----------+-----+
|         1|    2|
|         2|    1|
|         3|    2|
+----------+-----+



In [0]:
%scala
// Contar el número de productos en cada categoría
spark.sql("SELECT CategoryID, COUNT(*) AS ProductCount FROM Products GROUP BY CategoryID").show()

In [0]:
%scala
// Contar el número de productos en cada categoría
df_scala.groupBy("CategoryID").count().show()


##### Obtener el producto más caro

In [0]:
# Obtener el producto más caro
spark.sql("SELECT ProductID, ProductName, ListPrice FROM Products ORDER BY ListPrice DESC LIMIT 1").show()


+---------+-----------+---------+
|ProductID|ProductName|ListPrice|
+---------+-----------+---------+
|        2|     Laptop|  1299.99|
+---------+-----------+---------+



In [0]:
# Obtener el producto más caro
df.orderBy(df.ListPrice.desc()).limit(1).show()


+---------+-----------+---------+------------+----------+-------+
|ProductID|ProductName|ListPrice|StandardCost|CategoryID|InStock|
+---------+-----------+---------+------------+----------+-------+
|        2|     Laptop|  1299.99|      900.00|         1|   true|
+---------+-----------+---------+------------+----------+-------+



In [0]:
%scala
// Obtener el producto más caro
spark.sql("SELECT ProductID, ProductName, ListPrice FROM Products ORDER BY ListPrice DESC LIMIT 1").show()

In [0]:
%scala
// Obtener el producto más caro
df_scala.orderBy($"ListPrice".desc).show(1)

##### Obtener productos cuyo nombre contiene la palabra "Smart"

In [0]:
# Obtener productos cuyo nombre contiene la palabra "Smart"
spark.sql("SELECT ProductID, ProductName FROM Products WHERE ProductName LIKE '%Smart%'").show()


+---------+-----------+
|ProductID|ProductName|
+---------+-----------+
|        1| Smartphone|
+---------+-----------+



In [0]:
# Obtener productos cuyo nombre contiene la palabra "Smart"
df.filter(df.ProductName.contains("Smart")) \
  .select("ProductID", "ProductName") \
  .show()


+---------+-----------+
|ProductID|ProductName|
+---------+-----------+
|        1| Smartphone|
+---------+-----------+



In [0]:
%scala
// Obtener productos cuyo nombre contiene la palabra "Smart"
spark.sql("SELECT ProductID, ProductName FROM Products WHERE ProductName LIKE '%Smart%'").show()

In [0]:
%scala
// Obtener productos cuyo nombre contiene la palabra "Smart"
df_scala.filter($"ProductName".contains("Smart"))
  .select("ProductID", "ProductName")
  .show()


##### Obtener los 3 productos más baratos

In [0]:
# Obtener los 3 productos más baratos
spark.sql("SELECT ProductID, ProductName, ListPrice FROM Products ORDER BY ListPrice ASC LIMIT 3").show()


+---------+--------------+---------+
|ProductID|   ProductName|ListPrice|
+---------+--------------+---------+
|        3|       T-Shirt|    19.99|
|        4|       Blender|    49.99|
|        5|Vacuum Cleaner|   149.99|
+---------+--------------+---------+



In [0]:
# Obtener los 3 productos más baratos
df.orderBy(df.ListPrice.asc()).show(3)

+---------+--------------+---------+------------+----------+-------+
|ProductID|   ProductName|ListPrice|StandardCost|CategoryID|InStock|
+---------+--------------+---------+------------+----------+-------+
|        3|       T-Shirt|    19.99|        5.00|         2|   true|
|        4|       Blender|    49.99|       30.00|         3|   true|
|        5|Vacuum Cleaner|   149.99|       80.00|         3|  false|
+---------+--------------+---------+------------+----------+-------+
only showing top 3 rows



In [0]:
%scala
// Obtener los 3 productos más baratos
spark.sql("SELECT ProductID, ProductName, ListPrice FROM Products ORDER BY ListPrice ASC LIMIT 3").show()

In [0]:
%scala
// Obtener los 3 productos más baratos
df_scala.orderBy($"ListPrice".asc).show(3)

##### Productos más baratos por categoría

In [0]:
spark.sql("SELECT CategoryID, MIN(ListPrice) AS CheapestPrice FROM products GROUP BY CategoryID").show()

+----------+-------------+
|CategoryID|CheapestPrice|
+----------+-------------+
|         3|        49.99|
|         1|       699.99|
|         2|        19.99|
+----------+-------------+



In [0]:
from pyspark.sql import functions as F

# Agrupar por CategoryID y calcular el minimo de ListPrice para sacar el más barato
df.groupBy("CategoryID") \
  .agg(F.min("ListPrice").alias("CheapestPrice")) \
  .show()


+----------+-------------+
|CategoryID|CheapestPrice|
+----------+-------------+
|         1|       699.99|
|         2|        19.99|
|         3|        49.99|
+----------+-------------+



In [0]:
%scala
spark.sql("SELECT CategoryID, MIN(ListPrice) AS CheapestPrice FROM products GROUP BY CategoryID").show()

In [0]:
%scala
import org.apache.spark.sql.functions.min

// Agrupar por CategoryID y calcular el minimo de ListPrice para sacar el más barato
df_scala.groupBy("CategoryID")
  .agg(min("ListPrice").alias("CheapestPrice"))
  .show()