# Part 0: Setting up

## Install and start MongoDB

In [None]:
!apt install -qq mongodb
!service mongodb start

## Download dataset and push to our mongodb 

In [None]:
# It's already the 21st century and people are very impatient, so they use Brotli for text and Zstd for everything else.
# Reference: https://github.com/google/brotli
!apt-get install -qq brotli

In [None]:
!wget -q https://csc14118.github.io/thuoc_raw.json.br
!wget -q https://csc14118.github.io/gia_ke_khai_raw.json.br
!wget -q https://csc14118.github.io/movies_lang.json.br 

In [None]:
!brotli -d *.br

In [None]:
!pip install -q pymongo

In [None]:
import json
from pymongo import MongoClient

client = MongoClient()

# Creation of the new database
db = client['input_data']

collection_name = ["gia_ke_khai_raw", "movies_lang", "thuoc_raw"]

# Push our data to mongodb
for data in collection_name:
    collection = db[data]
    chunks = json.load(open(f'{data}.json'))
    collection.insert_many(chunks)

# Create a dummy database to test
db = client['dummy']
db['chunks'].insert_many([{'Banh xeo': 'Rat ngon'},{'Banh bao': 'Cung ngon'}])

client.list_database_names()

## Install pyspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q "https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz"
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.1.1-bin-hadoop2.7"
import findspark
findspark.init()
findspark.find()

In [None]:
import pyspark
print(pyspark.__version__)

## Dirty trick to connect spark to our mongodb

In industry environment, please read the docs carefully to seting up these complicated things.

In [None]:
!rm $SPARK_HOME/jars/mongo*.jar
!rm $SPARK_HOME/jars/bson*.jar

In [None]:
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.12/mongodb-driver-3.12.12.jar
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.12.12/mongo-java-driver-3.12.12.jar
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/bson/4.6.0/bson-4.6.0.jar

!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.13/mongodb-driver-3.12.13.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.12.13/mongo-java-driver-3.12.13.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/bson/4.9.1/bson-4.9.1.jar

In [None]:
from pyspark.shell import spark
from pyspark import SparkContext,SparkConf

uri = "mongodb://localhost:27017/input_data"

from pyspark.sql import SparkSession

spark_jb = "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1"
my_spark = SparkSession \
    .builder \
    .config("spark.executor.memory", "1g") \
    .appName("csc14112") \
    .config("spark.mongodb.read.connection.uri", uri) \
    .config("spark.mongodb.write.connection.uri", uri) \
    .getOrCreate()

In [None]:
# Test read data from our mongo db
p = my_spark.read.format("mongodb").option("database","dummy").option("collection", "chunk").load()
p.printSchema()

In [None]:
p.show()

# Part 1: Introduction to PySpark


In this lab assignment, we will work with a movie dataset loaded into our MongoDB at `input_data.movies_lang`. We will use PySpark RDD and DataFrame to perform the following tasks:

In [None]:
# YOUR CODE HERE
# Read data from mongodb

df = my_spark.read.format("mongodb").option("database","input_data").option("collection", "movies_lang").load()
print(type(df))
df.printSchema()

### (a) Count the number of movies by country. Sort by count in decreasing order.

In [None]:
# dataframe
df.groupBy('country').count().orderBy('count', ascending=False).show()

In [None]:
# RDD
df.rdd\
              .map(lambda x: (x.country, 1))\
              .reduceByKey(lambda x, y: x+ y)\
              .toDF(['country', 'count'])\
              .orderBy('count', ascending=False).show()


### (b) Return the titles of the movies produced in France.

In [None]:
from pyspark.sql import functions as f
df.select('title').filter(f.col('country') == 'FR').show()

In [None]:
df.rdd.filter(lambda x: x.country == 'FR').map(lambda x: x.title).collect()

### (c) Return the title of the movies of which Sofia Coppola is one of the actresses. 

In [None]:
# YOUR CODE HERE

df.select('title').filter(f.array_contains(df.actors.first_name, 'Sofia')).filter(f.array_contains(df.actors.last_name, 'Coppola')).show()

In [None]:
# RDD

temp = df.filter(f.array_contains(f.expr("transform (actors, x -> x.first_name)"),"Sofia") & f.array_contains(f.expr("transform (actors, x -> x.last_name)"),"Coppola"))
temp2 = temp.select('title').rdd.flatMap(lambda x:x).collect()
temp2

### (d) Return the names and birth dates of the directors of movies produced in France.


In [None]:
df.select(f.concat_ws(" ", df.director.first_name, df.director.last_name).alias("full_name"), df.director.birth_date).filter("country == 'FR'").show()

In [None]:
# RDD
df.rdd \
    .filter(lambda row: row.country == "FR") \
    .map(lambda row: (row.director.first_name + " " + row.director.last_name, row.director.birth_date))  \
    .collect()


### (e) Return the average number of actors in a film.


In [None]:
# dataframe
number_of_movies = df.select('title','year').distinct().count()
number_of_actors = df.withColumn('number_of_actors', f.explode('actors')).count()
print(number_of_actors)
print(number_of_movies)
print("Result: ",number_of_actors / number_of_movies)

In [None]:
# RDD
number_of_movies = df.rdd.map(lambda x : (x.title, x.year)).distinct().count()
number_of_actors = df.rdd.map(lambda x : len(x.actors)).sum()

print(number_of_actors)
print(number_of_movies)
print("Result: ",number_of_actors / number_of_movies)

### (f) Return the name of the actor that acted in the most movies.

In [None]:
# dataframe

temp = df.withColumn('actor', f.explode(df.actors))
temp = temp.select(f.concat_ws(" ", temp.actor.first_name, temp.actor.last_name).alias("full_name")).groupBy('full_name').count().orderBy('count', ascending=False)
max_movies = temp.select(f.max('count')).collect()[0][0]
temp.select('full_name').filter(f.col('count') == max_movies).show()

In [None]:
# RDD

from operator import add
df.rdd.flatMap(lambda x : x.actors) \
    .map(lambda x : (x.first_name + " " + x.last_name, 1)) \
    .reduceByKey(add) \
    .sortBy(lambda x: x[1], ascending=False) \
    .max(key=lambda x : x[1])[0]

# Part 2: Real-world Data Manipulation

In this part of the lab, we will work with two collections in our MongoDB: `gia_ke_khai_raw` and `thuoc_raw` loaded at `input_data.gia_ke_khai_raw` and `input_data.thuoc_raw` respectively. We will use PySpark RDD and DataFrame to perform the following tasks:

### (a)  Read the datasets into a DataFrame and print out the schema and the number of records.

In [None]:
gkk = my_spark.read.format("mongodb").option("database","input_data").option("collection", "gia_ke_khai_raw").load()
print("The schema of gia_ke_khai_raw")
gkk.printSchema()
print('Numbers of record: ', gkk.count())
thuoc = my_spark.read.format("mongodb").option("database","input_data").option("collection", "thuoc_raw").load()
print('The schema of thuoc_raw')
thuoc.printSchema()
print('Numbers of record: ', thuoc.count())

### (b) Show all records in the `thuoc_raw` collection that have the same active pharmaceutical ingredient (API) in their `hoatChat` field as their medicine name.


Notes: In the context of medication, API stands for Active Pharmaceutical Ingredient, which is the biologically active component in a drug that produces the intended therapeutic effect. In other words, it is the chemical substance that gives a medicine its medicinal properties.

In [None]:
# dataframe

thuoc.where(thuoc.tenThuoc == thuoc.hoatChat).show()

In [None]:
# RDD

import pprint # pretty print
result = thuoc.rdd.filter(lambda x : x.tenThuoc == x.hoatChat).collect()
pprint.pprint(result[:20]) # only prunt first 20 lines (for readability)

### (c) Create a new DataFrame from the `thuoc_raw` collection that splits the API in the `hoatChat` field into multiple rows. For example, "paracetamol" is the API in "Paracetamol 500 mg," and "amoxicillin" is the API in various medications such as "Amogentine 500mg/125mg," "Augbactam 1g/200mg," and "Viamomentin." The resulting DataFrame should have two columns: `hoatChat` and `thuocTuongUng` as a list. After processing the data, write it back to our MongoDB at `output_data.thuocthaythe`.

In [None]:
# dataframe

thuoc_split_df = thuoc.select("hoatChat", f.split("hoatChat", ", ").alias("APIs"))
thuoc_explode_df = thuoc_split_df.select("hoatChat", f.explode("APIs").alias("thuocTuongUng"))
thuoc_grouped_df = thuoc_explode_df.groupBy("hoatChat").agg(f.collect_list("thuocTuongUng").alias("thuocTuongUng"))
thuoc_grouped_df.show()
# thuoc_grouped_df.write.format("mongodb").option("database", "output_data").option("collection", "thuocthaythe").save()

### (d) Create new DataFrame from two mentioned above that contains  `tenThuoc`, `hoatChat`, `dongGoi`, `dvt` and `giaBan`. After process the data, write it back to our mongodb at `output_data.giathuoc`.

In [None]:
conditions = [thuoc.tenThuoc == gkk.tenThuoc, 
              thuoc.hoatChat == gkk.hoatChat,
              thuoc.dongGoi == gkk.dongGoi]
table_join= thuoc.join(gkk, conditions, 'outer')
table_join.select(thuoc.tenThuoc, gkk.dvt, gkk.giaBan, thuoc.hoatChat, thuoc.dongGoi).show()