# Installing Pyspark on Colab



In [0]:
#Install Java and Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget http://apache.mirror.rafal.ca/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar zxf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

--2020-04-24 22:35:39--  http://apache.mirror.rafal.ca/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
Resolving apache.mirror.rafal.ca (apache.mirror.rafal.ca)... 207.210.46.249, 2604:1500:f001:0:216:3eff:fe3f:746b
Connecting to apache.mirror.rafal.ca (apache.mirror.rafal.ca)|207.210.46.249|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 232530699 (222M) [application/x-gzip]
Saving to: ‘spark-2.4.5-bin-hadoop2.7.tgz’


2020-04-24 22:35:42 (72.6 MB/s) - ‘spark-2.4.5-bin-hadoop2.7.tgz’ saved [232530699/232530699]



In [0]:
#set the locations where Spark and Java are installed
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
#Run a local spark session to test your installation:
import sys
import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = SparkContext.getOrCreate()


# Upload Files

This portion is to upload files

In [0]:
#Upload Files     
from google.colab import files
files.upload()

Saving station_data1.csv to station_data1.csv
Saving trip_data1.csv to trip_data1.csv


# Then take the files and convert to variables (These will be dataframes)
Then to Tables


In [0]:
df = spark.read.csv('2015-12-12.csv', header =True)  

#type(df)

df.show(5) #shows 5 rows

print(spark.catalog.listTables()) #lists out tables

+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|      date|    time|   size|r_version|r_arch|   r_os|  package|version|country|ip_id|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|2015-12-12|13:42:10| 257886|    3.2.2|  i386|mingw32| HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37|1236751|    3.2.2|x86_64|mingw32|  RJSONIO|  1.3-0|     DE|    2|
|2015-12-12|13:42:35|2077876|    3.2.2|  i386|mingw32|   UsingR|  2.0-5|     CZ|    1|
|2015-12-12|13:42:01| 266724|    3.2.2|  i386|mingw32|gridExtra|  2.0.0|     CZ|    1|
|2015-12-12|13:00:21|3687766|       NA|    NA|     NA|     lme4| 1.1-10|     DE|    3|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
only showing top 5 rows

[]


Take the dataframes and convert to tables
https://opensource.com/article/19/3/apache-spark-and-dataframes-tutorial "Build temporary Table"

In [0]:
#help(df.registerTempTable)


df.registerTempTable("program") #takes the df variable and convert to connections table
print(spark.catalog.listTables()) #lists out tables

# Get the first 10 rows of flights
sampletable = spark.sql("""
select r_version, count(*) as freq 
from program
group by r_version
""")

# Show the results
sampletable.show()

[Table(name='program', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
+---------+------+
|r_version|  freq|
+---------+------+
|   2.15.3|   168|
|   2.11.1|    99|
|   2.15.1|   130|
|    3.2.2|114576|
|    3.0.3|   522|
|   2.14.0|     6|
|   2.13.2|     1|
|       NA| 93734|
|   2.15.0|   235|
|    3.0.1|   265|
|    3.2.1| 10001|
|    3.1.1|  9572|
|    3.0.0|   102|
|    3.2.0|  7697|
|   2.15.2|    63|
|    3.1.2| 10613|
|    3.3.0|  2604|
|   2.14.1|    15|
|    3.2.3|159577|
|   2.12.0|     4|
+---------+------+
only showing top 20 rows



# Lab 9

**Upload Files**

In [0]:
#Upload Files      ("2015-12-12.csv")
from google.colab import files
files.upload()

**Load the Data** (and do simple count of # of lines) 

In [0]:
#loading the data
raw_content = sc.textFile("2015-12-12.csv") 
#can also do compressed files and wildcards too.

raw_content.count() #counting number of lines

421970

**Use .take to retrieve a set # of lines**

In [0]:
#return first n rows 
raw_content.take (5)

['"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"',
 '"2015-12-12","13:42:10",257886,"3.2.2","i386","mingw32","HistData","0.7-6","CZ",1',
 '"2015-12-12","13:24:37",1236751,"3.2.2","x86_64","mingw32","RJSONIO","1.3-0","DE",2',
 '"2015-12-12","13:42:35",2077876,"3.2.2","i386","mingw32","UsingR","2.0-5","CZ",1',
 '"2015-12-12","13:42:01",266724,"3.2.2","i386","mingw32","gridExtra","2.0.0","CZ",1']

**.takeSample** [takes random sample; has 3 conditions] 

(is there replacement?, how many lines?, random seed #)![alt text](https://)

In [0]:
#return random samples [if replacement, # of samples, seed #]
raw_content.takeSample(True, 5, 3)

['"2015-12-12","05:59:49",511365,"3.2.3","i386","mingw32","ks","1.10.0","US",10364',
 '"2015-12-12","05:10:24",287843,"3.2.2","x86_64","mingw32","xtable","1.8-0","US",5665',
 '"2015-12-12","13:06:32",494138,"3.2.3","x86_64","linux-gnu","rjson","0.2.15","KR",655',
 '"2015-12-12","23:29:36",1350975,"3.2.3","x86_64","mingw32","randtoolbox","1.17","US",6509',
 '"2015-12-12","13:13:10",45187,"3.2.3","x86_64","linux-gnu","spatial","7.3-11","GB",548']

**Transformation (map & flatMap)**

Each row of the data is a character string. More convenient to have an **array** instead. We use `map` to transform them 





In [0]:
#instead of one BIG array, seperate into row array
content = raw_content.map(lambda x: x.split(',')) #comma seperated file
content.take(3)

[['"date"',
  '"time"',
  '"size"',
  '"r_version"',
  '"r_arch"',
  '"r_os"',
  '"package"',
  '"version"',
  '"country"',
  '"ip_id"'],
 ['"2015-12-12"',
  '"13:42:10"',
  '257886',
  '"3.2.2"',
  '"i386"',
  '"mingw32"',
  '"HistData"',
  '"0.7-6"',
  '"CZ"',
  '1'],
 ['"2015-12-12"',
  '"13:24:37"',
  '1236751',
  '"3.2.2"',
  '"x86_64"',
  '"mingw32"',
  '"RJSONIO"',
  '"1.3-0"',
  '"DE"',
  '2']]

**Sample of map (removing double quotation marks)**

In [0]:
def clean(x): #making a function 
  return ([xx.replace('"','') for xx in x])

content = content.map(clean) 
content.take(3)

#Same result
#raw_content.map(lambda x: x.split(',')).map(clean)

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id'],
 ['2015-12-12',
  '13:42:10',
  '257886',
  '3.2.2',
  'i386',
  'mingw32',
  'HistData',
  '0.7-6',
  'CZ',
  '1'],
 ['2015-12-12',
  '13:24:37',
  '1236751',
  '3.2.2',
  'x86_64',
  'mingw32',
  'RJSONIO',
  '1.3-0',
  'DE',
  '2']]

**Flatmap vs Map?**




*   **MAP**
return sequence of same length as the original data
*   **Flatmap**
return sequence each element length of all subsequence (entire list)



In [0]:
text = ["a b c", "d e", "f g h"]
sc.parallelize(text).map(lambda x:x.split(" ")).collect()

[['a', 'b', 'c'], ['d', 'e'], ['f', 'g', 'h']]

In [0]:
sc.parallelize(text).flatMap(lambda x:x.split(" ")).collect()

['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']

**Reduce and Counting**


*   How many downloaded records each package has
*   ex. sp downloaded 1020 times



In [0]:
#reduceByKey
package_count = content.map(lambda x: (x[6], 1)).reduceByKey(lambda a,b: a+b)
package_count.count()
package_count.take(5)


[('HistData', 159),
 ('UsingR', 151),
 ('lme4', 1560),
 ('testthat', 1178),
 ('maps', 1586)]

In [0]:
#countByKey
package_count_2 = content.map(lambda x: (x[6], 1)).countByKey()
package_count_2['maps']
#package_count_2['"maps"']

0

**Sorting**
After count by reduce, we can find the `rankings`. We use `sortByKey` method

In [0]:
#sortByKey ('0') = descending
#sortByKey ('1') = ascending

# Sort DNSCNNDIN/ and get the frst 10
package_count.map(lambda x: (x[1], x[0])).sortByKey(0).take(10)

[(4783, 'Rcpp'),
 (3913, 'ggplot2'),
 (3748, 'stringi'),
 (3449, 'stringr'),
 (3436, 'plyr'),
 (3265, 'magrittr'),
 (3223, 'digest'),
 (3205, 'reshape2'),
 (3046, 'RColorBrewer'),
 (3007, 'scales')]

In [0]:
# Sort ASCNNDIN/ and get the frst 10
package_count.map(lambda x: (x[1], x[0])).sortByKey(1).take(10)

[(1, 'multic'),
 (1, 'RBerkeley'),
 (1, 'vimcom'),
 (1, 'waldwolf'),
 (1, 'bstats'),
 (1, 'parspatstat'),
 (1, 'WaveCGH'),
 (1, 'mixnet'),
 (1, 'postgwas'),
 (1, 'rolasized')]

**Filter** Specific search
x is array, attribute #

ex. x[6] = package column

In [0]:
content.filter(lambda x: x[6] == 'Rtts' and x[8] == 'CN').count()


1

In [0]:
content.filter(lambda x: x[6] == 'Rtts' and x[8] == 'CN').take(1)

[['2015-12-12',
  '20:15:24',
  '23820',
  '3.2.2',
  'x86_64',
  'mingw32',
  'Rtts',
  '0.3.3',
  'CN',
  '41']]

**Set Operation**
each line of our data is an array instead of a string: intersecton and
distnct methods can't work properly. This is why we used raw_content (string) instead of content here as example.

In [0]:
raw_content.count()

421970

In [0]:
# one set's union with itself equals to its "double"
raw_content.union(raw_content).count()

843940

In [0]:
# one set's intersecton with itself equals to its disctnct value set
raw_content.intersection(raw_content).count()

421553

In [0]:
raw_content.distinct().count()

421553

**Join**
 lefOuterJoin, rightOuterJoin, and fullOuterJoin
 Cartesian also available

In [0]:
# generate a new RDD in which the 'country' variable is KNY
content_modified=content.map(lambda x:(x[8], x))

In [0]:
# create a mapping table of the abbreviatons of four countries and their full names.
mapping=[('DE', 'Germany'), ('US', 'United States'), ('CN', 'China'),
('IN',"India")]

In [0]:
mapping=sc.parallelize(mapping)

In [0]:
#  JOIN 
#Check the last column of each array
content_modified.join(mapping).takeSample(False, 4)

[('US',
  (['2015-12-12',
    '17:44:44',
    '64515',
    '3.2.3',
    'x86_64',
    'mingw32',
    'gtable',
    '0.1.2',
    'US',
    '3675'],
   'United States')),
 ('CN',
  (['2015-12-12',
    '20:22:27',
    '511',
    'NA',
    'NA',
    'NA',
    'sn',
    '0.4-16',
    'CN',
    '4469'],
   'China')),
 ('CN',
  (['2015-12-12',
    '20:51:15',
    '512',
    'NA',
    'NA',
    'NA',
    'hints',
    '1.0.1-1',
    'CN',
    '14149'],
   'China')),
 ('CN',
  (['2015-12-12',
    '21:41:31',
    '513',
    'NA',
    'NA',
    'NA',
    'svUnit',
    '0.7-11',
    'CN',
    '4890'],
   'China'))]

In [0]:
#Left Outer Join

#check column of each array

#In the mapping table, we provided the mappings for only four countries, so we fnd some 'None' values in the
#returned result below.
content_modified.leftOuterJoin(mapping).takeSample(False, 3)

[('US',
  (['2015-12-12',
    '05:11:12',
    '10867',
    '3.1.3',
    'x86_64',
    'mingw32',
    'rstudioapi',
    '0.4.0',
    'US',
    '5768'],
   'United States')),
 ('MY',
  (['2015-12-12',
    '14:31:38',
    '3687766',
    '3.2.1',
    'x86_64',
    'linux-gnu',
    'lme4',
    '1.1-10',
    'MY',
    '8466'],
   None)),
 ('US',
  (['2015-12-12',
    '16:20:27',
    '1185269',
    '3.2.3',
    'i386',
    'mingw32',
    'rmarkdown',
    '0.8.1',
    'US',
    '3170'],
   'United States'))]

#Lab 10

In [0]:
#Upload Files      
from google.colab import files
files.upload()

**`BASIC NUMBER TRANSFORMATIONS`**

---



In [0]:
nums = sc.parallelize([1,2,3])
nums.take(3)

[1, 2, 3]

Map each element to zero or more others.
Then flatten into single large list

In [0]:
numrange = nums.flatMap(lambda x: range(x))
numrange.collect()  #collect displays all

[0, 0, 1, 0, 1, 2]

Pass each element through a function


In [0]:
squares = nums.map(lambda x: x*x) #recall nums in input
squares.collect()

[1, 4, 9]

Keep elements passing a predicate
(1,4,9 -> only 4 is even)

In [0]:
even = squares.filter(lambda x: x % 2 == 0)
even.collect()

[4]

Return first n elements (in nums, total was 3. But we only view 2)

In [0]:
nums.take(2)

[1, 2]

**count** the number of elements

In [0]:
nums.count()

3

**merge** elements with an another function
[sum of elements of nums (1,2,3) = 6

In [0]:
nums.reduce(lambda x,y : x+y)

#1+2+3

6

**`TEXT TRANSFORMATION EXAMPLE`**

---



In [0]:
text = sc.textFile("shakespeare.txt")

map each element to zero or more others and flatten into single large list

In [0]:
words = text.flatMap(lambda line: line.split())
words.take(5)

['The', 'Project', 'Gutenberg', 'EBook', 'of']

Pass each element through a function

In [0]:
wordWithCount = words.map(lambda word: (word, 1))
wordWithCount.take(5)

[('The', 1), ('Project', 1), ('Gutenberg', 1), ('EBook', 1), ('of', 1)]

**`Basic Action (Text)`**

Count how many words there are (after split was done)

In [0]:
words.count()

NameError: ignored

**`RDD Operations`**

Read in a text file

In [0]:
mydata = sc.textFile("shakespeare.txt")

Convert text to UPPERCASE

In [0]:
mydata_uc = mydata.map(lambda line: line.upper())
mydata_uc.take (5)

['THE PROJECT GUTENBERG EBOOK OF THE COMPLETE WORKS OF WILLIAM SHAKESPEARE, BY',
 'WILLIAM SHAKESPEARE',
 '',
 'THIS EBOOK IS FOR THE USE OF ANYONE ANYWHERE AT NO COST AND WITH',
 'ALMOST NO RESTRICTIONS WHATSOEVER.  YOU MAY COPY IT, GIVE IT AWAY OR']

**Filter** the lines that start with 'l'

In [0]:
mydata_filt = mydata_uc.filter(lambda line: line.startswith('l'))
mydata_filt.count()

#none since all are in UPPERCASE (no lowercase)

0

You can pipe Spark operations using the **dot notation**

**'\' stands for non breaking new line**

In [0]:
text = sc.textFile("full_text.txt")\
.map(lambda line : line.split("\t"))\ #tab delimited file
.map(lambda fields: (fields[0], fields[1])) #only take first 2 columns

text.take(5)

[('USER_79321756', '2010-03-03T04:15:26'),
 ('USER_79321756', '2010-03-03T04:55:32'),
 ('USER_79321756', '2010-03-03T05:13:34'),
 ('USER_79321756', '2010-03-03T05:28:02'),
 ('USER_79321756', '2010-03-03T05:56:13')]

Pair RDDs adding a key

In [0]:
text = sc.textFile("full_text.txt")\
.keyBy(lambda line: line.split("\t")[0])

text.take(5)

#Make key value pairs using first attribute details

[('USER_79321756',
  'USER_79321756\t2010-03-03T04:15:26\tÜT: 47.528139,-122.197916\t47.528139\t-122.197916\tRT @USER_2ff4faca: IF SHE DO IT 1 MORE TIME......IMA KNOCK HER DAMN KOOFIE OFF.....ON MY MOMMA&gt;&gt;haha. #cutthatout'),
 ('USER_79321756',
  'USER_79321756\t2010-03-03T04:55:32\tÜT: 47.528139,-122.197916\t47.528139\t-122.197916\t@USER_77a4822d @USER_2ff4faca okay:) lol. Saying ok to both of yall about to different things!:*'),
 ('USER_79321756',
  'USER_79321756\t2010-03-03T05:13:34\tÜT: 47.528139,-122.197916\t47.528139\t-122.197916\tRT @USER_5d4d777a: YOURE A FOR GETTING IN THE MIDDLE OF THIS @USER_ab059bdc WHO THE FUCK ARE YOU ? A FUCKING NOBODY !!!!&gt;&gt;Lol! Dayum! Aye!'),
 ('USER_79321756',
  'USER_79321756\t2010-03-03T05:28:02\tÜT: 47.528139,-122.197916\t47.528139\t-122.197916\t@USER_77a4822d yea ok..well answer that cheap as Sweden phone you came up on when I call.'),
 ('USER_79321756',
  'USER_79321756\t2010-03-03T05:56:13\tÜT: 47.528139,-122.197916\t47.528139\t-122

Pairs with Complex Values

In [0]:
text = sc.textFile("full_text.txt")\
.map(lambda line : line.split("\t"))\
.map(lambda fields: (fields[0],(fields[1],fields[2])))

text.take(5)

#First attribute as key and second and third attribute as values

[('USER_79321756', ('2010-03-03T04:15:26', 'ÜT: 47.528139,-122.197916')),
 ('USER_79321756', ('2010-03-03T04:55:32', 'ÜT: 47.528139,-122.197916')),
 ('USER_79321756', ('2010-03-03T05:13:34', 'ÜT: 47.528139,-122.197916')),
 ('USER_79321756', ('2010-03-03T05:28:02', 'ÜT: 47.528139,-122.197916')),
 ('USER_79321756', ('2010-03-03T05:56:13', 'ÜT: 47.528139,-122.197916'))]

**`WordCount Example`**

---



In [0]:
counts = sc.textFile("shakespeare.txt")\
.flatMap(lambda line: line.split())\
.map(lambda word : (word,1))\
.reduceByKey(lambda v1,v2 : v1+v2)

counts.take(10)

[('The', 3977),
 ('Project', 85),
 ('EBook', 2),
 ('of', 15649),
 ('Shakespeare', 45),
 ('is', 7874),
 ('use', 266),
 ('anyone', 4),
 ('anywhere', 4),
 ('at', 2227)]

# Assignment 3

In [0]:
#Upload Files for the 3 questions     
#1 -> number_list.txt
#2 -> shakespeare.txt
#3 -> fulltext.txt

#Upload Files     
from google.colab import files
files.upload()


{}

1. ODD/EVEN NUMBER 
(Hint: Note that you are reading the file as text and need to convert the numbers to `int()`)

Input: number_list.txt (a list of 1000 integers)



Output: **Count the number of `odd` numbers and `even` numbers in the file**

In [0]:
num_list = sc.textFile("number_list.txt").map(lambda x: int(x))

num_list_even = num_list.filter(lambda x: x % 2 == 0)
num_list_odd = num_list.filter(lambda x: x % 2 != 0)

print ("Even number count:", num_list_even.count())
print ("Odd number count:", num_list_odd.count())

Even number count: 521
Odd number count: 479


2. Top K and bottom K words 
(Hint: Search and use takeOrdered() method)

---

Input: shakespeare.txt 


Output: **10 words with the `highest count` and 10 words with `lowest count`**

In [0]:
text = sc.textFile("shakespeare.txt")

words = text.flatMap(lambda line: line.split())
count = words.map(lambda word: (word, 1)).reduceByKey(lambda v1,v2: v1+v2)

orderDesc = count.sortBy(lambda p:p[1],ascending=False)
orderAsce = count.sortBy(lambda p:p[1],ascending=True)

#orderAsce.take(10)
orderDesc.take(10)

[('the', 23407),
 ('I', 19540),
 ('and', 18358),
 ('to', 15682),
 ('of', 15649),
 ('a', 12586),
 ('my', 10825),
 ('in', 9633),
 ('you', 9129),
 ('is', 7874)]

3. Group and Count

---

Input: fulltext_txt

Output: **`Count` the number of tweets for each user_id and `save` the results in a text file**. 

Sample Output 

[0] User ID
USER_79321756	

[1] Timestamp
2010-03-03T16:57:24	

[2] Location
ÃœT: 47.528139,-122.197916	

[3] Longitude
47.528139	

[4] Latitude
-122.197916	

[5] Message
Alright twitters tryna take me over!

In [0]:
text = sc.textFile("full_text.txt").map(lambda line: line.split("\t"))
count = text.map(lambda fields: (fields[0],1)).reduceByKey(lambda v1,v2:v1+v2)

count.take(10)

#count.saveAsTextFile("q3.txt")

[('USER_79321756', 83),
 ('USER_6197f95d', 26),
 ('USER_2929b0da', 24),
 ('USER_f730f97b', 23),
 ('USER_d12c6a27', 27),
 ('USER_064b120e', 36),
 ('USER_942c68df', 57),
 ('USER_2e5f8774', 56),
 ('USER_47e93993', 33),
 ('USER_cd29704a', 47)]