# Part 0: Setting up

## Install and start MongoDB

In [None]:
!apt install -qq mongodb
!service mongodb start

The following additional packages will be installed:
  libpcap0.8 libyaml-cpp0.6 mongo-tools mongodb-clients mongodb-server
  mongodb-server-core
The following NEW packages will be installed:
  libpcap0.8 libyaml-cpp0.6 mongo-tools mongodb mongodb-clients mongodb-server
  mongodb-server-core
0 upgraded, 7 newly installed, 0 to remove and 24 not upgraded.
Need to get 55.8 MB of archives.
After this operation, 226 MB of additional disk space will be used.
Selecting previously unselected package libpcap0.8:amd64.
(Reading database ... 122352 files and directories currently installed.)
Preparing to unpack .../0-libpcap0.8_1.9.1-3_amd64.deb ...
Unpacking libpcap0.8:amd64 (1.9.1-3) ...
Selecting previously unselected package libyaml-cpp0.6:amd64.
Preparing to unpack .../1-libyaml-cpp0.6_0.6.2-4ubuntu1_amd64.deb ...
Unpacking libyaml-cpp0.6:amd64 (0.6.2-4ubuntu1) ...
Selecting previously unselected package mongo-tools.
Preparing to unpack .../2-mongo-tools_3.6.3-0ubuntu1_amd64.deb ...
Unpacki

## Download dataset and push to our mongodb 

In [None]:
# It's already the 21st century and people are very impatient, so they use Brotli for text and Zstd for everything else.
# Reference: https://github.com/google/brotli
!apt-get install -qq brotli

Selecting previously unselected package brotli.
(Reading database ... (Reading database ... 5%(Reading database ... 10%(Reading database ... 15%(Reading database ... 20%(Reading database ... 25%(Reading database ... 30%(Reading database ... 35%(Reading database ... 40%(Reading database ... 45%(Reading database ... 50%(Reading database ... 55%(Reading database ... 60%(Reading database ... 65%(Reading database ... 70%(Reading database ... 75%(Reading database ... 80%(Reading database ... 85%(Reading database ... 90%(Reading database ... 95%(Reading database ... 100%(Reading database ... 122419 files and directories currently installed.)
Preparing to unpack .../brotli_1.0.7-6ubuntu0.1_amd64.deb ...
Unpacking brotli (1.0.7-6ubuntu0.1) ...
Setting up brotli (1.0.7-6ubuntu0.1) ...
Processing triggers for man-db (2.9.1-1) ...


In [None]:
!wget -q https://csc14118.github.io/thuoc_raw.json.br
!wget -q https://csc14118.github.io/gia_ke_khai_raw.json.br
!wget -q https://csc14118.github.io/movies_lang.json.br 

In [None]:
!brotli -d *.br

In [None]:
!pip install -q pymongo

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m492.1/492.1 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m283.7/283.7 kB[0m [31m25.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
import json
from pymongo import MongoClient

client = MongoClient()

# Creation of the new database
db = client['input_data']

collection_name = ["gia_ke_khai_raw", "movies_lang", "thuoc_raw"]

# Push our data to mongodb
for data in collection_name:
    collection = db[data]
    chunks = json.load(open(f'{data}.json'))
    collection.insert_many(chunks)

# Create a dummy database to test
db = client['dummy']
db['chunks'].insert_many([{'Banh xeo': 'Rat ngon'},{'Banh bao': 'Cung ngon'}])

client.list_database_names()

['admin', 'config', 'dummy', 'input_data', 'local']

## Install pyspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q "https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz"
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.1.1-bin-hadoop2.7"
import findspark
findspark.init()
findspark.find()

'spark-3.1.1-bin-hadoop2.7'

In [None]:
import pyspark
print(pyspark.__version__)

3.1.1


## Dirty trick to connect spark to our mongodb

In industry environment, please read the docs carefully to seting up these complicated things.

In [None]:
!rm $SPARK_HOME/jars/mongo*.jar
!rm $SPARK_HOME/jars/bson*.jar

rm: cannot remove 'spark-3.1.1-bin-hadoop2.7/jars/mongo*.jar': No such file or directory
rm: cannot remove 'spark-3.1.1-bin-hadoop2.7/jars/bson*.jar': No such file or directory


In [None]:
# Used this cell for solving conflict dependency which caused error
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.13/mongodb-driver-3.12.13.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.12.13/mongo-java-driver-3.12.13.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/bson/4.9.1/bson-4.9.1.jar

--2023-04-24 16:47:43--  https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 156368 (153K) [application/java-archive]
Saving to: ‘mongo-spark-connector_2.12-10.1.1.jar’


2023-04-24 16:47:43 (11.5 MB/s) - ‘mongo-spark-connector_2.12-10.1.1.jar’ saved [156368/156368]

--2023-04-24 16:47:43--  https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.13/mongodb-driver-3.12.13.jar
Resolving repo1.maven.org (repo1.maven.org)... 199.232.192.209, 199.232.196.209, 2a04:4e42:4c::209, ...
Connecting to repo1.maven.org (repo1.maven.org)|199.232.192.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 375474 (367K) [application/java-archive]
Saving to: ‘mongodb-

In [None]:
# This code will produce error
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.12/mongodb-driver-3.12.12.jar
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.12.12/mongo-java-driver-3.12.12.jar
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/bson/4.6.0/bson-4.6.0.jar

In [None]:
from pyspark.shell import spark
from pyspark import SparkContext,SparkConf

uri = "mongodb://localhost:27017/input_data"

from pyspark.sql import SparkSession

spark_jb = "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1"
my_spark = SparkSession \
    .builder \
    .config("spark.executor.memory", "1g") \
    .appName("csc14112") \
    .config("spark.mongodb.read.connection.uri", uri) \
    .config("spark.mongodb.write.connection.uri", uri) \
    .getOrCreate()

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.1
      /_/

Using Python version 3.9.16 (main, Dec  7 2022 01:11:51)
Spark context Web UI available at http://f70a865cf7e0:4040
Spark context available as 'sc' (master = local[*], app id = local-1682354873557).
SparkSession available as 'spark'.


In [None]:
# Test read data from our mongo db
p = my_spark.read.format("mongodb").option("database","dummy").option("collection", "chunk").load()
p.printSchema()

root



In [None]:
p.show()

++
||
++
++



# Part 1: Introduction to PySpark


In this lab assignment, we will work with a movie dataset loaded into our MongoDB at `input_data.movies_lang`. We will use PySpark RDD and DataFrame to perform the following tasks:

In [None]:
# YOUR CODE HERE

from pyspark.sql.functions import col

# Read data from mongodb
dfMovies = my_spark.read.format("mongodb").option("database","input_data").option("collection","movies_lang").load()

dfMovies.printSchema()
dfMovies.show()

dfMovies.createOrReplaceTempView("Movies")
query = spark.sql("SELECT * FROM Movies")
query.show(5)

root
 |-- _id: string (nullable = true)
 |-- actors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _id: string (nullable = true)
 |    |    |-- first_name: string (nullable = true)
 |    |    |-- last_name: string (nullable = true)
 |    |    |-- birth_date: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- country: string (nullable = true)
 |-- director: struct (nullable = true)
 |    |-- _id: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |    |-- first_name: string (nullable = true)
 |    |-- birth_date: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)

+--------+--------------------+-------+--------------------+---------------+--------------------+--------------------+----+
|     _id|              actors|country|       

### (a) Count the number of movies by country. Sort by count in decreasing order.

In [None]:
# YOUR CODE HERE
from pyspark.sql import functions as f
dfMovies.groupBy('country') \
        .count() \
        .orderBy(f.desc('count')) \
        .show()

+-------+-----+
|country|count|
+-------+-----+
|    USA|   67|
|     FR|   17|
|     IT|    2|
|     DE|    1|
|     JP|    1|
+-------+-----+



### (b) Return the titles of the movies produced in France.

In [None]:
# YOUR CODE HERE
dfMovies.where(col('country') == "FR").select("title").distinct().show()

+--------------------+
|               title|
+--------------------+
|           Sacrifice|
|            Van Gogh|
|Le last_name de l...|
| Les dents de la mer|
|Le gendarme et le...|
|Les bronzés font ...|
|             Shining|
|        Jeanne d'Arc|
|Le cinquième élément|
|                Léon|
|              Nikita|
|       Le grand bleu|
|De bruit et de fu...|
|Les quatre cents ...|
|    Le dernier métro|
|         Un prophète|
|  Nous trois ou rien|
+--------------------+



### (c) Return the title of the movies of which Sofia Coppola is one of the actresses. 

In [None]:
# YOUR CODE HERE
from pyspark.sql.functions import array_contains, split

dfMovies.where( array_contains(col('actors.first_name'), "Sofia")).where( array_contains(col('actors.last_name'), "Coppola")).select("title").distinct().show()

+--------------+
|         title|
+--------------+
|Le parrain III|
+--------------+



### (d) Return the names and birth dates of the directors of movies produced in France.


In [None]:
# YOUR CODE HERE
dfMovies.where(col('country') == "FR").selectExpr("director.last_name", "director.first_name", "director.birth_date").distinct().show()

#raise NotImplementedError

+---------+------------+----------+
|last_name|  first_name|birth_date|
+---------+------------+----------+
|Tarkovski|      Andrei|      1932|
|   Pialat|     Maurice|      1925|
|   Annaud|Jean-Jacques|      1943|
|Spielberg|      Steven|      1946|
|  Girault|        Jean|      1924|
|  Leconte|     Patrice|      1947|
|  Kubrick|     Stanley|      1928|
|   Besson|         Luc|      1959|
|  Chabrol|      Claude|      1930|
| Truffaut|    François|      null|
|  Audiard|     Jacques|      1952|
|  Kheiron|       Tabib|      null|
+---------+------------+----------+



### (e) Return the average number of actors in a film.


In [None]:
import math
data_select = spark.sql("select actors from Movies")
n_rows = data_select.count()
sum = 0
for i in range(n_rows):
  row = data_select.collect()[i][0];
  sum = sum + len(row)
avg = math.ceil(sum / n_rows)
print(avg)

3


### (f) Return the name of the actor that acted in the most movies.

In [None]:
import json
import numpy as np
from collections import Counter

data_select = spark.sql("select actors from Movies")
n_rows = data_select.count()

# mang all_actors chứa toàn bộ danh sách actors
all_actors = []
for i in range(n_rows):
  row = data_select.collect()[i][0];
  for j in range(len(row)):
    all_actors.append(row[j])

# nếu mảng all_actors không có phần tử nào thì không tìm được actor thỏa điều kiện
if (len(all_actors) == 0):
  print("no one")
else:
  # Khởi tạo bộ đếm trong thư viện Counter
  counter = Counter(all_actors)
  # Tìm phần tử xuất hiện nhiều nhất
  temp = counter.most_common(1)
  # Trích xuất thông tin tìm được
  actor = temp[0][0]
  amount = temp[0][1]
  print("Actor:", actor)
  print("Amount:", amount)

Actor: Row(_id='artist:27', first_name='Bruce', last_name='Willis', birth_date='1955', role='McClane')
Amount: 3


# Part 2: Real-world Data Manipulation

In this part of the lab, we will work with two collections in our MongoDB: `gia_ke_khai_raw` and `thuoc_raw` loaded at `input_data.gia_ke_khai_raw` and `input_data.thuoc_raw` respectively. We will use PySpark RDD and DataFrame to perform the following tasks:

### (a)  Read the datasets into a DataFrame and print out the schema and the number of records.

In [None]:
# YOUR CODE HERE
dfDrugs = my_spark.read.format("mongodb").option("database","input_data").option("collection","thuoc_raw").load()

dfDrugs.printSchema()
dfDrugs.show(5)
print(dfDrugs.count())

dfPrices = my_spark.read.format("mongodb").option("database","input_data").option("collection","gia_ke_khai_raw").load()

dfPrices.printSchema()
dfPrices.show(5)
print(dfPrices.count())

root
 |-- _id: string (nullable = true)
 |-- baoChe: string (nullable = true)
 |-- chuY: string (nullable = true)
 |-- congTyDk: string (nullable = true)
 |-- congTySx: string (nullable = true)
 |-- congTySxCode: string (nullable = true)
 |-- diaChiDk: string (nullable = true)
 |-- diaChiSx: string (nullable = true)
 |-- dongGoi: string (nullable = true)
 |-- dotPheDuyet: string (nullable = true)
 |-- giaKeKhai: null (nullable = true)
 |-- hieuLuc: null (nullable = true)
 |-- hoatChat: string (nullable = true)
 |-- huongDanSuDung: null (nullable = true)
 |-- huongDanSuDungBn: null (nullable = true)
 |-- id: string (nullable = true)
 |-- images: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- isHide: string (nullable = true)
 |-- meta: struct (nullable = true)
 |    |-- fileName: string (nullable = true)
 |-- nhomThuoc: string (nullable = true)
 |-- nongDo: string (nullable = true)
 |-- nuocDk: string (nullable = true)
 |-- nuocSx: string (nullable = true)
 

### (b) Show all records in the `thuoc_raw` collection that have the same active pharmaceutical ingredient (API) in their `hoatChat` field as their medicine name.


Notes: In the context of medication, API stands for Active Pharmaceutical Ingredient, which is the biologically active component in a drug that produces the intended therapeutic effect. In other words, it is the chemical substance that gives a medicine its medicinal properties.

In [None]:
# YOUR CODE HERE
dfDrugs.where(dfDrugs.hoatChat.contains(dfDrugs.tenThuoc)).show()

+--------------------+------+----+--------------------+--------------------+------------+--------------------+--------------------+--------------------+-----------+---------+-------+--------------------+--------------+----------------+-----------+------+------+------+---------+------+--------+--------+--------+----------+-----+----+------+----------+-----------+-----------+-----+------+----+------------------+---------+--------+
|                 _id|baoChe|chuY|            congTyDk|            congTySx|congTySxCode|            diaChiDk|            diaChiSx|             dongGoi|dotPheDuyet|giaKeKhai|hieuLuc|            hoatChat|huongDanSuDung|huongDanSuDungBn|         id|images|isHide|  meta|nhomThuoc|nongDo|  nuocDk|  nuocSx|phanLoai|  pheDuyet| rate|rows|rutSdk|rutSdkFile|   soDangKy|soQuyetDinh|state|taDuoc| ten|          tenThuoc|tieuChuan| tuoiTho|
+--------------------+------+----+--------------------+--------------------+------------+--------------------+--------------------+---

### (c) Create a new DataFrame from the `thuoc_raw` collection that splits the API in the `hoatChat` field into multiple rows. For example, "paracetamol" is the API in "Paracetamol 500 mg," and "amoxicillin" is the API in various medications such as "Amogentine 500mg/125mg," "Augbactam 1g/200mg," and "Viamomentin." The resulting DataFrame should have two columns: `hoatChat` and `thuocTuongUng` as a list. After processing the data, write it back to our MongoDB at `output_data.thuocthaythe`.

In [None]:
# YOUR CODE HERE
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import re
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import explode

# View 2 columns hoatChat and tenThuoc
dfDrugs.select('hoatChat', 'tenThuoc') \
       .show(10, truncate = False)

+----------------------------------------+----------------------+
|hoatChat                                |tenThuoc              |
+----------------------------------------+----------------------+
|amoxicilin  500mg, acid clavulanic 125mg|Amogentine 500mg/125mg|
|levofloxacin 250mg                      |PQAlevo               |
|Berberin clorid 0,01g                   |Berberin 0,01g        |
|Berberin clorid 0,05g                   |Berberin 0,05g        |
|Doxycyclin hydroclorid 100mg            |Doxycyclin TW3 100 mg |
|Paracetamol 500 mg                      |Paracetamol 500 mg    |
|Paracetamol 500 mg                      |Paracetamol TW3 500 mg|
|Tetracyclin 250 mg                      |Tetracyclin 250 mg    |
|Thiamin nitrat 0,01 g                   |Vitamin B1 0,01g      |
|ích mẫu, hương phụ, ngải cứu            |ích mẫu TW3           |
+----------------------------------------+----------------------+
only showing top 10 rows



In [None]:
# ***Create a new column in 'dfDrugs' called 'tenHoatChat' 
# ***Each row in 'tenHoatChat' is extracted from the name of active ingredients in 'hoatChat'

# UDF Function extracting name from active ingredients 
def extract_name_active(actives):
  names = []
  for active in actives:
    try:
      redundants = re.search(r"\d+(,\d+)? *(gam|mg|g|UI|ml|l|IU|\%)(\/\d+ ?ml)?", active).group(0)
      names.append(active.replace(redundants, "").strip())
    except:
      names.append(active.strip())
  
  return names

# Wrap the function above with PySpark SQL udf()
extract_name_udf = udf(extract_name_active, ArrayType(StringType()))

# Create a new column 'tenHoatChat'
dfDrugs = dfDrugs.withColumn('tenHoatChat', 
                             extract_name_udf(split('hoatChat', ', ')))

dfDrugs.select('tenThuoc', 'hoatChat', 'tenHoatChat') \
       .show(truncate = False)

+----------------------+----------------------------------------+--------------------------------+
|tenThuoc              |hoatChat                                |tenHoatChat                     |
+----------------------+----------------------------------------+--------------------------------+
|Amogentine 500mg/125mg|amoxicilin  500mg, acid clavulanic 125mg|[amoxicilin, acid clavulanic]   |
|PQAlevo               |levofloxacin 250mg                      |[levofloxacin]                  |
|Berberin 0,01g        |Berberin clorid 0,01g                   |[Berberin clorid]               |
|Berberin 0,05g        |Berberin clorid 0,05g                   |[Berberin clorid]               |
|Doxycyclin TW3 100 mg |Doxycyclin hydroclorid 100mg            |[Doxycyclin hydroclorid]        |
|Paracetamol 500 mg    |Paracetamol 500 mg                      |[Paracetamol]                   |
|Paracetamol TW3 500 mg|Paracetamol 500 mg                      |[Paracetamol]                   |
|Tetracycl

In [None]:
# ***Explode the column 'tenHoatChat' and use 'groupby()' function to group into different names of active ingredients

# Explode the values column 'tenHoatChat' into separate rows
explode_tenHoatChat = dfDrugs.selectExpr('tenThuoc', 'explode(tenHoatChat) as hoatChat')

# Group into different names of active ingredients
result = explode_tenHoatChat.groupBy('hoatChat') \
                            .agg(collect_list('tenThuoc') \
                            .alias('thuocTuongUng'))
result.show(truncate = False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|hoatChat                                                                                                                                             |thuocTuongUng                                                                                                                                                                                                                                                                                                                                 |
+-------------------------

In [None]:
# ***Write it back to our MongoDB at `output_data.thuocthaythe`
result.write.format("mongodb")              \
      .option("database", "output_data")    \
      .option("collection", "thuocthaythe") \
      .mode("overwrite")                    \
      .save()

### (d) Create new DataFrame from two mentioned above that contains  `tenThuoc`, `hoatChat`, `dongGoi`, `dvt` and `giaBan`. After process the data, write it back to our mongodb at `output_data.giathuoc`.

In [None]:
# Join 2 dataset 
gia_thuoc = dfDrugs.join(dfPrices, dfDrugs.soDangKy == dfPrices.sdk, "inner") \
                   .select(dfDrugs.tenThuoc, dfDrugs.hoatChat, dfPrices.dongGoi, dfPrices.dvt, dfPrices.giaBan).distinct()
gia_thuoc.show(truncate = False)

# Save the result
gia_thuoc.write.format("mongodb")        \
      .option("database", "output_data") \
      .option("collection", "giathuoc")  \
      .mode("overwrite")                 \
      .save()

+----------------------+----------------------------------------+-------------------------------------------------------------------------------------------------------------+----+------+
|tenThuoc              |hoatChat                                |dongGoi                                                                                                      |dvt |giaBan|
+----------------------+----------------------------------------+-------------------------------------------------------------------------------------------------------------+----+------+
|Amogentine 500mg/125mg|amoxicilin  500mg, acid clavulanic 125mg|Hộp 2 vỉ x 7 viên                                                                                            |Viên|7500  |
|Amogentine 500mg/125mg|amoxicilin  500mg, acid clavulanic 125mg|Hộp 2 vỉ x 7 viên                                                                                            |Viên|7000  |
|Paracetamol 500 mg    |Paracetamol 500 mg                  