# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [4]:
rddCitations.take(5)

                                                                                

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In [None]:
rddCitations1 = rddCitations.sample(False, 0.05)

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

In [None]:
rddPatents1 = rddPatents.sample(False, 0.05)

In [6]:
from itertools import islice

In [7]:
rddCit = rddCitations.cache().mapPartitionsWithIndex(lambda index, iterator:islice(iterator, 1,None ) if index == 0 else iterator)

In [8]:
rddCit.take(5)

                                                                                

['3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384',
 '3858241,3634889']

In [9]:
rddPat = rddPatents.cache().mapPartitionsWithIndex(lambda index, iterator:islice(iterator, 1,None ) if index == 0 else iterator)

In [10]:
rddPat.take(5)

                                                                                

['3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,',
 '3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,']

Getting rid of column headers in citation and patents RDDs. 

In [11]:
rddPat.cache()

PythonRDD[8] at RDD at PythonRDD.scala:53

In [12]:
rddCitSplit = rddCit.map(lambda x : x.split(',')).map(lambda y: (int(y[0]),int(y[1])))

In [13]:
rddCitSplit.take(5)

[(3858241, 956203),
 (3858241, 1324234),
 (3858241, 3398406),
 (3858241, 3557384),
 (3858241, 3634889)]

Extracting the citing patent and cited patent from rddCit table. Splitting each line on \",\" . Convert both of them to int

In [14]:
rddPatSplit = rddPat.map( lambda x: (int(x.split(',')[0]),str(x.split(',')[5])))

In [15]:
rddPatSplit.take(5)

                                                                                

[(3070801, '""'),
 (3070802, '"TX"'),
 (3070803, '"IL"'),
 (3070804, '"OH"'),
 (3070805, '"CA"')]

In [16]:
rdd1 = rddCitSplit.join(rddPatSplit)

In [17]:
rdd1.take(10)

                                                                                

[(3859644, (3290486, '"IL"')),
 (3859644, (3413446, '"IL"')),
 (3859644, (3611336, '"IL"')),
 (3859644, (3632986, '"IL"')),
 (3859644, (3688295, '"IL"')),
 (3859644, (3745308, '"IL"')),
 (3859644, (3751632, '"IL"')),
 (3859644, (3778798, '"IL"')),
 (3860958, (3126535, '"MN"')),
 (3861330, (3396681, '"PA"'))]

Joing citations RDD with patents RDD which has only patent no.s and state 

In [18]:
rdd1.cache()

PythonRDD[19] at RDD at PythonRDD.scala:53

In [19]:
rdd1map = rdd1.map(lambda x: (x[1][0],(x[0],x[1][1])))

In [20]:
rdd1map.take(10)

                                                                                

[(2506598, (3859078, '""')),
 (2593505, (3859078, '""')),
 (2665982, (3859078, '""')),
 (2855293, (3859078, '""')),
 (3330645, (3859078, '""')),
 (3706549, (3859078, '""')),
 (1692980, (3859684, '"IN"')),
 (3161899, (3859684, '"IN"')),
 (3451086, (3859684, '"IN"')),
 (3551932, (3859684, '"IN"'))]

Re-arranging the joined RDD in the form 'cited', 'citing', 'citing_state'

In [21]:
rdd2 = rdd1map.join(rddPatSplit)

In [22]:
rdd2.take(10)

                                                                                

[(3433218, ((3917458, '""'), '"NY"')),
 (3433218, ((5235804, '"CT"'), '"NY"')),
 (3433218, ((5968320, '""'), '"NY"')),
 (3433218, ((3984196, '""'), '"NY"')),
 (3433218, ((5318436, '"CT"'), '"NY"')),
 (3246456, ((3993464, '""'), '"CA"')),
 (3246456, ((5500028, '""'), '"CA"')),
 (3246456, ((3873286, '"KY"'), '"CA"')),
 (3246456, ((5942952, '"AZ"'), '"CA"')),
 (3246456, ((5230720, '"GA"'), '"CA"'))]

Again joining the previously joined RDD with patents table on cited patents

In [None]:
counts = rdd2.count()
print(counts)

In [23]:
rdd2.cache()

PythonRDD[29] at RDD at PythonRDD.scala:53

In [24]:
rdd4 = rdd2.filter(lambda x: ((x[1][0][1] != '""') or (x[1][1] != '""')))

In [25]:
rdd4.take(10)

                                                                                

[(3945930, ((4160370, '"MI"'), '""')),
 (3945930, ((5332452, '"IL"'), '""')),
 (3945930, ((4342658, '"MI"'), '""')),
 (3945930, ((5080814, '"MI"'), '""')),
 (3945930, ((4731190, '"GA"'), '""')),
 (3945930, ((4260502, '"IL"'), '""')),
 (3945930, ((4172802, '"CA"'), '""')),
 (3945930, ((4313836, '"MI"'), '""')),
 (3945930, ((4108785, '"OH"'), '""')),
 (3945930, ((4636321, '"PA"'), '""'))]

Filtering out the rows in which either 'cited_state' or 'citing_state' has null values

In [None]:
count2 = rdd4.count()
print(count2)

In [26]:
rdd5 = rdd4.filter(lambda x: (x[1][0][1] == x[1][1]))

In [27]:
rdd5.take(10)

[(5182659, ((5686960, '"MA"'), '"MA"')),
 (3373107, ((4756370, '"TX"'), '"TX"')),
 (3373107, ((5900213, '"TX"'), '"TX"')),
 (3373107, ((5009799, '"TX"'), '"TX"')),
 (5067948, ((5562616, '"CA"'), '"CA"')),
 (5067948, ((5372586, '"CA"'), '"CA"')),
 (5067948, ((5372590, '"CA"'), '"CA"')),
 (5067948, ((5395326, '"CA"'), '"CA"')),
 (5067948, ((5445614, '"CA"'), '"CA"')),
 (5067948, ((5286258, '"CA"'), '"CA"'))]

Filtering the rdd4 to have co-citations (i.e., 'cited_state'=='citing_state'

In [28]:
rdd6 = rdd5.map(lambda x: (x[1][0][0], x[1][0][1], x[0], x[1][1]))

In [29]:
rdd6.take(10)

[(5686960, '"MA"', 5182659, '"MA"'),
 (4756370, '"TX"', 3373107, '"TX"'),
 (5900213, '"TX"', 3373107, '"TX"'),
 (5009799, '"TX"', 3373107, '"TX"'),
 (5562616, '"CA"', 5067948, '"CA"'),
 (5372586, '"CA"', 5067948, '"CA"'),
 (5372590, '"CA"', 5067948, '"CA"'),
 (5395326, '"CA"', 5067948, '"CA"'),
 (5445614, '"CA"', 5067948, '"CA"'),
 (5286258, '"CA"', 5067948, '"CA"')]

Re-arranging rdd5 in the order 'citing', 'citing_state', 'cited', 'cited_state'

In [30]:
rdd7 = rdd6.map(lambda x: (x[0],1))

In [31]:
rdd7.take(10)

[(5686960, 1),
 (4756370, 1),
 (5900213, 1),
 (5009799, 1),
 (5562616, 1),
 (5372586, 1),
 (5372590, 1),
 (5395326, 1),
 (5445614, 1),
 (5286258, 1)]

Using map function to count each citing patent as '1' similar to 'word count' program.

In [32]:
rdd8 = rdd7.reduceByKey(lambda x,y : x+y)

In [33]:
rdd8.take(10)

                                                                                

[(5009799, 1),
 (5372586, 6),
 (5395326, 7),
 (5286258, 4),
 (4858464, 2),
 (4458522, 1),
 (4570485, 2),
 (5065617, 10),
 (4646559, 8),
 (3962376, 3)]

Using reduce function to get co-citations count similar to 'word-count' program

In [37]:
rdd10 = rdd8.filter(lambda x: x[1] >= 90)

In [38]:
rdd10.take(15)

                                                                                

[(5958954, 96),
 (5952345, 98),
 (5925042, 90),
 (5913855, 90),
 (5951547, 90),
 (5983822, 103),
 (5739256, 90),
 (5978329, 90),
 (5959466, 125),
 (6008204, 100),
 (5998655, 96),
 (5980517, 90),
 (5936426, 94)]

In [39]:
rdd11 = rdd8.takeOrdered(10, key = lambda x: -x[1])

                                                                                

In [40]:
print(rdd11)

[(5959466, 125), (5983822, 103), (6008204, 100), (5952345, 98), (5958954, 96), (5998655, 96), (5936426, 94), (5925042, 90), (5913855, 90), (5951547, 90)]


Sorting just the patents and their counts in descending order.

In [42]:
rddPatfinal = rddPat.map( lambda x: x.split(","))

In [None]:
rddPatfinal.take(5)

In [44]:
rddPat1 = rddPatfinal.map(lambda x: (int(x[0]), (x[1:])))

In [None]:
rddPat1.take(5)

In [46]:
rddPat1.cache()

PythonRDD[46] at RDD at PythonRDD.scala:53

In [47]:
rddfinal = rddPat1.join(rdd8)

In [None]:
rddfinal.take(5)

Joining the patents and their co-citations count with patent table with all information pertaining to the patent.

In [49]:
rdd9 = rddfinal.map(lambda x: (x[0], x[1][0][0:], x[1][1]))

In [None]:
rdd9.take(5)

In [57]:
rddex1 = rddfinal.filter(lambda x: x[1][1] >= 90)

In [None]:
rddex1.take(15)

In [60]:
rdd10 = rddex1.takeOrdered(10, key = lambda x: -x[1][1])

                                                                                

In [61]:
print(rdd10)

[(5959466, (['1999', '14515', '1997', '"US"', '"CA"', '5310', '2', '', '326', '4', '46', '159', '0', '1', '', '0.6186', '', '4.8868', '0.0455', '0.044', '', ''], 125)), (5983822, (['1999', '14564', '1998', '"US"', '"TX"', '569900', '2', '', '114', '5', '55', '200', '0', '0.995', '', '0.7201', '', '12.45', '0', '0', '', ''], 103)), (6008204, (['1999', '14606', '1998', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '121', '0', '1', '', '0.7415', '', '5', '0.0085', '0.0083', '', ''], 100)), (5952345, (['1999', '14501', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '118', '0', '1', '', '0.7442', '', '5.1102', '0', '0', '', ''], 98)), (5958954, (['1999', '14515', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '116', '0', '1', '', '0.7397', '', '5.181', '0', '0', '', ''], 96)), (5998655, (['1999', '14585', '1998', '"US"', '"CA"', '', '1', '', '560', '1', '14', '114', '0', '1', '', '0.7387', '', '5.1667', '', '', '', ''], 96)), (5936426, (['1999', '14466', '

In [65]:
for i in range(0,10):
    print(rdd10[i])

(5959466, (['1999', '14515', '1997', '"US"', '"CA"', '5310', '2', '', '326', '4', '46', '159', '0', '1', '', '0.6186', '', '4.8868', '0.0455', '0.044', '', ''], 125))
(5983822, (['1999', '14564', '1998', '"US"', '"TX"', '569900', '2', '', '114', '5', '55', '200', '0', '0.995', '', '0.7201', '', '12.45', '0', '0', '', ''], 103))
(6008204, (['1999', '14606', '1998', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '121', '0', '1', '', '0.7415', '', '5', '0.0085', '0.0083', '', ''], 100))
(5952345, (['1999', '14501', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '118', '0', '1', '', '0.7442', '', '5.1102', '0', '0', '', ''], 98))
(5958954, (['1999', '14515', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '116', '0', '1', '', '0.7397', '', '5.181', '0', '0', '', ''], 96))
(5998655, (['1999', '14585', '1998', '"US"', '"CA"', '', '1', '', '560', '1', '14', '114', '0', '1', '', '0.7387', '', '5.1667', '', '', '', ''], 96))
(5936426, (['1999', '14466', '1997', 

Iterating through each tuple in list rdd10 to display the result in desired manner. It displays top 10 patents.

In [63]:
print(len(rdd10))

10
