# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

In [4]:
# rddCitations = sc.textFile("cite75_99.txt.gz").sample(fraction = 0.1,withReplacement = False).cache()
# rddPatents = sc.textFile("apat63_99.txt.gz").sample(fraction = 0.1,withReplacement = False).cache()

The data looks like the following.

In [5]:
rddCitations.take(5)

In [6]:
rddPatents.take(5)

In [7]:
# Run on full dataset to remove headers 
citationheaders = rddCitations.first()
rddCitations =  rddCitations.filter(lambda line: line != citationheaders)

patentheaders = rddPatents.first()  
rddPatents =  rddPatents.filter(lambda line: line != patentheaders)


In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

In [None]:
# Removing headers from the datasets and turining into key value pairs

In [8]:
rddPatentsSplitColumns = rddPatents.map(lambda x: x.split(','))

kvpatents = rddPatentsSplitColumns.map(lambda x: (int(x[0]), list(x[1:])))

rddCitationsSplitColumns = rddCitations.map(lambda x: x.split(','))

kvcitingtocited = rddCitationsSplitColumns.map(lambda x: (int(x[0]), [(int(x[1]))]))

In [1]:
#Joining patents to citations. Filtering out United States citations and removing NaN or Null values.
#Selecting Patents, citations and state


In [9]:
patents_citing_join = kvpatents.leftOuterJoin(kvcitingtocited)

In [11]:
patents_citing_join = patents_citing_join.filter(lambda line: line[1][0][3] == '"US"' and line[1][0][4] != '""' and line[1][1] != None)


In [12]:
patents_citing_join = patents_citing_join.map(lambda line: (line[0],[line[1][0][4],line[1][1][0]]))

In [13]:
patents_citing_join.take(5)
# Citing, [CitingState, Cited]
# make cited as key join with patents and get the cited state

[(3874228, ['"CA"', 2404584]),
 (3874228, ['"CA"', 3308654]),
 (3880572, ['"MA"', 2972937]),
 (3880572, ['"MA"', 3585417]),
 (3880572, ['"MA"', 3732058])]

In [14]:
patents_citing_join_keyiscited = patents_citing_join.map(lambda line: (line[1][1],[line[0],line[1][0]]))

In [None]:
patents_citing_join_keyiscited.take(5)
# Cited,[Citing, CitingState]

[(2404584, [3874228, '"CA"']),
 (3308654, [3874228, '"CA"']),
 (2972937, [3880572, '"MA"']),
 (3585417, [3880572, '"MA"']),
 (3732058, [3880572, '"MA"'])]

In [None]:
#Joining again to get cited columns and perform similar filtering as above

In [None]:
patents_citing_join2 = patents_citing_join_keyiscited.leftOuterJoin(kvpatents)

In [None]:
patents_citing_join2 = patents_citing_join2.filter(lambda line: line[1][1] != None and line[1][1][3] == '"US"' and line[1][1][4] != '""' and line[1][0][1] == line[1][1][4])



In [None]:
citing_state_cited_state = patents_citing_join2.map(lambda line: (line[1][0][0],[line[1][0][1],line[0],line[1][1][4]]))

In [None]:
patents_citing_join2.take(1)

[(4009122,
  ([5137647, '"CT"'],
   ['1977',
    '6262',
    '1975',
    '"US"',
    '"CT"',
    '417130',
    '2',
    '22',
    '502',
    '1',
    '19',
    '2',
    '21',
    '1',
    '0.771',
    '0',
    '11.9524',
    '8',
    '0',
    '0',
    '0',
    '0']))]

In [None]:
citing_state_cited_state.take(5)

[(5137647, ['"CT"', 4009122, '"CT"']),
 (4889647, ['"CT"', 4009122, '"CT"']),
 (4284442, ['"NJ"', 3798090, '"NJ"']),
 (5520756, ['"NJ"', 3798090, '"NJ"']),
 (4672450, ['"CA"', 4254435, '"CA"'])]

In [3]:
#Getting count of same state citations

In [None]:
citing_countCited = citing_state_cited_state.groupByKey().mapValues(len)

In [None]:
#Final Join to get the columns as needed in the final output

In [None]:
newPatents = kvpatents.join(citing_countCited)

In [None]:
newPatents = newPatents.map(lambda line: (line[0],line[1][0]+[line[1][1]])).cache()

In [None]:
for line in newPatents.take(5):
    print(line)

(3877552, ['1975', '5583', '1973', '"US"', '"CA"', '260265', '2', '16', '188', '5', '53', '5', '7', '0.8', '0.4898', '0.625', '10.8571', '11', '0', '0', '0', '0', 2])
(3952280, ['1976', '5954', '1974', '"US"', '"CA"', '180110', '2', '5', '367', '2', '21', '5', '2', '0.8', '0', '0.5', '10.5', '10.4', '0', '0', '0', '0', 1])
(3965632, ['1976', '6024', '1975', '"US"', '"IL"', '461175', '2', '18', '52', '6', '69', '8', '2', '1', '0.5', '0.2188', '11', '6.75', '0', '0', '0', '0', 2])
(3971976, ['1976', '6052', '1975', '"US"', '"MA"', '357270', '2', '31', '363', '4', '45', '5', '2', '0.6', '0.5', '0.4444', '3', '15.8', '0.5', '0.2', '0', '0', 1])
(3972516, ['1976', '6059', '1975', '"US"', '"PA"', '154480', '2', '10', '266', '5', '52', '8', '4', '0.75', '0', '0.7222', '20.75', '11.5', '0', '0', '0', '0', 3])


In [4]:
#Sorted in descending order

In [None]:
newPatentsSorted = newPatents.sortBy(lambda x: x[1][22], False)

In [None]:
for line in newPatentsSorted.take(10):
    print(line)

(5959466, ['1999', '14515', '1997', '"US"', '"CA"', '5310', '2', '', '326', '4', '46', '159', '0', '1', '', '0.6186', '', '4.8868', '0.0455', '0.044', '', '', 125])
(5983822, ['1999', '14564', '1998', '"US"', '"TX"', '569900', '2', '', '114', '5', '55', '200', '0', '0.995', '', '0.7201', '', '12.45', '0', '0', '', '', 103])
(6008204, ['1999', '14606', '1998', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '121', '0', '1', '', '0.7415', '', '5', '0.0085', '0.0083', '', '', 100])
(5952345, ['1999', '14501', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '118', '0', '1', '', '0.7442', '', '5.1102', '0', '0', '', '', 98])
(5958954, ['1999', '14515', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '116', '0', '1', '', '0.7397', '', '5.181', '0', '0', '', '', 96])
(5998655, ['1999', '14585', '1998', '"US"', '"CA"', '', '1', '', '560', '1', '14', '114', '0', '1', '', '0.7387', '', '5.1667', '', '', '', '', 96])
(5936426, ['1999', '14466', '1997', '"US"', '"CA"