# Install Java and Spark on Hadoop

In [1]:
# install java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# install spark (change the version number if needed)
!wget -q https://downloads.apache.org/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
# unzip the spark file to the current folder
!tar xf spark-3.5.5-bin-hadoop3.tgz

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:3 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:9 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Packages [2,690 kB]
Get:12 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,243 kB]
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [

In [4]:
# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.5-bin-hadoop3"

# Create a SparkSession in Python

In [17]:
from pyspark.sql.functions import size

In [5]:
# start pyspark
!pip install findspark
import findspark
findspark.init()



In [6]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local")\
          .appName("Spark APIs Exercises")\
          .config("spark.some.config.option", "some-value")\
          .getOrCreate()

# Example 1: WordCount with Spark DataFrames and Spark RDDs




In [7]:
# Load the data
!git clone https://github.com/nnthaofit/CSC14118.git

Cloning into 'CSC14118'...
remote: Enumerating objects: 17, done.[K
remote: Counting objects: 100% (17/17), done.[K
remote: Compressing objects: 100% (15/15), done.[K
remote: Total 17 (delta 1), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (17/17), 818.44 KiB | 4.20 MiB/s, done.
Resolving deltas: 100% (1/1), done.


### Spark DataFrame-based WordCount

In [8]:
linesDF = spark.read.text("CSC14118/ppap.txt")
linesDF.show(linesDF.count(),truncate = False)

from pyspark.sql import functions as f
wordsDF = linesDF.withColumn("word", f.explode(f.split(f.col("value"), " ")))\
    .groupBy("word")\
    .count()\
    .sort("count", ascending = False)
wordsDF.show()

+----------------------------+
|value                       |
+----------------------------+
|ppap                        |
|i have a pen                |
|i have an apple             |
|ah apple pen                |
|i have a pen                |
|i have a pineapple          |
|ah pineapple pen            |
|ppap pen pineapple apple pen|
+----------------------------+

+---------+-----+
|     word|count|
+---------+-----+
|      pen|    6|
|     have|    4|
|        i|    4|
|    apple|    3|
|pineapple|    3|
|        a|    3|
|     ppap|    2|
|       ah|    2|
|       an|    1|
+---------+-----+



###RDD-based WordCount

In [9]:
linesRdd = spark.sparkContext.textFile("CSC14118/ppap.txt")
wordsRdd = linesRdd.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b)\
    .sortBy(lambda pair:-1*pair[1])
wordsRdd.collect()

[('pen', 6),
 ('i', 4),
 ('have', 4),
 ('a', 3),
 ('apple', 3),
 ('pineapple', 3),
 ('ppap', 2),
 ('ah', 2),
 ('an', 1)]

# Exercise 1: Data query with Spark DataFrame

In [10]:
# clone the example data files from GitHub to Drive
!git clone https://github.com/nnthaofit/CSC14118.git

fatal: destination path 'CSC14118' already exists and is not an empty directory.


###0. Load the data file: movies.json

In [11]:
df = spark.read.format("json").load("/content/CSC14118/movies.json")

### 1a. Show the schema of DataFrame that stores the movies dataset.

In [12]:
df.printSchema()

root
 |-- cast: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)



### 1b. Show the number of distinct movies in the dataset

In [13]:
df.distinct().count()

28789

### 2. Count the number of movies released during the years 2012 and 2015 (included)

In [14]:
df.distinct().where((2012 <= df.year) & (df.year <= 2015)).count()

1015

### 3. Show the year in which the number of movies released is highest. One highest year is enough

In [15]:
df.distinct().select("year").sort("year", ascending=False).show(1)

+----+
|year|
+----+
|2018|
+----+
only showing top 1 row



### 4. Show the list of movies such that for each film, the number of actors/actresses is at least five, and the number of genres it belongs to is at most two genres.

In [18]:
filtered_df = df.filter(
    (size("cast") >= 5) & (size("genres") <= 2)
)

filtered_df.select("title", "cast", "genres").show(truncate=False)

+--------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+
|title                           |cast                                                                                                                                                                          |genres          |
+--------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+
|A Desperate Chance              |[Earle Foxe, Alie Hollister, Robert G. Vignola, Helen Lidroth, Miriam Cooper]                                                                                                 |[Drama]         |
|The Archeologist                |[Charlotte Burton, Edward Coxen, George Field, Winifred Gr

### 5. Show the **movies** whose names are longest

In [19]:
from pyspark.sql.functions import length, max as spark_max

# Tính độ dài tên phim
df_with_length = df.withColumn("title_length", length("title"))

# Tìm độ dài lớn nhất
max_length = df_with_length.agg(spark_max("title_length")).collect()[0][0]

# Lọc ra các phim có độ dài tên bằng max_length
longest_title_movies = df_with_length.filter(df_with_length["title_length"] == max_length)

# Hiển thị
longest_title_movies.select("title").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------+
|title                                                                                                         |
+--------------------------------------------------------------------------------------------------------------+
|Cornell-Columbia-University of Pennsylvania Boat Race at Ithaca, N.Y., Showing Lehigh Valley Observation Train|
+--------------------------------------------------------------------------------------------------------------+



### 6. Show the movies whose name contains the word “fighting” (case-insensitive).

In [20]:
from pyspark.sql.functions import lower

filtered_df = df.filter(lower(df.title).contains("fighting"))
filtered_df.select("title").show(truncate=False)

+-----------------------+
|title                  |
+-----------------------+
|A Fighting Colleen     |
|Fighting Cressy        |
|Fighting Destiny       |
|Fighting for Gold      |
|The Fighting Heart     |
|The Fighting Line      |
|The Fighting Guide     |
|The Fighting Streak    |
|The Fighting Blade     |
|The Fighting Coward    |
|Fighting Fury          |
|The Fighting Adventurer|
|The Fighting Sap       |
|The Fighting Demon     |
|Fighting Fate          |
|The Fighting Heart     |
|Fighting Luck          |
|The Fighting Smile     |
|Fighting the Flames    |
|Fighting Youth         |
+-----------------------+
only showing top 20 rows



### 7. Show the list of distinct genres appearing in the dataset

In [21]:
from pyspark.sql.functions import explode

distinct_genres = df.select(explode("genres").alias("genre")).distinct()
distinct_genres.show(truncate=False)

+-------------+
|genre        |
+-------------+
|Crime        |
|Romance      |
|Thriller     |
|Slasher      |
|Found Footage|
|Adventure    |
|Teen         |
|Martial Arts |
|Sports       |
|Drama        |
|War          |
|Documentary  |
|Family       |
|Fantasy      |
|Silent       |
|Disaster     |
|Legal        |
|Mystery      |
|Supernatural |
|Suspense     |
+-------------+
only showing top 20 rows



### 8. List all movies in which the actor Harrison Ford has participated.

In [26]:
from pyspark.sql.functions import array_contains

# harrison_ford_movies = df.filter(array_contains(df.cast, "Harrison Ford"))
# harrison_ford_movies.select("title").show(truncate=False)

df.withColumn("actor", explode(df["cast"])).where(lower(col("actor")) == "harrison ford").select(df.columns).show()

+--------------------+-----------------+--------------------+----+
|                cast|           genres|               title|year|
+--------------------+-----------------+--------------------+----+
|[Constance Talmad...|[Romance, Comedy]|Experimental Marr...|1919|
|[Constance Talmad...|         [Comedy]| Happiness a la Mode|1919|
|[Constance Talmad...|         [Comedy]|Romance and Arabella|1919|
|[Vivian Martin, H...|         [Comedy]|      The Third Kiss|1919|
|[Harrison Ford, C...|         [Comedy]|The Veiled Adventure|1919|
|[Constance Talmad...|         [Comedy]|          Who Cares?|1919|
|[Vivian Martin, H...|          [Drama]|You Never Saw Suc...|1919|
|[Norma Talmadge, ...|          [Drama]| The Wonderful Thing|1921|
|[Alma Rubens, Har...|        [Mystery]|      Find the Woman|1922|
|[Constance Talmad...|          [Drama]| The Primitive Lover|1922|
|[Norma Talmadge, ...| [Romance, Drama]|     Smilin' Through|1922|
|[Helen Jerome Edd...|          [Drama]|     When Love Comes|1

### 9. List all movies in which the actors/actresses whose names include the word “Lewis“ (case-insensitive) have participated.

In [23]:
from pyspark.sql.functions import explode, lower, col

# Tách từng diễn viên ra dòng riêng, rồi lọc
lewis_cast_df = df.select("title", explode("cast").alias("actor"))
lewis_movies = lewis_cast_df.filter(lower(col("actor")).contains("lewis"))

# Lấy danh sách phim duy nhất
lewis_movies.select("title").distinct().show(truncate=False)


+---------------------------+
|title                      |
+---------------------------+
|Inez from Hollywood        |
|The Ballad of Jack and Rose|
|Salvage                    |
|At War with the Army       |
|Sex and the City 2         |
|Diary of a Madman          |
|Going Straight             |
|Cinderfella                |
|Gangs of New York          |
|That's My Boy              |
|Rock-A-Bye Baby            |
|The Crucible               |
|The Million Dollar Handicap|
|The Girl from Montmartre   |
|Romance                    |
|New Morals for Old         |
|The Hardys Ride High       |
|Andy Hardy Meets Debutante |
|Love Laughs at Andy Hardy  |
|Cheaper to Marry           |
+---------------------------+
only showing top 20 rows



### 10. Show top five actors/actresses that have participated in most movies.

In [24]:
from pyspark.sql.functions import explode, col

top_actors = (
    df.select(explode("cast").alias("actor"))
      .groupBy("actor")
      .count()
      .orderBy(col("count").desc())
      .limit(5)
)

top_actors.show(truncate=False)

+----------------+-----+
|actor           |count|
+----------------+-----+
|Harold Lloyd    |190  |
|Hoot Gibson     |142  |
|John Wayne      |136  |
|Charles Starrett|116  |
|Bebe Daniels    |103  |
+----------------+-----+



#Exercise 2: RDD-based mainpulation


*   The data is already in one ore more RDDs.
*   You must not convert RDD to DF or use pure Python code.


### 1. Consider a string s that includes only alphabetical letters and spaces. Check whether s is a palindrome (case-insensitive).

In [38]:
s = "U i a i u"
rdd = spark.sparkContext.parallelize(s.lower(),1)
rdd.collect()

['u', ' ', 'i', ' ', 'a', ' ', 'i', ' ', 'u']

In [39]:
rdd_id = spark.sparkContext.parallelize(range(0, rdd.count()), 1)
rdd_id.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8]

In [40]:
rdd_a = rdd_id.zip(rdd)
rdd_a.collect()

[(0, 'u'),
 (1, ' '),
 (2, 'i'),
 (3, ' '),
 (4, 'a'),
 (5, ' '),
 (6, 'i'),
 (7, ' '),
 (8, 'u')]

In [41]:
rdd_b = rdd_a.sortBy(lambda x: x[0], ascending=False)
rdd_b.collect()

[(8, 'u'),
 (7, ' '),
 (6, 'i'),
 (5, ' '),
 (4, 'a'),
 (3, ' '),
 (2, 'i'),
 (1, ' '),
 (0, 'u')]

In [42]:
rdd_compare = rdd_a.zip(rdd_b)
rdd_compare.collect()

[((0, 'u'), (8, 'u')),
 ((1, ' '), (7, ' ')),
 ((2, 'i'), (6, 'i')),
 ((3, ' '), (5, ' ')),
 ((4, 'a'), (4, 'a')),
 ((5, ' '), (3, ' ')),
 ((6, 'i'), (2, 'i')),
 ((7, ' '), (1, ' ')),
 ((8, 'u'), (0, 'u'))]

In [43]:
if (rdd_compare.filter(lambda row: row[0][1] != row[1][1]).count() == 0):
  print("Palindrome")
else:
  print("Not Palindrome")

Palindrome


### 2. Consider a string s that includes only alphabetical letters and spaces. Check whether s is a pangram (case-insensitive).

In [51]:
s = "Nhat Minh"
rdd_str = spark.sparkContext.parallelize(s.replace(" ","").lower(),1)
rdd_str.collect()

['n', 'h', 'a', 't', 'm', 'i', 'n', 'h']

In [52]:
rdd_str = rdd_str.sortBy(lambda x: x[0], ascending=False)
rdd_str.collect()

6

In [53]:
if (rdd_str.distinct().count() == 26):
  print("Pangram")
else:
  print("Not Pangram")

Not Pangram


#Exercise 3: Frequent patterns and association rules mining

### 0. Load the data file: foodmart.csv


*  A record is a tuple of binary values {0, 1}, each of which denotes the presence of an item (1: bought, 0: not bought).



In [None]:
!git clone https://github.com/nnthaofit/CSC14118.git
df = spark.read.csv("CSC14118/foodmart.csv", header=True, inferSchema = True)
df.show()

### 1. Convert the given data to the format required by Spark MLlib FPGrowth.

###2.	Apply Spark MLlib FPGrowth to the formatted data. Mine the set of frequent patterns with the minimum support of 0.1. Mine the set of association rules with the minimum confidence of 0.9.

#Exercise 4: Classification

###0. Load the data file: mushroom.csv
*   The data represents a collection of mushroom species.
*   There are 8124 examples, each of which has 22 attributes and it is categorized into either “edible” (e) or “poisonous” (p)


### 1.	Prepare the train and test sets following the ratio 8:2

### 2. Fit a decision tree model on the training set, using Spark MLlib DecisionTreeClassifier with default parameters

### 3. Fit a random forest model on the training set, using Spark MLlib RandomForestClassification with default parameters

### 4. Evaluate the two models on the same test set using the following metrics: areaUnderROC and areaUnderPR

###5. Chain the above steps into a single pipeline

# Exercise 5: Clustering

### 1.	Cluster the data by using Spark MLlib KMeans with k = 2, 3, and 5, using Euclidean distance and cosine distance

### 2. Evaluate each of the above clustering results using silhoutte score. Which configuration yeilds the best clustering?

###3. Chain the above steps into a single pipeline

###4. For each clustering result obtained above, count the number of examples that belong to each of the three species.

## Exercise 6: Network manipulation with Spark GraphFrames

###0. Load the data files: users.txt and followers.txt

###1.	Construct a graph from the given data to demonstrate a tiny social network


###2.	Apply Graphs graphPageRank to the network to obtain a ranking list of users in terms of followers

###3. Find connected components on the graph, using Graphs connectedComponents or stronglyConnectedComponents