In [1]:
from pyspark.sql import SparkSession

In [26]:
ss = SparkSession \
    .builder \
    .appName("myApp") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/") \
    .config("spark.mongodb.read.partitioner", "com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner") \
    .config("spark.mongodb.read.partitionerOptions.samples.per.partition", "50") \
    .getOrCreate()

In [25]:
ss.stop()

# batch

In [27]:
df = ss.read.format("mongodb")\
    .option("database", "crawl_news")\
    .option("collection", "urls")\
    .load()

In [30]:
df.rdd.getNumPartitions()

1

In [13]:
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- last_crawl: void (nullable = true)
 |-- url: string (nullable = true)



In [39]:
df.show(5, truncate=False)

+------------------------+----------+-------------------------------------------------------------------------------------------------------------------------+
|_id                     |last_crawl|url                                                                                                                      |
+------------------------+----------+-------------------------------------------------------------------------------------------------------------------------+
|669a450a8dc7a611c997a827|NULL      |https://vnexpress.net/430-trieu-co-phieu-khong-duoc-trinh-van-quyet-day-len-san-hose-the-nao-4770573.html#box_comment_vne|
|669a450a8dc7a611c997a828|NULL      |https://vnexpress.net/messi-da-dan-cau-thu-argentina-dung-che-nhao-doi-thu-4771853.html#box_comment_vne                  |
|669a450a8dc7a611c997a829|NULL      |https://vnexpress.net/bo-giao-duc-cong-bo-diem-san-y-duoc-tu-19-den-22-5-4772121.html                                    |
|669a450a8dc7a611c997a82a|NULL      |htt

In [35]:
df.write.format("mongodb")\
    .option("database", "db_raw")\
    .option("collection", "test")\
    .mode("append")\
    .save()

In [52]:
df.select("_id").count()

322

In [57]:
df.filter(df["_id"] == "669a450a8dc7a611c997a82b").show(20, truncate=False)

+------------------------+----------+-----------------------------------------------------------------------------------------+
|_id                     |last_crawl|url                                                                                      |
+------------------------+----------+-----------------------------------------------------------------------------------------+
|669a450a8dc7a611c997a82b|NULL      |https://vnexpress.net/chau-gai-ong-trump-gay-chu-y-tai-dai-hoi-dang-cong-hoa-4772029.html|
+------------------------+----------+-----------------------------------------------------------------------------------------+



In [65]:
df2 = df.limit(5)

In [66]:
df2 = df2.drop("_id")

In [67]:
df2.show()

+----------+--------------------+
|last_crawl|                 url|
+----------+--------------------+
|      NULL|https://vnexpress...|
|      NULL|https://vnexpress...|
|      NULL|https://vnexpress...|
|      NULL|https://vnexpress...|
|      NULL|https://vnexpress...|
+----------+--------------------+



In [90]:
df2.write.format("mongodb")\
    .option("database", "db_raw")\
    .option("collection", "the_thao")\
    .option("comment", "long đẹp trai")\
    .mode("append")\
    .save()

# streaming

In [84]:
from pyspark.sql.types import *

In [86]:
read_schema = (StructType()
    .add("_id", StringType())
    .add("last_crawl", TimestampType())
    .add("url", StringType())
)

In [87]:
data_stream_read = (ss.readStream
    .format("mongodb")
    .option("database", "db_raw")
    .option("collection", "the_thao")
    .schema(read_schema)
    .load()
    .writeStream
    .format("console")
    .trigger(continuous="1 seconds")
    .outputMode("append")
)

In [88]:
query = data_stream_read.start()

In [92]:
query.stop()