<a href="https://colab.research.google.com/github/Theseyh/Big-Data-Framework/blob/main/BDF_12_Exercises.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#00 - Configuration of Apache Spark on Collaboratory


###Installing Java, Spark, and Findspark


---


This code installs Apache Spark 3.0.1, Java 8, and [Findspark](https://github.com/minrk/findspark), a library that makes it easy for Python to find Spark.

In [None]:
import os

os.environ["SPARK_VERSION"] = "spark-3.5.3"
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget  http://apache.osuosl.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!echo $SPARK_VERSION-bin-hadoop3.tgz
!rm $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

### Set Environment Variables
Set the locations where Spark and Java are installed.

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark/"
os.environ["DRIVE_DATA"] = "/content/gdrive/My Drive/Enseignement/2024-2025/ING3/HPDA/BigDataFrameworks/data/"

!rm /content/spark
!ln -s /content/$SPARK_VERSION-bin-hadoop3 /content/spark
!export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin
!echo $SPARK_HOME
!env |grep  "DRIVE_DATA"

In [1]:


# Install Java and PySpark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install pyspark findspark



import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/lib/python3.11/dist-packages/pyspark"
os.environ["DRIVE_DATA"] = "/content/gdrive/My Drive/Big Data Framework/data/"

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


### Start a SparkSession
This will start a local Spark session.

In [2]:
!python -V

import findspark
findspark.init()

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

# Example: shows the PySpark version
print("PySpark version {0}".format(sc.version))

# Example: parallelise an array and show the 2 first elements
sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)

Python 3.11.11
PySpark version 3.5.4


[2, 3]

In [3]:
from pyspark.sql import SparkSession
# We create a SparkSession object (or we retrieve it if it is already created)
spark = SparkSession \
.builder \
.appName("My application") \
.config("spark.some.config.option", "some-value") \
.master("local[4]") \
.getOrCreate()
# We get the SparkContext
sc = spark.sparkContext

In [4]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive



---


# 12 - Exercises. Final assignment

## Exercise 12.1

Let us extract information from the cite75_99.txt and apat63_99.txt files. Write a script that performs the following operations:

1. From the cite75_99.txt file, obtain the number of citations received by each patent. You must produce a DataFrame with the following format:

| PatentNum | ncitations |
|-----------|------------|
| 3060453   |    3       |
| 3390168   |    6       |
| 3626542   |   18       |
| 3611507   |    5       |
| 3000113   |    4       |


2. From the apat63_99.txt file, create a DataFrame to show the patent number, its country and the patent year, discarding the rest of fields in the file. The DataFrame produced must have the following format:

|PatentNum |country|Year |
|----------|-------|-----|
| 3070801  | BE    | 1963|
| 3070802  | US    | 1963|
| 3070803  | US    | 1963|
| 3070804  | US    | 1963|
| 3070805  | US    | 1963|


**Requirements**

 - Both DataFrames must be stored in Parquet format with gzip compression. Check the number of partitions of each DataFrame and the number of files gererated.

 - It is **strongly advised** to copy the files from your Drive to a temporal directory in the notebook virtual machine and unzip them there. This will reduce the execution times. See the cell below:
        

In [19]:
!mkdir /tmp/data/
!cp "$DRIVE_DATA"apat63_99.txt.tar.bz2 "$DRIVE_DATA"cite75_99.txt.tar.bz2 "$DRIVE_DATA"country_codes.txt /tmp/data
%cd /tmp/data
!ls
!tar -jxf apat63_99.txt.tar.bz2
!tar -jxf cite75_99.txt.tar.bz2
!rm /tmp/data/*.tar.bz2
!ls

mkdir: cannot create directory ‘/tmp/data/’: File exists
/tmp/data
apat63_99.txt  apat63_99.txt.tar.bz2  cite75_99.txt  cite75_99.txt.tar.bz2  country_codes.txt
apat63_99.txt  cite75_99.txt  country_codes.txt


In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Patent Analysis") \
    .config("spark.sql.parquet.compression.codec", "gzip") \
    .getOrCreate()

# Path to the data files
data_path = "/tmp/data/"

# Load cite75_99.txt and extract the number of citations per patent
cite_file = f"{data_path}cite75_99.txt"
cite_schema = "Citing INT, Cited INT"

# Load the cite75_99.txt file
cite_df = spark.read.csv(cite_file, schema=cite_schema, sep=',', header=False)

# Count citations for each patent
ncitations_df = cite_df.groupBy("Cited") \
    .agg(count("Citing").alias("ncitations")) \
    .withColumnRenamed("Cited", "PatentNum")

# Save ncitations DataFrame as Parquet with gzip compression
ncitations_output = "/tmp/ncitations.parquet"
ncitations_df.write.mode("overwrite").parquet(ncitations_output)

# Check the number of partitions and files generated
print(f"Number of partitions (ncitations): {ncitations_df.rdd.getNumPartitions()}")
!ls {ncitations_output}

# Load apat63_99.txt and extract patent number, country, and year
# Load the apat63_99.txt file with corrected schema
apat_file = f"{data_path}apat63_99.txt"
corrected_schema = """
PatentNum INT, GYear INT, GDate INT, AppYear INT, Country STRING,
PostState STRING, Assignee INT, AssCode INT, Claims INT, NClass INT,
Cat INT, SubCat INT, CMade INT, CReceive INT, RatioCit FLOAT,
General FLOAT, Original FLOAT, FwdAplag FLOAT, BckgtLag FLOAT,
SelfCtUb FLOAT, SelfCtLb FLOAT, SecDupBd FLOAT, SecDlwb FLOAT
"""

apat_df = spark.read.csv(
    apat_file, schema=corrected_schema, sep=",", header=True
)

# Select relevant columns
patent_info_df = apat_df.select("PatentNum", "Country", "GYear").withColumnRenamed("GYear", "Year")

# Save as Parquet
patent_info_output = "/tmp/patent_info.parquet"
patent_info_df.write.mode("overwrite").parquet(patent_info_output)

# Verify results
ncitations_df.show(5)
patent_info_df.show(5)



Number of partitions (ncitations): 2
part-00000-c1abb4b5-5495-4301-856e-2e86b352d8c9-c000.gz.parquet  _SUCCESS
part-00001-c1abb4b5-5495-4301-856e-2e86b352d8c9-c000.gz.parquet
+---------+----------+
|PatentNum|ncitations|
+---------+----------+
|  3060453|         3|
|  3390168|         6|
|  3626542|        18|
|  3611507|         5|
|  3000113|         4|
+---------+----------+
only showing top 5 rows

+---------+-------+----+
|PatentNum|Country|Year|
+---------+-------+----+
|  3070801|     BE|1963|
|  3070802|     US|1963|
|  3070803|     US|1963|
|  3070804|     US|1963|
|  3070805|     US|1963|
+---------+-------+----+
only showing top 5 rows



In [11]:
# prompt: show the 5 first row of cite75_99.txt

!head -n 5 /tmp/data/cite75_99.txt

!head -n 5 /tmp/data/apat63_99.txt



"CITING","CITED"
3858241,956203
3858241,1324234
3858241,3398406
3858241,3557384
"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
3070803,1963,1096,,"US","IL",,1,,2,6,63,,9,,0.3704,,,,,,,
3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,


## Exercise 12.2

Write a code that, from the Parquet files created in the previous exercise, obtains for each country and for each year: the total number of patents, the total number of citations from those patents, the average number of citations and the maximum number of citations. Compute only those values in which there are any values in both files (*inner join*). In addition, each country must show its whole name, obtained from the *country_codes.txt* file. The final DataFrame must look like this one:


|Country            |Year|PatentsNum |TotalCitations|AvgCitations      |MaxCitations|
|-------------------|----|-----------|--------------|------------------|------------|
|Algeria            |1963|2          |7             |3.5               |4           |
|Algeria            |1968|1          |2             |2.0               |2           |
|Algeria            |1970|1          |2             |2.0               |2           |
|Algeria            |1972|1          |1             |1.0               |1           |
|Algeria            |1977|1          |2             |2.0               |2           |
|Andorra            |1987|1          |3             |3.0               |3           |
|Andorra            |1993|1          |1             |1.0               |1           |
|Andorra            |1998|1          |1             |1.0               |1           |
|Antigua and Barbuda|1978|1          |6             |6.0               |6           |
|Antigua and Barbuda|1979|1          |14            |14.0              |14          |
|Antigua and Barbuda|1991|1          |8             |8.0               |8           |
|Antigua and Barbuda|1994|1          |19            |19.0              |19          |
|Antigua and Barbuda|1995|2          |12            |6.0               |11          |
|Antigua and Barbuda|1996|2          |3             |1.5               |2           |
|Argentina          |1963|14         |35            |2.5               |7           |
|Argentina          |1964|20         |60            |3.0               |8           |
|Argentina          |1965|10         |35            |3.5               |10          |
|Argentina          |1966|16         |44            |2.75              |9           |
|Argentina          |1967|13         |60            |4.615384615384615 |14          |

**Requirements**

- The output DataFrame must be saved in a single CSV file, with a header and without any compression.


In [24]:
!head -n 5 /tmp/data/country_codes.txt
!ls


AF	Afghanistan
AX	Aland Islands
AL	Albania
DZ	Algeria
AS	American Samoa
apat63_99.txt  cite75_99.txt  country_codes.txt


In [63]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, max, sum, count

# Initialize SparkSession
spark = SparkSession.builder.appName("Exercise 12.2").getOrCreate()

# Load Parquet files
ncitations_path = "/tmp/ncitations.parquet"
patent_info_path = "/tmp/patent_info.parquet"

ncitations_df = spark.read.parquet(ncitations_path)
patent_info_df = spark.read.parquet(patent_info_path)

# Inner join on PatentNum
joined_df = patent_info_df.join(ncitations_df, on="PatentNum", how="inner")

# Group by Country and Year, then aggregate
aggregated_df = joined_df.groupBy("Country", "Year").agg(
    count("PatentNum").alias("PatentsNum"),       # Total number of patents
    sum("ncitations").alias("TotalCitations"),   # Total citations
    avg("ncitations").alias("AvgCitations"),     # Average citations
    max("ncitations").alias("MaxCitations")      # Maximum citations
)

# Load country codes and join with aggregated data
country_codes_path = "/tmp/data/country_codes.txt"
country_codes_df = spark.read.csv(country_codes_path, sep="\t", header=False, inferSchema=True) \
    .toDF("Code", "Name")  # Rename columns for clarity

# Join on the country code
final_df = aggregated_df.join(country_codes_df, aggregated_df["Country"] == country_codes_df["Code"]) \
    .select(col("Name").alias("Country"), "Year", "PatentsNum", "TotalCitations", "AvgCitations", "MaxCitations")

    # Sort by Country and Decade
final_df = final_df.orderBy("Country", "Year")

# Display the first 20 rows of the final DataFrame
final_df.show(n=20, truncate=False)

# Save final DataFrame as CSV
output_path = "/tmp/exercise_12_2.csv"
final_df.write.csv(output_path, mode="overwrite", header=True)



+-------------------+----+----------+--------------+-----------------+------------+
|Country            |Year|PatentsNum|TotalCitations|AvgCitations     |MaxCitations|
+-------------------+----+----------+--------------+-----------------+------------+
|Algeria            |1963|2         |7             |3.5              |4           |
|Algeria            |1968|1         |2             |2.0              |2           |
|Algeria            |1970|1         |2             |2.0              |2           |
|Algeria            |1972|1         |1             |1.0              |1           |
|Algeria            |1977|1         |2             |2.0              |2           |
|Andorra            |1987|1         |3             |3.0              |3           |
|Andorra            |1993|1         |1             |1.0              |1           |
|Andorra            |1998|1         |1             |1.0              |1           |
|Antigua and Barbuda|1978|1         |6             |6.0              |6     

## Exercise 12.3

From the apat63_99.txt file, obtain the number of patents per country and year **using RDDs** (do not use DataFrames). The resulting RDD must be a key/value RDD in which the key is a country and the value a list of tuples. Each tuple will be composed of a year and the number of patents of the country during that year. In addition, the resulting RDD must be sorted by  the country code and, for each country, values must be sorted by year.

Example of output key/value entry:

    (u'PA', [(u'1963', 2), (u'1964', 2), (u'1965', 1), (u'1966', 1), (u'1970', 1), (u'1971', 1), (u'1972', 6), (u'1974', 3), (u'1975', 5), (u'1976', 3), (u'1977', 2), (u'1978', 2), (u'1980', 2), (u'1982', 1), (u'1983', 1), (u'1985', 2), (u'1986', 1), (u'1987', 2), (u'1988', 1), (u'1990', 1), (u'1991', 2), (u'1993', 1), (u'1995', 1), (u'1996', 1), (u'1999', 1)])

**Requirements:**

- You must remove the double quotation marks from the country code.
- Use 8 partitions to read the apat63_99.txt.bz2 file.


In [31]:
from pyspark import SparkContext, SparkConf

# Reuse existing SparkContext if available
sc = SparkContext.getOrCreate()
# Path to the compressed file
file_path = "/tmp/data/apat63_99.txt"

# Read the file with 8 partitions
rdd = sc.textFile(file_path, minPartitions=8)

# Process the RDD
result_rdd = (
    rdd.map(lambda line: line.split(","))  # Split each line by commas
       .map(lambda fields: (fields[4].replace('"', ''), fields[1]))  # Extract country (field 4) and year (field 1), removing quotes
       .filter(lambda x: x[0] and x[1])  # Filter out rows with empty country or year
       .map(lambda x: ((x[0], x[1]), 1))  # Create key as (country, year) and set value to 1
       .reduceByKey(lambda a, b: a + b)  # Aggregate counts for each (country, year)
       .map(lambda x: (x[0][0], (x[0][1], x[1])))  # Reformat to (country, (year, count))
       .groupByKey()  # Group by country
       .mapValues(lambda years: sorted(list(years), key=lambda y: y[0]))  # Sort years for each country
       .sortByKey()  # Sort by country code
)

# Collect and display the output
output = result_rdd.collect()
for entry in output:
    print(entry)

# Stop the SparkContext
sc.stop()


('AD', [('1987', 1), ('1993', 1), ('1995', 1), ('1998', 2), ('1999', 1)])
('AE', [('1984', 2), ('1985', 2), ('1987', 1), ('1989', 2), ('1990', 1), ('1991', 2), ('1992', 1), ('1993', 1), ('1994', 1), ('1996', 1), ('1998', 1), ('1999', 3)])
('AG', [('1978', 1), ('1979', 1), ('1991', 1), ('1994', 1), ('1995', 2), ('1996', 2), ('1999', 1)])
('AI', [('1998', 1)])
('AL', [('1999', 1)])
('AM', [('1995', 1), ('1999', 1)])
('AN', [('1979', 1), ('1980', 1), ('1989', 1), ('1991', 2), ('1995', 1), ('1997', 1), ('1998', 1)])
('AR', [('1963', 19), ('1964', 27), ('1965', 18), ('1966', 20), ('1967', 16), ('1968', 18), ('1969', 17), ('1970', 23), ('1971', 22), ('1972', 29), ('1973', 28), ('1974', 24), ('1975', 24), ('1976', 24), ('1977', 20), ('1978', 21), ('1979', 24), ('1980', 18), ('1981', 25), ('1982', 18), ('1983', 21), ('1984', 20), ('1985', 11), ('1986', 17), ('1987', 18), ('1988', 16), ('1989', 20), ('1990', 17), ('1991', 16), ('1992', 20), ('1993', 24), ('1994', 32), ('1995', 31), ('1996', 30)

## Exercise 12.4

From the Parquet files created in Exercise 12.1, create a DataFrame that gives the patent or patents with the higher number of citations per country and year, as well as the average of the number of citations per country and year, and the difference between the maximum and the average values. The resulting DataFrame should look like this:


|Country|Year|PatentNum|max  |average       |diff              |
|-------|----|---------|-----|--------------|------------------|
|AD     |1987|4688621  |3    |3.0           |0.0               |
|AD     |1993|5193231  |1    |1.0           |0.0               |
|AD     |1998|5765303  |1    |1.0           |0.0               |
|AE     |1984|4482959  |5    |5.0           |0.0               |
|AE     |1985|4554981  |14   |14.0          |0.0               |
|AE     |1987|4663181  |3    |3.0           |0.0               |
|AE     |1989|4805221  |7    |5.0           |2.0               |
|AE     |1990|4909321  |2    |2.0           |0.0               |
|AE     |1991|5004552  |3    |2.0           |1.0               |
|AE     |1992|5104556  |4    |4.0           |0.0               |
|AE     |1993|5181569  |8    |8.0           |0.0               |
|AE     |1996|5580125  |1    |1.0           |0.0               |
|AG     |1978|4126850  |6    |6.0           |0.0               |
|AG     |1979|4172981  |14   |14.0          |0.0               |
|AG     |1991|5013035  |8    |8.0           |0.0               |
|AG     |1994|5345071  |19   |19.0          |0.0               |
|AG     |1995|5457307  |11   |6.0           |5.0               |
|AG     |1996|5525786  |2    |1.5           |0.5               |

**Requirements:**

- The DataFrame must be sorted by country code and year.
- Do **NOT** replace the country code by its whole name.
- The output must be saved as a single CSV file, with a header and without any compression.


In [35]:
!ls -a

.  ..  apat63_99.txt  cite75_99.txt  country_codes.txt


In [38]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

# If a SparkSession doesn't already exist, create one
if not 'spark' in locals():
    spark = SparkSession.builder.appName("Exercise 12.4").getOrCreate()
else: # otherwise, get the existing one
    spark = SparkSession.builder.appName("Exercise 12.4").getOrCreate()


# Load the Parquet files
ncitations_df = spark.read.parquet("/tmp/ncitations.parquet")
patent_info_df = spark.read.parquet("/tmp/patent_info.parquet")

# Join the dataframes on PatentNum
joined_df = patent_info_df.join(ncitations_df, on="PatentNum", how="inner")

# Aggregating by country and year
agg_df = joined_df.groupBy("Country", "Year").agg(
    F.max("ncitations").alias("max"),  # Max citations
    F.avg("ncitations").alias("average")  # Average citations
)

# Calculate the difference between max and average
agg_df = agg_df.withColumn("diff", F.col("max") - F.col("average"))

# For each country and year, get the patent with the max number of citations
max_citation_patents_df = joined_df.join(agg_df, on=["Country", "Year"]) \
    .filter(joined_df["ncitations"] == agg_df["max"]) \
    .select("Country", "Year", "PatentNum", "max", "average", "diff")

# Sort the DataFrame by country and year
sorted_final_df = max_citation_patents_df.orderBy("Country", "Year")

# Save the result as a CSV file
output_path = "/tmp/exercise_12_2_output.csv"
sorted_final_df.write.csv(output_path, mode="overwrite", header=True)

# Show the final output DataFrame (optional)
sorted_final_df.show(10)


+-------+----+---------+---+-------+----+
|Country|Year|PatentNum|max|average|diff|
+-------+----+---------+---+-------+----+
|     AD|1987|  4688621|  3|    3.0| 0.0|
|     AD|1993|  5193231|  1|    1.0| 0.0|
|     AD|1998|  5765303|  1|    1.0| 0.0|
|     AE|1984|  4482959|  5|    5.0| 0.0|
|     AE|1985|  4554981| 14|   14.0| 0.0|
|     AE|1987|  4663181|  3|    3.0| 0.0|
|     AE|1989|  4805221|  7|    5.0| 2.0|
|     AE|1990|  4909321|  2|    2.0| 0.0|
|     AE|1991|  5004552|  3|    2.0| 1.0|
|     AE|1992|  5104556|  4|    4.0| 0.0|
+-------+----+---------+---+-------+----+
only showing top 10 rows



## Exercise 12.5

From the Parquet file with the (PatentNum,Country,Year) information from Exercise 12.1, create a DataFrame that shows the number of patents associated to each country per decade (understanding as a *decade* the years from 0 to 9; e.g. from 1970 to 1979). In addition, the DataFrame must show the increase or decrease of the number of patents per country and decade with respect to the previous decade. The resulting DataFrame must look like this:

|Country|Decade|PatentsNum|Diff|
|-------|------|----------|----|
|AD     |1980  |1         |0   |
|AD     |1990  |5         |4   |
|AE     |1980  |7         |0   |
|AE     |1990  |11        |4   |
|AG     |1970  |2         |0   |
|AG     |1990  |7         |5   |
|AI     |1990  |1         |0   |
|AL     |1990  |1         |0   |
|AM     |1990  |2         |0   |
|AN     |1970  |1         |0   |
|AN     |1980  |2         |1   |
|AN     |1990  |5         |3   |
|AR     |1960  |135       |0   |
|AR     |1970  |239       |104 |
|AR     |1980  |184       |-55 |
|AR     |1990  |292       |108 |

**Requirements**

- The DataFrame must be sorted by country code and year.
- Do **NOT** replace the country code by its whole name.
- The output must be saved as a single CSV file, with a header and without any compression.

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

# If a SparkSession doesn't already exist, create one
if not 'spark' in locals():
    spark = SparkSession.builder.appName("Exercise 12.4").getOrCreate()
else: # otherwise, get the existing one
    spark = SparkSession.builder.appName("Exercise 12.4").getOrCreate()


# Load the Parquet file
patent_info_df = spark.read.parquet("/tmp/patent_info.parquet")

# Add the Decade column based on the Year column
result_df = patent_info_df.withColumn("Decade", (F.floor(F.col("Year") / 10) * 10))

# Group by Country and Decade, and count patents
result_df = result_df.groupBy("Country", "Decade") \
    .agg(F.count("PatentNum").alias("PatentsNum"))

# Sort by Country and Decade
result_df = result_df.orderBy("Country", "Decade")

# Define the window specification
window_spec = Window.partitionBy("Country").orderBy("Decade")

# Calculate the difference using lag function and handle first decade with coalesce
result_df = result_df.withColumn("Diff",
                                 F.coalesce(
                                     result_df["PatentsNum"] - F.lag(result_df["PatentsNum"], 1).over(window_spec),
                                     F.lit(0)  # Default to 0 for the first decade
                                 )
                                )

# Show the result
result_df.show(20)

In [62]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

# If a SparkSession doesn't already exist, create one
if not 'spark' in locals():
    spark = SparkSession.builder.appName("Exercise 12.4").getOrCreate()
else: # otherwise, get the existing one
    spark = SparkSession.builder.appName("Exercise 12.4").getOrCreate()


# Load the Parquet file
patent_info_df = spark.read.parquet("/tmp/patent_info.parquet")

# Add the Decade column based on the Year column
result_df = patent_info_df.withColumn("Decade", (F.floor(F.col("Year") / 10) * 10))

# Group by Country and Decade, and count patents
result_df = result_df.groupBy("Country", "Decade") \
    .agg(F.count("PatentNum").alias("PatentsNum"))


# Calculate the difference using lag function and handle first decade with coalesce
result_df = result_df.withColumn("Diff",
                                 F.coalesce(
                                     result_df["PatentsNum"] - F.lag(result_df["PatentsNum"], 1).over(window_spec),
                                     F.lit(0)  # Default to 0 for the first decade
                                 )
                                )

# Show the result
result_df.show(20)

+-------+------+----------+----+
|Country|Decade|PatentsNum|Diff|
+-------+------+----------+----+
|     AD|  1980|         1|   0|
|     AD|  1990|         5|   4|
|     AE|  1980|         7|   0|
|     AE|  1990|        11|   4|
|     AG|  1970|         2|   0|
|     AG|  1990|         7|   5|
|     AI|  1990|         1|   0|
|     AL|  1990|         1|   0|
|     AM|  1990|         2|   0|
|     AN|  1970|         1|   0|
|     AN|  1980|         2|   1|
|     AN|  1990|         5|   3|
|     AR|  1960|       135|   0|
|     AR|  1970|       239| 104|
|     AR|  1980|       184| -55|
|     AR|  1990|       292| 108|
|     AT|  1960|       950|   0|
|     AT|  1970|      2588|1638|
|     AT|  1980|      3057| 469|
|     AT|  1990|      3665| 608|
+-------+------+----------+----+
only showing top 20 rows

