# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [4]:
rddCitations.take(5)

                                                                                

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

### Solution

We first remove the column names row from the RDD

In [6]:
h1 = rddCitations.first()
h2 = rddPatents.first()
c1 = rddCitations.filter(lambda line: line != h1)
p1 = rddPatents.filter(lambda line: line != h2)

Since the data is extracted from the gzip as a comma separated collection of rows, we must first split each row by the comma to get tuples

In [7]:
c2 = c1.map(lambda x: (x.split(',')[0], x.split(',')[1]))

We perform the same action with the patents rdds but we also create multiple copies as well as creating reduced patent rdds with just the patent and postate

In [8]:
p2 = p1.map(lambda x: (x.split(',')[0], x))
p3 = p1.map(lambda x: (x.split(',')[0], x))
p4 = p2.map(lambda x: (x[0], x[1].split(',')[5]))
p5 = p3.map(lambda x: (x[0], x[1].split(',')[5]))
p6 = p1.map(lambda x: (x.split(',')[0], x))

We join the citations rdd with the reduced patents rdd on the key which is citing patent in the citations rdd and patent in the patents rdd

In [9]:
c3 = c2.join(p4)

We then use map to convert this joined result into a key value pair with cited as the new key and (citing, citing_postate) as the value

In [10]:
c4 = c3.map(lambda x: (x[1][0], (x[0], x[1][1])))

We join the citations rdd again with another reduced patents rdd on the key which is cited patent in the citations rdd and patent in the patents rdd

In [11]:
c5 = c4.join(p5)

We then flatten this complicated nested tuple into a regular tuple with 4 elements (citing, citing_postate, cited, cited_postate)

In [12]:
c6 = c5.map(lambda x: (x[1][0][0], x[1][0][1], x[0], x[1][1]))

We then remove the cases where the postates are not equal

In [13]:
c7 = c6.filter(lambda x: x[1] == x[3])

Then we remove the cases where the postate is empty

In [14]:
c8 = c7.filter(lambda x: x[1] != '""')

We once again convert this into a key value pair with citing as the key and (citing_postate, cited, cited_postate) as the value

In [15]:
c9 = c8.map(lambda x: (x[0], (x[1], x[2], x[3])))

We perform the groupBy and count. This will give us just the citing and count as the result. Citing is still the key here

In [16]:
c10 = c9.groupByKey().mapValues(lambda x: len(set(x)))

We perform the final join to add back the patent data

In [17]:
c11 = c10.join(p6).cache()

We sort by descending order of count

In [18]:
c12 = c11.sortBy(lambda x: x[1][0], False)

                                                                                

Finally, we drop the redundant citing data as it is already in the patent info (This action is not required). Currently the result is in the form of (patent_row_string, count) but we can use map and split to convert this into a larger tuple but I found it is easier to read the data in the current format, so I stored as such

In [19]:
c13 = c12.map(lambda x: (x[1][1], x[1][0]))

In [20]:
c13.take(10)

                                                                                

[('5959466,1999,14515,1997,"US","CA",5310,2,,326,4,46,159,0,1,,0.6186,,4.8868,0.0455,0.044,,',
  125),
 ('5983822,1999,14564,1998,"US","TX",569900,2,,114,5,55,200,0,0.995,,0.7201,,12.45,0,0,,',
  103),
 ('6008204,1999,14606,1998,"US","CA",749584,2,,514,3,31,121,0,1,,0.7415,,5,0.0085,0.0083,,',
  100),
 ('5952345,1999,14501,1997,"US","CA",749584,2,,514,3,31,118,0,1,,0.7442,,5.1102,0,0,,',
  98),
 ('5958954,1999,14515,1997,"US","CA",749584,2,,514,3,31,116,0,1,,0.7397,,5.181,0,0,,',
  96),
 ('5998655,1999,14585,1998,"US","CA",,1,,560,1,14,114,0,1,,0.7387,,5.1667,,,,',
  96),
 ('5936426,1999,14466,1997,"US","CA",5310,2,,326,4,46,178,0,1,,0.58,,11.2303,0.0765,0.073,,',
  94),
 ('5913855,1999,14417,1997,"US","CA",733846,2,,606,3,32,242,0,1,,0.7403,,8.3595,0,0,,',
  90),
 ('5739256,1998,13983,1995,"US","CA",70060,2,15,528,1,15,453,0,1,,0.8232,,15.1104,0.1124,0.1082,,',
  90),
 ('5925042,1999,14445,1997,"US","CA",733846,2,,606,3,32,242,0,1,,0.7382,,8.3471,0,0,,',
  90)]