# CSCI 4253 / 5253 - Lab #4 - Patent Problem with Spark DataFrames
<div>
 <h2> CSCI 4283 / 5253 
  <IMG SRC="https://www.colorado.edu/cs/profiles/express/themes/cuspirit/logo.png" WIDTH=50 ALIGN="right"/> </h2>
</div>

This [Spark cheatsheet](https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf) is useful as is [this reference on doing joins in Spark dataframe](http://www.learnbymarketing.com/1100/pyspark-joins-by-example/).

The [DataBricks company has one of the better reference manuals for PySpark](https://docs.databricks.com/spark/latest/dataframes-datasets/index.html) -- they show you how to perform numerous common data operations such as joins, aggregation operations following `groupBy` and the like.

In [17]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

The following aggregation functions may be useful -- [these can be used to aggregate results of `groupby` operations](https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html#example-aggregations-using-agg-and-countdistinct). More documentation is at the [PySpark SQL Functions manual](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html). Feel free to use other functions from that library.

In [18]:
from pyspark.sql.functions import col, count, countDistinct

Create our session as described in the tutorials

In [19]:
spark = SparkSession \
    .builder \
    .appName("Lab4-Dataframe") \
    .master("local[*]")\
    .getOrCreate()

Read in the citations and patents data and check that the data makes sense. Note that unlike in the RDD solution, the data is automatically inferred to be Integer() types.

In [4]:
citations = spark.read.load('cite75_99.txt.gz',
            format="csv", sep=",", header=True,
            compression="gzip",
            inferSchema="true")

In [5]:
citations.show(5)

+-------+-------+
| CITING|  CITED|
+-------+-------+
|3858241| 956203|
|3858241|1324234|
|3858241|3398406|
|3858241|3557384|
|3858241|3634889|
+-------+-------+
only showing top 5 rows



In [6]:
patents = spark.read.load('apat63_99.txt.gz',
            format="csv", sep=",", header=True,
            compression="gzip",
            inferSchema="true")

In [7]:
patents.show(5)

+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
|3070801| 1963| 1096|   null|     BE|   null|    null|      1|  null|   269|  6|    69| null|       1|    null|    0.0|    null|    null|    null|    null|    null|    null|    null|
|3070802| 1963| 1096|   null|     US|     TX|    null|      1|  null|     2|  6|    63| null|       0|    null|   null|    null|    null|    null|    null|    null|    null|    null|
|3070803| 1963| 1096|   null|     US|     IL|    null|      1|  null|     2|  6|    6

# Solution:

I first start with removing NULL values form the Patents DataFrame

In [8]:
# Remove rows where POSTATE is null from the patents DataFrame
patents_cleaned = patents.dropna(subset=["POSTATE"])
patents_cleaned.show()


+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
|3070802| 1963| 1096|   null|     US|     TX|    null|      1|  null|     2|  6|    63| null|       0|    null|   null|    null|    null|    null|    null|    null|    null|    null|
|3070803| 1963| 1096|   null|     US|     IL|    null|      1|  null|     2|  6|    63| null|       9|    null| 0.3704|    null|    null|    null|    null|    null|    null|    null|
|3070804| 1963| 1096|   null|     US|     OH|    null|      1|  null|     2|  6|    6

In [9]:
# setting alias
sc = citations.alias('sc')
sp = patents_cleaned.alias('sp')

## Joining Postate and Citing

Joining the POSTATE column from the Patents DataFrame as 'CITING_POSTATE' with the CITATIONS DataFrame:

In [10]:
inner_join = sc.join(sp, sc.CITING == sp.PATENT, 'inner')

# Select only CITING, POSTATE (renamed as CITING_POSTATE), and CITED
citing_postate = inner_join.select(
    sc["CITING"],
    sp["POSTATE"].alias("CITING_POSTATE"),
    sc["CITED"]
)
citing_postate.show()

+-------+--------------+-------+
| CITING|CITING_POSTATE|  CITED|
+-------+--------------+-------+
|3858242|            MI|1515701|
|3858242|            MI|3319261|
|3858242|            MI|3668705|
|3858242|            MI|3707004|
|3858244|            CT|  14040|
|3858244|            CT|  17445|
|3858244|            CT|2211676|
|3858244|            CT|2635670|
|3858244|            CT|2838924|
|3858244|            CT|2912700|
|3858245|            NY|2072303|
|3858245|            NY|3176316|
|3858245|            NY|3553737|
|3858245|            NY|3608095|
|3858245|            NY|3621837|
|3858245|            NY|3699969|
|3858245|            NY|3755824|
|3858250|            NY|2741776|
|3858250|            NY|2869141|
|3858250|            NY|2883675|
+-------+--------------+-------+
only showing top 20 rows



Now, joining the POSTATE column from the Patents DataFrame as 'CITED_POSTATE' with the CITATIONS DataFrame. This gives us an intermediate DataFrame to further calculate the number of SAME_STATE citations.

In [11]:
inner_join_cited = citing_postate.join(sp, citing_postate.CITED == sp.PATENT, 'inner')

# Select only CITING, POSTATE (renamed as CITING_POSTATE), and CITED
CITED_CITING_POSTATE = inner_join_cited.select(
    inner_join_cited["CITING"], 
    inner_join_cited["CITING_POSTATE"],
    inner_join_cited["CITED"], 
    inner_join_cited["POSTATE"].alias("CITED_POSTATE")
)

# Show the result
CITED_CITING_POSTATE.show()

+-------+--------------+-------+-------------+
| CITING|CITING_POSTATE|  CITED|CITED_POSTATE|
+-------+--------------+-------+-------------+
|4483021|            MS|3070803|           IL|
|4133055|            NH|3070803|           IL|
|4921141|            CA|3070803|           IL|
|5557807|            FL|3070803|           IL|
|4484363|            CA|3070803|           IL|
|5850636|            CA|3070803|           IL|
|4400830|            FL|3070805|           CA|
|4058119|            WA|3070807|           OH|
|3976076|            ME|3070810|           IL|
|5946742|            SC|3070810|           IL|
|4060860|            UT|3070811|           CA|
|4195370|            WA|3070811|           CA|
|4385407|            CA|3070811|           CA|
|4407027|            CA|3070811|           CA|
|5890240|            CA|3070811|           CA|
|5533214|            CA|3070811|           CA|
|4370762|            OH|3070812|           LA|
|4733415|            PA|3070812|           LA|
|3927703|    

Now that we have the intermediate DataFrame `CITED_CITING_POSTATE`, we can use the `flither()` function to choose CITITATIONS where **CITING_POSTATE == CITED_POSTATE**. This will give us a DataFrame with CITATIONS that have cited the same state patents



In [12]:
# filter rows where CITING_POSTATE is equal to CITED_POSTATE
CITED_CITING_POSTATE_SAME_STATE = CITED_CITING_POSTATE.filter(
    col("CITING_POSTATE") == col("CITED_POSTATE")
)

CITED_CITING_POSTATE_SAME_STATE.show()


+-------+--------------+-------+-------------+
| CITING|CITING_POSTATE|  CITED|CITED_POSTATE|
+-------+--------------+-------+-------------+
|4067198|            AK|3217791|           AK|
|4676695|            AK|3217791|           AK|
|5190098|            AK|3217791|           AK|
|5238053|            AK|3217791|           AK|
|4075779|            AK|3373523|           AK|
|4130086|            AK|3464385|           AK|
|4178878|            AK|3464385|           AK|
|4344414|            AK|3472314|           AK|
|5618134|            AK|3472314|           AK|
|4205718|            AK|3472314|           AK|
|5172587|            AK|3797257|           AK|
|4944413|            AK|3870156|           AK|
|4014293|            AK|3886905|           AK|
|4344414|            AK|3908753|           AK|
|4205718|            AK|3908753|           AK|
|4304440|            AK|3967854|           AK|
|4557498|            AK|4165888|           AK|
|4742798|            AK|4180012|           AK|
|5697730|    

Now that we have `CITED_CITING_POSTATE_SAME_STATE`, we can easily count the number of same state citations by using the `groupBy()` function on the CITING coulmn and the `count()` function.

In [14]:
# Group by CITING and count the occurrences of same-state citations
same_state_citations_count = CITED_CITING_POSTATE_SAME_STATE.groupBy("CITING").agg(
    count("CITED").alias("SAME_STATE")
)

# Order by the count in descending order
same_state_citations_count_desc = same_state_citations_count.orderBy("SAME_STATE", ascending=False)

same_state_citations_count_desc.show()



+-------+----------+
| CITING|SAME_STATE|
+-------+----------+
|5959466|       125|
|5983822|       103|
|6008204|       100|
|5952345|        98|
|5998655|        96|
|5958954|        96|
|5936426|        94|
|5913855|        90|
|5978329|        90|
|5951547|        90|
|5980517|        90|
|5739256|        90|
|5925042|        90|
|5618907|        89|
|5689485|        89|
|5978331|        89|
|5928229|        89|
|5917082|        89|
|5602226|        88|
|5847160|        87|
+-------+----------+
only showing top 20 rows



The above table shows the number of same state citations from each patent in descending order. Now we can just join our `PATENTS` table and the above table to get the final result.


In [15]:
# Join same_state_citations_count_desc with sp on CITING and PATENT
final_result = sp.join(
    same_state_citations_count_desc, 
    same_state_citations_count_desc["CITING"] == sp["PATENT"]
)

# Drop CITING and order by SAME_STATE in descending order
final_result_ordered = final_result.drop("CITING").orderBy(col("SAME_STATE").desc())

final_result_ordered.show()


+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+----------+
| PATENT|GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT|SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|SAME_STATE|
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+---+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+----------+
|5959466| 1999|14515|   1997|     US|     CA|    5310|      2|  null|   326|  4|    46|  159|       0|     1.0|   null|  0.6186|    null|  4.8868|  0.0455|   0.044|    null|    null|       125|
|5983822| 1999|14564|   1998|     US|     TX|  569900|      2|  null|   114|  5|    55|  200|       0|   0.995|   null|  0.7201|    null|   12.45|     0.0|     0.0|    null|    null|       103|
|6008204| 1999|14606|   1998| 

The above table shows the final result were the patents table shows one additions columns `SAME_STATE`.