# Part 0: Setting up

## Note: 
Link colab: https://colab.research.google.com/drive/1ddG9NCV7K_SB8JWOxojBwVwI6-gtFC9D#scrollTo=O8YNrN4lAZa5

## Install and start MongoDB

In [None]:
!apt install -qq mongodb
!service mongodb start

## Download dataset and push to our mongodb 

In [None]:
# It's already the 21st century and people are very impatient, so they use Brotli for text and Zstd for everything else.
# Reference: https://github.com/google/brotli
!apt-get install -qq brotli

In [None]:
!wget -q https://csc14118.github.io/thuoc_raw.json.br
!wget -q https://csc14118.github.io/gia_ke_khai_raw.json.br
!wget -q https://csc14118.github.io/movies_lang.json.br 

In [None]:
!brotli -d *.br

In [None]:
!pip install -q pymongo

In [None]:
import json
from pymongo import MongoClient

client = MongoClient()

# Creation of the new database
db = client['input_data']

collection_name = ["gia_ke_khai_raw", "movies_lang", "thuoc_raw"]

# Push our data to mongodb
for data in collection_name:
    collection = db[data]
    chunks = json.load(open(f'{data}.json', encoding='utf-8'))
    collection.insert_many(chunks)

# Create a dummy database to test
db = client['dummy']
db['chunks'].insert_many([{'Banh xeo': 'Rat ngon'},{'Banh bao': 'Cung ngon'}])

client.list_database_names()

## Install pyspark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q "https://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz"
!tar xf spark-3.1.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.1.1-bin-hadoop2.7"
import findspark
findspark.init()
findspark.find()

In [None]:
import pyspark
print(pyspark.__version__)

## Dirty trick to connect spark to our mongodb

In industry environment, please read the docs carefully to seting up these complicated things.

In [None]:
!rm $SPARK_HOME/jars/mongo*.jar
!rm $SPARK_HOME/jars/bson*.jar

In [None]:
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.12/10.1.1/mongo-spark-connector_2.12-10.1.1.jar
# Because having a new version of mongodb-driver and mongodb-java-driver in https://repo1.maven.org/maven2/org/mongodb so we need change
# this link.
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.12/mongodb-driver-3.12.12.jar
# !cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.12.12/mongo-java-driver-3.12.12.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongodb-driver/3.12.13/mongodb-driver-3.12.13.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.12.13/mongo-java-driver-3.12.13.jar
!cd $SPARK_HOME/jars && wget https://repo1.maven.org/maven2/org/mongodb/bson/4.6.0/bson-4.6.0.jar


In [None]:
from pyspark.shell import spark
from pyspark import SparkContext,SparkConf

uri = "mongodb://localhost:27017/input_data"

from pyspark.sql import SparkSession

spark_jb = "org.mongodb.spark:mongo-spark-connector_2.12:10.1.1"
my_spark = SparkSession \
    .builder \
    .config("spark.executor.memory", "1g") \
    .appName("csc14112") \
    .config("spark.mongodb.read.connection.uri", uri) \
    .config("spark.mongodb.write.connection.uri", uri) \
    .getOrCreate()

In [None]:
# Test read data from our mongo db
p = my_spark.read.format("mongodb").option("database","input_data").option("collection", "gia_ke_khai_raw").load()
p.printSchema()

In [None]:
p.show()

# Part 1: Introduction to PySpark


In this lab assignment, we will work with a movie dataset loaded into our MongoDB at `input_data.movies_lang`. We will use PySpark RDD and DataFrame to perform the following tasks:

In [None]:
# YOUR CODE HERE
# Read data from mongodb
df_movies_lang = my_spark.read.format("mongodb").option("database","input_data").option("collection", "movies_lang").load()
df_movies_lang.printSchema()


In [None]:
df_movies_lang.select("_id", "actors").show()

### (a) Count the number of movies by country. Sort by count in decreasing order.

In [None]:
# YOUR CODE HERE
from pyspark.sql.functions import col
gb_country=df_movies_lang.groupBy("country").count()
t=gb_country.orderBy(col("count").desc())
for country,count in t.collect():
  print("country:",f"{country:<3}","|number of movies:",count)

### (b) Return the titles of the movies produced in France.

In [None]:
# YOUR CODE HERE

title_FR=df_movies_lang.select("title").where(df_movies_lang.country=="FR")
print("The titles of the movies produced in France:")
for title in title_FR.collect():
  print(title.title)

### (c) Return the title of the movies of which Sofia Coppola is one of the actresses. 

In [None]:
# YOUR CODE HERE
df = df_movies_lang.selectExpr("title", "explode(actors) as actor")
df = df.filter("actor.first_name = 'Sofia'")
print("The title of the movies of which Sofia Coppola is one of the actresses:")
for title,actor in df.collect():
    print(title)

### (d) Return the names and birth dates of the directors of movies produced in France.


In [None]:
# YOUR CODE HERE

# Filter movies produced in France
france_movies = df_movies_lang.rdd.filter(lambda x: x.country == 'FR')

# Extract director name and birth date
director_info = france_movies.map(lambda x: (x.director['last_name'] + " " + x.director['first_name'], x.director['birth_date']))
# Remove duplicates
distinct_director_info = director_info.distinct()
# Print the results
for name, birthdate in distinct_director_info.collect():
    print("Director Name: ", name, " | Birth Date: ", birthdate)

### (e) Return the average number of actors in a film.


In [None]:
# YOUR CODE HERE

from pyspark.sql.functions import size, avg

# Count total number of actors per row
df_movies_lang_with_count = df_movies_lang.withColumn('actor_count', size(df_movies_lang.actors))
df_movies_lang_with_count.select("_id", "actor_count").show()
# Calculate average number of actors
avg_actors = df_movies_lang_with_count.agg(avg('actor_count')).collect()[0][0]

print(f'Average number of actors: {avg_actors:.2f}')

### (f) Return the name of the actor that acted in the most movies.

In [None]:
# YOUR CODE HERE

from pyspark.sql.functions import explode, col

# Explode the "actors" column to create a new row for each actor in each movie
exploded_df = df_movies_lang.select("_id", explode(col("actors")).alias("actor"))

# Group by actor _id and count the number of movies they appeared in
actor_counts = exploded_df.groupBy("actor._id").count()

# Sort by count in descending order and take the first row
most_frequent_actor = actor_counts.orderBy(col("count").desc())
most_frequent_actor.show()
# Get the id of the actor that appeared in the most movies
actor_id = most_frequent_actor.first()[0]

# Filter for the desired actor ID and select their name
actor = exploded_df.filter(col("actor._id") == actor_id).select("actor._id", "actor.last_name", "actor.first_name").first()

# Print the name of the actor
print(f'Name of the actor that acted in the most movies: {actor[0]} -', actor[1] + " " + actor[2])

# Part 2: Real-world Data Manipulation

In this part of the lab, we will work with two collections in our MongoDB: `gia_ke_khai_raw` and `thuoc_raw` loaded at `input_data.gia_ke_khai_raw` and `input_data.thuoc_raw` respectively. We will use PySpark RDD and DataFrame to perform the following tasks:

### (a)  Read the datasets into a DataFrame and print out the schema and the number of records.

In [None]:
# YOUR CODE HERE
gia_ke_khai_df = my_spark.read.format("mongodb").option("database","input_data").option("collection", "gia_ke_khai_raw").load()
print("gia_ke_khai_raw schema:")
gia_ke_khai_df.printSchema()
print("number of records gia_ke_khai_raw:", gia_ke_khai_df.count())

thuoc_df  = my_spark.read.format("mongodb").option("database","input_data").option("collection", "thuoc_raw").load()
print("thuoc_raw schema:")
thuoc_df.printSchema()
print("number of records thuoc_raw :", thuoc_df.count())
# raise NotImplementedError

### (b) Show all records in the `thuoc_raw` collection that have the same active pharmaceutical ingredient (API) in their `hoatChat` field as their medicine name.


Notes: In the context of medication, API stands for Active Pharmaceutical Ingredient, which is the biologically active component in a drug that produces the intended therapeutic effect. In other words, it is the chemical substance that gives a medicine its medicinal properties.

In [None]:
# YOUR CODE HERE
same_apis_df = thuoc_df.alias("thuoc1").join(thuoc_df.alias("thuoc2"),
    col("thuoc1.hoatChat") == col("thuoc2.tenThuoc"), "inner")\
.select(col("thuoc1._id"),col("thuoc1.baoChe"),col("thuoc1.tenThuoc"), col("thuoc1.hoatChat"))

same_apis_df.show(truncate=False)
# raise NotImplementedError

### (c) Create a new DataFrame from the `thuoc_raw` collection that splits the API in the `hoatChat` field into multiple rows. For example, "paracetamol" is the API in "Paracetamol 500 mg," and "amoxicillin" is the API in various medications such as "Amogentine 500mg/125mg," "Augbactam 1g/200mg," and "Viamomentin." The resulting DataFrame should have two columns: `hoatChat` and `thuocTuongUng` as a list. After processing the data, write it back to our MongoDB at `output_data.thuocthaythe`.

In [None]:
# YOUR CODE HERE
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import re
from pyspark.sql.functions import collect_list
# Define a Python function to extract the keys from the values column
def extract_keys(value):
    values = value.split(", ")
    keys = []
    for value in values:
        try:
            sub_string = re.search(f"\d+(,\d+)? *(gam|mg|g|UI|ml|l|IU|\%)(\/\d+ ?ml)?", value).group(0)
            keys.append(value.replace(sub_string, "").replace("  ", "").strip().lower())
        except:
            keys.append(value.strip().lower())
    # keys = re.findall(r"\b\w+(?:\s+\w+)*\b(?=\s+\d+\s*[mg]?)", value)
    # keys = [key.lower() for key in keys]
    return keys

# Define a UDF that wraps the Python function
extract_keys_udf = udf(extract_keys, ArrayType(StringType()))

# Apply the UDF to the values column to create a new column with the keys as an array
thuoc_df = thuoc_df.withColumn('hoatChat_values', extract_keys_udf('hoatChat'))
thuoc_df.select("tenThuoc", "hoatChat_values").show()
from pyspark.sql.functions import explode

# Explode the values column into separate rows and rename the columns
df_exploded = thuoc_df.selectExpr('tenThuoc', 'explode(hoatChat_values) as value') \
                .withColumnRenamed('tenThuoc', 'tenThuoc') \
                .withColumnRenamed('value', 'hoatChat')

# Show the resulting DataFrame
df_exploded.show()
# raise NotImplementedError

# Assuming your dataframe is called "df"
result_df = df_exploded.groupBy("hoatChat").agg(collect_list("tenThuoc").alias("thuocTuongUng"))

# Show the result dataframe
result_df.show()

# Write it back to our MongoDB at `output_data.thuocthaythe`
result_df.write.format("mongodb").option("database","output_data").option("collection", "thuocthaythe").mode("overwrite").save()

In [None]:
test_c_output_df = my_spark.read.format("mongodb").option("database","output_data").option("collection", "thuocthaythe").load()
test_c_output_df.show()

### (d) Create new DataFrame from two mentioned above that contains  `tenThuoc`, `hoatChat`, `dongGoi`, `dvt` and `giaBan`. After process the data, write it back to our mongodb at `output_data.giathuoc`.

In [None]:
# YOUR CODE HERE
# gia_ke_khai_df.show(truncate=False)
# thuoc_df.show(truncate=False)
# Explode the values column into separate rows, get id to compare with sdk in gia_ke_khai_df and rename the columns 
thuoc_df_exploded = thuoc_df.selectExpr('id','tenThuoc', 'explode(hoatChat_values) as value') \
                .withColumnRenamed('tenThuoc', 'tenThuoc') \
                .withColumnRenamed('value', 'hoatChat')
thuoc_df_exploded.show()
# Using join with comparing "id" in thuoc_df_exploded with "sdk" in gia_ke_khai_df and select attributes in paraphase 
gia_thuoc_df = thuoc_df_exploded.alias("thuoc").join(gia_ke_khai_df.alias("giakekhai"),
    col("thuoc.id") == col("giakekhai.sdk"))\
    .select(col("thuoc.tenThuoc"),col("thuoc.hoatChat"),col("giakekhai.dongGoi"), col("giakekhai.dvt"), col("giakekhai.giaBan"))
gia_thuoc_df.show()
# Write it back to our MongoDB at `output_data.giathuoc`
gia_thuoc_df.write.format("mongodb") \
    .option("database", "output_data") \
    .option("collection", "giathuoc") \
    .mode("overwrite") \
    .save()

In [None]:
test_d_output_df = my_spark.read.format("mongodb").option("database","output_data").option("collection", "giathuoc").load()
test_d_output_df.show()