Copyright (C) 2019 Software Platform Lab, Seoul National University

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


## Colab Spark Setup

In [1]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -q
!curl http://mirror.cogentco.com/pub/apache/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz --output spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.162)] [Waiting for headers] [Co                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran35/ InRelease [3,609 B]
0% [Connecting to archive.ubuntu.com (91.189.88.162)] [Waiting for headers] [2                                                                                Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [3 InRelease 2,586 B/88.7 kB 3%] [2 InRelease 0 B/3,60                                                                               Hit:4 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
0% [Waiting for headers] [3 InRelease 14.2 kB/88.7 kB 16%] [2 InRelease 0 B/3,60% [4 InRelease gpgv 21.3 kB] [Waiting for headers] [3 InRelease 14.2 kB/88.7 k

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

## Wikipedia dataset sample

This time we're not using HDFS to load the data. Sample data are loaded by Python code directly.

The data has four fields: project, title, pageview count and size.

In [0]:
wikipedia_data_sample = ["commons.m File:Gemblong.JPG 1 9717"
,"pl Beata_Tyszkiewicz 10 207378"
,"en Special:RecentChangesLinked/Roswell_(TV_series) 1 14617"
,"de Grafische_Benutzeroberfl%C3%A4che 1 22549"
,"en Simeon_I_of_Bulgaria 5 385793"
,"en Rainbow_Six_(novel) 8 122792"
,"es Pediatr%C3%ADa 5 73598"
,"sv Ett_uts%C3%B6kt_universum 1 9499"
,"en Video_game_content_rating_system 4 112324"
,"es Yuno_Gasai 2 55260"
,"en File:Georg_Wilhelm_Friedrich_Hegel00.jpg 1 43395"
,"en Anestia_ombrophanes 1 8881"
,"et Seitse 2 84874"
,"en And_I_Am_Telling_You_I%27m_Not_Going 4 85690"
,"he %D7%A4%D7%A8%D7%93%D7%99%D7%92%D7%9E%D7%94 1 13887"
,"zh File:Pictogram_voting_keep-green.svg 1 15106"
,"sv Special:Senaste_relaterade_%C3%A4ndringar/Homestead,_Florida 1 7677"
,"pt Categoria:Ambientes_de_desenvolvimento_integrado_livres 1 8151"
,"de.voy Plattensee 1 43748"
,"en Sia 1 8938"
,"en Category:Toronto_Toros_players 2 0"
,"en Special:Export/Helsinki_Accords 1 19899"
,"xh Special:Contributions/Kpeterzell 1 5883"
,"nl Sia 1 0"
,"no Sia 5 87075"
,"en Special:Contributions/2.31.218.202 1 7402"
,"es Sia 1 10329"
,"de Datei:BSicon_uhKBHFe.svg 1 9786"
,"en Randolph_County,_Alabama 1 21431"
,"es Woodkid 3 70494"
,"en Tu_Bishvat 3 56438"
,"cs Radiohead 1 14325"
,"es Naturaleza_sangre 1 9286"
,"en Anatolia_(disambiguation) 1 7980"
,"pt Woodkid 1 8982"
,"pt Titanoboa_cerrejonensis 5 64540"
,"commons.m Woodkid 1 19278"
,"fi Matti_Inkinen 1 10138"
,"ja %E3%83%95%E3%82%A1%E3%82%A4%E3%83%AB:Esfahan_(Iran)_Emam_Mosque.JPG 1 33168"
,"en Psicobloc 1 12739"
,"en Macael,_Spain 1 12658"
,"fa Ryuichi_Sakamoto 1 22855"
,"fr Sp%C3%A9cial:Pages_li%C3%A9es/Fichier:Wiki-ezokuroten5.jpg 1 21955"
,"nl Overleg_gebruiker:82.171.157.232 1 0"
,"en Thomas_%26_Mack_Center 2 41010"
,"en Warren_Beatty 49 2631986"
,"uz Auberville 1 11401"
,"es Naturaleza_sangrea 10 9286"
,"pt Queima_de_sutis 2 8982"]

In [0]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

ss = SparkSession.builder.master("local[*]").getOrCreate()
sc = ss.sparkContext

# Parallelize the data and split into columns
lines = sc.parallelize(wikipedia_data_sample)
columns = lines.map(lambda line: tuple(line.split(" ")))

## Problem 1. SparkSQL 


In [0]:
# Create a Spark DataFrame from wikipedia_data_sample (equivalent of an 'SQL table' in Spark)
df = ss.createDataFrame(columns, ['project', 'title', 'count', 'size'])

# Create a table view called "WikipediaTable"
df.createOrReplaceTempView("WikipediaTable")

In [17]:
selected = ss.sql("SELECT project,title,size FROM WikipediaTable \
WHERE project == 'en' and size < 10000 and size >= 1000 \
ORDER BY title")

selected.show()

+-------+--------------------+----+
|project|               title|size|
+-------+--------------------+----+
|     en|Anatolia_(disambi...|7980|
|     en| Anestia_ombrophanes|8881|
|     en|                 Sia|8938|
|     en|Special:Contribut...|7402|
+-------+--------------------+----+



## Problem 2. SparkSQL 

In [0]:
title_owner = ["Woodkid Lila"
,"Sia Jane"
,"Ryuichi_Sakamoto Sam"]

lines1 = sc.parallelize(title_owner)
columns1 = lines1.map(lambda line: tuple(line.split(" ")))

df1 = ss.createDataFrame(columns1, ['title', 'owner'])
df1.createOrReplaceTempView("OwnerTable")


In [37]:
selected = ss.sql("SELECT OwnerTable.owner, MEAN(WikipediaTable.count) as avg_count_column FROM WikipediaTable \
INNER JOIN OwnerTable ON WikipediaTable.title=OwnerTable.title \
GROUP BY OwnerTable.owner")
                   
selected.show()

+-----+------------------+
|owner|  avg_count_column|
+-----+------------------+
| Lila|1.6666666666666667|
|  Sam|               1.0|
| Jane|               2.0|
+-----+------------------+



## Problem #3 SparkRDD 

In [38]:
project_count_tuples = columns.map(lambda t: (t[0], long(t[2])))
project_count_tuples.collect()

[('commons.m', 1L),
 ('pl', 10L),
 ('en', 1L),
 ('de', 1L),
 ('en', 5L),
 ('en', 8L),
 ('es', 5L),
 ('sv', 1L),
 ('en', 4L),
 ('es', 2L),
 ('en', 1L),
 ('en', 1L),
 ('et', 2L),
 ('en', 4L),
 ('he', 1L),
 ('zh', 1L),
 ('sv', 1L),
 ('pt', 1L),
 ('de.voy', 1L),
 ('en', 1L),
 ('en', 2L),
 ('en', 1L),
 ('xh', 1L),
 ('nl', 1L),
 ('no', 5L),
 ('en', 1L),
 ('es', 1L),
 ('de', 1L),
 ('en', 1L),
 ('es', 3L),
 ('en', 3L),
 ('cs', 1L),
 ('es', 1L),
 ('en', 1L),
 ('pt', 1L),
 ('pt', 5L),
 ('commons.m', 1L),
 ('fi', 1L),
 ('ja', 1L),
 ('en', 1L),
 ('en', 1L),
 ('fa', 1L),
 ('fr', 1L),
 ('nl', 1L),
 ('en', 2L),
 ('en', 49L),
 ('uz', 1L),
 ('es', 10L),
 ('pt', 2L)]

In [0]:
project_count_list = sorted(list(set(project_count_tuples.collect())))

from collections import defaultdict as dd

project_dict = dd(list)

for project,count in project_count_list:
  project_dict[project].append(count)

In [0]:
def median(lst):
    n = len(lst)
    if n < 1:
            return None
    if n % 2 == 1:
            return lst[n//2]
    else:
            return sum(lst[n//2-1:n//2+1])/2.0

In [82]:
for (i,j) in project_dict.items():
  print('median of project "{}" is {}'.format(i,median(j)))

median of project "fr" is 1
median of project "de.voy" is 1
median of project "nl" is 1
median of project "uz" is 1
median of project "pt" is 2
median of project "no" is 5
median of project "de" is 1
median of project "sv" is 1
median of project "fa" is 1
median of project "en" is 4
median of project "commons.m" is 1
median of project "pl" is 10
median of project "fi" is 1
median of project "cs" is 1
median of project "et" is 2
median of project "xh" is 1
median of project "zh" is 1
median of project "ja" is 1
median of project "es" is 3
median of project "he" is 1


## Problem #4 SparkRDD 

In [0]:
data = ["1 2 1.0"
,"1 3 2.0"
,"1 4 3.0"
,"2 3 1.0"
,"2 4 3.0"
,"2 5 5.0"]

########## ANSWER ##########


## Problem #5 Calculating Pi

In [0]:
from random import random

spark = SparkSession.builder.appName("Spark Pi").getOrCreate()

partitions = 2
n = 1000000 * partitions

def f(_):
  ## ANSWER ##


# Generate n random dots and for each dot, calculate its distance from (0, 0).
# Count the number of dots whose distance is less or equal to 1
num_dots_inside_circle = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(lambda a, b: a + b)

      
print("Pi is roughly %f" % (4.0 * num_dots_inside_circle / n))