# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

Collaborated with Tanmai Gajula (tanmai.gajula@colorado.edu)

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rddd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is very slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task.

To that end, we've included code to just extract the last 200,000 lines of each file below using the Python "slice" notation. Using that subset of the data your "new patent" table should look like:

![Top partial 10 RDD self-state citations](top-subsample-rdd.png)

When you're ready to run the whole thing, just include all the data and run it again (...and wait...).

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). If you haven't used Python "byte" types before, google it. You can convert a byte variable `x` into e.g. a UTF8 string using `x.decode('uft-8')`.

This creates a rdd, a distributed dataset by reading the last 800000 lines of citations file which gets decompresseed by the gzip module.

In [3]:
import gzip
with gzip.open('cite75_99.txt.gz', 'r') as f:
    #rddCitations_header = f.readline()
    rddCitations = sc.parallelize( f.readlines()[-800000:] )

Next, Apply map on rdd that will apply the function to each element on rdd (1 to 1 mapping), here map operation is a transformation
then next, its converting from byte to UTF8 string. 

In [4]:
rdd1 = rddCitations.map(lambda x: x.decode('utf-8'))

We apply the Map transformations in a cascading fashion where: Map1: Strips each element to remove newline character at the end. Then Map2: Split each element by delimitor ',' Then, Map3: Convert the key into integer type.

In [5]:
rdd1 = rdd1.map(lambda x: x.strip()).map(lambda x: tuple(x.split(","))).map(lambda x: (int(x[0]), x[1]))

Creates another rdd, distributed datasetfor patents file by reading the last 800000 lines, where the file gets unzipped.

In [6]:
with gzip.open('apat63_99.txt.gz', 'r') as f:
    #rddPatents_header = f.readline()
    rddPatents = sc.parallelize( f.readlines()[-800000:] )

THis converts from byte to UTF8 string

In [7]:
rddp = rddPatents.map(lambda x: x.decode('utf-8'))

Next we Map to remove newline character from each element in rdd.

In [8]:
#header2 = rddp.first()
rddp = rddp.map(lambda x: x.strip())

Again cascade the Mappers, Split by delimitor first ',', then convert each element into form: (int(patent), postate) 

In [9]:
rddp1 = rddp.map(lambda x: x.split(",")).map(lambda x: (int(x[0]), x[5]))

In [11]:
rdd1.take(5) # prints first 5 elements of rdd1 as list of tuples - (key, value) : This is an action

[(5935430, '5453769'),
 (5935430, '5500071'),
 (5935430, '5567868'),
 (5935430, '5571410'),
 (5935430, '5595650')]

In [12]:
rddp1.take(5) # prints first 5 elements of rdd1 as list of tuples - (key, value)

[(5200108, '""'),
 (5200109, '""'),
 (5200110, '""'),
 (5200111, '""'),
 (5200112, '"NY"')]

Next we Perform inner join, which is a transforamtion here, on rdd1 and rddp1 to get (CITING, (CITED, CITING_STATE)) and then apply map to make CITED as key <br>
So, each element would finally be (int(CITED), (CITING, CITING_STATE)) 

In [13]:
rdd2 = rdd1.join(rddp1).map(lambda x: (int(x[1][0]), (x[0], x[1][1])))
#rdd2.cache()

In [14]:
rdd2.take(5) # prints first 5 elements of rdd2 as list of tuples - (key, value)

[(5019263, (5935440, '""')),
 (5362522, (5935440, '""')),
 (5716527, (5935440, '""')),
 (4499359, (5935456, '""')),
 (4518842, (5935456, '""'))]

Again Perform inner join on rdd2 and rddp1 and the result would be rdd3 where each element is (CITED, ((CITING, CITING_STATE), CITED_STATE))

In [15]:
rdd3 = rdd2.join(rddp1)
#rdd3.cache()

In [16]:
rdd3.take(5) # prints first 5 elements of rdd3 as list of tuples - (key, value)

[(5856120, ((5936256, '""'), '""')),
 (5717280, ((5936336, '""'), '""')),
 (5243280, ((5936400, '"MA"'), '""')),
 (5654928, ((5936432, '"CA"'), '""')),
 (5654928, ((5940338, '"ID"'), '""'))]

Here filter acts as a transformation
Then we Filter out records from rdd where CITING_STATE is "" or None and CITED_STATE is "" or None

In [17]:
rdd4 = rdd3.filter(lambda x: not((x[1][0][1] is None or x[1][0][1] == '""') or (x[1][1] is None or x[1][1] == '""')))

Next, Select only records where CITED_STATE == CITING_STATE

In [18]:
rdd5 = rdd4.filter(lambda x: x[1][0][1] == x[1][1])

In [19]:
rdd5.take(5) # prints first 5 elements of rdd5 as list of tuples - (key, value)

[(5361072, ((5990834, '"CA"'), '"CA"')),
 (5661792, ((5946299, '"NJ"'), '"NJ"')),
 (5661792, ((5949870, '"NJ"'), '"NJ"')),
 (5820048, ((5984242, '"IL"'), '"IL"')),
 (5487216, ((5937504, '"NJ"'), '"NJ"'))]

for every element of rdd we will convert to (CITING, 1) using map transformation and then apply reduceByKey (also a tranformation) which will perform summation of values on each key.

In [37]:
rdd6 = rdd5.map(lambda x: (x[1][0][0], 1)).reduceByKey(lambda x, y: x + y)

In [38]:
rdd6.take(10) # prints first 10 elements of rdd6 as list of tuples - (key, value)

[(5937504, 7),
 (5940240, 5),
 (5940384, 1),
 (5973864, 1),
 (5944752, 7),
 (5944992, 3),
 (5946000, 3),
 (5952240, 2),
 (5952576, 3),
 (5999808, 11)]

Next we apply map tranformations on intial patents rdd -> rddp where we split each element into (key, value) = (PATENT, comma separated list of remaining info)

In [39]:
rddp2 = rddp.map(lambda x: x.split(",", 1)).map(lambda x: tuple(x))

Then we apply map transformation on rddp2 to convert the key to integer type.

In [41]:
rddp2 = rddp2.map(lambda x: (int(x[0]), x[1]))

Then we perform leftOuterJoin transformation on rddp2 with rdd6 and get resultant rdd with counts.

In [42]:
rdd7 = rddp2.leftOuterJoin(rdd6)
#rdd7.cache()

In [43]:
rdd7.take(5) # prints first 5 elements of rdd2 as list of tuples - (key, value)

[(5200128,
  ('1993,12149,1992,"IT","",690995,3,4,264,5,51,5,1,0.8,0,0.375,2,19,0,0,0,0',
   None)),
 (5200160,
  ('1993,12149,1991,"US","PA",154240,2,18,423,1,19,9,6,1,0.5,0.1975,4,5.4444,0,0,0.8333,0.8333',
   None)),
 (5200192,
  ('1993,12149,1988,"DE","",,1,23,424,3,31,5,2,1,0,0.32,6.5,8,,,,', None)),
 (5200224,
  ('1993,12149,1991,"US","PA",416145,2,10,426,6,61,12,1,0.9167,0,0.5455,6,16,0.2,0.0833,0,0',
   None)),
 (5200256,
  ('1993,12149,1989,"US","LA",,1,16,428,6,69,7,16,1,0.8281,0.5714,5.8125,8.5714,,,,',
   None))]

Next we perform a  filter to remove counts which are NULL.

In [47]:
rdd8 = rdd7.filter(lambda x: x[1][1] is not None)

In [48]:
rdd8.take(5) # prints first 5 elements of rdd8 as list of tuples - (key, value)

[(5935488,
  ('1999,14466,1998,"US","NJ",746337,2,,252,1,19,10,0,1,,0.18,,8.1,0,0,,', 1)),
 (5935552,
  ('1999,14466,1998,"US","IL",600400,2,,423,1,19,8,0,0.875,,0,,17,0.2,0.125,,',
   1)),
 (5935680,
  ('1999,14466,1997,"US","WA",70060,2,,428,6,69,10,0,1,,0.66,,6,0.4,0.4,,',
   3)),
 (5935712,
  ('1999,14466,1997,"US","NY",160890,2,,428,6,69,10,0,1,,0.34,,4.3,0.7,0.7,,',
   8)),
 (5935840,
  ('1999,14466,1995,"US","NY",41565,2,,435,3,33,2,0,1,,0.5,,4.5,0.5,0.5,,',
   1))]

Then we perform sort transformation which sorts by counts and then the collect() action is executed which will return aggregated results as a list to driver.

In [51]:
rdd8.sortBy(lambda x: x[1][1]).collect()

[(5935488,
  ('1999,14466,1998,"US","NJ",746337,2,,252,1,19,10,0,1,,0.18,,8.1,0,0,,', 1)),
 (5935552,
  ('1999,14466,1998,"US","IL",600400,2,,423,1,19,8,0,0.875,,0,,17,0.2,0.125,,',
   1)),
 (5935840,
  ('1999,14466,1995,"US","NY",41565,2,,435,3,33,2,0,1,,0.5,,4.5,0.5,0.5,,',
   1)),
 (5936096,
  ('1999,14466,1998,"US","PA",480850,2,,548,1,14,2,0,1,,0.5,,7.5,1,1,,', 1)),
 (5936928,
  ('1999,14466,1996,"US","CA",744243,2,,369,2,24,43,0,1,,0.6544,,5.8605,0,0,,',
   1)),
 (5937344,
  ('1999,14466,1998,"US","GA",756870,2,,455,2,21,7,0,1,,0,,3.8571,0,0,,', 1)),
 (5937728,
  ('1999,14473,1997,"US","MI",589355,2,,91,5,53,7,0,1,,0.4898,,10.4286,0.1667,0.1429,,',
   1)),
 (5938112,
  ('1999,14473,1995,"US","WI",737877,2,,229,6,68,7,0,1,,0.6939,,11.2857,0,0,,',
   1)),
 (5938368,
  ('1999,14473,1997,"US","CT",685843,2,,403,6,67,13,0,0.9231,,0.625,,15.3846,0.1111,0.0769,,',
   1)),
 (5938688,
  ('1999,14473,1997,"US","NY",123370,2,,607,3,32,2,0,1,,0.5,,4,0,0,,', 1)),
 (5939456,
  ('1999,14473,199

Display the top 10 self-state citations for each patent.

In [53]:
rdd8.top(10, key=lambda x: x[1][1])

[(5959466,
  ('1999,14515,1997,"US","CA",5310,2,,326,4,46,159,0,1,,0.6186,,4.8868,0.0455,0.044,,',
   94)),
 (6008204,
  ('1999,14606,1998,"US","CA",749584,2,,514,3,31,121,0,1,,0.7415,,5,0.0085,0.0083,,',
   80)),
 (5952345,
  ('1999,14501,1997,"US","CA",749584,2,,514,3,31,118,0,1,,0.7442,,5.1102,0,0,,',
   78)),
 (5999972,
  ('1999,14585,1996,"US","CA",551495,2,,709,2,22,352,0,1,,0.8714,,4.0398,0.0117,0.0114,,',
   77)),
 (5958954,
  ('1999,14515,1997,"US","CA",749584,2,,514,3,31,116,0,1,,0.7397,,5.181,0,0,,',
   76)),
 (5987245,
  ('1999,14564,1996,"US","CA",551495,2,,709,2,22,341,0,1,,0.8737,,4.0587,0.0121,0.0117,,',
   76)),
 (5998655,
  ('1999,14585,1998,"US","CA",,1,,560,1,14,114,0,1,,0.7387,,5.1667,,,,', 76)),
 (5980517,
  ('1999,14557,1998,"US","CA",733846,2,,606,3,32,241,0,1,,0.7394,,8.3776,0,0,,',
   73)),
 (5951547,
  ('1999,14501,1997,"US","CA",733846,2,,606,3,32,242,0,1,,0.7382,,8.3471,0,0,,',
   73)),
 (5998471,
  ('1999,14585,1998,"US","CA",749584,2,,514,3,31,103,0,1,,0.