# Part 0: Setting up

## Install and start MongoDB

In [None]:
!apt install -qq neofetch
!neofetch

The following additional packages will be installed:
  caca-utils chafa fonts-droid-fallback fonts-noto-mono fonts-urw-base35
  ghostscript gsfonts imagemagick imagemagick-6-common imagemagick-6.q16 jp2a
  libchafa0 libdjvulibre-text libdjvulibre21 libfftw3-double3 libgs9
  libgs9-common libid3tag0 libidn12 libijs-0.35 libimlib2 libjbig2dec0
  libjxr-tools libjxr0 liblqr-1-0 libmagickcore-6.q16-6
  libmagickcore-6.q16-6-extra libmagickwand-6.q16-6 libnetpbm10 libpci3
  libsixel-bin libsixel1 libwmflite-0.2-7 netpbm pci.ids pciutils poppler-data
  toilet toilet-fonts w3m w3m-img
Suggested packages:
  fonts-noto fonts-freefont-otf | fonts-freefont-ttf fonts-texgyre
  ghostscript-x imagemagick-doc autotrace cups-bsd | lpr | lprng enscript gimp
  gnuplot grads hp2xx html2ps libwmf-bin mplayer povray radiance sane-utils
  texlive-base-bin transfig ufraw-batch libfftw3-bin libfftw3-dev inkscape
  poppler-utils fonts-japanese-mincho | fonts-ipafont-mincho
  fonts-japanese-gothic | fonts-ipafo

In [None]:
!sudo apt-get install -y mongodb-org

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
mongodb-org is already the newest version (6.0.8).
0 upgraded, 0 newly installed, 0 to remove and 32 not upgraded.


Due to the fact that Colab has upgrade their environment to 22.04, some environment is [not work at all](https://medium.com/google-colab/colab-updated-to-ubuntu-22-04-lts-709a91555b3c). You should using your own MongoDB server to do this lab on local if you can not resolve the environment conflict.

In [None]:
!apt install -qq mongodb
!service mongodb start

Package mongodb is not available, but is referred to by another package.
This may mean that the package is missing, has been obsoleted, or
is only available from another source

[1;31mE: [0mPackage 'mongodb' has no installation candidate[0m
mongodb: unrecognized service


## Download dataset and push to our mongodb

In [None]:
# It's already the 21st century and people are very impatient, so they use Brotli for text and Zstd for everything else.
# Reference: https://github.com/google/brotli
!apt-get install -qq brotli

Selecting previously unselected package brotli.
(Reading database ... 120511 files and directories currently installed.)
Preparing to unpack .../brotli_1.0.9-2build6_amd64.deb ...
Unpacking brotli (1.0.9-2build6) ...
Setting up brotli (1.0.9-2build6) ...
Processing triggers for man-db (2.10.2-1) ...


In [None]:
!wget -q https://csc14118.github.io/thuoc_raw.json.br
!wget -q https://csc14118.github.io/gia_ke_khai_raw.json.br
!wget -q https://csc14118.github.io/movies_lang.json.br

In [None]:
!brotli -d *.br

In [None]:
!pip install -q pymongo

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m603.6/603.6 kB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m300.4/300.4 kB[0m [31m7.6 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import json
from pymongo import MongoClient

client = MongoClient()

# Creation of the new database
db = client['input_data']

collection_name = ["gia_ke_khai_raw", "movies_lang", "thuoc_raw"]

# Push our data to mongodb
for data in collection_name:
    collection = db[data]
    chunks = json.load(open(f'{data}.json'))
    collection.insert_many(chunks)

# Create a dummy database to test
db = client['dummy']
db['chunks'].insert_many([{'Banh xeo': 'Rat ngon'},{'Banh bao': 'Cung ngon'}])

client.list_database_names()

## Install pyspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q "https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz"
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.1.1-bin-hadoop2.7"
import findspark
findspark.init()
findspark.find()

'spark-3.1.1-bin-hadoop2.7'

In [None]:
import pyspark
print(pyspark.__version__)

3.1.1


## Dirty trick to connect spark to our mongodb

In industry environment, please read the docs carefully to seting up these complicated things.

In [None]:
!rm $SPARK_HOME/jars/mongo*.jar
!rm $SPARK_HOME/jars/bson*.jar

rm: cannot remove 'spark-3.1.1-bin-hadoop2.7/jars/mongo*.jar': No such file or directory
rm: cannot remove 'spark-3.1.1-bin-hadoop2.7/jars/bson*.jar': No such file or directory


In [None]:
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.13/mongodb-driver-3.12.13.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.12.13/mongo-java-driver-3.12.13.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/bson/4.9.1/bson-4.9.1.jar

--2023-08-10 00:53:27--  https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 156368 (153K) [application/java-archive]
Saving to: ‘mongo-spark-connector_2.12-10.1.1.jar’


2023-08-10 00:53:27 (6.30 MB/s) - ‘mongo-spark-connector_2.12-10.1.1.jar’ saved [156368/156368]

--2023-08-10 00:53:27--  https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.13/mongodb-driver-3.12.13.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 375474 (367K) [application/java-archive]
Saving to: ‘mongodb-

In [None]:
from pyspark.shell import spark
from pyspark import SparkContext,SparkConf

# uri = "mongodb://localhost:27017/input_data"

from pyspark.sql import SparkSession

# spark_jb = "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1"
# my_spark = SparkSession \
#     .builder \
#     .config("spark.executor.memory", "1g") \
#     .appName("csc14112") \
#     .config("spark.mongodb.read.connection.uri", uri) \
#     .config("spark.mongodb.write.connection.uri", uri) \
#     .getOrCreate()

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.1
      /_/

Using Python version 3.10.12 (main, Jun 11 2023 05:26:28)
Spark context Web UI available at http://740d1c094e54:4040
Spark context available as 'sc' (master = local[*], app id = local-1691628818198).
SparkSession available as 'spark'.


In [None]:
my_spark = SparkSession \
    .builder \
    .appName("csc14112") \
    .getOrCreate()

In [None]:
# # Test read data from our mongo db
# p = my_spark.read.format("mongodb").option("database","dummy").option("collection", "chunk").load()
# p.printSchema()

In [None]:
p = my_spark.read.option('header', 'true').json('movies_lang.json')
p.printSchema()

root
 |-- _id: string (nullable = true)
 |-- actors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _id: string (nullable = true)
 |    |    |-- birth_date: string (nullable = true)
 |    |    |-- first_name: string (nullable = true)
 |    |    |-- last_name: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- country: string (nullable = true)
 |-- director: struct (nullable = true)
 |    |-- _id: string (nullable = true)
 |    |-- birth_date: string (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



In [None]:
p.show()

+--------+--------------------+-------+--------------------+---------------+--------------------+--------------------+----+
|     _id|              actors|country|            director|          genre|           languages|               title|year|
+--------+--------------------+-------+--------------------+---------------+--------------------+--------------------+----+
| movie:1|[{artist:15, 1908...|     DE|{artist:3, 1899, ...|          drama|                [el]|             Vertigo|1958|
| movie:2|[{artist:5, 1949,...|    USA|{artist:4, 1937, ...|Science-fiction|    [el, en, it, es]|               Alien|1979|
| movie:3|[{artist:109, 197...|    USA|{artist:6, 1954, ...|          drama|[el, de, it, en, ...|             Titanic|1997|
| movie:4|                  []|     FR|{artist:9, 1932, ...|          drama|[en, it, el, fr, de]|           Sacrifice|1986|
| movie:5|[{artist:11, 1954...|    USA|{artist:10, 1946,...|         Action|[es, fr, en, it, ...|          Volte/Face|1997|
| movie:

# Part 1: Introduction to PySpark


In this lab assignment, we will work with a movie dataset loaded into our MongoDB at `input_data.movies_lang`. We will use PySpark RDD and DataFrame to perform the following tasks:

In [None]:
# YOUR CODE HERE
# Read data from mongodb

raise NotImplementedError

### (a) Count the number of movies by country. Sort by count in decreasing order.

In [None]:
# YOUR CODE HERE
from pyspark.sql.functions import desc, col, concat_ws, length, max
p.groupBy('country').count().sort(desc('count')).show()

+-------+-----+
|country|count|
+-------+-----+
|    USA|   67|
|     FR|   17|
|     IT|    2|
|     DE|    1|
|     JP|    1|
+-------+-----+



### (b) Return the titles of the movies produced in France.

In [None]:
# YOUR CODE HERE
p.filter(p["country"] == "FR").select("title").show(truncate = False)

+-----------------------------------+
|title                              |
+-----------------------------------+
|Sacrifice                          |
|Van Gogh                           |
|Le last_name de la rose            |
|Les dents de la mer                |
|Le gendarme et les extra-terrestres|
|Les bronzés font du ski            |
|Shining                            |
|Jeanne d'Arc                       |
|Le cinquième élément               |
|Léon                               |
|Nikita                             |
|Le grand bleu                      |
|De bruit et de fureur              |
|Les quatre cents coups             |
|Le dernier métro                   |
|Un prophète                        |
|Nous trois ou rien                 |
+-----------------------------------+



### (c) Return the title of the movies of which Sofia Coppola is one of the actresses.

In [None]:
# YOUR CODE HERE
from pyspark.sql import functions as f
# explode 'actors' column to get all actors
inv = p.withColumn("actor", f.explode(p.actors))
# concat first name and last name for full name of actor
inv = inv.withColumn('full_name', concat_ws(' ', inv.actor.first_name, inv.actor.last_name))
# filter by full name 'Sofia Coppala'
inv.filter(inv.full_name == 'Sofia Coppola').select('title').show(truncate = False)

+--------------+
|title         |
+--------------+
|Le parrain III|
+--------------+



### (d) Return the names and birth dates of the directors of movies produced in France.


In [None]:
# YOUR CODE HERE
# get first name, last name and birth date of the directors of movies having "FR" country
di = p.filter(p["country"] == "FR").select("director.first_name", "director.last_name", "director.birth_date")
# concatenate first name and last name into fulll name; remove dupliacted rows
di.select(concat_ws(' ', di.first_name, di.last_name).alias('full_name'), di.birth_date).distinct().show()

+-------------------+----------+
|          full_name|birth_date|
+-------------------+----------+
|         Luc Besson|      1959|
|Jean-Jacques Annaud|      1943|
|     Maurice Pialat|      1925|
|    Patrice Leconte|      1947|
|     Tabib  Kheiron|      null|
|   Andrei Tarkovski|      1932|
|  François Truffaut|      null|
|       Jean Girault|      1924|
|    Jacques Audiard|      1952|
|     Claude Chabrol|      1930|
|    Stanley Kubrick|      1928|
|   Steven Spielberg|      1946|
+-------------------+----------+



### (e) Return the average number of actors in a film.


In [None]:
# YOUR CODE HERE

raise NotImplementedError

### (f) Return the name of the actor that acted in the most movies.

In [None]:
# YOUR CODE HERE
# count the number of movies of actors
num_movie_actor = inv.select('full_name').groupBy('full_name').count()
# find the max number
max_movies = num_movie_actor.select(max('count')).collect()[0]['max(count)']
# list out all actors
num_movie_actor.filter(num_movie_actor['count'] == max_movies).select('full_name').show()

+------------+
|   full_name|
+------------+
|Bruce Willis|
+------------+



# Part 2: Real-world Data Manipulation

In this part of the lab, we will work with two collections in our MongoDB: `gia_ke_khai_raw` and `thuoc_raw` loaded at `input_data.gia_ke_khai_raw` and `input_data.thuoc_raw` respectively. We will use PySpark RDD and DataFrame to perform the following tasks:

### (a)  Read the datasets into a DataFrame and print out the schema and the number of records.

In [None]:
# YOUR CODE HERE
gkk = my_spark.read.option('header', 'true').json('gia_ke_khai_raw.json')
gkk.printSchema()
gkk.count()

root
 |-- doanhNghiepDk: string (nullable = true)
 |-- doanhNghiepSx: string (nullable = true)
 |-- dongGoi: string (nullable = true)
 |-- dvt: string (nullable = true)
 |-- giaBan: string (nullable = true)
 |-- hoatChat: string (nullable = true)
 |-- id: string (nullable = true)
 |-- image: string (nullable = true)
 |-- ngayBaoCao: string (nullable = true)
 |-- ngayVanBan: string (nullable = true)
 |-- nongDo: string (nullable = true)
 |-- sdk: string (nullable = true)
 |-- soVanBan: string (nullable = true)
 |-- stt: string (nullable = true)
 |-- tenThuoc: string (nullable = true)



62197

In [None]:
th1 = my_spark.read.option('header', 'true').json('thuoc_raw.json')
# th.printSchema()
# th.count()

### (b) Show all records in the `thuoc_raw` collection that have the same active pharmaceutical ingredient (API) in their `hoatChat` field as their medicine name.


Notes: In the context of medication, API stands for Active Pharmaceutical Ingredient, which is the biologically active component in a drug that produces the intended therapeutic effect. In other words, it is the chemical substance that gives a medicine its medicinal properties.

In [None]:
# YOUR CODE HERE

### (c) Create a new DataFrame from the `thuoc_raw` collection that splits the API in the `hoatChat` field into multiple rows. For example, "paracetamol" is the API in "Paracetamol 500 mg," and "amoxicillin" is the API in various medications such as "Amogentine 500mg/125mg," "Augbactam 1g/200mg," and "Viamomentin." The resulting DataFrame should have two columns: `hoatChat` and `thuocTuongUng` as a list. After processing the data, write it back to our MongoDB at `output_data.thuocthaythe`.

In [None]:
# YOUR CODE HERE
from pyspark.sql.functions import regexp_replace

In [None]:
th1.select('hoatChat', 'tenThuoc').filter(th1.hoatChat.contains('gồm')).show(truncate = False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------+
|hoatChat                                                                                                                                                                                                                                                                                                                                                                                           |tenThuoc                                    |
+---------------------------------------------------------------------------------------------------------------------------------

In [None]:
# Replace bad data
th = th1.withColumn('hoatChat', regexp_replace('hoatChat', '\n', ' '))
th = th.withColumn('hoatChat', regexp_replace('hoatChat', '\[[^]]*\]|\([^)]*\)', ''))
th = th.withColumn('hoatChat', regexp_replace('hoatChat', '\(.*', ''))
th = th.withColumn('hoatChat', regexp_replace('hoatChat', '[)]', ''))
n = '\s*[#\d]+\s*[,.\/]*\s*[\d]*\s*'
t = '[ ,;./]+'
th = th.withColumn('hoatChat', regexp_replace('hoatChat', 'f"{n}mg|{n}mcg|{n}iu|{n}ui|{n}g|{n}ml|{n}l|{n}kg|{n}mm|{n}m|{n}cm2|{n}\%|{n}đơn vị usp|{n}miu|{n}m iu|{n}lọ|{n}tuýp|{n}bơm tiêm|{n}que|{n}viên|{n}ống{t}"', ''))
th = th.withColumn('hoatChat', regexp_replace('hoatChat', '\d+:\d+', ''))
th = th.withColumn('hoatChat', regexp_replace('hoatChat', '.*:', ''))
th = th.withColumn('hoatChat', regexp_replace('hoatChat', '\s*/', ''))
th = th.withColumn('hoatChat', regexp_replace('hoatChat', '(tương đương|tương ứng|dưới dạng|dạng|gồm)', ''))
th = th.withColumn('hoatChat', regexp_replace('hoatChat', '\.\.\.|--|…|-', ''))


In [None]:
th.select('hoatChat', 'tenThuoc').filter(th.hoatChat.contains(' ')).sort(desc('hoatChat')).show(truncate = False)

In [None]:
a = th.select('hoatChat','tenThuoc')
a = a.select('hoatChat','tenThuoc').withColumn("hoatChat", f.explode(f.split(f.col("hoatChat"), '\s*[,;]\s*')))
from pyspark.sql.functions import trim
a = a.withColumn("hoatChat", trim(a.hoatChat))
# Replace bad data
# a = a.withColumn('hoatChat', regexp_replace('hoatChat', '\s*[,.\/]+\s*', ''))
# a = a.withColumn('hoatChat', regexp_replace('hoatChat', '\s*[#\d]+\s*[\d]*\s*', ''))
a = a.select('hoatChat','tenThuoc').filter((a.hoatChat != '') & (a.hoatChat != '.'))
# a.sort(desc('hoatChat')).show(truncate = False)
ttt = a.groupBy('hoatChat').agg(f.collect_list('tenThuoc').alias('thuocTuongUng'))
ttt.sort(('hoatChat')).show()

+--------------------+--------------------+
|            hoatChat|       thuocTuongUng|
+--------------------+--------------------+
|       & các Vitamin|      [Silycardus-F]|
|        . Vitamin B1|          [Geromino]|
|        . Vitamin D3|            [Padeex]|
|        . vitamin B5|           [Cadi BC]|
|                  .3|[Hydrogen peroxyd...|
|                 000|            [Hes 6%]|
|01 hệ phân phối t...|[Absolox (Đóng gó...|
|                 075|            [Rejina]|
|                   1|[Capilusa, Pectol...|
|          1 bơm tiêm|[Orgalutran (Đóng...|
|           1 que cấy|          [Implanon]|
|              1 tuýp|         [Betacylic]|
|1 ống  chứa Ascor...|          [Ascorneo]|
|10hydroxy2decenoi...|[Topiderm (SXNQ: ...|
|17 viên nén màu v...|            [Qlaira]|
|      17 β estradiol|   [Oestrogel 0.06%]|
|                  1]|[Prospan lozenges...|
|                  1ọ|     [Cefotaxime 1g]|
|                   2|[Rinzup Lozenges ...|
|2 viên nén không ...|          

In [None]:
# ttt = a.groupBy('hoatChat').agg(f.collect_list('tenThuoc').alias('thuocTuongUng'))
# ttt.sort(('hoatChat')).show()

In [None]:
ttt.filter(ttt.hoatChat.contains('ích mẫu')).show(truncate = False)

+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|hoatChat       |thuocTuongUng                                                                                                                                                                             |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|cao đặc ích mẫu|[Hoàn điều kinh bổ huyết, Tibidine, Ích mẫu hoàn]                                                                                                                                         |
|ích mẫu        |[ích mẫu, ích mẫu, ích mẫu, Bạch đái hoàn Xuân quang, Viên nang ích mẫu, Bổ huyết điều kinh, Viên ích mẫu OP.CIM, Hoạt huyết Thephaco, Viên nang nguyệt quý, Cao kh

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
client = MongoClient()
dbo = client['output_data']
collection = dbo["thuocthaythe"]
collection.insert_many(ttt)

### (d) Create new DataFrame from two collections mentioned above that contains  `tenThuoc`, `hoatChat`, `dongGoi`, `dvt` and `giaBan`. After process the data, write it back to our mongodb at `output_data.giathuoc`.

In [None]:
# YOUR CODE HERE
gt = gkk.join(th, th.soDangKy == gkk.sdk).select(gkk.sdk, th.tenThuoc, th.hoatChat, gkk.dongGoi, gkk.dvt, gkk.giaBan).distinct()
gt.show(truncate = False)


+-----------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------+----+------+
|sdk        |tenThuoc               |hoatChat                                                                                                                                                                    |dongGoi                                               |dvt |giaBan|
+-----------+-----------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------+----+------+
|VD-31422-18|Rejina                 |Mỗi 15 gam mỡ chứa: Allantoin 0,075g; Ethyl aminobenzoate 0,3g; Hydrocortisone acetat 0,075                                      

In [None]:
collection = dbo["giathuoc"]
collection.insert_many(gt)

In [None]:
# gkk.show(5,truncate = False)

+------------------------------------+-------------+---------------------------------------+----+------+--------+------------------------+-----+----------+----------+------+-----------+--------+----+--------+
|doanhNghiepDk                       |doanhNghiepSx|dongGoi                                |dvt |giaBan|hoatChat|id                      |image|ngayBaoCao|ngayVanBan|nongDo|sdk        |soVanBan|stt |tenThuoc|
+------------------------------------+-------------+---------------------------------------+----+------+--------+------------------------+-----+----------+----------+------+-----------+--------+----+--------+
|Công ty cổ phần dược phẩm Đạt Vi Phú|null         |Hộp 6 vỉ x 10 viên; hộp 10 vỉ x 10 viên|Viên|3000  |null    |5d4947f46f6a36432da7e2cb|null |26/03/2019|null      |null  |VD-31633-19|null    |null|null    |
|Công ty cổ phần dược phẩm Đạt Vi Phú|null         |Hộp 2 vỉ x 7 viên                      |Viên|450000|null    |5d4947f46f6a36432da7e2cc|null |26/03/2019|null     

In [None]:
# print(gkk.count())
# gkk.select('sdk').distinct().count()

62197


39967

In [None]:
# th.show(5,truncate = False)

+------+----+-------------------------------------------------+-------------------------------------------------+------------+-----------------------------------------+-----------------------------------------+---------------------------------------------+-----------+---------+-------+----------------------------------------+--------------+----------------+-----------+------+------+------+---------+------+--------+--------+--------+----------+-----+----+------+----------+-----------+-----------+-----+------+----+----------------------+---------+--------+
|baoChe|chuY|congTyDk                                         |congTySx                                         |congTySxCode|diaChiDk                                 |diaChiSx                                 |dongGoi                                      |dotPheDuyet|giaKeKhai|hieuLuc|hoatChat                                |huongDanSuDung|huongDanSuDungBn|id         |images|isHide|meta  |nhomThuoc|nongDo|nuocDk  |nuocSx  |phanLoai|phe

In [None]:
# print(th.count())
# th.select('soDangKy').distinct().count()

38945


38945