# Patent Applicants and Technology Distribution in Germany by Landkreis

This notebook analyzes patent applicants and technology distributions across Germany's NUTS Level 3 regions (Landkreise). Using SQL to query the PATSTAT database, we will load data into Pandas DataFrames and explore it interactively with Pygwalker. For optional geographical visualization, we will use Geopandas to create maps of applicants and technology distributions by Landkreis.


## Setup and import the library modules

In [None]:
# Import pandas libraries for data frame handling
# Import time library for measuring sql execution time
import time

# Import Geopandas for mapping if needed later
import geopandas as gpd
import pandas as pd

# Import pygwalker library for vizualisation
import pygwalker as pyg

# Import the EPO library module for PATSTAT
from epo.tipdata.patstat import PatstatClient

# Import xml lib for IPC sub group labels
from lxml import etree as ET

# Import sql library for easy sql execution
from sqlalchemy import create_engine, func
from sqlalchemy.sql import literal_column

In [None]:
# Intantiate the client objects with reduced data set with TEST or the full dataset with PRDOD
patstat = PatstatClient(env="TEST")
# patstat = PatstatClient(env='PROD')

# Instantiate the ORM
db = patstat.orm()

# import all the tables we need
from epo.tipdata.patstat.database.models import (
    TLS201_APPLN,
    TLS202_APPLN_TITLE,
    TLS206_PERSON,
    TLS207_PERS_APPLN,
    TLS224_APPLN_CPC,
    TLS231_INPADOC_LEGAL_EVENT,
)

### Test Query 1

List all granted applications filed at EPO (direct +PCT) having a year of filing = 2010.


In [None]:
# Test query 1

# Start the timer
start_time = time.time()

q = db.query(
    TLS201_APPLN.appln_id,
    TLS201_APPLN.appln_auth,
    TLS201_APPLN.appln_nr,
    TLS201_APPLN.appln_kind,
    TLS201_APPLN.appln_filing_date,
).filter(
    TLS201_APPLN.appln_filing_year == 2010,
    TLS201_APPLN.appln_auth == "EP",
    TLS201_APPLN.granted == "Y",
)

df = patstat_test.df(q)

# Stop the timer
end_time = time.time()

# Calculate and print the execution time
execution_time = end_time - start_time
print(f"Query execution time: {execution_time:.2f} seconds")

df

### Test Query 2

Create a hitlist of Chinese applicants filing patents at the EPO (direct or PCT).

In [None]:
# Test query 2

# Start the timer
start_time = time.time()

q = (
    db.query(
        TLS206_PERSON.psn_name,
        TLS206_PERSON.person_ctry_code,
        func.count(TLS201_APPLN.appln_id).label("APPLICATIONS_AT_EPO"),
    )
    .select_from(TLS206_PERSON)
    .join(TLS207_PERS_APPLN)
    .join(TLS201_APPLN)
    .filter(
        TLS206_PERSON.person_ctry_code == "CN",
        TLS207_PERS_APPLN.applt_seq_nr > 0,
        TLS207_PERS_APPLN.invt_seq_nr == 0,
        TLS201_APPLN.appln_auth == "EP",
    )
    .group_by(TLS206_PERSON.psn_name, TLS206_PERSON.person_ctry_code)
    .order_by(func.count(TLS201_APPLN.appln_id).desc())
    .limit(100)
)
df = patstat.df(q)

# Stop the timer
end_time = time.time()

# Calculate and print the execution time
execution_time = end_time - start_time
print(f"Query execution time: {execution_time:.2f} seconds")

df

## ChatGPT (GPT-4o) as a Co-Developer - Step 1 - Using pygwalker for Vizualisation

Using PATSTAT, we aim to create dynamic maps that visualize the inventions of German applicants. These maps serve as powerful tools to raise awareness about patent activity within local and societal communities. They also help uncover connections between political, commercial, and patenting activities over time, offering actionable insights for decision-makers.

### Initial Prompt with mtc.berlin "ChatGPT Team" with activated memory

>I need you to help me develop SQL statements for PATSTAT based on regional NUTS codes to identify applicants and technologies based on federal states in Germany. Please act as a co-developer and be so kind to help me with a map per federal state where we create lists for applicants and technologies distribution in level 3 NUTS. We want to visualize with pygwalker library. Please answer in markdown and code blocks to easily copy the results for testing in a jupyter notebook.

The following cells are co-developed in several iterations with this starting prompt above. 

### Pyg Walker initial settings
1. Coordinate System: Geographic
2. Mark System: Choropleth
3. Download nuts in geojson from Eurostat (https://ec.europa.eu/eurostat/web/gisco/geodata/statistical-units/territorial-units-statistics)
4. Open Geographic configuration, load geojson file and enter NUTS_ID as Feature ID
5. Add NUTS field to Geometry ID
6. Add Row Count to Color

### Execute the following first Query and familiarize yourself with the Pyg Walker visualization


In [None]:
# Query patent applicants and technology for NUTS level 3 in Germany

# Start the timer
start_time = time.time()

# Define query
q = (
    db.query(
        TLS206_PERSON.person_name,
        TLS206_PERSON.nuts.label("nuts_code"),
        TLS224_APPLN_CPC.cpc_class_symbol.label(
            "technology_field"
        ),  # Adding CPC class as technology field
        func.count(TLS201_APPLN.appln_id).label("appln_count"),
    )
    .select_from(TLS206_PERSON)
    .join(TLS207_PERS_APPLN, TLS206_PERSON.person_id == TLS207_PERS_APPLN.person_id)
    .join(TLS201_APPLN, TLS207_PERS_APPLN.appln_id == TLS201_APPLN.appln_id)
    .join(TLS224_APPLN_CPC, TLS201_APPLN.appln_id == TLS224_APPLN_CPC.appln_id)
    .filter(
        TLS206_PERSON.nuts.startswith("DE"),  # Filter for Germany NUTS code
        TLS206_PERSON.nuts_level == 3,  # Limit to NUTS level 3
    )
    .group_by(
        TLS206_PERSON.nuts,
        TLS206_PERSON.person_name,
        TLS224_APPLN_CPC.cpc_class_symbol,  # Group by CPC class as well
    )
    .order_by(TLS206_PERSON.nuts)
)  # Sort by nuts_code
# Convert to DataFrame for further processing and visualization

df = patstat.df(q)

# Stop the timer
end_time = time.time()

# Calculate and print the execution time
execution_time = end_time - start_time
print(f"Query execution time: {execution_time:.2f} seconds")

pyg.walk(df)

## First Query patent applicants and technology distribution with filing year and grant status

### extract the NUTS Level 1 = Federal State

### extract the CPC sub classes (CPC hierachy level 3: e.g. B66B = )

In [None]:
# Query patent applicants and technology distribution with filing year and grant status
## extract the NUTS Level 1 = Federal State
## extract the CPC sub classes (CPC hierachy level 3: e.g. B66B = )

#### Start the timer
start_time = time.time()

q = (
    db.query(
        TLS206_PERSON.person_name.label("applicant"),
        TLS206_PERSON.nuts.label("nuts_code"),
        literal_column("SUBSTR(nuts, 1, 3)").label(
            "federal_state_code"
        ),  # Federal state (NUTS Level 1)
        TLS224_APPLN_CPC.cpc_class_symbol.label("technology_field"),  # Technology field
        literal_column("SUBSTR(cpc_class_symbol, 1, 4)").label("cpc_subclass"),
        TLS201_APPLN.appln_filing_year.label("filing_year"),  # Filing year
        TLS201_APPLN.granted.label("granted"),  # Grant status
        func.count(TLS201_APPLN.appln_id).label("appln_count"),  # Application count
    )
    .select_from(TLS206_PERSON)
    .join(TLS207_PERS_APPLN, TLS206_PERSON.person_id == TLS207_PERS_APPLN.person_id)
    .join(TLS201_APPLN, TLS207_PERS_APPLN.appln_id == TLS201_APPLN.appln_id)
    .join(TLS224_APPLN_CPC, TLS201_APPLN.appln_id == TLS224_APPLN_CPC.appln_id)
    .filter(
        TLS206_PERSON.nuts.startswith("DE"),  # Filter for Germany NUTS code
        TLS206_PERSON.nuts_level == 3,  # Limit to NUTS level 3
    )
    .group_by(
        TLS201_APPLN.appln_filing_year,  # Group by filing year
        TLS206_PERSON.nuts,  # Group by NUTS Level 3 code
        literal_column("SUBSTR(nuts, 1, 3)"),  # Group by federal state code
        TLS224_APPLN_CPC.cpc_class_symbol,  # Group by technology field
        TLS206_PERSON.person_name,  # Group by person name
        TLS201_APPLN.granted,  # Group by grant status
    )
    .order_by(TLS206_PERSON.nuts)
)  # , TLS201_APPLN.appln_filing_year)

# Execute the query
df = patstat.df(q)

### Stop the timer
end_time = time.time()

# Calculate and print the execution time
execution_time = end_time - start_time
print(f"Query execution time: {execution_time:.2f} seconds")


# Display the first few rows of the DataFrame
print(df.head())

## Add mapping for Bundesland 

Exracts NUTS Code 1 for Bundesland and NUTS Code 3 for Landkreis
Using a mapping CSV from EUROSTAT. 


In [None]:
# Add mapping for Bundesland NUTS Code 1 and Landkreis NUTS Code 3 with a mapping CSV from EUROSTAT

## Load the prepared CSV file
nuts_mapping = pd.read_csv("./mappings/nuts_mapping.csv", delimiter=",")

## Create separate mappings for federal states and districts
federal_state_mapping = (
    nuts_mapping[nuts_mapping["LEVEL"] == 1]
    .set_index("NUTS_ID")["NAME_LATIN"]
    .to_dict()
)
landkreis_mapping = (
    nuts_mapping[nuts_mapping["LEVEL"] == 3]
    .set_index("NUTS_ID")["NAME_LATIN"]
    .to_dict()
)

## Map federal states (NUTS Level 1)
df["federal_state_name"] = df["nuts_code"].str[:3].map(federal_state_mapping)

## Map Landkreise (NUTS Level 3)
df["landkreis_name"] = df["nuts_code"].map(landkreis_mapping)

# Display the first few rows of the DataFrame for checking
print(df.head())

## Add mapping for CPC Subclass 

Add mapping for level 3 of the CPC hierachy to replace the 4 digit CPC sub classes with their titels. 
Use the ipc_scheme of the WIPO website, as the cpc downloads were not available at the time. 


In [None]:
# Start measuring time
start = time.time()

# File path to the IPC XML
filename = "./mappings/EN_ipc_scheme_20210101.xml"

# Define the namespace and parser
ipc_namespace = "{http://www.wipo.int/classifications/ipc/masterfiles}"
ipcEntry = f"{ipc_namespace}ipcEntry"
text_body = f"{ipc_namespace}textBody"
title_part = f"{ipc_namespace}titlePart"
text = f"{ipc_namespace}text"
parser = ET.XMLParser(remove_blank_text=True)

# Parse the XML file
tree = ET.parse(filename, parser=parser)
root = tree.getroot()

# Initialize dictionary for sub-class mapping
sub_class_mapping = {}

# Iterate through the XML to extract sub-class information
for element in root.iter(ipcEntry):
    if element.attrib.get("kind") == "u":  # Focus on sub-classes
        symbol = element.attrib.get("symbol")  # Extract sub-class symbol

        # Locate the title text within the nested structure
        text_element = element.find(f".//{text_body}//{title_part}//{text}")
        title = text_element.text.strip() if text_element is not None else "No Title"

        sub_class_mapping[symbol] = title

# Print a sample of the extracted data
# for symbol, title in list(sub_class_mapping.items())[:20]:
#    print(f"{symbol}: {title}")

# Print execution time
print(
    f"Extracted {len(sub_class_mapping)} sub-classes in {(time.time() - start) * 1000:.0f} ms."
)

# execute the mapping
df["cpc_subclass_title"] = df["cpc_subclass"].map(sub_class_mapping)

# Display the first few rows of the DataFrame
print(df.head())

df

## Finalize our code with a better structure and the help of our co-developer

Prompt:
> I will extract and give you the python code from a jupyter notebook. Could you please help me structure it with a better architecture using maybe functions or classes?

Answer:
> Sure! Please paste the Python code extracted from your Jupyter notebook, and I’ll help refactor and structure it into a clean, modular script that can be executed outside Jupyter. I’ll use functions and classes where appropriate to improve readability, reusability, and maintainability.

Improvements Made
* Class-Based Structure: Created a PatentDataProcessor class for handling mappings, queries, and processing.
* Modular Functions: Split tasks into reusable methods (load_nuts_mapping, query_patent_data, load_ipc_scheme, process_data, and visualize_data).
* Readability: Organized steps clearly in the __main__ block: Added meaningful print statements to track execution progress.
* Maintainability: External files (e.g., NUTS mapping, IPC scheme) are parameterized for flexibility.
* Error Handling Ready: Methods can easily be extended to include error handling.

In [None]:
import time

import geopandas as gpd
import pandas as pd
import pygwalker as pyg
from epo.tipdata.patstat import PatstatClient
from epo.tipdata.patstat.database.models import (
    TLS201_APPLN,
    TLS202_APPLN_TITLE,
    TLS206_PERSON,
    TLS207_PERS_APPLN,
    TLS224_APPLN_CPC,
    TLS231_INPADOC_LEGAL_EVENT,
)
from lxml import etree as ET
from sqlalchemy import create_engine, func
from sqlalchemy.sql import literal_column


class PatentDataProcessor:
    def __init__(
        self,
        patstat_env="TEST",
        nuts_mapping_path="./mappings/nuts_mapping.csv",
        ipc_scheme_path="./mappings/EN_ipc_scheme_20210101.xml",
    ):
        self.patstat = PatstatClient(env=patstat_env)
        self.db = self.patstat.orm()
        self.nuts_mapping_path = nuts_mapping_path
        self.ipc_scheme_path = ipc_scheme_path
        self.nuts_mapping = None
        self.sub_class_mapping = {}

    def load_nuts_mapping(self):
        """Load NUTS mapping from CSV."""
        print("Loading NUTS mapping...")
        self.nuts_mapping = pd.read_csv(self.nuts_mapping_path, delimiter=",")
        self.federal_state_mapping = (
            self.nuts_mapping[self.nuts_mapping["LEVEL"] == 1]
            .set_index("NUTS_ID")["NAME_LATIN"]
            .to_dict()
        )
        self.landkreis_mapping = (
            self.nuts_mapping[self.nuts_mapping["LEVEL"] == 3]
            .set_index("NUTS_ID")["NAME_LATIN"]
            .to_dict()
        )

    def query_patent_data(self):
        """Query patent data using ORM."""
        print("Querying patent data...")
        start_time = time.time()
        query = (
            self.db.query(
                TLS206_PERSON.person_name.label("applicant"),
                TLS206_PERSON.nuts.label("nuts_code"),
                literal_column("SUBSTR(nuts, 1, 3)").label("federal_state_code"),
                TLS224_APPLN_CPC.cpc_class_symbol.label("technology_field"),
                literal_column("SUBSTR(cpc_class_symbol, 1, 4)").label("cpc_subclass"),
                TLS201_APPLN.appln_filing_year.label("filing_year"),
                TLS201_APPLN.granted.label("granted"),
                func.count(TLS201_APPLN.appln_id).label("appln_count"),
            )
            .select_from(TLS206_PERSON)
            .join(
                TLS207_PERS_APPLN,
                TLS206_PERSON.person_id == TLS207_PERS_APPLN.person_id,
            )
            .join(TLS201_APPLN, TLS207_PERS_APPLN.appln_id == TLS201_APPLN.appln_id)
            .join(TLS224_APPLN_CPC, TLS201_APPLN.appln_id == TLS224_APPLN_CPC.appln_id)
            .filter(TLS206_PERSON.nuts.startswith("DE"), TLS206_PERSON.nuts_level == 3)
            .group_by(
                TLS201_APPLN.appln_filing_year,
                TLS206_PERSON.nuts,
                literal_column("SUBSTR(nuts, 1, 3)"),
                TLS224_APPLN_CPC.cpc_class_symbol,
                TLS206_PERSON.person_name,
                TLS201_APPLN.granted,
            )
            .order_by(TLS206_PERSON.nuts)
        )

        df = self.patstat.df(query)
        print(f"Query execution time: {time.time() - start_time:.2f} seconds")
        return df

    def load_ipc_scheme(self):
        """Load IPC scheme for CPC subclass titles."""
        print("Loading IPC scheme...")
        start = time.time()
        ipc_namespace = "{http://www.wipo.int/classifications/ipc/masterfiles}"
        parser = ET.XMLParser(remove_blank_text=True)
        tree = ET.parse(self.ipc_scheme_path, parser=parser)
        root = tree.getroot()

        for element in root.iter(f"{ipc_namespace}ipcEntry"):
            if element.attrib.get("kind") == "u":
                symbol = element.attrib.get("symbol")
                text_element = element.find(
                    f".//{ipc_namespace}textBody//{ipc_namespace}titlePart//{ipc_namespace}text"
                )
                title = (
                    text_element.text.strip()
                    if text_element is not None
                    else "No Title"
                )
                self.sub_class_mapping[symbol] = title

        print(
            f"Loaded {len(self.sub_class_mapping)} IPC subclasses in {(time.time() - start) * 1000:.0f} ms."
        )

    def process_data(self, df):
        """Map federal states, districts, and subclass titles."""
        print("Processing data...")
        df["federal_state_name"] = (
            df["nuts_code"].str[:3].map(self.federal_state_mapping)
        )
        df["landkreis_name"] = df["nuts_code"].map(self.landkreis_mapping)
        df["cpc_subclass_title"] = df["cpc_subclass"].map(self.sub_class_mapping)
        return df

    def visualize_data(self, df):
        """Visualize data using Pygwalker."""
        print("Launching visualization...")
        pyg.walk(df)


if __name__ == "__main__":
    processor = PatentDataProcessor(patstat_env="TEST")

    # Step 1: Load mappings
    processor.load_nuts_mapping()
    processor.load_ipc_scheme()

    # Step 2: Query data
    patent_data = processor.query_patent_data()

    # Step 3: Process data
    processed_data = processor.process_data(patent_data)

    # Step 4: Visualize data
    processor.visualize_data(processed_data)