# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rddd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is very slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task.

To that end, we've included code to just extract the last 200,000 lines of each file below using the Python "slice" notation. Using that subset of the data your "new patent" table should look like:

![Top partial 10 RDD self-state citations](top-subsample-rdd.png)

When you're ready to run the whole thing, just include all the data and run it again (...and wait...).

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). If you haven't used Python "byte" types before, google it. You can convert a byte variable `x` into e.g. a UTF8 string using `x.decode('uft-8')`.

In [3]:
import gzip
with gzip.open('cite75_99.txt.gz', 'r') as f:
    print("start")
    rddCitations = sc.parallelize( f.readlines()[-800000:] )
    print("done")

start
done


Reading the last 800000 lines of the citations file 

In [4]:
with gzip.open('apat63_99.txt.gz', 'r') as f:
    print("start")
    rddPatents = sc.parallelize( f.readlines()[-800000:] )
    print("done")

start
done


Reading the last 800000 lines of the patent file.

In [28]:
rddC = rddCitations.map( lambda x : tuple(x.decode("utf-8").rstrip().split(',')) )
# rddC.take(5)

It basically converts each row into utf-8 format and splits the citations file on the delimeter ','. Now each row of the citations file is an array which consists of citing and cited.

In [6]:
rddP = rddPatents.map( lambda x : tuple(x.decode("utf-8").rstrip().split(',')) )

rddCitations = rddC.map(lambda x : (x[1],x[0]))
rddCitations.take(5)

# table1 = rddC.map(lambda x: x[1],x[0],)
# table1.take(5)

# table2 = rddP.map(lambda x: x[0], x[5])
    


[('5453769', '5935430'),
 ('5500071', '5935430'),
 ('5567868', '5935430'),
 ('5571410', '5935430'),
 ('5595650', '5935430')]

The citing and cited columns are interchanged, since i want to do a join based on the cited column. Each row of the patents file is converted into an array split by the delimeter ','

In [7]:
rddPatents = rddP.map(lambda x: (x[0],x[5]))
rddPatents.take(5)

[('5200108', '""'),
 ('5200109', '""'),
 ('5200110', '""'),
 ('5200111', '""'),
 ('5200112', '"NY"')]

From the patents table, just extract the patent_id and the state information of the patent. 

In [8]:
join1 = rddCitations.join(rddPatents)
join1.collect()

[('5453769', ('5935430', '"CA"')),
 ('5453769', ('5988786', '"CA"')),
 ('5453769', ('5997708', '"CA"')),
 ('5453769', ('6003977', '"CA"')),
 ('5230624', ('5935431', '"WI"')),
 ('5250179', ('5935435', '"MN"')),
 ('5250179', ('5954849', '"MN"')),
 ('5250179', ('5984109', '"MN"')),
 ('5232048', ('5935439', '"OK"')),
 ('5232048', ('5937944', '"OK"')),
 ('5232048', ('5958234', '"OK"')),
 ('5232048', ('5980745', '"OK"')),
 ('5232048', ('6006829', '"OK"')),
 ('5248426', ('5935443', '"CA"')),
 ('5248426', ('5961934', '"CA"')),
 ('5536406', ('5935450', '"CA"')),
 ('5536406', ('5989357', '"CA"')),
 ('5759388', ('5935450', '""')),
 ('5683546', ('5935451', '""')),
 ('5683546', ('6007728', '""')),
 ('5452177', ('5935455', '"MA"')),
 ('5452177', ('5936829', '"MA"')),
 ('5452177', ('5969934', '"MA"')),
 ('5452177', ('5982607', '"MA"')),
 ('5452177', ('5986874', '"MA"')),
 ('5452177', ('5986875', '"MA"')),
 ('5677236', ('5935455', '""')),
 ('5495089', ('5935462', '"MA"')),
 ('5495089', ('5996222', '"M

A join is done on the citations and the patent table, based on the cited column. It gives the cited_id, the citing_id and the cited_state.  

In [9]:
citing = join1.map(lambda x: (x[1][0], (x[0],x[1][1],)))
# citing.collect()

The above table is rearranged such that it outputs, citing_id, cited_id and the cited_state. 

In [10]:
join2 = citing.join(rddPatents)
join2.take(5)

[('6006829', (('5232048', '"OK"'), '"NY"')),
 ('6006829', (('5411084', '"NC"'), '"NY"')),
 ('6006829', (('5624560', '"TX"'), '"NY"')),
 ('6006829', (('5339895', '"TX"'), '"NY"')),
 ('6006829', (('5642781', '"TX"'), '"NY"'))]

The citations table and the patent table are now joined, based on the citing_id, it will output the citing_id, cited_id, cited_state and the citing_state

In [11]:
join3 = join2.filter(lambda x : x[1][1] and x[1][1].strip() and x[1][1] != '""')
join3.take(5)

[('6006829', (('5232048', '"OK"'), '"NY"')),
 ('6006829', (('5411084', '"NC"'), '"NY"')),
 ('6006829', (('5624560', '"TX"'), '"NY"')),
 ('6006829', (('5339895', '"TX"'), '"NY"')),
 ('6006829', (('5642781', '"TX"'), '"NY"'))]

Trailing whitespaces and empty spaces are removed from the cited_state and the citing_state. 

In [12]:
join4 = join3.filter(lambda x: x[1][1] == x[1][0][1])
join4.take(5)

[('5935791', (('5547861', '"NC"'), '"NC"')),
 ('5935791', (('5455166', '"NC"'), '"NC"')),
 ('5935791', (('5691145', '"NC"'), '"NC"')),
 ('5935791', (('5270184', '"NC"'), '"NC"')),
 ('5981294', (('5213965', '"CA"'), '"CA"'))]

The above table has been filtered based on the equality of the cited_state and the citing_state. 

In [13]:
join4_count = join4.reduceByKey(lambda x,y : x+y)
join4_count.take(5)

[('5935791',
  (('5547861', '"NC"'),
   '"NC"',
   ('5455166', '"NC"'),
   '"NC"',
   ('5691145', '"NC"'),
   '"NC"',
   ('5270184', '"NC"'),
   '"NC"')),
 ('5981294',
  (('5213965', '"CA"'),
   '"CA"',
   ('5458852', '"CA"'),
   '"CA"',
   ('5215886', '"CA"'),
   '"CA"',
   ('5354692', '"CA"'),
   '"CA"',
   ('5409664', '"CA"'),
   '"CA"',
   ('5416000', '"CA"'),
   '"CA"',
   ('5423989', '"CA"'),
   '"CA"')),
 ('5989409', (('5279543', '"CA"'), '"CA"', ('5362307', '"CA"'), '"CA"')),
 ('6004309',
  (('5622530', '"MN"'),
   '"MN"',
   ('5443442', '"MN"'),
   '"MN"',
   ('5591124', '"MN"'),
   '"MN"')),
 ('5962710',
  (('5541155', '"CT"'),
   '"CT"',
   ('5629020', '"CT"'),
   '"CT"',
   ('5650386', '"CT"'),
   '"CT"',
   ('5643957', '"CT"'),
   '"CT"'))]

In [14]:
join5 = join4.map(lambda x: (x[0],1))
join6 = join5.reduceByKey(lambda x,y : x+y)
join6.take(5)

[('5935791', 4),
 ('5981294', 7),
 ('5989409', 2),
 ('6004309', 3),
 ('5962710', 4)]

Every row of the join4 table (this is the table that has same state rows) is converted to (patent_id, 1) tuples and then a reduction operation is done on the basis of the patent_id. This will output the patent_id and the number of same state citations of that particular patent_id. 

In [15]:
patent = rddP.map(lambda x: (x[0], (x[1:])))
patent.take(5)

[('5200108',
  ('1993',
   '12149',
   '1989',
   '"JP"',
   '""',
   '265595',
   '3',
   '6',
   '252',
   '1',
   '19',
   '8',
   '12',
   '1',
   '0.4028',
   '0.6875',
   '4.4167',
   '3.125',
   '0.875',
   '0.875',
   '0.3333',
   '0.3333')),
 ('5200109',
  ('1993',
   '12149',
   '1992',
   '"JP"',
   '""',
   '87490',
   '3',
   '201',
   '252',
   '1',
   '19',
   '2',
   '9',
   '1',
   '0',
   '0.5',
   '3.1111',
   '6',
   '1',
   '0.5',
   '0.4444',
   '0.4444')),
 ('5200110',
  ('1993',
   '12149',
   '1992',
   '"JP"',
   '""',
   '503380',
   '3',
   '20',
   '252',
   '1',
   '19',
   '5',
   '1',
   '1',
   '0',
   '0.32',
   '3',
   '4',
   '0.4',
   '0.4',
   '0',
   '0')),
 ('5200111',
  ('1993',
   '12149',
   '1989',
   '"JP"',
   '""',
   '39150',
   '3',
   '2',
   '252',
   '1',
   '19',
   '3',
   '0',
   '1',
   '',
   '0.6667',
   '',
   '5',
   '0',
   '0',
   '',
   '')),
 ('5200112',
  ('1993',
   '12149',
   '1991',
   '"US"',
   '"NY"',
   '280070',


The patent table is converted to the following form : Key is the patent_id and the value is the rest of the patent information.

In [16]:
join7 = join6.join(patent)
join7.take(1)

[('6004309',
  (3,
   ('1999',
    '14599',
    '1997',
    '"US"',
    '"MN"',
    '22715',
    '2',
    '',
    '604',
    '3',
    '32',
    '58',
    '0',
    '0.9828',
    '',
    '0.2278',
    '',
    '13.3966',
    '0.1333',
    '0.1034',
    '',
    '')))]

The counts table( which contains the patent_id and the same state citations count) is joined with the patent table. The output is : patent_id, same state citations count and the patent information. 

In [25]:
patent_table_flattened = join7.map(lambda x : ((x[0],)+ x[1][1]+ (x[1][0],)))
patent_table_flattened.take(1)
# join7.map(lambda x: ((x[0],) + x[1][1] + (x[1][0],)))
# join7.take(1)

[('6004309',
  '1999',
  '14599',
  '1997',
  '"US"',
  '"MN"',
  '22715',
  '2',
  '',
  '604',
  '3',
  '32',
  '58',
  '0',
  '0.9828',
  '',
  '0.2278',
  '',
  '13.3966',
  '0.1333',
  '0.1034',
  '',
  '',
  3)]

The output of the above table is flattened such that same state citations count gets added to the end. 

In [26]:
patent_table_flattened.sortBy((lambda a:a[23]), ascending = False).take(10)

[('5959466',
  '1999',
  '14515',
  '1997',
  '"US"',
  '"CA"',
  '5310',
  '2',
  '',
  '326',
  '4',
  '46',
  '159',
  '0',
  '1',
  '',
  '0.6186',
  '',
  '4.8868',
  '0.0455',
  '0.044',
  '',
  '',
  94),
 ('6008204',
  '1999',
  '14606',
  '1998',
  '"US"',
  '"CA"',
  '749584',
  '2',
  '',
  '514',
  '3',
  '31',
  '121',
  '0',
  '1',
  '',
  '0.7415',
  '',
  '5',
  '0.0085',
  '0.0083',
  '',
  '',
  80),
 ('5952345',
  '1999',
  '14501',
  '1997',
  '"US"',
  '"CA"',
  '749584',
  '2',
  '',
  '514',
  '3',
  '31',
  '118',
  '0',
  '1',
  '',
  '0.7442',
  '',
  '5.1102',
  '0',
  '0',
  '',
  '',
  78),
 ('5999972',
  '1999',
  '14585',
  '1996',
  '"US"',
  '"CA"',
  '551495',
  '2',
  '',
  '709',
  '2',
  '22',
  '352',
  '0',
  '1',
  '',
  '0.8714',
  '',
  '4.0398',
  '0.0117',
  '0.0114',
  '',
  '',
  77),
 ('5998655',
  '1999',
  '14585',
  '1998',
  '"US"',
  '"CA"',
  '',
  '1',
  '',
  '560',
  '1',
  '14',
  '114',
  '0',
  '1',
  '',
  '0.7387',
  '',
  '5

The top 10 citations count is got by performing a sort by descending on the same state citations count for the last 800000 lines. 