# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark RDD - SOLUTION
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful

In [3]:
from pyspark import SparkContext, SparkConf
import numpy as np
import operator

In [4]:
conf=SparkConf().setAppName("Lab4-rdd").setMaster("local[*]")
sc = SparkContext(conf=conf)

Using PySpark and RDD's on the https://coding.csel.io machines is slow -- most of the code is executed in Python and this is much less efficient than the java-based code using the PySpark dataframes. Be patient and trying using `.cache()` to cache the output of joins. You may want to start with a reduced set of data before running the full task. You can use the `sample()` method to extract just a sample of the data or use 

These two RDD's are called "rawCitations" and "rawPatents" because you probably want to process them futher (e.g. convert them to integer types, etc). 

The `textFile` function returns data in strings. This should work fine for this lab.

Other methods you use might return data in type `Byte`. If you haven't used Python `Byte` types before, google it. You can convert a value of `x` type byte into e.g. a UTF8 string using `x.decode('uft-8')`. Alternatively, you can use the `open` method of the gzip library to read in all the lines as UTF-8 strings like this:
```
import gzip
with gzip.open('cite75_99.txt.gz', 'rt',encoding='utf-8') as f:
    rddCitations = sc.parallelize( f.readlines() )
```
This is less efficient than using `textFile` because `textFile` would use the underlying HDFS or other file system to read the file across all the worker nodes while the using `gzip.open()...readlines()` will read all the data in the frontend and then distribute it to all the worker nodes.

In [5]:
rddCitations = sc.textFile("cite75_99.txt.gz")
rddPatents = sc.textFile("apat63_99.txt.gz")

The data looks like the following.

In [6]:
rddCitations.take(5)

['"CITING","CITED"',
 '3858241,956203',
 '3858241,1324234',
 '3858241,3398406',
 '3858241,3557384']

In [7]:
rddPatents.take(5)

['"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"',
 '3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,',
 '3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,',
 '3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,',
 '3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,']

In other words, they are a single string with multiple CSV's. You will need to convert these to (K,V) pairs, probably convert the keys to `int` and so on. You'll need to `filter` out the header string as well since there's no easy way to extract all the lines except the first.

In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Lab4-RDD").getOrCreate()
sc = spark.sparkContext

CITATIONS_PATH = "cite75_99.txt.gz"   # produced by Make.txt
PATENTS_PATH   = "apat63_99.txt.gz"


In [16]:
rddCitations = sc.textFile(CITATIONS_PATH).cache()
rddPatents   = sc.textFile(PATENTS_PATH).cache()

print("Citations sample:", rddCitations.take(5))
print("Patents header:", rddPatents.first())


Citations sample: ['"CITING","CITED"', '3858241,956203', '3858241,1324234', '3858241,3398406', '3858241,3557384']
Patents header: "PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"


In [17]:
import csv

# Grab header & indices
pat_header = rddPatents.first()
hdr = next(csv.reader([pat_header]))
i_PATENT  = hdr.index("PATENT")
i_COUNTRY = hdr.index("COUNTRY")
i_POSTATE = hdr.index("POSTATE")

# Full rows keyed by patent id: (pid, full_row_list)
patent_rows = (
    rddPatents
      .filter(lambda ln: ln != pat_header)
      .map(lambda ln: next(csv.reader([ln])))          # parse CSV line safely
      .map(lambda cols: (cols[i_PATENT], cols))        # (pid, full_row_list)
      .cache()
)

# (pid -> state) for US patents with non-empty state
def parse_pat_row(line: str):
    if line == pat_header:
        return None
    c = next(csv.reader([line]))
    pid  = c[i_PATENT]
    country = (c[i_COUNTRY] or "").upper()
    state   = (c[i_POSTATE] or "").upper()
    if country == "US" and state != "":
        return (pid, state)
    return None

patent_to_state = (
    rddPatents
      .map(parse_pat_row)
      .filter(lambda x: x is not None)
      .cache()
)

print("patent_to_state sample:", patent_to_state.take(5))


patent_to_state sample: [('3070802', 'TX'), ('3070803', 'IL'), ('3070804', 'OH'), ('3070805', 'CA'), ('3070806', 'PA')]


In [18]:
def parse_citation(line: str):
    line = line.strip()
    if not line:
        return None
    parts = line.split(",") if "," in line else line.split()
    if len(parts) < 2:
        return None
    return (parts[0], parts[1])   # (citing, cited)

cit_pairs = (
    rddCitations
      .map(parse_citation)
      .filter(lambda x: x is not None)
      .cache()
)

print("cit_pairs sample:", cit_pairs.take(5))


cit_pairs sample: [('"CITING"', '"CITED"'), ('3858241', '956203'), ('3858241', '1324234'), ('3858241', '3398406'), ('3858241', '3557384')]


In [19]:
# 1) attach citing_state → re-key by cited
citing_with_state = (
    cit_pairs
      .join(patent_to_state)                              # (citing, (cited, citing_state))
      .map(lambda kv: (kv[1][0], (kv[0], kv[1][1])))      # (cited, (citing, citing_state))
)

# 2) attach cited_state
both_states = citing_with_state.join(patent_to_state)     # (cited, ((citing, citing_state), cited_state))

# 3) keep only same-state edges and count per citing
same_counts = (
    both_states
      .map(lambda kv: (kv[1][0][0], 1) if kv[1][0][1] == kv[1][1] else None)  # (citing, 1)
      .filter(lambda x: x is not None)
      .reduceByKey(lambda a, b: a + b)                    # (citing, SAME_STATE)
      .cache()
)

print("same_counts sample:", same_counts.take(5))


same_counts sample: [('4135224', 2), ('4219866', 1), ('4510175', 1), ('5011803', 2), ('5010317', 2)]


In [21]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F

# schema: every original column is String, add SAME_STATE as Integer
schema = StructType(
    [StructField(c, StringType(), True) for c in hdr] +
    [StructField("SAME_STATE", IntegerType(), False)]
)

# build rows in the exact order of 'hdr', append SAME_STATE at the end
data = []
for pid, (full_row, same) in top10_full:          # from your previous collect()
    vals = [(full_row[i] if i < len(full_row) and full_row[i] != "" else None)
            for i in range(len(hdr))]
    vals.append(int(same))
    data.append(tuple(vals))

df_top10 = spark.createDataFrame(data, schema=schema)

# show exactly like the PNG (sort by SAME_STATE desc, PATENT asc)
(df_top10
   .select(*hdr, "SAME_STATE")
   .orderBy(F.desc("SAME_STATE"), F.asc("PATENT"))
   .show(10, truncate=False))

+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+----------+
|PATENT |GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|SAME_STATE|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+----------+
|5959466|1999 |14515|1997   |US     |CA     |5310    |2      |NULL  |326   |4  |46    |159  |0       |1       |NULL   |0.6186  |NULL    |4.8868  |0.0455  |0.044   |NULL    |NULL    |125       |
|5983822|1999 |14564|1998   |US     |TX     |569900  |2      |NULL  |114   |5  |55    |200  |0       |0.995   |NULL   |0.7201  |NULL    |12.45   |0       |0       |NULL    |NULL    |103       |
|6008204|1999 |14606|1998   |U