# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [3]:
# Add .sample(False, 0.05).cache() while debugging, then run on full dataset

rddCitations = sc.textFile("cite75_99.txt.gz")#.sample(False, 0.05).cache()
rddPatents = sc.textFile("apat63_99.txt.gz")#.sample(False, 0.05).cache()

header = rddCitations.first() # extract header
rddCitations = rddCitations.filter( lambda x: x != header) #filter out header

header_pat = rddPatents.first() # extract header
rddPatents = rddPatents.filter( lambda x: x != header_pat) # filter out header

The data looks like the following after filtering out the headers.

In [4]:
rddCitations.take(5)

['3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384',
 '3858241,3634889']

In [5]:
rddPatents.take(5)

['3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,',
 '3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

**My Solution Below: We need to join citations with patents on citation #, then get each cited patents state, so later we can augment and find what we want**

In [15]:
# Cleaning up our RDDs a bit

citations = rddCitations.map( lambda x: x.split(',') )\
    .map( lambda x: (int(x[0]), int(x[1])) )
    #.filter( lambda x: x[0] != '""' )
    #.groupByKey()
    #.take(20)

# Filtered out the '""' entries from Patent data State column

pats = rddPatents.map( lambda x: x.split(',') )\
    .map( lambda x: (int(x[0]), x[5]) )\
    .filter( lambda x: x[1] != '""' ) 
    #.groupByKey()
    #.take(20)


In [16]:
# pats.filter( lambda x: x[0] == '3187320').take(20) # This was a tool to make sure we correctly had the Cited state

Created a new RDD called joined that swaps the positions of CITING and CITED and joins with Patents

In [17]:
# Joining our citations with patents on CITED as key after reorganizing data(swapping CITING with CITED to make CITED the key)

new_citations = citations.map( lambda x: (x[1], x[0]) )
joined = new_citations.join(pats)

joined.take(5)

# RDD named joined gives us (Cited, (Citing, Cited State))
# Result we want: (Cited, Cited State)

[(3606034, (3858746, '"NY"')),
 (3606034, (4573851, '"NY"')),
 (3606034, (4695103, '"NY"')),
 (3606034, (4789075, '"NY"')),
 (3515792, (3859884, '"CA"'))]

In [18]:
# IGNORE ALL OF THIS BRAINSTORMING"
# # Joining our citations with patents on CITING as key(no swap)

# joined_alt = citations.join(pats)

# joined_alt.take(5)

# # Another RDD, but this time gives us (Citing, (Cited, Citing State))
# # Result (Citing, Citing State)

In [19]:
# Need to join into one intermediate result that has everything we need since we're missing citing state

# Swap again to make (Citing, Cited, Cited State)
citing = joined.map( lambda x: (x[1][0], (x[0], x[1][1])))
citing.take(5)

[(3858570, (3464388, '"MO"')),
 (3897751, (3464388, '"MO"')),
 (3920000, (3464388, '"MO"')),
 (3924571, (3464388, '"MO"')),
 (4034740, (3464388, '"MO"'))]

Now that CITING is the key, we can left outer join with the patent data to get CITING state.

In [20]:
test_table = citing.leftOuterJoin(pats)
test_table.take(5)

[(3938145, ((3114148, '"NY"'), '"NM"')),
 (3938145, ((3149332, '"IA"'), '"NM"')),
 (3938145, ((3175214, '"CA"'), '"NM"')),
 (3938145, ((3603919, '"KS"'), '"NM"')),
 (4975659, ((3178648, '"IL"'), '"MA"'))]

So now that we have (CITING, (CITED, CITED STATE), CITING STATE) we are able to reorganize

In [21]:
citation_states = test_table.map( lambda x: (x[0], x[1][1], x[1][0][0], x[1][0][1]))
citation_states.take(5)

# The following is of form (CITING, CITING STATE, CITED, CITED STATE)

[(4580865, '"NJ"', 3585564, '"NY"'),
 (4580865, '"NJ"', 3920304, '"NJ"'),
 (4580865, '"NJ"', 3649956, '"CA"'),
 (4580865, '"NJ"', 3112975, '"NY"'),
 (4580865, '"NJ"', 4239325, '"OH"')]

We create a function that we are going to apply over the citation_states data that counts same state occurrences

In [22]:
def state_count(data):
    count = 0
    
    if data[1] == data[3]:
        count = count + 1
    
    return (data[0], count) # returns tuple with CITING # and the state count

In [23]:
same_states_ct = citation_states.map(state_count)
same_states_ct.take(5)

[(5066526, 0), (5066526, 0), (5066526, 0), (5066526, 0), (5066526, 0)]

Now we want to "reduce" by using .groupByKey() to aggregate all of the counts by CITING #

In [24]:
totals = same_states_ct.groupByKey()
totals.take(5)

[(5386821, <pyspark.resultiterable.ResultIterable at 0x7f81bcbbcac0>),
 (4715820, <pyspark.resultiterable.ResultIterable at 0x7f81bcc65700>),
 (4329825, <pyspark.resultiterable.ResultIterable at 0x7f81d4f0cdf0>),
 (5107227, <pyspark.resultiterable.ResultIterable at 0x7f81d4f0c3a0>),
 (4421259, <pyspark.resultiterable.ResultIterable at 0x7f81bcc734f0>)]

We should also remember to use mapValues(sum) to be able to see the actual values from the iterable, and sum all values for each key

In [28]:
final_reduction = totals.mapValues(sum).map( lambda x: (x[1], x[0])).sortByKey(False)
final_reduction.take(10)

[(125, 5959466),
 (103, 5983822),
 (100, 6008204),
 (98, 5952345),
 (96, 5958954),
 (96, 5998655),
 (94, 5936426),
 (90, 5913855),
 (90, 5925042),
 (90, 5951547)]

Last but not least, we have to join this final result with patents by citations as the key.

In [66]:
# Issues appending same state counts to our patent "table" of data

# final_rdd = final_reduction.map( lambda x: (x[1], x[0]) )

# # Parse the data again to get the full rows in 2 pieces
# new_pats = rddPatents.map( lambda x: x.split(',') )\
#     .map( lambda x: (int(x[0]), x[1:22]) )

# final_pats = new_pats.leftOuterJoin(final_rdd)

# # Lastly, reorganize
# final_clean = final_pats.map( lambda x: (x[0], x[1][1], x[1][0])).sortBy( lambda x: x[1][0]) # x[1][0] here is the same state count, so we want it at the end
# final_clean.take(10)

## Co-working session notes:

In [None]:
counts = rddPatents.map( lambda x: x.split(',') )\
    .map( lambda x: (x[5], x[0]) )\
    .filter( lambda x: x[0] != '""' )\
    .groupByKey()
    #.take(20)

In [None]:
counts.take(5)

**My Notes:**

Hadoop vs. PySpark - Hadoop relies too much on our processing of input and directing of output, PySpark figures out the best way to structure/optimize the flow
 of the data parallels
 
Hadoop - Everything is a (k,v) pair

PySpark - Everything is an element of an RDD
 
Also PySpark remembers the sequence of computations we need to do in order to get to the final state we wanted. That's the benefit of the RDD, is that if a node with 3 splits of the entire data fails, it still knows the steps and can apply them to the splits of data living on other nodes
 
Map - .map() which applies the lambda to each value x

Reduce - .groupByKey() which will aggregate all of the data by key

In the example above, we filter out empty state values

Create RDD - .parallelize(data_structure)

ie: counts.take() will need to read all of our data again unless we use .cache(), so remember to add it specifically after doing joins