# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [1]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [2]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

**Download data files (run once on CSEL).** If you get `Path does not exist` for the .gz files, run the next cell to download them. Skip it if the files already exist.

In [3]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

# SAMPLE!!! COMMENT IT BEFORE SUBMISSION
rddCitations = rddCitations.sample(False, 0.05)
rddPatents = rddPatents.sample(False, 0.05)

The data looks like the following.

In [4]:
rddCitations.take(5)

['3858243,3156927',
 '3858244,2211676',
 '3858244,2635670',
 '3858246,3601877',
 '3858248,3706104']

In [5]:
rddPatents.take(5)

['3070808,1963,1096,,"US","IA",,1,,623,3,39,,4,,0.375,,,,,,,',
 '3070822,1963,1096,,"US","NY",,2,,401,1,12,,4,,0.375,,,,,,,',
 '3070830,1963,1096,,"US","NH",,2,,16,5,59,,0,,,,,,,,,',
 '3070836,1963,1096,,"US","OK",,2,,264,5,51,,24,,0.7569,,,,,,,',
 '3070866,1963,1096,,"US","NJ",,2,,264,5,51,,1,,0,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

## Parsing and algorithm

- **Citations**: Skip header line; split by comma â†’ (citing, cited) as ints.
- **Patents**: Skip header; parse CSV to get PATENT (col 0), COUNTRY (4), POSTATE (5). Build (patent_id, state) for US with state, and (patent_id, full_line) for lookup.
- **Join**: Join citations with patent states for citing and cited; keep only same state; count per citing patent.
- **Top 10**: Join counts with full patent lines, sort by count desc, take 10 and append ",{count}" to each line.

In [6]:
import csv
import io

def is_header(line):
    return '"CITING"' in line or '"PATENT"' in line

def parse_citation(line):
    parts = line.split(",")
    return (int(parts[0]), int(parts[1]))

def parse_patent(line):
    row = next(csv.reader(io.StringIO(line)))
    patent_id = int(row[0])
    country = row[4] if len(row) > 4 else ""
    state = (row[5].strip() if len(row) > 5 and row[5] else "") or None
    if country == "US" and state:
        return (patent_id, ("state", state, line))
    return (patent_id, ("line", None, line))

citations_rdd = rddCitations.filter(lambda x: not is_header(x)).map(parse_citation)
patents_parsed = rddPatents.filter(lambda x: not is_header(x)).map(parse_patent)

patent_states = patents_parsed.filter(lambda x: x[1][0] == "state").map(lambda x: (x[0], x[1][1]))
patent_lines = patents_parsed.map(lambda x: (x[0], x[1][2]))

patent_states.cache()
patent_lines.cache()

PythonRDD[7] at RDD at PythonRDD.scala:53

In [7]:
with_citing = citations_rdd.map(lambda x: (x[0], (x[1],))).join(patent_states)
with_citing = with_citing.map(lambda x: (x[1][0][0], (x[0], x[1][1])))
with_both = with_citing.join(patent_states)

same_state = (with_both
    .filter(lambda x: x[1][0][1] == x[1][1])
    .map(lambda x: (x[1][0][0], 1))
    .reduceByKey(operator.add)
)
same_state.cache()

PythonRDD[26] at RDD at PythonRDD.scala:53

In [8]:
top10_counts = same_state.takeOrdered(10, key=lambda x: -x[1])

patent_lines_dict = patent_lines.collectAsMap()
for patent_id, count_val in top10_counts:
    line = patent_lines_dict.get(patent_id, "")
    if line:
        print(line + "," + str(count_val))

5846365,1998,14221,1996,"US","OH",454215,2,19,156,1,19,35,0,0.9714,,0.808,,9.5714,0.6774,0.6,,,2
4105308,1978,6794,1977,"US","MA",445890,2,10,359,5,54,5,10,1,0.32,0,9,3.8,0.4,0.4,0.3,0.3,1
4643730,1987,9909,1985,"US","WI",313200,2,26,604,3,32,10,21,1,0.59,0.54,7.45,6.9,0.1,0.1,0.15,0.1429,1
4761474,1988,10441,1986,"US","IN",368060,2,5,544,1,14,4,0,1,,0.625,,6.5,0.25,0.25,,,1
5185158,1993,12093,1992,"US","CA",22715,2,9,424,3,31,19,2,0.8947,0,0.6021,2,11.6842,0.7647,0.6842,1,1,1
5557941,1996,13416,1995,"US","MN",572490,2,16,62,6,69,4,0,1,,0.375,,8.25,0.75,0.75,,,1
5252832,1993,12338,1992,"US","OK",238325,2,8,250,4,44,17,3,1,0,0,3,7.1765,0.4286,0.3529,0.6667,0.6667,1
5323008,1994,12590,1992,"US","CA",146580,2,32,250,4,44,8,3,1,0.4444,0.5625,3,6.75,0.2,0.125,0,0,1
5396262,1995,12849,1993,"US","CA",706475,2,5,345,2,23,11,6,1,0.5,0.6446,3.6667,11.6364,0,0,0,0,1
5409384,1995,12898,1993,"US","PA",695811,2,10,439,4,41,22,8,1,0.375,0.2479,3.5,7.4545,0,0,0.25,0.25,1
