* Spark connector https://docs.mongodb.com/spark-connector/current/
* mongo-spark https://spark-packages.org/package/mongodb/mongo-spark
* pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.2
    * spark 2.4.7
    * scala 2.11.12
    * java 1.8
    * mongodb 4.4.2



In [1]:
from pyspark.sql import SparkSession

In [12]:
url_test_collection = "mongodb://localhost/test_db.test_collection"

my_spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.mongodb.input.uri", url_test_collection) \
    .config("spark.mongodb.output.uri", url_test_collection) \
    .getOrCreate()

### Leitura

In [13]:
dados = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
type(dados)

pyspark.sql.dataframe.DataFrame

In [14]:
dados.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- item: string (nullable = true)
 |-- qty: double (nullable = true)
 |-- size: struct (nullable = true)
 |    |-- h: double (nullable = true)
 |    |-- w: double (nullable = true)
 |    |-- uom: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [15]:
dados.count()

3

In [16]:
dados.head()

Row(_id=Row(oid='5fef762ea361ebddd011487b'), item='Camisa Polo', qty=25.0, size=Row(h=14.0, w=21.0, uom='cm'), tags=['branco', 'vermelho'])

In [17]:
dados.show()

+--------------------+---------------+----+-----------------+------------------+
|                 _id|           item| qty|             size|              tags|
+--------------------+---------------+----+-----------------+------------------+
|[5fef762ea361ebdd...|    Camisa Polo|25.0| [14.0, 21.0, cm]|[branco, vermelho]|
|[5fef762ea361ebdd...|Vestido Bordado|85.0| [27.9, 35.5, cm]|           [cinza]|
|[5fef762ea361ebdd...|        Moleton|45.0|[19.0, 22.85, cm]|     [verde, azul]|
+--------------------+---------------+----+-----------------+------------------+



### Gravação

In [19]:
df = spark.createDataFrame([("Camisa T-Shirt", 50)], ["item", "qty"])

In [20]:
df.head()

Row(item='Camisa T-Shirt', qty=50)

In [21]:
df.show()

+--------------+---+
|          item|qty|
+--------------+---+
|Camisa T-Shirt| 50|
+--------------+---+



In [22]:
df.collect()

[Row(item='Camisa T-Shirt', qty=50)]

In [23]:
df.show(1)

+--------------+---+
|          item|qty|
+--------------+---+
|Camisa T-Shirt| 50|
+--------------+---+



In [24]:
df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").save()

In [25]:
dados.show()

+--------------------+---------------+----+-----------------+------------------+
|                 _id|           item| qty|             size|              tags|
+--------------------+---------------+----+-----------------+------------------+
|[5fef762ea361ebdd...|    Camisa Polo|25.0| [14.0, 21.0, cm]|[branco, vermelho]|
|[5fef762ea361ebdd...|Vestido Bordado|85.0| [27.9, 35.5, cm]|           [cinza]|
|[5fef762ea361ebdd...|        Moleton|45.0|[19.0, 22.85, cm]|     [verde, azul]|
|[5fef775b5db57b1b...| Camisa T-Shirt|50.0|             null|              null|
+--------------------+---------------+----+-----------------+------------------+

