# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

## Aparajita Singh (apsi2875)
#### Resources: Lecture videos and links provided
#### Collaborators: Amit Baran Roy

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator
from itertools import islice

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

In [3]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 24 kB/s s eta 0:00:01     |████████████████▏               | 103.2 MB 53.0 MB/s eta 0:00:02��█▍            | 123.7 MB 53.0 MB/s eta 0:00:02
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 41.4 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612244 sha256=fff45e2d4c705346fb90a04231a1b56b5b79d821dad08113ad967f5522612ee1
  Stored in directory: /home/jovyan/.cache/pip/wheels/ea/21/84/970b03913d0d6a96ef51c34c878add0de9e4ecbb7c764ea21f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1
Note: you may need to restart the kernel to use updated packages.


Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [4]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [5]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [6]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

#### Filter out the header string as we do not require it in the later steps.

In [7]:
rddCita = rddCitations.cache().mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
rddPat = rddPatents.cache().mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)

#### Extract the citing patent and the cited patent from the citations file. Split each line on ",". Convert both of them to int.

In [8]:
rddCita = rddCita.map( lambda x : (int(x.split(',')[0]), int(x.split(',')[1])) )
rddCita.take(5)

[(3858241, 956203),
 (3858241, 1324234),
 (3858241, 3398406),
 (3858241, 3557384),
 (3858241, 3634889)]

#### Extract the patent number and the state from patents table. Split each line on ",". Convert the value at index 0 to int (this is our patent number). The value at index 5 gives us the state. Store this information in rddPat.

In [9]:
rddPat = rddPat.map( lambda x : (int(x.split(',')[0]), x.split(',')[5]) )
rddPat.take(5)

[(3070801, '""'),
 (3070802, '"TX"'),
 (3070803, '"IL"'),
 (3070804, '"OH"'),
 (3070805, '"CA"')]

#### Took a sample of rddPat data for testing / debugging, using sample().

In [None]:
#rddPatSample = rddPat.sample(False,0.05)
#rddCitaSample = rddCitations.sample(False,0.05)

#### Perform join using the rddCita and rddPat.

In [10]:
rdd1 = rddCita.join(rddPat)

In [11]:
rdd1.take(5)

[(3952244, (3176219, '"CA"')),
 (3952244, (3648163, '"CA"')),
 (3952244, (3753088, '"CA"')),
 (3952244, (3809961, '"CA"')),
 (3952244, (3810003, '"CA"'))]

#### In the above step, we obtain citing, cited and citing state. In the next step, interchange the columns. Make cited patent the key and the other two columns are citing and citing state.

In [12]:
rdd2 = rdd1.map(lambda x: (x[1][0], (x[0], x[1][1])))

In [13]:
rdd2.take(5)

[(2704577, (3861260, '"CT"')),
 (3288013, (3861260, '"CT"')),
 (3333495, (3861260, '"CT"')),
 (3381562, (3861260, '"CT"')),
 (3808912, (3861260, '"CT"'))]

#### Perform join using rdd2 and rddPat.

In [14]:
rdd3 = rdd2.join(rddPat)

In [15]:
rdd3.take(5)

[(4212471, ((4538256, '""'), '""')),
 (4212471, ((4535436, '""'), '""')),
 (4212471, ((4409712, '""'), '""')),
 (4212471, ((4480326, '""'), '""')),
 (4212471, ((4387540, '"NJ"'), '""'))]

#### Rearrange the columns. The first column becomes citing, the second and third are the states and the last one is cited patent.

In [16]:
rdd5 = rdd3.map(lambda x: (x[1][0][0], (x[1][0][1], x[1][1], x[0])))

In [17]:
rdd5.take(10)

[(5805056, ('"MI"', '"NY"', 5268564)),
 (5811782, ('""', '"NY"', 5268564)),
 (5656804, ('"NY"', '"NY"', 5268564)),
 (5932860, ('"PA"', '"NY"', 5268564)),
 (5506394, ('"PA"', '"NY"', 5268564)),
 (5656805, ('"PA"', '"NY"', 5268564)),
 (5576531, ('"NY"', '"NY"', 5268564)),
 (5541397, ('"NY"', '"NY"', 5268564)),
 (4224118, ('"IN"', '"NY"', 3451902)),
 (4058432, ('""', '"NY"', 3451902))]

#### Filter and keep only those rows where citing state is the same as cited state. Also, check that the states aren't empty. 

In [18]:
rdd6 = rdd5.filter(lambda x: x[1][0]==x[1][1]).filter(lambda x: x[1][0] != '""').filter(lambda x: x[1][1] != '""')
rdd6.take(5)

[(3920628, ('"NJ"', '"NJ"', 3828021)),
 (4066752, ('"NJ"', '"NJ"', 3828021)),
 (4029882, ('"NJ"', '"NJ"', 3828021)),
 (4085208, ('"NJ"', '"NJ"', 3828021)),
 (4000262, ('"NJ"', '"NJ"', 3828021))]

#### Perform group by operation using citing as the key.

In [19]:
rdd7 = rdd6.groupBy(lambda x: x[0])
rdd7.take(5)

[(4318176, <pyspark.resultiterable.ResultIterable at 0x7f6f141649d0>),
 (5771811, <pyspark.resultiterable.ResultIterable at 0x7f6f14164a90>),
 (3869664, <pyspark.resultiterable.ResultIterable at 0x7f6f141649a0>),
 (4127544, <pyspark.resultiterable.ResultIterable at 0x7f6f14164700>),
 (5429418, <pyspark.resultiterable.ResultIterable at 0x7f6f14164e80>)]

#### For each citing patent, the length of the same state cited patents is extracted and stored in rdd8.

In [20]:
rdd8 = rdd7.map(lambda x: (x[0],len(x[1])))

In [21]:
rdd8.take(5)

[(4318176, 10), (5771811, 5), (3869664, 1), (4127544, 6), (5429418, 1)]

#### Create an output rdd which has citing as key and the patent information.

In [22]:
rddPatOut = rddPatents.cache().mapPartitionsWithIndex(lambda idx, it: islice(it, 1, None) if idx == 0 else it)
rddPatOut = rddPatOut.map( lambda x : (int(x.split(',')[0]), x.split(',')[1:]) )

#### Join rdd8 (with count) and the output rdd.

In [23]:
#rddPatOutSample = rddPatOut.sample(False,0.05)

In [24]:
rdd9 = rdd8.join(rddPatOut)

In [25]:
#rdd9.take(5)
rdd10 = rdd9.sortBy(lambda x: x[1][0], ascending=False)

#### The code in the next cell prints out the patent number and patent information. The patent information now contains the count as well.

In [26]:
#rdd10.take(5)

In [27]:
rdd11 = rdd10.map(lambda x: (x[0], x[1][1] + [x[1][0]]) ).take(20) 
#rdd10.reduceByKey(lambda x,y: (x, y[1].extend(y[0]))).take(5)
for line in rdd11:
    print (line)

(5959466, ['1999', '14515', '1997', '"US"', '"CA"', '5310', '2', '', '326', '4', '46', '159', '0', '1', '', '0.6186', '', '4.8868', '0.0455', '0.044', '', '', 125])
(5983822, ['1999', '14564', '1998', '"US"', '"TX"', '569900', '2', '', '114', '5', '55', '200', '0', '0.995', '', '0.7201', '', '12.45', '0', '0', '', '', 103])
(6008204, ['1999', '14606', '1998', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '121', '0', '1', '', '0.7415', '', '5', '0.0085', '0.0083', '', '', 100])
(5952345, ['1999', '14501', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '118', '0', '1', '', '0.7442', '', '5.1102', '0', '0', '', '', 98])
(5958954, ['1999', '14515', '1997', '"US"', '"CA"', '749584', '2', '', '514', '3', '31', '116', '0', '1', '', '0.7397', '', '5.181', '0', '0', '', '', 96])
(5998655, ['1999', '14585', '1998', '"US"', '"CA"', '', '1', '', '560', '1', '14', '114', '0', '1', '', '0.7387', '', '5.1667', '', '', '', '', 96])
(5936426, ['1999', '14466', '1997', '"US"', '"CA"