# Integraci√≥n de Apache Spark y MongoDB con PySpark

**Objetivo del Notebook:** Aprender a conectar Apache Spark con una base de datos MongoDB para realizar operaciones de lectura, an√°lisis y escritura. Este es un patr√≥n muy com√∫n en arquitecturas de Big Data, donde MongoDB se utiliza como un sistema de almacenamiento flexible y escalable (Operational Datastore) y Spark se usa para el procesamiento y an√°lisis de datos a gran escala.
azure-synapse-analytics-integrate-mongodb-atlas-architecture.svg
**Aprenderemos a:**
1.  **Configurar el entorno:** Iniciar una sesi√≥n de Spark con el conector oficial de MongoDB.
2.  **Conectar con MongoDB Atlas:** Usaremos una base de datos en la nube para un ejemplo realista.
3.  **Poblar MongoDB:** Insertaremos datos de ejemplo directamente desde el notebook.
4.  **Leer datos:** Cargar una colecci√≥n de MongoDB en un DataFrame de Spark.
5.  **Analizar datos:** Utilizar el poder de Spark SQL y las operaciones de DataFrame para procesar la informaci√≥n.
6.  **Escribir datos:** Guardar los resultados de nuestro an√°lisis de vuelta en una nueva colecci√≥n en MongoDB.

## 1. Configuraci√≥n del Entorno

Esta es la parte m√°s importante. Necesitamos tres cosas:
1.  **Una base de datos MongoDB:** Usaremos el servicio gratuito de **MongoDB Atlas**.
2.  **Las librer√≠as necesarias:** `pyspark` para Spark y `pymongo` para interactuar con MongoDB.
3.  **El Conector de Spark para MongoDB:** Un paquete que le permite a Spark comunicarse con MongoDB.

### 1.1. Pasos para Configurar MongoDB Atlas (¬°Acci√≥n Requerida!)

Si no tienes una cuenta, sigue estos pasos (toma 5-10 minutos):
1.  **Ve a [MongoDB Atlas](https://www.mongodb.com/cloud/atlas) y reg√≠strate.**
2.  **Crea un cl√∫ster gratuito (Tier M0).** Puedes elegir cualquier proveedor de nube y regi√≥n.
3.  **Crea un usuario de base de datos:** En la secci√≥n "Database Access", crea un usuario. **Guarda bien el nombre de usuario y la contrase√±a.**
4.  **Configura el acceso de red:** En la secci√≥n "Network Access", agrega tu direcci√≥n IP actual o permite el acceso desde cualquier lugar (`0.0.0.0/0` - **solo para este tutorial, no es seguro para producci√≥n**).
5.  **Obt√©n la cadena de conexi√≥n:** Ve a "Database", haz clic en "Connect" en tu cl√∫ster, selecciona "Connect your application" y copia la cadena de conexi√≥n. **Se ver√° algo as√≠:** `mongodb+srv://<username>:<password>@clustername.mongodb.net/`.

In [1]:
# =================================================================
# 1.2. Instalar las dependencias
# =================================================================
!pip install pyspark pymongo -q

[?25l   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.0/1.7 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m‚îÅ‚îÅ[0m[90m‚ï∫[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.1/1.7 MB[0m [31m3.6 MB/s[0m eta [36m0:00:01[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[91m‚ï∏[0m[90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m0.6/1.7 MB[0m [31m8.2 MB/s[0m eta [36m0:00:01[0m[2K   [91m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m[91m‚ï∏[0m [32m1.7/1.7 MB[0m [31m16.6 MB/s[0m eta [36m0:00:01[0m[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m1.7/1.7 MB[0m [31m14.1 MB/s[0m

In [2]:
# =================================================================
# 1.3. Iniciar la Sesi√≥n de Spark con el Conector de MongoDB
# =================================================================
from pyspark.sql import SparkSession

# Reemplaza esta versi√≥n si es necesario. B√∫scala en el repositorio de Maven.
mongo_spark_connector_version = "3.0.1"
mongo_spark_connector = f"org.mongodb.spark:mongo-spark-connector_2.12:{mongo_spark_connector_version}"

spark = SparkSession.builder\
    .appName("SparkMongoDBIntegration")\
    .config("spark.jars.packages", mongo_spark_connector)\
    .getOrCreate()

print("SparkSession iniciada con el conector de MongoDB.")

SparkSession iniciada con el conector de MongoDB.


## 2. Poblando MongoDB con Datos de Ejemplo

Para que nuestro notebook sea autocontenido, vamos a insertar algunos datos en nuestra base de datos de Atlas usando la librer√≠a `pymongo`. Esto tambi√©n nos sirve para verificar que nuestra cadena de conexi√≥n es correcta.

In [None]:
import pymongo

# =================================================================
# ¬°ACCI√ìN REQUERIDA!
# Reemplaza la cadena de conexi√≥n con la tuya de MongoDB Atlas.
# Aseg√∫rate de poner tu usuario y contrase√±a.
# =================================================================
# =================================================================
# ¬°ACCI√ìN REQUERIDA!
# Reemplaza la cadena de conexi√≥n con la tuya de MongoDB Atlas.
# =================================================================
MONGO_URI = ""
# Definimos el nombre de nuestra base de datos y colecci√≥n
DB_NAME = "universidad"
COLLECTION_NAME = "cientificos"

# Datos de ejemplo que vamos a insertar
cientificos_data = [
    {"nombre": "Albert", "apellido": "Einstein", "campo": "F√≠sica", "nacimiento": 1879},
    {"nombre": "Marie", "apellido": "Curie", "campo": "Qu√≠mica", "nacimiento": 1867},
    {"nombre": "Isaac", "apellido": "Newton", "campo": "F√≠sica", "nacimiento": 1643},
    {"nombre": "Charles", "apellido": "Darwin", "campo": "Biolog√≠a", "nacimiento": 1809},
    {"nombre": "Rosalind", "apellido": "Franklin", "campo": "Qu√≠mica", "nacimiento": 1920}
]

# Conectamos a MongoDB y poblamos la colecci√≥n
try:
    client = pymongo.MongoClient(MONGO_URI)
    db = client[DB_NAME]
    collection = db[COLLECTION_NAME]

    # Limpiamos la colecci√≥n por si ya exist√≠an datos
    collection.delete_many({})

    # Insertamos los nuevos datos
    collection.insert_many(cientificos_data)

    print(f"‚úÖ Datos insertados correctamente en la colecci√≥n '{COLLECTION_NAME}'.")
    client.close()
except Exception as e:
    print(f"‚ùå Error al conectar o insertar datos: {e}")

‚úÖ Datos insertados correctamente en la colecci√≥n 'cientificos'.


## 3. Leer Datos de MongoDB con Spark

Ahora que tenemos datos en MongoDB, vamos a usar Spark para leerlos y cargarlos en un DataFrame. Un DataFrame es una tabla de datos distribuida con columnas nombradas.

In [6]:
# Leemos los datos desde MongoDB a un DataFrame de Spark
df_cientificos = spark.read.format("mongo")\
    .option("uri", MONGO_URI)\
    .option("database", DB_NAME)\
    .option("collection", COLLECTION_NAME)\
    .load()

# Mostramos el esquema inferido por Spark
print("Esquema del DataFrame:")
df_cientificos.printSchema()

# Mostramos los datos cargados
print("\nDatos cargados desde MongoDB:")
df_cientificos.show()

Esquema del DataFrame:
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- apellido: string (nullable = true)
 |-- campo: string (nullable = true)
 |-- nacimiento: integer (nullable = true)
 |-- nombre: string (nullable = true)


Datos cargados desde MongoDB:
+--------------------+--------+--------+----------+--------+
|                 _id|apellido|   campo|nacimiento|  nombre|
+--------------------+--------+--------+----------+--------+
|{68ccb513e7510f01...|Einstein|  F√≠sica|      1879|  Albert|
|{68ccb513e7510f01...|   Curie| Qu√≠mica|      1867|   Marie|
|{68ccb513e7510f01...|  Newton|  F√≠sica|      1643|   Isaac|
|{68ccb513e7510f01...|  Darwin|Biolog√≠a|      1809| Charles|
|{68ccb513e7510f01...|Franklin| Qu√≠mica|      1920|Rosalind|
+--------------------+--------+--------+----------+--------+



## 4. Analizar los Datos con Spark

Una vez que los datos est√°n en un DataFrame de Spark, podemos usar todo su poder para el an√°lisis. Realizaremos un an√°lisis simple: **encontrar a los cient√≠ficos nacidos en el siglo XIX y agruparlos por campo.**

In [7]:
from pyspark.sql.functions import count

# Filtramos los cient√≠ficos nacidos en el siglo XIX (entre 1801 y 1900)
df_siglo_xix = df_cientificos.filter(
    (df_cientificos.nacimiento > 1800) & (df_cientificos.nacimiento <= 1900)
)

print("Cient√≠ficos nacidos en el siglo XIX:")
df_siglo_xix.show()

# Agrupamos por campo y contamos cu√°ntos hay en cada uno
df_conteo_por_campo = df_siglo_xix.groupBy("campo").agg(
    count("*").alias("numero_de_cientificos")
)

print("\nConteo de cient√≠ficos del siglo XIX por campo:")
df_conteo_por_campo.show()

Cient√≠ficos nacidos en el siglo XIX:
+--------------------+--------+--------+----------+-------+
|                 _id|apellido|   campo|nacimiento| nombre|
+--------------------+--------+--------+----------+-------+
|{68ccb513e7510f01...|Einstein|  F√≠sica|      1879| Albert|
|{68ccb513e7510f01...|   Curie| Qu√≠mica|      1867|  Marie|
|{68ccb513e7510f01...|  Darwin|Biolog√≠a|      1809|Charles|
+--------------------+--------+--------+----------+-------+


Conteo de cient√≠ficos del siglo XIX por campo:
+--------+---------------------+
|   campo|numero_de_cientificos|
+--------+---------------------+
|  F√≠sica|                    1|
| Qu√≠mica|                    1|
|Biolog√≠a|                    1|
+--------+---------------------+



## 5. Escribir los Resultados de Vuelta a MongoDB

Finalmente, guardaremos el resultado de nuestro an√°lisis (el DataFrame `df_conteo_por_campo`) en una nueva colecci√≥n en MongoDB.

In [8]:
# Definimos el nombre de la nueva colecci√≥n para los resultados
RESULTS_COLLECTION_NAME = "conteo_por_campo"

# Escribimos el DataFrame de resultados en MongoDB
# El modo "overwrite" borrar√° la colecci√≥n si ya existe y la crear√° de nuevo.
df_conteo_por_campo.write.format("mongo")\
    .option("uri", MONGO_URI)\
    .option("database", DB_NAME)\
    .option("collection", RESULTS_COLLECTION_NAME)\
    .mode("overwrite")\
    .save()

print(f"‚úÖ Resultados guardados en la colecci√≥n '{RESULTS_COLLECTION_NAME}'.")

# Verificaci√≥n final usando pymongo para confirmar que los datos se escribieron
try:
    client = pymongo.MongoClient(MONGO_URI)
    db = client[DB_NAME]
    results_collection = db[RESULTS_COLLECTION_NAME]
    print("\nVerificando los datos escritos en MongoDB:")
    for doc in results_collection.find():
        print(doc)
    client.close()
except Exception as e:
    print(f"‚ùå Error al verificar los datos: {e}")

# Detenemos la sesi√≥n de Spark
spark.stop()

‚úÖ Resultados guardados en la colecci√≥n 'conteo_por_campo'.

Verificando los datos escritos en MongoDB:
{'_id': ObjectId('68ccb54c98f4a667c551597b'), 'campo': 'F√≠sica', 'numero_de_cientificos': 1}
{'_id': ObjectId('68ccb54c98f4a667c551597c'), 'campo': 'Qu√≠mica', 'numero_de_cientificos': 1}
{'_id': ObjectId('68ccb54c98f4a667c551597d'), 'campo': 'Biolog√≠a', 'numero_de_cientificos': 1}


#Ejemplo con datos reales

Vamos a reemplazar los datos de los cient√≠ficos con un conjunto de datos p√∫blico muy popular: T√≠tulos de Netflix. Este dataset contiene informaci√≥n sobre pel√≠culas y shows de TV, su tipo, pa√≠s de origen, a√±o de lanzamiento, etc. Es perfecto para realizar agregaciones y filtros realistas.

# Integraci√≥n de Apache Spark y MongoDB con PySpark: Caso Pr√°ctico con Datos de Netflix

**Objetivo del Notebook:** Aprender a conectar Apache Spark con una base de datos MongoDB para realizar operaciones de lectura, an√°lisis y escritura. Este es un patr√≥n muy com√∫n en arquitecturas de Big Data, donde MongoDB se utiliza como un sistema de almacenamiento flexible y Spark se usa para el procesamiento a gran escala.


**Aprenderemos a:**
1.  **Configurar el entorno:** Iniciar una sesi√≥n de Spark con el conector oficial de MongoDB.
2.  **Conectar con MongoDB Atlas:** Usaremos una base de datos en la nube para un ejemplo realista.
3.  **Poblar MongoDB:** Descargaremos un dataset real (T√≠tulos de Netflix) y lo insertaremos en nuestra base de datos.
4.  **Leer datos:** Cargar la colecci√≥n de Netflix en un DataFrame de Spark.
5.  **Analizar datos:** Utilizar el poder de Spark para realizar agregaciones y filtros sobre los datos.
6.  **Escribir datos:** Guardar los resultados de nuestro an√°lisis de vuelta en una nueva colecci√≥n en MongoDB.

In [1]:
# =================================================================
# Paso 1: Instalar las dependencias
# =================================================================
!pip install pyspark pymongo pandas -q

In [2]:
# =================================================================
# Paso 2: Iniciar la Sesi√≥n de Spark con el Conector de MongoDB
# =================================================================
from pyspark.sql import SparkSession

# Versi√≥n del conector de Spark para MongoDB
mongo_spark_connector_version = "3.0.1"
mongo_spark_connector = f"org.mongodb.spark:mongo-spark-connector_2.12:{mongo_spark_connector_version}"

spark = SparkSession.builder\
    .appName("SparkMongoDBNetflix")\
    .config("spark.jars.packages", mongo_spark_connector)\
    .getOrCreate()

print("‚úÖ SparkSession iniciada con el conector de MongoDB.")

‚úÖ SparkSession iniciada con el conector de MongoDB.


## 3. Poblando MongoDB con el Dataset de Netflix

Ahora, vamos a descargar el dataset de Netflix, procesarlo con la librer√≠a Pandas y subirlo a nuestra base de datos en MongoDB Atlas.

In [None]:
import pymongo
import pandas as pd
import requests
from io import StringIO

# =================================================================
# ¬°ACCI√ìN REQUERIDA!
# Reemplaza la cadena de conexi√≥n con la tuya de MongoDB Atlas.
# Aseg√∫rate de poner tu usuario y contrase√±a.
# =================================================================
MONGO_URI = ""

# Definimos el nombre de nuestra base de datos y colecci√≥n
DB_NAME = "netflix"
COLLECTION_NAME = "titles"

# URL del dataset de Netflix en formato CSV
url = "https://raw.githubusercontent.com/rfordatascience/tidytuesday/master/data/2021/2021-04-20/netflix_titles.csv"

print("üì• Descargando el dataset de Netflix...")
try:
    # Descargar el contenido del archivo CSV
    response = requests.get(url)
    response.raise_for_status() # Lanza un error si la descarga falla

    # Leer el CSV en un DataFrame de Pandas
    csv_data = StringIO(response.text)
    df_pandas = pd.read_csv(csv_data)

    # Limpieza b√°sica: Reemplazar valores nulos (NaN) para evitar errores en MongoDB
    df_pandas.fillna("", inplace=True)

    # Convertir el DataFrame a una lista de diccionarios (formato JSON)
    data_to_insert = df_pandas.to_dict('records')

    print(f"üìÑ Se leyeron {len(data_to_insert)} documentos del archivo.")

    # Conectar a MongoDB y poblar la colecci√≥n
    print("‚è≥ Conectando a MongoDB Atlas y poblando la colecci√≥n...")
    client = pymongo.MongoClient(MONGO_URI)
    db = client[DB_NAME]
    collection = db[COLLECTION_NAME]

    # Limpiamos la colecci√≥n por si ya exist√≠an datos
    collection.delete_many({})

    # Insertamos los nuevos datos
    collection.insert_many(data_to_insert)

    print(f"‚úÖ ¬°√âxito! {len(data_to_insert)} documentos insertados en la colecci√≥n '{COLLECTION_NAME}'.")
    client.close()

except Exception as e:
    print(f"‚ùå Ocurri√≥ un error: {e}")

üì• Descargando el dataset de Netflix...
üìÑ Se leyeron 7787 documentos del archivo.
‚è≥ Conectando a MongoDB Atlas y poblando la colecci√≥n...
‚úÖ ¬°√âxito! 7787 documentos insertados en la colecci√≥n 'titles'.


## 4. Leer Datos de MongoDB con Spark

Con los datos ya en MongoDB, usaremos Spark para leer la colecci√≥n completa y cargarla en un DataFrame distribuido.

In [6]:
# Leemos los datos desde la colecci√≥n de Netflix
df_netflix = spark.read.format("mongo")\
    .option("uri", MONGO_URI)\
    .option("database", DB_NAME)\
    .option("collection", COLLECTION_NAME)\
    .load()

print("üìù Esquema del DataFrame de Netflix:")
df_netflix.printSchema()

print("\nüé¨ Algunos datos cargados desde la colecci√≥n de Netflix:")
df_netflix.show(5)

üìù Esquema del DataFrame de Netflix:
root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- country: string (nullable = true)
 |-- date_added: string (nullable = true)
 |-- description: string (nullable = true)
 |-- director: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- listed_in: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- show_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- type: string (nullable = true)


üé¨ Algunos datos cargados desde la colecci√≥n de Netflix:
+--------------------+--------------------+-------------+-----------------+--------------------+-----------------+---------+--------------------+------+------------+-------+-----+-------+
|                 _id|                cast|      country|       date_added|         description|         director| duration|           listed_in|rating|rel

## 5. Analizar los Datos con Spark

Ahora realizaremos un an√°lisis sobre el DataFrame:
1.  Contar cu√°ntas producciones son **Pel√≠culas (Movie)** vs. **Shows de TV (TV Show)**.
2.  Encontrar el **Top 5 de pa√≠ses** con la mayor cantidad de producciones en Netflix.

In [7]:
from pyspark.sql.functions import col, desc

# 1. Conteo por tipo (Movie vs. TV Show)
df_conteo_tipo = df_netflix.groupBy("type").count()

print("üìä Conteo de producciones por tipo:")
df_conteo_tipo.show()


# 2. Top 5 pa√≠ses con m√°s producciones
# Filtramos para excluir registros donde el pa√≠s no est√° especificado
df_conteo_pais = df_netflix.filter(col("country") != "")\
    .groupBy("country")\
    .count()\
    .orderBy(desc("count"))\
    .limit(5)

print("\nüèÜ Top 5 pa√≠ses con m√°s producciones en Netflix:")
df_conteo_pais.show()

üìä Conteo de producciones por tipo:
+-------+-----+
|   type|count|
+-------+-----+
|TV Show| 2410|
|  Movie| 5377|
+-------+-----+


üèÜ Top 5 pa√≠ses con m√°s producciones en Netflix:
+--------------+-----+
|       country|count|
+--------------+-----+
| United States| 2555|
|         India|  923|
|United Kingdom|  397|
|         Japan|  226|
|   South Korea|  183|
+--------------+-----+



## 6. Escribir Resultados de Vuelta a MongoDB

Finalmente, guardaremos el resultado de nuestro an√°lisis (el Top 5 de pa√≠ses) en una **nueva colecci√≥n** en MongoDB para que pueda ser consultado por otras aplicaciones.

In [8]:
# Definimos el nombre de la nueva colecci√≥n para los resultados
RESULTS_COLLECTION_NAME = "top_paises_productores"

# Escribimos el DataFrame de resultados en MongoDB
# El modo "overwrite" borrar√° la colecci√≥n si ya existe y la crear√° de nuevo.
df_conteo_pais.write.format("mongo")\
    .option("uri", MONGO_URI)\
    .option("database", DB_NAME)\
    .option("collection", RESULTS_COLLECTION_NAME)\
    .mode("overwrite")\
    .save()

print(f"‚úÖ Resultados guardados en la colecci√≥n '{RESULTS_COLLECTION_NAME}'.")

# Verificaci√≥n final usando pymongo para confirmar que los datos se escribieron
try:
    client = pymongo.MongoClient(MONGO_URI)
    db = client[DB_NAME]
    results_collection = db[RESULTS_COLLECTION_NAME]
    print("\nüßê Verificando los datos escritos en MongoDB:")
    for doc in results_collection.find():
        print(doc)
    client.close()
except Exception as e:
    print(f"‚ùå Error al verificar los datos: {e}")

# Detenemos la sesi√≥n de Spark para liberar recursos
spark.stop()

‚úÖ Resultados guardados en la colecci√≥n 'top_paises_productores'.

üßê Verificando los datos escritos en MongoDB:
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1ae'), 'country': 'United States', 'count': 2555}
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1af'), 'country': 'India', 'count': 923}
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1b0'), 'country': 'United Kingdom', 'count': 397}
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1b1'), 'country': 'Japan', 'count': 226}
{'_id': ObjectId('68ccb6b9a0f4ab6fc60db1b2'), 'country': 'South Korea', 'count': 183}
