# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [4]:
rddCitations.take(5)

                                                                                

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [5]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

<b>Removing Headers from both citations and patents table</b>

In [6]:
header = rddCitations.first()
rddCitations = rddCitations.filter(lambda line: line != header)

In [7]:
rddCitations.take(5)

['3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384',
 '3858241,3634889']

In [8]:
header1 = rddPatents.first()
rddPatents = rddPatents.filter(lambda line: line != header1)

In [9]:
rddPatents.take(5)

['3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,',
 '3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,']

<b>Extracting relevant info from both tables to join on CITING==PATENT</b>

In [10]:
patents = rddPatents.map(lambda k: (int(k.split(',')[0]),','.join(k.split(',')[1:])))

In [11]:
citations = rddCitations.map(lambda k: (int(k.split(',')[0]),int(k.split(',')[1])))

In [12]:
citedPatents = citations.join(patents).cache()
citedPatents.take(1)

                                                                                

[(3858504,
  (72573,
   '1975,5485,1973,"FR","",,1,11,100,5,59,6,2,0.5,0.5,0.4444,19,41.6667,,,,'))]

<b>Arranging data into (cited, (citing, citingState))</b>

In [13]:
citedPatents1 = citedPatents.map(lambda k: (k[1][0],(k[0],k[1][1].split(',')[4])))
citedPatents1.take(1)

[(72573, (3858504, '""'))]

<b>Joining citedPatent with patent on CITED==PATENT</b>

In [14]:
citedPatents2 = citedPatents1.join(patents).cache()
citedPatents2.take(10)

                                                                                

[(5135106,
  ((5662216, '"RI"'),
   '1992,11904,1988,"US","MA",,1,40,206,6,68,10,8,1,0.2188,0,5.875,8.5,,,,')),
 (5135106,
  ((5261534, '"CT"'),
   '1992,11904,1988,"US","MA",,1,40,206,6,68,10,8,1,0.2188,0,5.875,8.5,,,,')),
 (5135106,
  ((5515968, '"GA"'),
   '1992,11904,1988,"US","MA",,1,40,206,6,68,10,8,1,0.2188,0,5.875,8.5,,,,')),
 (5135106,
  ((5720386, '"WA"'),
   '1992,11904,1988,"US","MA",,1,40,206,6,68,10,8,1,0.2188,0,5.875,8.5,,,,')),
 (5135106,
  ((5573120, '"CA"'),
   '1992,11904,1988,"US","MA",,1,40,206,6,68,10,8,1,0.2188,0,5.875,8.5,,,,')),
 (5135106,
  ((5701997, '"NJ"'),
   '1992,11904,1988,"US","MA",,1,40,206,6,68,10,8,1,0.2188,0,5.875,8.5,,,,')),
 (5135106,
  ((5344039, '"GA"'),
   '1992,11904,1988,"US","MA",,1,40,206,6,68,10,8,1,0.2188,0,5.875,8.5,,,,')),
 (5135106,
  ((5253753, '"CA"'),
   '1992,11904,1988,"US","MA",,1,40,206,6,68,10,8,1,0.2188,0,5.875,8.5,,,,')),
 (5108526,
  ((6000746, '""'),
   '1992,11806,1991,"DE","",489515,3,20,156,1,19,13,30,0.8462,0.6356,0.79

<b>Arranging data as (citing, citing state, cited, cited state) to enable filtering</b>

In [15]:
arrangedData = citedPatents2.map(lambda k: (k[1][0][0],k[1][0][1],k[0],k[1][1].split(',')[4]))
arrangedData.take(5)

[(5662216, '"RI"', 5135106, '"MA"'),
 (5261534, '"CT"', 5135106, '"MA"'),
 (5515968, '"GA"', 5135106, '"MA"'),
 (5720386, '"WA"', 5135106, '"MA"'),
 (5573120, '"CA"', 5135106, '"MA"')]

In [16]:
filteredData = arrangedData.filter(lambda k: k[1]==k[3] and k[1]!='""' and k[3]!='""')
filteredData.take(10)

[(5766704, '"MA"', 5274018, '"MA"'),
 (5939157, '"MA"', 5274018, '"MA"'),
 (5955159, '"MA"', 5274018, '"MA"'),
 (5985383, '"MA"', 5274018, '"MA"'),
 (5827459, '"MA"', 5274018, '"MA"'),
 (3864379, '"NJ"', 3465036, '"NJ"'),
 (4796222, '"NY"', 3685020, '"NY"'),
 (4484262, '"NY"', 3685020, '"NY"'),
 (4068304, '"NY"', 3685020, '"NY"'),
 (4636990, '"NY"', 3685020, '"NY"')]

<b>Changing format to a tuple to enable grouping and sorting</b>

In [17]:
filteredData1 = filteredData.map(lambda k: (k[0],(k[1],k[2],k[3])))
filteredData1.take(10)

[(5766704, ('"MA"', 5274018, '"MA"')),
 (5939157, ('"MA"', 5274018, '"MA"')),
 (5955159, ('"MA"', 5274018, '"MA"')),
 (5985383, ('"MA"', 5274018, '"MA"')),
 (5827459, ('"MA"', 5274018, '"MA"')),
 (3864379, ('"NJ"', 3465036, '"NJ"')),
 (4796222, ('"NY"', 3685020, '"NY"')),
 (4484262, ('"NY"', 3685020, '"NY"')),
 (4068304, ('"NY"', 3685020, '"NY"')),
 (4636990, ('"NY"', 3685020, '"NY"'))]

In [18]:
sortedData = filteredData1.groupByKey().mapValues(len).sortBy(lambda k: k[1], ascending=False)

                                                                                

In [19]:
sortedData.take(10)

                                                                                

[(5959466, 125),
 (5983822, 103),
 (6008204, 100),
 (5952345, 98),
 (5958954, 96),
 (5998655, 96),
 (5936426, 94),
 (5925042, 90),
 (5913855, 90),
 (5951547, 90)]

<b>Joining sorted data with patent table to get final output</b>

In [20]:
output = patents.leftOuterJoin(sortedData).cache()
output.take(10)

                                                                                

[(3070828, ('1963,1096,,"US","CT",,2,,16,5,59,,4,,0.625,,,,,,,', None)),
 (3070996, ('1963,1096,,"US","MD",,6,,73,4,43,,2,,0,,,,,,,', None)),
 (3071368, ('1963,1096,,"US","NY",,1,,269,6,69,,7,,0.7755,,,,,,,', None)),
 (3071816, ('1963,1103,,"US","OH",,2,,264,5,51,,4,,0.625,,,,,,,', None)),
 (3072188, ('1963,1103,,"US","PA",,2,,166,6,64,,1,,0,,,,,,,', None)),
 (3072356, ('1963,1103,,"US","MN",,1,,242,5,51,,4,,0.375,,,,,,,', None)),
 (3072896, ('1963,1103,,"US","IL",,2,,340,2,21,,1,,0,,,,,,,', None)),
 (3073064, ('1963,1110,,"US","CA",,2,,47,6,61,,0,,,,,,,,,', None)),
 (3073716, ('1963,1110,,"US","NY",,2,,427,1,12,,2,,0,,,,,,,', None)),
 (3073940, ('1963,1110,,"CH","",,1,,219,4,49,,3,,0,,,,,,,', None))]

In [21]:
output.count()

                                                                                

2923922

<b>Removing none values to allow sorting by count</b>

In [22]:
def removeNone(item):
    if(item[1][1] == None):
        return (item[0],(item[1][0],0))
    else:
        return item
output = output.map(removeNone)

In [23]:
output.take(10)

[(3070828, ('1963,1096,,"US","CT",,2,,16,5,59,,4,,0.625,,,,,,,', 0)),
 (3070996, ('1963,1096,,"US","MD",,6,,73,4,43,,2,,0,,,,,,,', 0)),
 (3071368, ('1963,1096,,"US","NY",,1,,269,6,69,,7,,0.7755,,,,,,,', 0)),
 (3071816, ('1963,1103,,"US","OH",,2,,264,5,51,,4,,0.625,,,,,,,', 0)),
 (3072188, ('1963,1103,,"US","PA",,2,,166,6,64,,1,,0,,,,,,,', 0)),
 (3072356, ('1963,1103,,"US","MN",,1,,242,5,51,,4,,0.375,,,,,,,', 0)),
 (3072896, ('1963,1103,,"US","IL",,2,,340,2,21,,1,,0,,,,,,,', 0)),
 (3073064, ('1963,1110,,"US","CA",,2,,47,6,61,,0,,,,,,,,,', 0)),
 (3073716, ('1963,1110,,"US","NY",,2,,427,1,12,,2,,0,,,,,,,', 0)),
 (3073940, ('1963,1110,,"CH","",,1,,219,4,49,,3,,0,,,,,,,', 0))]

In [24]:
finalOutput = output.sortBy(lambda k: k[1][1], ascending=False)
finalOutput.take(10)

                                                                                

[(5959466,
  ('1999,14515,1997,"US","CA",5310,2,,326,4,46,159,0,1,,0.6186,,4.8868,0.0455,0.044,,',
   125)),
 (5983822,
  ('1999,14564,1998,"US","TX",569900,2,,114,5,55,200,0,0.995,,0.7201,,12.45,0,0,,',
   103)),
 (6008204,
  ('1999,14606,1998,"US","CA",749584,2,,514,3,31,121,0,1,,0.7415,,5,0.0085,0.0083,,',
   100)),
 (5952345,
  ('1999,14501,1997,"US","CA",749584,2,,514,3,31,118,0,1,,0.7442,,5.1102,0,0,,',
   98)),
 (5958954,
  ('1999,14515,1997,"US","CA",749584,2,,514,3,31,116,0,1,,0.7397,,5.181,0,0,,',
   96)),
 (5998655,
  ('1999,14585,1998,"US","CA",,1,,560,1,14,114,0,1,,0.7387,,5.1667,,,,', 96)),
 (5936426,
  ('1999,14466,1997,"US","CA",5310,2,,326,4,46,178,0,1,,0.58,,11.2303,0.0765,0.073,,',
   94)),
 (5739256,
  ('1998,13983,1995,"US","CA",70060,2,15,528,1,15,453,0,1,,0.8232,,15.1104,0.1124,0.1082,,',
   90)),
 (5978329,
  ('1999,14550,1995,"US","CA",148925,2,,369,2,24,145,0,1,,0.5449,,12.9241,0.4196,0.4138,,',
   90)),
 (5980517,
  ('1999,14557,1998,"US","CA",733846,2,,606,3