<a href="https://colab.research.google.com/github/Simbak13/Ingenieria-De-Datos-Avanzada/blob/main/01_PySpark_Con_MongoDB_Atlas.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**1. Agrega la base de datos de ejemplo sample_suplies en Atlas. Revisa para ello el Documento Word que se Adjunta. (Esto fue configurado en Mongo Atlas).**

**Instalacion de PySpark**

In [None]:
# Descarga e instala PySpark
!pip install pyspark



**Verificar que se instalo PySpark**

In [None]:
# Verificar la versión e imprimir
print("\nVersión de PySpark:")
!pyspark --version


Versión de PySpark:
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.5.1
      /_/
                        
Using Scala version 2.12.18, OpenJDK 64-Bit Server VM, 11.0.28
Branch HEAD
Compiled by user heartsavior on 2024-02-15T11:24:58Z
Revision fd86f85e181fc2dc0f50a096855acf83a6cc5d9c
Url https://github.com/apache/spark
Type --help for more information.


**3. Crea un notebook en Colab y construye la conexión de SparkSession con MongoDB usando la cadena SRV. Usa el nombre de usuario y password que creaste en el paso**

In [None]:
from pyspark.sql import SparkSession

# Configura tu cadena de conexión aquí
usuario = "user_256057"
password = "256057"
cluster = "cluster0.fahly7d.mongodb.net"  # por ejemplo: cluster0.abcd.mongodb.net
basedatos = "sample_supplies"
coleccion = "clientes"

uri = f"mongodb+srv://{usuario}:{password}@{cluster}/{basedatos}"

# Crear la sesión Spark con el conector de MongoDB
spark = SparkSession.builder \
    .appName("MongoDB_Atlas_Connection") \
    .config("spark.mongodb.read.connection.uri", uri) \
    .config("spark.mongodb.write.connection.uri", uri) \
    .config("spark.jars.packages",
            "org.mongodb.spark:mongo-spark-connector_2.12:3.0.2") \
    .getOrCreate()

print("Conexión SparkSession creada con éxito")


Conexión SparkSession creada con éxito


**2. Muestra que la conexión ha sido existosa.**

In [None]:
try:
    df = spark.read.format("mongo").option("uri", f"{uri}.{coleccion}").load()
    print("Conexión a MongoDB Atlas exitosa. Muestra de datos:")
    df.show(5)
except Exception as e:
    print("Error en la conexión o lectura de datos:", e)


Conexión a MongoDB Atlas exitosa. Muestra de datos:
++
||
++
++



**4. Usa la base de datos sample_supplies y lee la colección "sales" como un dataframe y explórala. Imprime el schema, número de filas estimadas (count) y el tipo de datos (df.dtypes).**

**Leer la colección sales como DataFrame**

In [None]:
# Leer la colección sales
df = spark.read.format("mongo").option("uri", f"{uri}.sales").load()

print("Lectura de la colección 'sales' completada.")

Lectura de la colección 'sales' completada.


**Imprimir el esquema**



In [None]:
print("Esquema del DataFrame:")
df.printSchema()

Esquema del DataFrame:
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- couponUsed: boolean (nullable = true)
 |-- customer: struct (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- age: integer (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- satisfaction: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- price: decimal(6,2) (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- purchaseMethod: string (nullable = true)
 |-- saleDate: timestamp (nullable = true)
 |-- storeLocation: string (nullable = true)



**Contar número de filas**

In [None]:
print(f"Número de filas en 'sales': {df.count()}")

Número de filas en 'sales': 5000


**Mostrar los tipos de datos de cada columna**

In [None]:
print("Tipos de datos:")
print(df.dtypes)

Tipos de datos:
[('_id', 'struct<oid:string>'), ('couponUsed', 'boolean'), ('customer', 'struct<gender:string,age:int,email:string,satisfaction:int>'), ('items', 'array<struct<name:string,price:decimal(6,2),quantity:int,tags:array<string>>>'), ('purchaseMethod', 'string'), ('saleDate', 'timestamp'), ('storeLocation', 'string')]


**5. Crea una vista temporal llamada "sales_view" y ejecuta las siguientes consultas en Spark SQL**

In [None]:
# Crear la vista temporal
df.createOrReplaceTempView("sales_view")

print("Vista temporal 'sales_view' creada correctamente.")

Vista temporal 'sales_view' creada correctamente.


###**A partir de aquí, genera las consultas tanto en SQL y en expresiones Lambda. ⭐**

---
**a) Consulta cuántos documentos tiene sales_view.**

**SQL ✅**

In [None]:
# Contar número de documentos (filas) en la vista temporal
resultado = spark.sql("SELECT COUNT(*) AS total_documentos FROM sales_view")

# Mostrar el resultado
resultado.show()

+----------------+
|total_documentos|
+----------------+
|            5000|
+----------------+



**Expresiones Lambda ✅**

In [None]:
total_docs = df.count()
print(f"Total de documentos (sin SQL, método count()): {total_docs}")

Total de documentos (sin SQL, método count()): 5000


---
**b) Agrupa por storeLocation y ordena de mayor a menor.**

**SQL ✅**

In [None]:
# Agrupar por storeLocation y ordenar de mayor a menor
resultado_sql = spark.sql("""
    SELECT storeLocation, COUNT(*) AS total_ventas
    FROM sales_view
    GROUP BY storeLocation
    ORDER BY total_ventas DESC
""")

# Mostrar resultados
resultado_sql.show()

+-------------+------------+
|storeLocation|total_ventas|
+-------------+------------+
|       Denver|        1549|
|      Seattle|        1134|
|       London|         794|
|       Austin|         676|
|     New York|         501|
|    San Diego|         346|
+-------------+------------+



**Expresiones Lambda ✅**

In [None]:
from pyspark.sql import functions as F

# Agrupar y ordenar sin SQL
resultado_lambda = (
    df.groupBy("storeLocation")
      .count()
      .orderBy(F.desc("count"))
)

resultado_lambda.show()

+-------------+-----+
|storeLocation|count|
+-------------+-----+
|       Denver| 1549|
|      Seattle| 1134|
|       London|  794|
|       Austin|  676|
|     New York|  501|
|    San Diego|  346|
+-------------+-----+



---
**c) Imprime los clientes cuya edad es mayor 42**

**SQL ✅**

In [None]:
# Clientes con edad mayor a 42 usando SQL
resultado_sql = spark.sql("""
    SELECT customer.age AS edad, customer.email AS email, storeLocation
    FROM sales_view
    WHERE customer.age > 42
""")

resultado_sql.show(10, truncate=False)


+----+-----------------+-------------+
|edad|email            |storeLocation|
+----+-----------------+-------------+
|50  |keecade@hem.uy   |Seattle      |
|51  |worbiduh@vowbu.cg|Denver       |
|45  |vatires@ta.pe    |Seattle      |
|44  |owtar@pu.cd      |London       |
|71  |man@bob.mz       |Austin       |
|57  |ohaguwu@nufub.gi |Denver       |
|49  |merto@betosiv.pm |London       |
|59  |la@cevam.tj      |San Diego    |
|55  |eja@ko.es        |Seattle      |
|53  |se@nacwev.an     |New York     |
+----+-----------------+-------------+
only showing top 10 rows



**Expresiones Lambda ✅**

In [None]:
from pyspark.sql import functions as F

# Filtrar clientes con edad > 42
resultado_lambda = (
    df.filter(F.col("customer.age") > 42)
      .select("customer.age", "customer.email", "storeLocation")
)

resultado_lambda.show(10, truncate=False)


+---+-----------------+-------------+
|age|email            |storeLocation|
+---+-----------------+-------------+
|50 |keecade@hem.uy   |Seattle      |
|51 |worbiduh@vowbu.cg|Denver       |
|45 |vatires@ta.pe    |Seattle      |
|44 |owtar@pu.cd      |London       |
|71 |man@bob.mz       |Austin       |
|57 |ohaguwu@nufub.gi |Denver       |
|49 |merto@betosiv.pm |London       |
|59 |la@cevam.tj      |San Diego    |
|55 |eja@ko.es        |Seattle      |
|53 |se@nacwev.an     |New York     |
+---+-----------------+-------------+
only showing top 10 rows



---
**d) Imprime el valor mínimo y máximo de satisfaction que está dentro de customer.**

**SQL ✅**

In [None]:
# Consulta SQL para obtener el valor mínimo y máximo de satisfacción
resultado_sql = spark.sql("""
    SELECT
        MIN(customer.satisfaction) AS min_satisfaction,
        MAX(customer.satisfaction) AS max_satisfaction
    FROM sales_view
""")

resultado_sql.show()

+----------------+----------------+
|min_satisfaction|max_satisfaction|
+----------------+----------------+
|               1|               5|
+----------------+----------------+



**Expresiones Lambda ✅**

In [None]:
from pyspark.sql import functions as F

# Calcular valores mínimo y máximo de satisfacción
resultado_lambda = df.agg(
    F.min("customer.satisfaction").alias("min_satisfaction"),
    F.max("customer.satisfaction").alias("max_satisfaction")
)

resultado_lambda.show()

+----------------+----------------+
|min_satisfaction|max_satisfaction|
+----------------+----------------+
|               1|               5|
+----------------+----------------+



---
**e) Agrupa por el mètodo de compra purchaseMethod y ordena.**

**SQL ✅**

In [None]:
# Agrupar por método de compra y ordenar de mayor a menor
resultado_sql = spark.sql("""
    SELECT purchaseMethod, COUNT(*) AS total_compras
    FROM sales_view
    GROUP BY purchaseMethod
    ORDER BY total_compras DESC
""")

resultado_sql.show()


+--------------+-------------+
|purchaseMethod|total_compras|
+--------------+-------------+
|      In store|         2819|
|        Online|         1585|
|         Phone|          596|
+--------------+-------------+



**Expresiones Lambda ✅**

In [None]:
from pyspark.sql import functions as F

# Agrupar y ordenar con la API de DataFrame (lambda-style)
resultado_lambda = (
    df.groupBy("purchaseMethod")
      .count()
      .orderBy(F.desc("count"))
)

resultado_lambda.show()

+--------------+-----+
|purchaseMethod|count|
+--------------+-----+
|      In store| 2819|
|        Online| 1585|
|         Phone|  596|
+--------------+-----+

