In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, StringType

spark = SparkSession.builder \
    .appName("PatentJoinDataFrame") \
    .master("local[*]") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")


In [2]:

patent_path = "apat63_99.txt"   
citations_path = "cite75_99.txt" 


In [3]:
patents_df = spark.read.csv(patent_path, header=False, quote='"', multiLine=False).cache()

print("Patents columns:", patents_df.columns)
patents_df.show(5, truncate=False)


Patents columns: ['_c0', '_c1', '_c2', '_c3', '_c4', '_c5', '_c6', '_c7', '_c8', '_c9', '_c10', '_c11', '_c12', '_c13', '_c14', '_c15', '_c16', '_c17', '_c18', '_c19', '_c20', '_c21', '_c22']
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+----+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
|_c0    |_c1  |_c2  |_c3    |_c4    |_c5    |_c6     |_c7    |_c8   |_c9   |_c10|_c11  |_c12 |_c13    |_c14    |_c15   |_c16    |_c17    |_c18    |_c19    |_c20    |_c21    |_c22    |
+-------+-----+-----+-------+-------+-------+--------+-------+------+------+----+------+-----+--------+--------+-------+--------+--------+--------+--------+--------+--------+--------+
|PATENT |GYEAR|GDATE|APPYEAR|COUNTRY|POSTATE|ASSIGNEE|ASSCODE|CLAIMS|NCLASS|CAT |SUBCAT|CMADE|CRECEIVE|RATIOCIT|GENERAL|ORIGINAL|FWDAPLAG|BCKGTLAG|SELFCTUB|SELFCTLB|SECDUPBD|SECDLWBD|
|3070801|1963 |1096 |NULL   |BE     |NULL   |NULL    |1      |NULL  |269

In [4]:

pat_meta = patents_df.select(
    F.col("_c0").alias("patent_id").cast(IntegerType()),
    F.col("_c4").alias("country"),
    F.col("_c5").alias("state")
).withColumn("state", F.when(F.col("state") == "", None).otherwise(F.col("state"))) \
 .cache()

pat_meta.show(10, truncate=False)


+---------+-------+-------+
|patent_id|country|state  |
+---------+-------+-------+
|NULL     |COUNTRY|POSTATE|
|3070801  |BE     |NULL   |
|3070802  |US     |TX     |
|3070803  |US     |IL     |
|3070804  |US     |OH     |
|3070805  |US     |CA     |
|3070806  |US     |PA     |
|3070807  |US     |OH     |
|3070808  |US     |IA     |
|3070809  |US     |AZ     |
+---------+-------+-------+
only showing top 10 rows



In [5]:

citations_df = spark.read.text(citations_path).select(
    F.split(F.col("value"), "\\s+").alias("parts")
).filter(F.size("parts") == 2).select(
    F.col("parts").getItem(0).cast(IntegerType()).alias("citing"),
    F.col("parts").getItem(1).cast(IntegerType()).alias("cited")
).cache()

citations_df.show(6, truncate=False)


+------+-----+
|citing|cited|
+------+-----+
+------+-----+



In [6]:
cited_join = citations_df.join(
    pat_meta.select(F.col("patent_id").alias("cited"), F.col("state").alias("cited_state")),
    on="cited",
    how="left"
)

cited_with_states = cited_join.join(
    pat_meta.select(F.col("patent_id").alias("citing"), F.col("state").alias("citing_state")),
    on="citing",
    how="left"
)

cited_with_states.select("citing", "cited", "cited_state", "citing_state").show(8, truncate=False)


+------+-----+-----------+------------+
|citing|cited|cited_state|citing_state|
+------+-----+-----------+------------+
+------+-----+-----------+------------+



In [7]:
same_state_counts = cited_with_states \
    .filter((F.col("cited_state").isNotNull()) & (F.col("citing_state").isNotNull())) \
    .withColumn("same", F.when(F.col("cited_state") == F.col("citing_state"), 1).otherwise(0)) \
    .groupBy("citing") \
    .agg(F.sum("same").cast(IntegerType()).alias("same_state_citations")) \
    .cache()

same_state_counts.show(10)


+------+--------------------+
|citing|same_state_citations|
+------+--------------------+
+------+--------------------+



In [8]:
patents_full = patents_df.withColumn("patent_id", F.col("_c0").cast(IntegerType()))

patents_aug = patents_full.join(
    same_state_counts,
    patents_full.patent_id == same_state_counts.citing,
    how="left"
)

patents_aug = patents_aug.withColumn(
    "same_state_citations",
    F.when(
        (F.col("_c4") == "US") & (F.col("_c5").isNotNull()) & (F.col("_c5") != ""),
        F.coalesce(F.col("same_state_citations"), F.lit(0))
    ).otherwise(F.lit(None))
)

patents_aug.select("_c0", "_c4", "_c5", "same_state_citations").show(12, truncate=False)


+-------+-------+-------+--------------------+
|_c0    |_c4    |_c5    |same_state_citations|
+-------+-------+-------+--------------------+
|PATENT |COUNTRY|POSTATE|NULL                |
|3070801|BE     |NULL   |NULL                |
|3070802|US     |TX     |0                   |
|3070803|US     |IL     |0                   |
|3070804|US     |OH     |0                   |
|3070805|US     |CA     |0                   |
|3070806|US     |PA     |0                   |
|3070807|US     |OH     |0                   |
|3070808|US     |IA     |0                   |
|3070809|US     |AZ     |0                   |
|3070810|US     |IL     |0                   |
|3070811|US     |CA     |0                   |
+-------+-------+-------+--------------------+
only showing top 12 rows



In [9]:
top10 = patents_aug \
    .select(F.col("_c0").cast(IntegerType()).alias("patent_id"), "same_state_citations") \
    .filter(F.col("same_state_citations").isNotNull()) \
    .orderBy(F.col("same_state_citations").desc()) \
    .limit(10)

top10.show(10, truncate=False)


+---------+--------------------+
|patent_id|same_state_citations|
+---------+--------------------+
|4649756  |0                   |
|3070802  |0                   |
|4649759  |0                   |
|3070803  |0                   |
|4649760  |0                   |
|3070804  |0                   |
|4649766  |0                   |
|3070805  |0                   |
|4649767  |0                   |
|3070806  |0                   |
+---------+--------------------+



In [11]:
original_cols = patents_df.columns  # _c0, _c1, ... as strings
cols_to_concat = [F.coalesce(F.col(c).cast(StringType()), F.lit('')) for c in original_cols] \
                 + [F.coalesce(F.col("same_state_citations").cast(StringType()), F.lit(''))]

patents_with_augmented_line = patents_aug.withColumn("augmented_line", F.concat_ws(",", *cols_to_concat))
patents_with_augmented_line.select("augmented_line").show(5, truncate=False)


patents_with_augmented_line.select("augmented_line").write.text("patents_augmented_out")


+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|augmented_line                                                                                                                                                                      |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|PATENT,GYEAR,GDATE,APPYEAR,COUNTRY,POSTATE,ASSIGNEE,ASSCODE,CLAIMS,NCLASS,CAT,SUBCAT,CMADE,CRECEIVE,RATIOCIT,GENERAL,ORIGINAL,FWDAPLAG,BCKGTLAG,SELFCTUB,SELFCTLB,SECDUPBD,SECDLWBD,|
|3070801,1963,1096,,BE,,,1,,269,6,69,,1,,0,,,,,,,,                                                                                                                                   |
|3070802,1963,1096,,US,TX,,1,,2,6,63,,0,,,,,,,,,,0                                   

In [2]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("PatentJoinLab") \
    .getOrCreate()

# Get the SparkContext from the SparkSession
sc = spark.sparkContext


In [3]:
# RDD solution: run in the same Spark session, using sc
sc = spark.sparkContext


In [4]:
# RDD Cell 1: helper CSV parsing inside mapPartitions
import csv
from io import StringIO

def parse_csv_partition(lines):
    # csv.reader expects an iterable of lines -- we can feed it directly
    for row in csv.reader(lines):
        # yield list of fields
        yield row

In [12]:
# RDD Cell 2: read patents lines and parse
patent_lines = sc.textFile("apat63_99.txt")
patent_parsed = patent_lines.mapPartitions(parse_csv_partition).cache()
# patent_parsed: each element is a list of fields, e.g. ["6009554","1999","14606","1997","US","NY", ...]


In [13]:
# RDD Cell 3: build (patent_id, state) and also (patent_id, fields_list) for reassembly
def extract_patent_state(fields):
    try:
        pid = int(fields[0])
    except Exception:
        return None
    # state at index 5 if present
    state = None
    if len(fields) > 5:
        st = fields[5]
        if st != '':
            state = st
    return (pid, state)

patent_states = patent_parsed.map(lambda fields: extract_patent_state(fields)).filter(lambda x: x is not None).cache()
# patent_states: (pid, state or None)


In [14]:
# RDD Cell 4: read citations and map to (citing, cited)
cit_rdd = sc.textFile("cite75_99.txt") \
    .map(lambda l: l.strip().split()) \
    .filter(lambda parts: len(parts) == 2) \
    .map(lambda parts: (int(parts[0]), int(parts[1]))) \
    .cache()
# cit_rdd: (citing, cited)

In [15]:
# RDD Cell 5: Get cited_state per (citing, cited)
# First map citations keyed by cited: (cited, citing)
cit_by_cited = cit_rdd.map(lambda citing_cited: (citing_cited[1], citing_cited[0]))  # (cited, citing)

# join with patent_states (keyed by patent_id)
# ensure patent_states keyed by pid
pat_states_kv = patent_states  # (pid, state)
# join on cited -> (cited, (citing, cited_state))
joined_cited = cit_by_cited.join(pat_states_kv)  # join uses RDD.join, result only where state exists in pat_states_kv (but pat_states_kv contains even None states)
# NOTE: if patent_states has entries with state==None, join still produces (cited, (citing, None)), we keep that so we can filter later

# map to (citing, cited_state) for each citation
citing_to_cited_state = joined_cited.map(lambda cid_citingState: (cid_citingState[1][0], cid_citingState[1][1]))  # (citing, cited_state)


In [16]:
# RDD Cell 6: bring in citing_state by joining citing_to_cited_state with patent_states keyed by citing
# First ensure patent_states keyed by patent id - it's already (pid, state)
# join on citing as key
joined_for_citing = citing_to_cited_state.join(pat_states_kv)  # key=citing -> (citing, (cited_state, citing_state))

# Now filter where both states are non-null and compare equality; produce (citing, 1) for same-state
same_pairs = joined_for_citing.filter(lambda kv: (kv[1][0] is not None) and (kv[1][1] is not None) and (kv[1][0] == kv[1][1])) \
    .map(lambda kv: (kv[0], 1))

# reduceByKey to count same-state cites
same_state_counts = same_pairs.reduceByKey(lambda a,b: a+b)  # (citing, count)
same_state_counts.cache()


PythonRDD[42] at RDD at PythonRDD.scala:53

In [22]:
# Load patents
lines_rdd = sc.textFile("apat63_99.txt")

# Remove header
header = lines_rdd.first()
data_rdd = lines_rdd.filter(lambda row: row != header)

# Split into columns
parsed_rdd = data_rdd.map(lambda line: line.split(","))

# Extract (patent_id, state)
patent_states = parsed_rdd.map(lambda cols: (cols[0], cols[5]))

# Check first few
patent_states.take(5)


[('3070801', '""'),
 ('3070802', '"TX"'),
 ('3070803', '"IL"'),
 ('3070804', '"OH"'),
 ('3070805', '"CA"')]

In [21]:
lines_rdd = sc.textFile("apat63_99.txt")

# Grab header
header = lines_rdd.first()

# Filter it out
data_rdd = lines_rdd.filter(lambda row: row != header)

# Now parse safely
parsed_rdd = data_rdd.map(lambda line: line.split(","))

"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,
3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,


In [23]:
# Load citations
citations_rdd = sc.textFile("cite75_99.txt")

# Remove header
cite_header = citations_rdd.first()
data_citations_rdd = citations_rdd.filter(lambda row: row != cite_header)

# Split
parsed_citations_rdd = data_citations_rdd.map(lambda line: line.split(","))

# Extract (cited, citing)
cit_by_cited = parsed_citations_rdd.map(lambda cols: (cols[1], cols[0]))

# Check
cit_by_cited.take(5)


[('956203', '3858241'),
 ('1324234', '3858241'),
 ('3398406', '3858241'),
 ('3557384', '3858241'),
 ('3634889', '3858241')]

In [24]:
# (cited, (citing, cited_state))
joined_cited = cit_by_cited.join(patent_states)

# (citing, cited_state)
citing_to_cited_state = joined_cited.map(lambda x: (x[1][0], x[1][1]))

citing_to_cited_state.take(5)


[('3858315', '"MO"'),
 ('3861040', '"MO"'),
 ('3863340', '"MO"'),
 ('3872592', '"MO"'),
 ('3889369', '"MO"')]

In [25]:
# (citing, (cited_state, citing_state))
joined_with_states = citing_to_cited_state.join(patent_states)

joined_with_states.take(5)


[('4125939', ('"MO"', '"NY"')),
 ('4125939', ('"CT"', '"NY"')),
 ('4409735', ('"MO"', '"CT"')),
 ('4409735', ('"MA"', '"CT"')),
 ('4409735', ('"NY"', '"CT"'))]

In [27]:
# Map to ((citing_state, cited_state), 1)
state_pairs = joined_with_states.map(lambda x: ((x[1][1], x[1][0]), 1))

# Reduce
state_pair_counts = state_pairs.reduceByKey(lambda a, b: a + b)

# Show results
state_pair_counts.take(20)


[(('""', '""'), 2382043),
 (('""', '"NY"'), 252405),
 (('"MA"', '"AZ"'), 3843),
 (('"NC"', '"NC"'), 12975),
 (('"GA"', '"OH"'), 6168),
 (('"NJ"', '"LA"'), 2439),
 (('"UT"', '"WI"'), 933),
 (('"MO"', '"KS"'), 864),
 (('"CA"', '"WV"'), 1403),
 (('"LA"', '""'), 12816),
 (('"SC"', '"NY"'), 3611),
 (('"SC"', '"NJ"'), 2889),
 (('"KY"', '"CO"'), 416),
 (('"MN"', '"TX"'), 9103),
 (('"TX"', '"OH"'), 15006),
 (('"CT"', '"OH"'), 10489),
 (('"TX"', '"MA"'), 16788),
 (('"TX"', '"IL"'), 20830),
 (('"WI"', '"CA"'), 16353),
 (('"CA"', '"AZ"'), 17059)]

In [28]:
# Save results to folder
state_pair_counts.saveAsTextFile("state_citation_counts")

In [29]:
state_pair_counts.coalesce(1).saveAsTextFile("state_citation_counts_single")


In [30]:
for pair, count in state_pair_counts.take(20):
    print(pair, count)

('""', '""') 2382043
('""', '"NY"') 252405
('"MA"', '"AZ"') 3843
('"NC"', '"NC"') 12975
('"GA"', '"OH"') 6168
('"NJ"', '"LA"') 2439
('"UT"', '"WI"') 933
('"MO"', '"KS"') 864
('"CA"', '"WV"') 1403
('"LA"', '""') 12816
('"SC"', '"NY"') 3611
('"SC"', '"NJ"') 2889
('"KY"', '"CO"') 416
('"MN"', '"TX"') 9103
('"TX"', '"OH"') 15006
('"CT"', '"OH"') 10489
('"TX"', '"MA"') 16788
('"TX"', '"IL"') 20830
('"WI"', '"CA"') 16353
('"CA"', '"AZ"') 17059


In [33]:
# Dataframe version 2
from pyspark.sql import SparkSession

# Start SparkSession (if not already started)
spark = SparkSession.builder.appName("PatentCitationsDF").getOrCreate()

# Load patents file
patents_df = spark.read.csv("apat63_99.txt", header=True, inferSchema=True)

# Load citations file
citations_df = spark.read.csv("cite75_99.txt", header=True, inferSchema=True)


In [34]:
patents_states = patents_df.select("PATENT", "POSTATE")
citations = citations_df.select("CITING", "CITED")


In [37]:
# Join citations with patents (for citing side)
from pyspark.sql import functions as F

citing_with_state = citations.join(
    patents_states.withColumnRenamed("PATENT", "CITING_PATENT")
                  .withColumnRenamed("POSTATE", "CITING_STATE"),
    citations["CITING"] == F.col("CITING_PATENT"),
    "inner"
)

# Join again for cited side
joined_df = citing_with_state.join(
    patents_states.withColumnRenamed("PATENT", "CITED_PATENT")
                  .withColumnRenamed("POSTATE", "CITED_STATE"),
    citing_with_state["CITED"] == F.col("CITED_PATENT"),
    "inner"
)


In [38]:
state_pair_counts_df = (
    joined_df.groupBy("CITING_STATE", "CITED_STATE")
             .count()
)


In [39]:
state_pair_counts_df.show(10)


+------------+-----------+-----+
|CITING_STATE|CITED_STATE|count|
+------------+-----------+-----+
|          SC|         PA| 2987|
|          MN|         IL|14046|
|          KS|         MN|  962|
|          CO|         ID|  543|
|          DC|         MA|  258|
|          NC|         OR|  874|
|          IL|         ND|  417|
|          KS|         OK|  558|
|          IA|         CO|  703|
|          NE|         AZ|  165|
+------------+-----------+-----+
only showing top 10 rows



In [40]:
state_pair_counts_df.write.csv("state_citation_counts_df", header=True, mode="overwrite")


In [41]:
df_results = {
    (row["CITING_STATE"], row["CITED_STATE"]): row["count"]
    for row in state_pair_counts_df.collect()
}


In [43]:
rdd_results = dict(state_pair_counts.collect())


In [46]:
def normalize_state(s):
    if s is None:
        return None
    s = s.strip().upper().replace('"', '')
    return s if s else None


In [47]:
print("Do RDD and DataFrame outputs match?", df_results == rdd_results)

Do RDD and DataFrame outputs match? False


In [49]:
rdd = sc.textFile("file.csv").map(lambda line: line.split(","))
pairs_rdd = rdd.map(lambda row: ((row[0], row[1]), 1))

In [50]:
pairs_rdd = rdd.map(lambda row: (
    (normalize_state(row[0]), normalize_state(row[1])), 1
))


In [51]:
counts_rdd = pairs_rdd.reduceByKey(lambda a, b: a + b)


Py4JJavaError: An error occurred while calling o592.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/jovyan/lab4-pyspark-patent/file.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:210)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at jdk.internal.reflect.GeneratedMethodAccessor50.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.io.IOException: Input path does not exist: file:/home/jovyan/lab4-pyspark-patent/file.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 24 more
