# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [4]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

### SOLUTION PROCESS

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

Remove header and convert string into key-value pairs for patents rdd

In [7]:
patents_header = rddPatents.first()
key_value_patents = rddPatents.filter(lambda row: row != patents_header) \
        .map(lambda row: (row.split(",")[0], row.split(",")[1:]))
key_value_patents.take(1)

[('3070801',
  ['1963',
   '1096',
   '',
   '"BE"',
   '""',
   '',
   '1',
   '',
   '269',
   '6',
   '69',
   '',
   '1',
   '',
   '0',
   '',
   '',
   '',
   '',
   '',
   '',
   ''])]

Remove header and convert string into key-value pairs for citations rdd

In [8]:
citations_header = rddCitations.first()
key_value_citations = rddCitations.filter(lambda row: row != citations_header) \
        .map(lambda row: (row.split(",")[1], row.split(",")))
key_value_citations.take(1)

[('956203', ['3858241', '956203'])]

Get POSTATE info for each patent as a new rdd

In [9]:
state_rdd = key_value_patents.map(lambda row: (row[0], row[1][4]))
state_rdd.take(5)

[('3070801', '""'),
 ('3070802', '"TX"'),
 ('3070803', '"IL"'),
 ('3070804', '"OH"'),
 ('3070805', '"CA"')]

Get POSTATE for CITIED patent

In [10]:
cited_key_join = key_value_citations.join(state_rdd)
cited_key_join.take(5)

[('3770051', (['3860386', '3770051'], '""')),
 ('3770051', (['3879599', '3770051'], '""')),
 ('3770051', (['3902046', '3770051'], '""')),
 ('3770051', (['4001548', '3770051'], '""')),
 ('3770051', (['4029939', '3770051'], '""'))]

Update key to CITING from CITED 

In [11]:
key_citing_rdd = cited_key_join.map(lambda row: (row[1][0][0], row[1]))
key_citing_rdd.take(5)

[('3858577', (['3858577', '3471215'], '"MA"')),
 ('3963323', (['3963323', '3471215'], '"MA"')),
 ('4120293', (['4120293', '3471215'], '"MA"')),
 ('4161944', (['4161944', '3471215'], '"MA"')),
 ('4266548', (['4266548', '3471215'], '"MA"'))]

Get POSTATE for CITING patent

In [12]:
rdd_with_states = key_citing_rdd.join(state_rdd)
rdd_with_states.take(5)

[('5809756', ((['5809756', '3485314'], '"GA"'), '"WI"')),
 ('5809756', ((['5809756', '4920734'], '"NY"'), '"WI"')),
 ('5809756', ((['5809756', '5507138'], '"MD"'), '"WI"')),
 ('5809756', ((['5809756', '4878339'], '"MN"'), '"WI"')),
 ('5809756', ((['5809756', '4879867'], '"NY"'), '"WI"'))]

Check if POSTATE for CITING andd CITED are same. If so, CITING patent get trivial count = 1

In [13]:
same_state_rdd = rdd_with_states.filter(lambda row: row[1][1] == row[1][0][1] and row[1][1]!= '""' and row[1][1]!= 'null') \
             .map(lambda row: (row[0], 1))
same_state_rdd.take(5)

[('4344287', 1),
 ('5952671', 1),
 ('5952671', 1),
 ('5952671', 1),
 ('3956832', 1)]

Calculate the total count for each patent and sort in descending order of count 

In [15]:
count_rdd = same_state_rdd.reduceByKey(lambda x, y: x + y)
count_rdd = count_rdd.sortBy(lambda x: x[1], ascending=False)
count_rdd.take(5)

[('5959466', 125),
 ('5983822', 103),
 ('6008204', 100),
 ('5952345', 98),
 ('5998655', 96)]

#### FINAL RESULT

Append SAME_STATE count to patents data and compute results in descending order of SAME_STATE count 

In [16]:
result = key_value_patents.join(count_rdd)
result = result.sortBy(lambda x: x[1][1], ascending=False)

Display the top 10 results

In [17]:
result.take(10)

[('5959466',
  (['1999',
    '14515',
    '1997',
    '"US"',
    '"CA"',
    '5310',
    '2',
    '',
    '326',
    '4',
    '46',
    '159',
    '0',
    '1',
    '',
    '0.6186',
    '',
    '4.8868',
    '0.0455',
    '0.044',
    '',
    ''],
   125)),
 ('5983822',
  (['1999',
    '14564',
    '1998',
    '"US"',
    '"TX"',
    '569900',
    '2',
    '',
    '114',
    '5',
    '55',
    '200',
    '0',
    '0.995',
    '',
    '0.7201',
    '',
    '12.45',
    '0',
    '0',
    '',
    ''],
   103)),
 ('6008204',
  (['1999',
    '14606',
    '1998',
    '"US"',
    '"CA"',
    '749584',
    '2',
    '',
    '514',
    '3',
    '31',
    '121',
    '0',
    '1',
    '',
    '0.7415',
    '',
    '5',
    '0.0085',
    '0.0083',
    '',
    ''],
   100)),
 ('5952345',
  (['1999',
    '14501',
    '1997',
    '"US"',
    '"CA"',
    '749584',
    '2',
    '',
    '514',
    '3',
    '31',
    '118',
    '0',
    '1',
    '',
    '0.7442',
    '',
    '5.1102',
    '0',
    '0',
