Práctica: PySpark + MongoDB Atlas sample_supplies.sales

Alumna: Gabriela Gómez Ibarra 255504

Curso: MIA001524A - Ingeniería de Datos Avanzada


In [18]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz
!tar xf spark-3.4.1-bin-hadoop3.tgz
!pip install -q findspark pyspark


In [19]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"


In [20]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MongoDB_PySpark_Integration") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1") \
    .config("spark.mongodb.read.connection.uri",
            "mongodb+srv://user_al255504:255504@cluster0.iabg5rl.mongodb.net/sample_supplies.sales") \
    .getOrCreate()

print("✅ Conexión exitosa a MongoDB Atlas desde PySpark")


✅ Conexión exitosa a MongoDB Atlas desde PySpark


In [21]:
df = spark.read.format("mongodb") \
    .option("database", "sample_supplies") \
    .option("collection", "sales") \
    .load()

df.printSchema()
df.show(5)
print("Total de filas:", df.count())


root
 |-- _id: string (nullable = true)
 |-- couponUsed: boolean (nullable = true)
 |-- customer: struct (nullable = true)
 |    |-- gender: string (nullable = true)
 |    |-- age: integer (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- satisfaction: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- price: decimal(6,2) (nullable = true)
 |    |    |-- quantity: integer (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |-- purchaseMethod: string (nullable = true)
 |-- saleDate: timestamp (nullable = true)
 |-- storeLocation: string (nullable = true)

+--------------------+----------+--------------------+--------------------+--------------+--------------------+-------------+
|                 _id|couponUsed|            customer|               items|purchaseMethod|            s

In [22]:
from pyspark.sql.functions import col, count, min as min_, max as max_, desc

df.createOrReplaceTempView("sales_view")

# a) Total de documentos
print("a) Total de documentos:")
print(df.count())

# b) Agrupar por storeLocation
print("\nb) Agrupar por storeLocation y ordenar de mayor a menor:")
df.groupBy("storeLocation").agg(count("*").alias("total")) \
  .orderBy(desc("total")) \
  .show()

# c) Clientes con edad mayor a 42
print("\nc) Clientes con edad > 42:")
df.select(
    col("customer.age").alias("age"),
    col("customer.email").alias("email"),
    col("storeLocation"),
    col("purchaseMethod")
).where(col("customer.age") > 42).show(10)

# d) Valor mínimo y máximo de satisfaction
print("\nd) Valor mínimo y máximo de satisfaction:")
df.agg(
    min_(col("customer.satisfaction")).alias("min_satisfaction"),
    max_(col("customer.satisfaction")).alias("max_satisfaction")
).show()

# e) Agrupar por purchaseMethod
print("\ne) Agrupar por purchaseMethod y ordenar:")
df.groupBy("purchaseMethod").agg(count("*").alias("total")) \
  .orderBy(desc("total")) \
  .show()


a) Total de documentos:
5000

b) Agrupar por storeLocation y ordenar de mayor a menor:
+-------------+-----+
|storeLocation|total|
+-------------+-----+
|       Denver| 1549|
|      Seattle| 1134|
|       London|  794|
|       Austin|  676|
|     New York|  501|
|    San Diego|  346|
+-------------+-----+


c) Clientes con edad > 42:
+---+-----------------+-------------+--------------+
|age|            email|storeLocation|purchaseMethod|
+---+-----------------+-------------+--------------+
| 50|   keecade@hem.uy|      Seattle|         Phone|
| 51|worbiduh@vowbu.cg|       Denver|      In store|
| 45|    vatires@ta.pe|      Seattle|      In store|
| 44|      owtar@pu.cd|       London|      In store|
| 71|       man@bob.mz|       Austin|        Online|
| 57| ohaguwu@nufub.gi|       Denver|      In store|
| 49| merto@betosiv.pm|       London|      In store|
| 59|      la@cevam.tj|    San Diego|      In store|
| 55|        eja@ko.es|      Seattle|        Online|
| 53|     se@nacwev.an|     

In [23]:
df.dtypes


[('_id', 'string'),
 ('couponUsed', 'boolean'),
 ('customer', 'struct<gender:string,age:int,email:string,satisfaction:int>'),
 ('items',
  'array<struct<name:string,price:decimal(6,2),quantity:int,tags:array<string>>>'),
 ('purchaseMethod', 'string'),
 ('saleDate', 'timestamp'),
 ('storeLocation', 'string')]

In [26]:
from pyspark.sql import DataFrame

def q(sql_str):
    jdf = spark._jsparkSession.sql(sql_str)
    return DataFrame(jdf, spark)

df.createOrReplaceTempView("sales_view")

q("SELECT COUNT(*) AS total FROM sales_view").show()
q("""
SELECT storeLocation, COUNT(*) AS total
FROM sales_view
GROUP BY storeLocation
ORDER BY total DESC
""").show()
q("""
SELECT customer.age AS age, customer.email AS email, storeLocation, purchaseMethod
FROM sales_view
WHERE customer.age > 42
""").show(10)
q("""
SELECT MIN(customer.satisfaction) AS min_satisfaction,
       MAX(customer.satisfaction) AS max_satisfaction
FROM sales_view
""").show()
q("""
SELECT purchaseMethod, COUNT(*) AS total
FROM sales_view
GROUP BY purchaseMethod
ORDER BY total DESC
""").show()


+-----+
|total|
+-----+
| 5000|
+-----+

+-------------+-----+
|storeLocation|total|
+-------------+-----+
|       Denver| 1549|
|      Seattle| 1134|
|       London|  794|
|       Austin|  676|
|     New York|  501|
|    San Diego|  346|
+-------------+-----+

+---+-----------------+-------------+--------------+
|age|            email|storeLocation|purchaseMethod|
+---+-----------------+-------------+--------------+
| 50|   keecade@hem.uy|      Seattle|         Phone|
| 51|worbiduh@vowbu.cg|       Denver|      In store|
| 45|    vatires@ta.pe|      Seattle|      In store|
| 44|      owtar@pu.cd|       London|      In store|
| 71|       man@bob.mz|       Austin|        Online|
| 57| ohaguwu@nufub.gi|       Denver|      In store|
| 49| merto@betosiv.pm|       London|      In store|
| 59|      la@cevam.tj|    San Diego|      In store|
| 55|        eja@ko.es|      Seattle|        Online|
| 53|     se@nacwev.an|     New York|         Phone|
+---+-----------------+-------------+------------