# Lecture 2, Lab 2
## Word Count in Spark

Submit via link with [this form](https://docs.google.com/forms/d/e/1FAIpQLSfM8WyYLDtJxDFLInWjpW5-q72werVS2-x7W_3YQpyzZUmaJQ).

### Setup

Let's set up Spark on your Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=c754434126df612ae79968720a840a2e2ef938c3de82dcc450d330f23ebca007
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

Now we authenticate a Google Drive client to download the file we will be processing in our Spark job.

**Make sure to follow the interactive instructions.**

In [2]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [3]:
id='1xoZxyyQwUnLJr3TtXm-mTBlKWNCg4eeK'
downloaded = drive.CreateFile({'id': id})
# https://drive.google.com/file/d/1xoZxyyQwUnLJr3TtXm-mTBlKWNCg4eeK/view?usp=share_link
downloaded.GetContentFile('soc-LiveJournal1Adj.txt')

If you executed the cells above, you should be able to see the file *pg100.txt* under the "Files" tab on the left panel.

### Your task

If you successfully run the setup stage, you are ready to work on the *pg100.txt* file which contains a copy of the complete works of Shakespeare.

Write a Spark application which outputs the number of words that start with each letter. This means that for every letter, we want to count the total number of (non-unique) words that start with a specific letter.

In your implementation, **ignore the letter case**, i.e., consider all words as lower case. Also, you can ignore all words that **start** with a non-alphabetic character. You should output word counts for the **entire document**, inclusive of the title, author, and the main texts. If you encounter words broken as a result of new lines, e.g. "pro-ject" where the segment after the dash sign is on a new line, no special processing is needed and you can safely consider it as two words.

Your outputs will be graded on a range -- if your differences from the ground-truths are within an error threshold of 5, you'll be considered correct.

Originally taken from CS246, Stanford.

In [3]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
import pandas as pd



# create the Spark Session
spark = SparkSession.builder.getOrCreate()
spark.stop


# create the Spark Context
sc = spark.sparkContext

In [30]:
# YOUR
# !wget https://course.ccs.neu.edu/cs6220/homework-2/data/soc-LiveJournal1Adj.txt
txt = sc.textFile("soc-LiveJournal1Adj.txt", 1)

In [None]:

# CODE
import itertools
# !head -n 100 soc-LiveJournal1Adj.txt > short_data.txt
# txt = sc.textFile("short_data.txt", 1)
lines = txt.map(lambda x: x.split())

friends = lines.filter(lambda x:len(x)==2).map(lambda x: (x[0],x[1].split(",")))

# map to edges
directFriends = friends.flatMap(lambda data:[((data[0],friend), -1000000) for friend in data[1]])
mutualFriends = friends.flatMap(lambda data: [(pair, 1) for pair in itertools.permutations(data[1], 2)])
fullList = directFriends.union(mutualFriends)

#reduce to (user, recommend_friends)
fullList = fullList.reduceByKey(lambda x,y:x+y)
mutualCount = fullList.filter(lambda x:x[1] > 0).map(lambda x: (x[0][0], (x[1], x[0][1]))).groupByKey().mapValues(list)
# sort based on number of mutual friends and the list is capped at 10
mutualCount = mutualCount.map(lambda x:(x[0],sorted(x[1],key=lambda x:(-x[0], int(x[1])))))\
        .map(lambda x:(x[0],x[1][:10]))\
        .map(lambda x:(x[0],[i[1] for i in x[1]]))
user_list = mutualCount.collect()

# to output format
for i in range(len(user_list)):
      user_list[i] = str(user_list[i][0]) + "\t" + ",".join(str(recomended_friend) for recomended_friend in user_list[i][1])
no_frend_list = lines.filter(lambda x:len(x)==1).flatMap(lambda x:x).collect()
complete_list = user_list + no_frend_list
for c in complete_list:
  print(c)


In [None]:
# HERE


Once you obtained the desired results, **head over to Gradescope and submit your solution for this Colab**!