# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")
# rddPatents = sc.textFile("apat63_99.txt.gz").sample(fraction=.003, withReplacement=False, seed=42)

The data looks like the following.

In [4]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

## Method

1. Join patents with citations to retrieve all of each patent's citations.
2. Join this rdd back with patents to now retrieve each citation's postate.
3. Filter out rows where the citing postate matches the citated poststate.
4. Group these rows by their citing patent number to find cocitations, and count said cocitations.
5. Join the counts back to patents to get output in required form and sort the rows in descending order of counts.

In [6]:
# preprocess rows to split fields
patents = rddPatents.map(lambda x: x.split(",")).cache()
citations = rddCitations.map(lambda x: x.split(",")).cache()

# save headers
patents_header = patents.first()
citations_header = citations.first()

# remove headers from rdd
patents = patents.filter(lambda row: row != patents_header)
citations = citations.filter(lambda row: row != citations_header).cache()

# filter out rows without POSTATE
patents = patents.filter(lambda r: r[5]!='""').cache()

In [7]:
# make key value pairs
key_patents = patents.map(lambda r: (r[0], ','.join(r)))

In [8]:
# join with citations
nq1 = key_patents.join(citations).cache()
# parse and reformat
parsed_nq1 = nq1.map(lambda r: r[1][0].split(',') + [r[1][1]])

In [9]:
# make key value pairs
key_nq1 = parsed_nq1.map(lambda r: (r[-1], ','.join(r))).cache()

In [10]:
# join with patents
key_patents = patents.map(lambda r: (r[0], r[5]))
nq2 = key_nq1.join(key_patents).cache()
# parse and reformat
parsed_nq2 = nq2.map(lambda r: r[1][0].split(',') + [r[1][1]])

In [11]:
# filter rows where citing POSTATE is the same as the cited POSTATE
nq3 = parsed_nq2.filter(lambda r: r[5]==r[-1])

In [12]:
# group by citing PATENT
grouped_nq3 = nq3.groupBy(lambda r: r[0])

In [13]:
# count cocitations
counted_nq3 = grouped_nq3.map(lambda r: (r[0], len(r[1])))

In [14]:
# make key value pairs
key_patents = patents.map(lambda r: (r[0], ','.join(r)))
# join with count cocitations
nq4 = key_patents.join(counted_nq3).cache()
# parse and reformat
parsed_nq4 = nq4.map(lambda r: r[1][0].split(',') + [r[1][1]])

In [15]:
# sort in descending order of cocitation counts
sorted_nq4 = parsed_nq4.sortBy(lambda r: -r[-1])

In [16]:
# print answer
sorted_nq4.take(10)

[['5959466',
  '1999',
  '14515',
  '1997',
  '"US"',
  '"CA"',
  '5310',
  '2',
  '',
  '326',
  '4',
  '46',
  '159',
  '0',
  '1',
  '',
  '0.6186',
  '',
  '4.8868',
  '0.0455',
  '0.044',
  '',
  '',
  125],
 ['5983822',
  '1999',
  '14564',
  '1998',
  '"US"',
  '"TX"',
  '569900',
  '2',
  '',
  '114',
  '5',
  '55',
  '200',
  '0',
  '0.995',
  '',
  '0.7201',
  '',
  '12.45',
  '0',
  '0',
  '',
  '',
  103],
 ['6008204',
  '1999',
  '14606',
  '1998',
  '"US"',
  '"CA"',
  '749584',
  '2',
  '',
  '514',
  '3',
  '31',
  '121',
  '0',
  '1',
  '',
  '0.7415',
  '',
  '5',
  '0.0085',
  '0.0083',
  '',
  '',
  100],
 ['5952345',
  '1999',
  '14501',
  '1997',
  '"US"',
  '"CA"',
  '749584',
  '2',
  '',
  '514',
  '3',
  '31',
  '118',
  '0',
  '1',
  '',
  '0.7442',
  '',
  '5.1102',
  '0',
  '0',
  '',
  '',
  98],
 ['5958954',
  '1999',
  '14515',
  '1997',
  '"US"',
  '"CA"',
  '749584',
  '2',
  '',
  '514',
  '3',
  '31',
  '116',
  '0',
  '1',
  '',
  '0.7397',
  '',
  