Copyright (C) 2018 Software Platform Lab, Seoul National University

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

# KMOOC_HW1: Spark RDD / SparkSQL

## Colab Spark Setup

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -q
!curl http://mirror.navercorp.com/apache/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz --output spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Get:1 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Connecting to security.u0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.39)] [Co                                                                               Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (91.189.91.39)] [Co                                                                               Get:3 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
0% [3 InRelease 2,601 B/88.7 kB 3%] [Connecting to security.ubuntu.com (91.189.0% [1 InRelease gpgv 21.3 kB] [3 InRelease 2,601 B/88.7 kB 3%] [Connecting to s                                                                               Get:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,626 B]
0% [1 InRelease gpgv 21.3 k

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

## Wikipedia dataset sample

This time we're not using HDFS to load the data. Sample data are loaded by Python code directly.

The data has four fields: project, title, pageview count and size.

In [0]:
wikipedia_data_sample = ["commons.m File:Gemblong.JPG 1 9717"
,"pl Beata_Tyszkiewicz 10 207378"
,"en Special:RecentChangesLinked/Roswell_(TV_series) 1 14617"
,"de Grafische_Benutzeroberfl%C3%A4che 1 22549"
,"en Simeon_I_of_Bulgaria 5 385793"
,"en Rainbow_Six_(novel) 8 122792"
,"es Pediatr%C3%ADa 5 73598"
,"sv Ett_uts%C3%B6kt_universum 1 9499"
,"en Video_game_content_rating_system 4 112324"
,"es Yuno_Gasai 2 55260"
,"en File:Georg_Wilhelm_Friedrich_Hegel00.jpg 1 43395"
,"en Anestia_ombrophanes 1 8881"
,"et Seitse 2 84874"
,"en And_I_Am_Telling_You_I%27m_Not_Going 4 85690"
,"he %D7%A4%D7%A8%D7%93%D7%99%D7%92%D7%9E%D7%94 1 13887"
,"zh File:Pictogram_voting_keep-green.svg 1 15106"
,"sv Special:Senaste_relaterade_%C3%A4ndringar/Homestead,_Florida 1 7677"
,"pt Categoria:Ambientes_de_desenvolvimento_integrado_livres 1 8151"
,"de.voy Plattensee 1 43748"
,"en Sia 1 8938"
,"en Category:Toronto_Toros_players 2 0"
,"en Special:Export/Helsinki_Accords 1 19899"
,"xh Special:Contributions/Kpeterzell 1 5883"
,"nl Sia 1 0"
,"no Sia 5 87075"
,"en Special:Contributions/2.31.218.202 1 7402"
,"es Sia 1 10329"
,"de Datei:BSicon_uhKBHFe.svg 1 9786"
,"en Randolph_County,_Alabama 1 21431"
,"es Woodkid 3 70494"
,"en Tu_Bishvat 3 56438"
,"cs Radiohead 1 14325"
,"es Naturaleza_sangre 1 9286"
,"en Anatolia_(disambiguation) 1 7980"
,"pt Woodkid 1 8982"
,"pt Titanoboa_cerrejonensis 5 64540"
,"commons.m Woodkid 1 19278"
,"fi Matti_Inkinen 1 10138"
,"ja %E3%83%95%E3%82%A1%E3%82%A4%E3%83%AB:Esfahan_(Iran)_Emam_Mosque.JPG 1 33168"
,"en Psicobloc 1 12739"
,"en Macael,_Spain 1 12658"
,"fa Ryuichi_Sakamoto 1 22855"
,"fr Sp%C3%A9cial:Pages_li%C3%A9es/Fichier:Wiki-ezokuroten5.jpg 1 21955"
,"nl Overleg_gebruiker:82.171.157.232 1 0"
,"en Thomas_%26_Mack_Center 2 41010"
,"en Warren_Beatty 49 2631986"
,"uz Auberville 1 11401"
,"es Naturaleza_sangrea 10 9286"
,"pt Queima_de_sutis 2 8982"]

In [0]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

ss = SparkSession.builder.master("local[*]").getOrCreate()
sc = ss.sparkContext

# Parallelize the data and split into columns
lines = sc.parallelize(wikipedia_data_sample)
columns = lines.map(lambda line: tuple(line.split(" ")))

## Problem 1. Spark RDD 


In [13]:
# Create (project, count) tuples
project_count_tuples = columns.map(lambda column: (column[0], int(column[2]))) # Data type is int in here, not long.

# Hint: Try using ReduceByKey and Filter transforms!

########## ANSWER ##########
counts = project_count_tuples.reduceByKey(lambda x,y: x+y)
counts.filter(lambda x: x[1]>=20).collect()

[('en', 87), ('es', 22)]

## Problem 2. SparkSQL

In [17]:
# Create a Spark DataFrame from wikipedia_data_sample (equivalent of an 'SQL table' in Spark)
df = ss.createDataFrame(columns, ['project', 'title', 'count', 'size'])

# Create a table view called "WikipediaTable"
df.createOrReplaceTempView("WikipediaTable")

########## ANSWER ##########
dfs = ss.sql("SELECT project,title,size FROM WikipediaTable")
seldf = dfs.filter(dfs['project']=='en').filter(dfs['size']>=1000).filter(dfs['size']<10000)
seldf = seldf.orderBy('title', ascending=False)
seldf.show()

+-------+--------------------+----+
|project|               title|size|
+-------+--------------------+----+
|     en|Special:Contribut...|7402|
|     en|                 Sia|8938|
|     en| Anestia_ombrophanes|8881|
|     en|Anatolia_(disambi...|7980|
+-------+--------------------+----+



## Problem 3. SparkSQL 

In [20]:
title_owner = ["Woodkid Lila"
,"Sia Jane"
,"Ryuichi_Sakamoto Sam"]

lines1 = sc.parallelize(title_owner)
columns1 = lines1.map(lambda line: tuple(line.split(" ")))

df1 = ss.createDataFrame(columns1, ['title', 'owner'])
df1.createOrReplaceTempView("OwnerTable")

########## ANSWER ##########
df = ss.sql("SELECT count,title FROM WikipediaTable")
df2 = df.join(df1,df['title'] == df1['title'])
df2 = df2.groupby('owner').agg({'count':'mean'})
df2 = df2.withColumnRenamed("avg(count)", "avg_count")
df2.show()

+-----+------------------+
|owner|         avg_count|
+-----+------------------+
| Lila|1.6666666666666667|
|  Sam|               1.0|
| Jane|               2.0|
+-----+------------------+

