# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [24]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [25]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=Lab4-rdd, master=local[*]) created by __init__ at /tmp/ipykernel_138/3941829341.py:2 

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [26]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [27]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [28]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

*********** My Solution Starts Here **********

Step 1: Extract the PATENT and POSTATE columns from Patents table

In [29]:
patents= rddPatents.map(lambda x : x.split(',')).map(lambda x:(x[0], x[5]))
patents.take(5)

[('"PATENT"', '"POSTATE"'),
 ('3070801', '""'),
 ('3070802', '"TX"'),
 ('3070803', '"IL"'),
 ('3070804', '"OH"')]

Step 2: We have to filter out column names, so modify the previous code by adding filter condition

In [6]:
patents= rddPatents.filter(lambda x:'PATENT' not in x).map(lambda x : x.split(',')).map(lambda x:(x[0], x[5]))
patents.take(5)

[('3070801', '""'),
 ('3070802', '"TX"'),
 ('3070803', '"IL"'),
 ('3070804', '"OH"'),
 ('3070805', '"CA"')]

Step 3: Similarly, extract the CITED and CITING columns from Citations table and filter out the column names

In [7]:
citing= rddCitations.filter(lambda x:'CITED' not in x).map(lambda x : x.split(',')).map(lambda x: (x[0],x[1]))
citing.take(5)

[('3858241', '956203'),
 ('3858241', '1324234'),
 ('3858241', '3398406'),
 ('3858241', '3557384'),
 ('3858241', '3634889')]

Step 4: Join both the tables so that we can get CITING_STATE

In [22]:
initial_join = citing.join(patents)

In [23]:
initial_join.take(5)

[('3858365', ('1962481', '""')),
 ('3858365', ('3163971', '""')),
 ('3858365', ('3324621', '""')),
 ('3858365', ('3553924', '""')),
 ('3858365', ('3676980', '""'))]

Step 5: Currently CITING is the key. In order to get CITED_STATES with next join operation, swap CITING and CITED so that CITED is the key.

In [10]:
def getCitations(x):
    citing, (cited, citing_state) = x
    return (cited, (citing, citing_state))

swapped = initial_join.map(getCitations)

In [11]:
swapped.take(5)

[('2507756', ('3859862', '"MI"')),
 ('2753812', ('3859862', '"MI"')),
 ('2757569', ('3859862', '"MI"')),
 ('3139304', ('3861637', '""')),
 ('3291525', ('3861637', '""'))]

Step 6: Join both the tables so that we can get CITED_STATE

In [31]:
int_res = swapped.join(patents).cache()

In [32]:
int_res.take(5)

[('3922914', (('5400651', '""'), '"PA"')),
 ('3922914', (('4006637', '""'), '"PA"')),
 ('3922914', (('4786857', '"WA"'), '"PA"')),
 ('3922914', (('5233986', '"IN"'), '"PA"')),
 ('3922914', (('5554936', '"WA"'), '"PA"'))]

Step 7: Rearrange the output so that it's a proper (key, value) pair

In [13]:
def getCitationsWithNoFilter(x):
    cited, ((citing, citing_state), cited_state) = x
    return (citing, (citing_state, cited, cited_state))

int_res = int_res.map(getCitationsWithNoFilter)

In [14]:
int_res.take(5)

[('4482536', ('""', '4022881', '""')),
 ('5256402', ('"NJ"', '4022881', '""')),
 ('4426373', ('""', '4022881', '""')),
 ('5496541', ('"PA"', '4022881', '""')),
 ('4444747', ('""', '4022881', '""'))]

Step 8: Filter out states that have a blank string

In [15]:
def getCitationsWithFilter(x):
    citing, (citing_state, cited, cited_state) = x
    return True if (citing_state != '""' and cited_state != '""' and (citing_state == cited_state)) else False

citing_counts = int_res.filter(getCitationsWithFilter)

Step 9: Count the number of same state citations for each patent

In [16]:
def citeCount(x):
    citing, (citing_state, cited, cited_state) = x
    return (citing,1)

citing_counts = citing_counts.map(citeCount).reduceByKey(lambda acc, val: acc + val)

In [17]:
citing_counts = citing_counts.sortBy(lambda x: x[1], ascending=False)
citing_counts.take(10)

[('5959466', 125),
 ('5983822', 103),
 ('6008204', 100),
 ('5952345', 98),
 ('5998655', 96),
 ('5958954', 96),
 ('5936426', 94),
 ('5978329', 90),
 ('5913855', 90),
 ('5739256', 90)]

Step 10: Retrieve PATENT column from RddPatents table and join it with the same state count output from above

In [18]:
def getPatentData(line):
    line_split = line.split(',')
    return (line_split[0],",".join(line_split[1:]))

patent_kv = rddPatents.map(getPatentData)

In [19]:
patentJoinedWithCited = patent_kv.leftOuterJoin(citing_counts).cache()

Step 11: For any state with 'None' value as count, replace it with 0 and print the final output.

In [20]:
def fillNone(x):
    (key, (rest, count)) = x
    if(count is None):
        count = 0
    return (key, (rest, count))

patentJoinedWithCited = patentJoinedWithCited.map(fillNone)

In [21]:
def getKeyCount(x):
    (key, (rest, count)) = x
    return count

patentJoinedWithCited.sortBy(lambda x: getKeyCount(x), ascending=False).take(10)

[('5959466',
  ('1999,14515,1997,"US","CA",5310,2,,326,4,46,159,0,1,,0.6186,,4.8868,0.0455,0.044,,',
   125)),
 ('5983822',
  ('1999,14564,1998,"US","TX",569900,2,,114,5,55,200,0,0.995,,0.7201,,12.45,0,0,,',
   103)),
 ('6008204',
  ('1999,14606,1998,"US","CA",749584,2,,514,3,31,121,0,1,,0.7415,,5,0.0085,0.0083,,',
   100)),
 ('5952345',
  ('1999,14501,1997,"US","CA",749584,2,,514,3,31,118,0,1,,0.7442,,5.1102,0,0,,',
   98)),
 ('5958954',
  ('1999,14515,1997,"US","CA",749584,2,,514,3,31,116,0,1,,0.7397,,5.181,0,0,,',
   96)),
 ('5998655',
  ('1999,14585,1998,"US","CA",,1,,560,1,14,114,0,1,,0.7387,,5.1667,,,,', 96)),
 ('5936426',
  ('1999,14466,1997,"US","CA",5310,2,,326,4,46,178,0,1,,0.58,,11.2303,0.0765,0.073,,',
   94)),
 ('5925042',
  ('1999,14445,1997,"US","CA",733846,2,,606,3,32,242,0,1,,0.7382,,8.3471,0,0,,',
   90)),
 ('5913855',
  ('1999,14417,1997,"US","CA",733846,2,,606,3,32,242,0,1,,0.7403,,8.3595,0,0,,',
   90)),
 ('5739256',
  ('1998,13983,1995,"US","CA",70060,2,15,528,1,1