# ST446 Distributed Computing for Big Data
## Homework 1: Spark RDDs, Spark SQL and Hive
### Milan Vojnovic and Christine Yuen, edited by Simon Schoeller LT 2019

---

## Instructions:

**Deadline**: February 27, 2019, 5pm London time

**Datasets**: All the required datasets are available for download from here:

https://www.dropbox.com/sh/89xbpcjl4oq0j4w/AACrbtUzm3oCW1OcpL7BasRfa?dl=0
in the respective sub-directories.

Please make sure that you document your work appropriately. If you get stuck somewhere, make sure to give the other parts a try.


## A. Spark RDDs (30 points)

We continue to analyse the dblp dataset available in the file `author-large.txt`. This time, we want to find the top 10 pairs of authors who published the largest number of papers together (with possible other collaborators). For example, if authors $a$, $b$ and $c$ published a paper with title $t$, then this contributes one joint publication for each author pair ($a$,$b$), ($b$,$c$) and ($a$,$c$). Use the first column of the input data for the author names and use the third column of the input data for the publication title. 

You need to solve this task by using RDD operations in pyspark like those in `pyspark_rdd.ipynb` in week 3 of the course and the [Spark RDD documentation]( http://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD). You can run your code on your laptop or GCP. Please make sure to give us all your code and document what you have done outside of the notebook, for example using terminal in- and output or screenshots. *Please make sure to delete you dataproc clusters and buckets afterwards.*

## Answer

First, here is the code to create the bucket, the cluster, and start up the jupyter notebook on the cluster.  

<pre>gsutil mb gs://anyabucket24feb2019/

gsutil cp "/Users/Anya/Documents/Grad School/LSE/Dist Comp for Big Data/author-large.txt" gs://anyabucket24feb2019/

gcloud dataproc clusters create anyacluster24feb2019 --project my-project-1519696393583 --bucket anyabucket24feb2019 --initialization-actions gs://dataproc-initialization-actions/jupyter/jupyter.sh

22


export PORT=8123
export HOSTNAME="anyacluster24feb2019-m"
export PROJECT="my-project-1519696393583"
export ZONE="europe-west1-b"


gcloud compute ssh ${HOSTNAME} \
    --project=${PROJECT} --zone=${ZONE}  -- \
    -D ${PORT} -N &

"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome" \
      --proxy-server="socks5://localhost:${PORT}" \
      --user-data-dir=/tmp/${HOSTNAME}
</pre>

Next, here is my code to use pyspark to obtain the top 10 pairs of authors who published the most works together.  First we import the libraries, then load in the data.  After this, we convert the data, and then obtain the data of just the papers and author columns.  Then we join this data with itself by the first column (the title of the papers), such that will obtain data of two rows: one of tuples of the pairs of authors (some duplicates and all unsorted) and the other with the paper titles.  Then we create a function sortauthors that marks duplicates (a pair of the same author) and otherwise sorts the names alphabetical such that we will not double count the pair of (a,b) and (b,a), and can group them together.  This function also changes the pairs of authors to be the first entry and title of the paper to be the second, so that the author pairs are now the key.  Then we use the function on each row, and only take the distinct combinations of author pairs and paper titles.  Then we filter out any that were marked as pairs of the same person.  Then we count the number of occurrences of each pair, which we sort and then print out the top 10.  

In [5]:
import numpy as np
from pyspark.sql.types import *

data_from_file = sc.\
    textFile(
        "gs://anyabucket24feb2019/author-large.txt", 
        4)

data_from_file_conv = data_from_file.map(lambda row: np.array(row.strip().split("\t")))
paper_author = data_from_file_conv.map(lambda row: (row[2], row[0]))
pairedAuths = paper_author.join(paper_author)

def sortauthors(row):
    if row[1][0] == row[1][1]:
        return "SamePerson"
    else :
        return (tuple(sorted(row[1])), row[0])

updatedpairs = pairedAuths.map(sortauthors).distinct()
updatedpairs = updatedpairs.filter(lambda row: row != "SamePerson")

paircounts = updatedpairs.countByKey()
sortedcounts = [(i, paircounts[i]) for i in sorted(paircounts, key=paircounts.get, reverse=True)]
sortedcounts[0:10]


[(('Irith Pomeranz', 'Sudhakar M. Reddy'), 246),
 (('Amr El Abbadi', 'Divyakant Agrawal'), 161),
 (('Makoto Takizawa', 'Tomoya Enokido'), 137),
 (('Didier Dubois', 'Henri Prade'), 122),
 (('Elizabeth Chang', 'Tharam S. Dillon'), 115),
 (('Mary Jane Irwin', 'Narayanan Vijaykrishnan'), 107),
 (('Mahmut T. Kandemir', 'Mary Jane Irwin'), 100),
 (('Chun Chen', 'Jiajun Bu'), 99),
 (('Shojiro Nishio', 'Takahiro Hara'), 96),
 (('Filip De Turck', 'Piet Demeester'), 90)]

## B. Spark SQL (30 points)

Do the same as in problem A, but this time use the Spark SQL API, which we covered in week 4. You may find useful to consult 'Querying with Spark SQL' in `spark-dataframe-sql.ipynb` of week 4 class and the [Spark SQL documentation](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html).

## Answer

Next we will do the same using spark sql.  First we specify the schema of the data.  Then we load it in.  After this, we run a query just to show that there are duplicates of the same author pair and same title.  Thus we need to be sure to only take distinct combinations of authors and titles in our final query.  In our actual query, we join the distinct combinations of authors and titles with itself using the titles, then we filter this where the first author comes before the second (eliminating pairs of the same author and duplicates of [a,b] vs [b,a]) and then we group these by the pairs of authors, and then order it by descending count, and then show the count and both author names. 

In [11]:
schema = StructType([
    StructField("author", StringType(), True),    
    StructField("journal", StringType(), True),
    StructField("title", StringType(), True),
    StructField("year", LongType(), True)
])

authors = spark.read.csv("gs://anyabucket24feb2019/author-large.txt", 
                    header='false', schema=schema, sep='\t')
authors.createOrReplaceTempView("authors")


spark.sql("select count(*), a.title from authors a join authors b on a.title = b.title where \
    a.author = 'Irith Pomeranz' and b.author = 'Sudhakar M. Reddy' group by a.title order by count(*) desc").show(1)


spark.sql("select count(*), a.author, b.author from(select distinct author, title from authors)a \
                                               join(select distinct author, title from authors)b on a.title = b.title \
          where a.author < b.author group by a.author, b.author order by count(*) desc").show(10)


+--------+--------------------+
|count(1)|               title|
+--------+--------------------+
|       4|A Low Power Pseud...|
+--------+--------------------+
only showing top 1 row

+--------+------------------+--------------------+
|count(1)|            author|              author|
+--------+------------------+--------------------+
|     246|    Irith Pomeranz|   Sudhakar M. Reddy|
|     161|     Amr El Abbadi|   Divyakant Agrawal|
|     137|   Makoto Takizawa|      Tomoya Enokido|
|     122|     Didier Dubois|         Henri Prade|
|     115|   Elizabeth Chang|    Tharam S. Dillon|
|     107|   Mary Jane Irwin|Narayanan Vijaykr...|
|     100|Mahmut T. Kandemir|     Mary Jane Irwin|
|      99|         Chun Chen|           Jiajun Bu|
|      96|    Shojiro Nishio|       Takahiro Hara|
|      90|    Filip De Turck|      Piet Demeester|
+--------+------------------+--------------------+
only showing top 10 rows



## C. Hive (40 points)

In this part we are going to use the Yelp data available in the following JSON file `Yelp/yelp_academic_dataset_user.json`. You may complete this task by using either Hive installed on your laptop or using Hive on Google Cloud Platform. Please complete the following steps:

_(Here, it is particularly important that you find a suitable way to document your work appropriately.)_

### 1. Load data into a Hive table

Create a Hive table and load the input data into this table.

Please describe any commmands that you run in a command line interface, provide all the code that you wrote and ran. For example, this may include any commands run in a terminal, Hive script files (\*.sql), and screenshots (if, for example, you used Google Cloud Platform through the browser interface). See the class examples for references.

Note:
* The dataset is in JSON format whereas in the class the datasets were in XML or TXT format. You will need to figure out (look up) how to load data from a JSON file to a Hive table. 
* You will need to infer the schema by looking at the data. 

Hints: 

* Some of the columns are of array type. For example, you should use array&lt;STRING&gt; for the friends column.
* The size of the dataset is large (about 1GB). You may want to create a smaller dataset first and work with this smaller dataset until you develop and test your code, and then apply it on the original dataset.


### 2. Simple queries

Having created the Hive table and loaded the data into it, write and execute queries to:

i. retrieve the schema;

ii. show the number of rows in the table;

iii. select top 10 users who have provided the largest number of reviews (the output should consist of the user name and the number of reviews of the users).

For all the queries, please show both the commands you used and the output. You may copy and paste the commands that you run and the outputs, or provide screenshots.

## Answers:

The code used is shown below.  First I created an ssh connection with the cluster created above.  Then I downloaded the file to the cluster from dropbox.  Next I loaded it into hadoop.  Then I started hive, added the jar, created the database, defined the schema, and made sure to specify the serde and the location of the file within hadoop.  The last three lines are the queries for the actual assignment.  

<pre>
gcloud compute --project "my-project-1519696393583" ssh --zone "europe-west1-b" "anyacluster24feb2019-m"

wget "https://www.dropbox.com/sh/89xbpcjl4oq0j4w/AAC4_qW_wKyGIXXYZOwZC-Wia/Yelp/yelp_academic_dataset_user.json?dl=0" -O yelpuserdata.json

hadoop fs -put -f /home/Anya/yelpuserdata.json hdfs://anyacluster24feb2019-m/user/Anya/yelpuserdata.json

hive

ADD JAR /usr/lib/hive/lib/hive-hcatalog-core.jar;

CREATE DATABASE IF NOT EXISTS yelp;

USE yelp;

CREATE EXTERNAL TABLE users (
user_id STRING,
name STRING,
review_count INT,
yelping_since STRING,
friends array<STRING>,
useful INT,
funny INT,
cool INT,
fans INT, 
elite array<STRING>,
average_stars DOUBLE,
compliment_hot INT,
compliment_more INT,
compliment_profile INT,
compliment_cute INT,
compliment_list INT,
compliment_note INT,
compliment_plain INT,
compliment_cool INT,
compliment_funny INT,
compliment_writer INT,
compliment_photos INT,
type STRING
)
ROW FORMAT SERDE 'org.openx.jarfiles.jsonserde.JsonSerDe' 
STORED AS TEXTFILE
LOCATION 'hdfs://anyacluster24feb2019-m/user/Anya/';





DESCRIBE users;
SELECT Count(*) FROM users;
SELECT user_id, review_count FROM users ORDER BY review_count DESC limit 10;
</pre>

i. The code to show the schema and the result are shown below.  
![2.i](Assignment1_C2i.png)

ii. The query to obtain the number of rows in the table is shown below, as well as the result.  There are 1029432 rows in total.  
![2.ii](Assignment1_C2ii.png)

iii. The query to obtain top 10 users who have provided the largest number of reviews as well as the counts of their reviews is shown below, as well as the result.  
![2.iii](Assignment1_C2iii.png)