In [1]:
from difflib import SequenceMatcher
from pyspark.sql.functions import col, regexp_extract, explode, split, concat_ws, avg, length, desc, year, sum, count
from pyspark.sql import SparkSession
from pyspark.ml.feature import StopWordsRemover

# Fonction pour calculer le pourcentage de similarité entre deux chaînes de caractères
def similarite(titre1, titre2):
    return SequenceMatcher(None, titre1, titre2).ratio() * 100

# Fonction pour calculer la similarité entre tous les titres non vides dans le DataFrame
def calculer_similarite_entre_titres(df):
    similarites = []
    titres_non_vides = df.filter(col("title").isNotNull()).select("title").collect()
    for i in range(len(titres_non_vides)):
        titre1 = titres_non_vides[i]["title"].lower()
        for j in range(i + 1, len(titres_non_vides)):
            titre2 = titres_non_vides[j]["title"].lower()
            # Vérifier si les titres ne sont pas vides et que la similarité n'est pas de 100%
            if titre1 and titre2 and titre1 != titre2:
                pourcentage_similarite = similarite(titre1, titre2)
                similarites.append((titre1, titre2, pourcentage_similarite))
    return similarites

# Initialisation de la session Spark
spark = SparkSession.builder \
    .appName("Analyse des données MongoDB") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/spark.data") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/spark.data") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

# Chargement des données
try:
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
except Exception as e:
    print(f"Error loading data: {e}")
    df = None

if df:
    # Vérification de l'existence de la colonne 'title'
    if 'title' in df.columns:
        # Calculer les similarités entre tous les titres non vides
        similarites_titres = calculer_similarite_entre_titres(df)

        # Filtrer les similarités pour ne garder que celles qui ont une similarité inférieure à 60%
        similarites_sous_60 = [sim for sim in similarites_titres if sim[2] < 60]

        # Trier les similarités filtrées par ordre décroissant de similarité
        top_similarites_sous_60 = sorted(similarites_sous_60, key=lambda x: x[2], reverse=True)[:100]

        # Afficher les 20 meilleures paires de titres avec une similarité inférieure à 60%
        print("Top 20 paires de titres avec une similarité inférieure à 60% :")
        for titre1, titre2, pourcentage_similarite in top_similarites_sous_60[:20]:
            print(f"Titre 1 : {titre1}, Titre 2 : {titre2}, Pourcentage de similarité : {pourcentage_similarite}%")

    # Définition des mots par langue avec des préfixes uniques
    english_words = [
        "english_Agriculture", "english_System", "english_Agricole", "english_Farmer", "english_Harvest", "english_Crop", "english_Soil",
        "english_Fertilizer", "english_Irrigation", "english_Pesticide", "english_Tractor", "english_Greenhouse",
        "english_Weather", "english_Climate", "english_Rainfall", "english_Drought", "english_Flood",
        "english_Sunshine", "english_Temperature", "english_Forecast", "english_Wind", "english_Humidity"
    ]
    french_words = [
        "french_Agriculture", "french_Agriculteur", "french_Récolte", "french_Culture", "french_Sol",
        "french_Engrais", "french_Irrigation", "french_Pesticide", "french_Tracteur", "french_Serre",
        "french_Météo", "french_Climat", "french_Précipitations", "french_Sécheresse", "french_Inondation",
        "french_Ensoleillement", "french_Température", "french_Prévision", "french_Vent", "french_Humidité"
    ]
    german_words = [
        "german_Landwirtschaft", "german_Bauer", "german_Ernte", "german_Anbau", "german_Boden",
        "german_Dünger", "german_Bewässerung", "german_Pestizid", "german_Traktor", "german_Gewächshaus",
        "german_Wetter", "german_Klima", "german_Niederschlag", "german_Dürre", "german_Überschwemmung",
        "german_Sonnenschein", "german_Temperatur", "german_Vorhersage", "german_Wind", "german_Luftfeuchtigkeit"
    ]
    japanese_words = [
        "japanese_農業", "japanese_農家", "japanese_収穫", "japanese_作物", "japanese_土壌",
        "japanese_肥料", "japanese_灌漑", "japanese_農薬", "japanese_トラクター", "japanese_温室",
        "japanese_天気", "japanese_気候", "japanese_降水量", "japanese_干ばつ", "japanese_洪水",
        "japanese_日照", "japanese_温度", "japanese_予報", "japanese_風", "japanese_湿度"
    ]
    chinese_words = [
        "chinese_农业", "chinese_农民", "chinese_收获", "chinese_作物", "chinese_土壤",
        "chinese_肥料", "chinese_灌溉", "chinese_杀虫剂", "chinese_拖拉机", "chinese_温室",
        "chinese_天气", "chinese_气候", "chinese_降雨量", "chinese_干旱", "chinese_洪水",
        "chinese_阳光", "chinese_温度", "chinese_预报", "chinese_风", "chinese_湿度"
    ]

    # Combinaison de toutes les listes de mots
    combined_words = english_words + french_words + german_words + japanese_words + chinese_words

    # Utilisation de regex pour compter les occurrences des mots dans les titres
    for word in combined_words:
        regex_pattern = f"(?i)\\b{word.split('_', 1)[1]}\\b"
        df = df.withColumn(word, (regexp_extract(col("title"), regex_pattern, 0) != "").cast("int"))

    # Calcul de la somme des occurrences pour chaque mot
    word_counts = df.select([sum(col(word)).alias(word) for word in combined_words])

    # Filtrer les colonnes dont la somme est supérieure à zéro
    non_zero_word_counts = word_counts.select([col(word) for word in word_counts.columns if word_counts.select(sum(col(word))).first()[0] > 0])

    # Renommer les colonnes pour supprimer les préfixes de langue
    for word in combined_words:
        non_zero_word_counts = non_zero_word_counts.withColumnRenamed(word, word.split('_', 1)[1])

    # Affichage des résultats
    non_zero_word_counts.show(truncate=False)
else:
    print("La colonne 'title' n'existe pas dans le DataFrame.")

# Analyse des descriptions et des revendications
# Combinaison des stopwords en anglais, français et chinois
english_stopwords = StopWordsRemover.loadDefaultStopWords("english")
french_stopwords = StopWordsRemover.loadDefaultStopWords("french")
chinese_stopwords = ["的", "了", "在", "是", "我", "有", "和", "不", "就", "人", "都", "一", "一个", "上", "也", "很", "到", "说", "要", "去", "你", "会", "着", "没有", "看", "好", "自己"]
combined_stopwords = list(set(english_stopwords + french_stopwords + chinese_stopwords))

# Distribution des brevets par année de publication
distribution_par_annee = df.groupBy(year("publication_date").alias("year")).count().orderBy("year")

# Distribution des brevets par pays
distribution_par_pays = df.groupBy("country").count().orderBy(desc("count"))

# Distribution des brevets par inventeur ou titulaire actuel
distribution_par_inventeur_actuel = df.select(explode("current_assignees").alias("assignee")).groupBy("assignee").count().orderBy(desc("count"))

# Distribution des brevets par langue
distribution_par_langue = df.groupBy("other_language").count().orderBy(desc("count"))

# Concaténation des descriptions en une seule chaîne de caractères
concat_descriptions = df.select(concat_ws(" ", "description").alias("concat_description"))

# Tokenisation des descriptions
tokenized_descriptions = concat_descriptions.select(split(col("concat_description"), "\\s+").alias("tokens"))

# Suppression des stop words des descriptions
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens", stopWords=combined_stopwords)
filtered_descriptions = remover.transform(tokenized_descriptions)

# Filtrer les mots pour ne garder que ceux contenant des lettres
filtered_descriptions = filtered_descriptions.select(
    explode(col("filtered_tokens")).alias("mot")).filter(col("mot").rlike("^[a-zA-Z]+$"))

# Concaténation des revendications en une seule chaîne de caractères
concat_revendications = df.select(concat_ws(" ", "claims").alias("concat_claims"))

# Tokenisation des revendications
tokenized_revendications = concat_revendications.select(split(col("concat_claims"), "\\s+").alias("tokens"))

# Suppression des stop words des revendications
filtered_revendications = remover.transform(tokenized_revendications)

# Filtrer les mots pour ne garder que ceux contenant des lettres
filtered_revendications = filtered_revendications.select(
    explode(col("filtered_tokens")).alias("mot")).filter(col("mot").rlike("^[a-zA-Z]+$"))

# Analyse de la longueur moyenne des descriptions et des revendications
longueur_moyenne_description = concat_descriptions.select(avg(length("concat_description")).alias("avg_description_length"))
longueur_moyenne_revendications = concat_revendications.select(avg(length("concat_claims")).alias("avg_claims_length"))

# Calcul de la fréquence des mots dans les descriptions
freq_mots_descriptions = filtered_descriptions.groupBy("mot").count().orderBy(desc("count"))

# Calcul de la fréquence des mots dans les revendications
freq_mots_revendications = filtered_revendications.groupBy("mot").count().orderBy(desc("count"))

# Distribution des brevets par inventeur
distribution_par_inventeur = df.select(explode("inventors").alias("inventor")) \
                                .groupBy("inventor").agg(count("*").alias("nombre_brevets")) \
                                .orderBy(col("nombre_brevets").asc())

# Distribution des brevets par date de publication
distribution_par_date_publication = df.groupBy("publication_date").count().orderBy("publication_date")

# Distribution des brevets par pays d'origine et année de publication
distribution_par_pays_et_annee = df.groupBy("country", year("publication_date").alias("year")).count().orderBy("country", "year")

# Explode des revendications pour obtenir une ligne par revendication
df_exploded = df.select("country", explode("claims").alias("claim"))

# Distribution des brevets par pays et nombre de revendications
distribution_par_pays_et_revendications = df_exploded.groupBy("country").count().orderBy(col("count").desc())

results = {
    "distribution_par_annee": distribution_par_annee,
    "distribution_par_pays": distribution_par_pays,
    "distribution_par_inventeur_actuel": distribution_par_inventeur_actuel,
    "distribution_par_langue": distribution_par_langue,
    "longueur_moyenne_description": longueur_moyenne_description,
    "longueur_moyenne_revendications": longueur_moyenne_revendications,
    "freq_mots_descriptions": freq_mots_descriptions,
    "freq_mots_revendications": freq_mots_revendications,
    "distribution_par_inventeur": distribution_par_inventeur,
    "distribution_par_date_publication": distribution_par_date_publication,
    "distribution_par_pays_et_annee": distribution_par_pays_et_annee,
    "distribution_par_pays_et_revendications": distribution_par_pays_et_revendications
}

for key, df in results.items():
    print(f"{key.replace('_', ' ').capitalize()} :")
    df.show(truncate=False)
else:
    print("Les données n'ont pas pu être chargées.")


Top 20 paires de titres avec une similarité inférieure à 60% :
Titre 1 : a method for obtaining data from an image of an object of a user that has a biometric characteristic of the user, Titre 2 : method for verifying the identity of a user by identifying an object within an image that has a biometric characteristic of the user and mobile device for executing the method, Pourcentage de similarité : 59.93031358885017%
Titre 1 : dispositif de pulverisation de produits de traitement pour vegetaux, Titre 2 : dispositif pour l'application de micro-ondes en vue du traitement d'un materiau., Pourcentage de similarité : 59.863945578231295%
Titre 1 : method and apparatus for packing bunches of flowers into sleeves, Titre 2 : the method and apparatus for determining the number of living cells in a test fluid, Pourcentage de similarité : 59.863945578231295%
Titre 1 : composition comprenant du henne et/ou de l'indigo, une huile et un saccharide, et procede de coloration capillaire la mettant en œu

In [2]:
df.printSchema()


root
 |-- country: string (nullable = true)
 |-- count: long (nullable = false)


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, explode, desc

# Initialisation de la session Spark
spark = SparkSession.builder \
    .appName("Distribution des brevets") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/spark.data") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/spark.data") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

# Chargement des données
try:
    df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
except Exception as e:
    print(f"Error loading data: {e}")
    df = None

if df:
    # Filtrage des données pour s'assurer que les colonnes nécessaires ne sont pas nulles
    df_filtre = df.filter(col("country").isNotNull() & col("publication_date").isNotNull())

    # Distribution des brevets par pays, année de publication et inventeur
    distribution_par_pays_annee_inventeur = df_filtre.select(
        "country",
        year("publication_date").alias("year"),
        explode("inventors").alias("inventor")
    ).groupBy("country", "year", "inventor").count().orderBy("country", "year", desc("count"))

    # Affichage des résultats
    distribution_par_pays_annee_inventeur.show(truncate=False)
else:
    print("Les données n'ont pas pu être chargées.")


+---------+----+--------------------+-----+
|country  |year|inventor            |count|
+---------+----+--------------------+-----+
|Australia|2006|Thomas Giering      |1    |
|Australia|2006|Oliver Martin       |1    |
|Australia|2006|Graham Butler       |1    |
|Australia|2006|Joachim Voelkening  |1    |
|Australia|2006|Michael Hodges      |1    |
|Australia|2006|Wolfgang Rauscher   |1    |
|Australia|2006|Lysis Cubieres      |1    |
|Australia|2006|Gerhard Schwenk     |1    |
|Australia|2006|Yannick Mechine     |1    |
|Australia|2006|Nicholas John Gudde |1    |
|Australia|2007|Joanne J. Fillatti  |1    |
|Australia|2007|Toni Voelker        |1    |
|Australia|2007|Tim Ulmasov         |1    |
|Australia|2007|Neal A. Bringe      |1    |
|Australia|2007|Monica Colt         |1    |
|Australia|2007|Mircea Dan Bucevschi|1    |
|Australia|2007|Mendy Axlerad       |1    |
|Australia|2008|Jerold Brickler     |1    |
|Australia|2008|Roy Cooley          |1    |
|Australia|2008|Mark E. Peters  

In [4]:
from pyspark.sql.functions import asc

# Distribution des brevets par pays, langue et inventeur
distribution_par_pays_langue_inventeur = df_filtre.select(
    "country",
    "other_language",
    explode("inventors").alias("inventor")
).groupBy("country", "other_language", "inventor").count().orderBy("country", asc("count"))

# Affichage des résultats
distribution_par_pays_langue_inventeur.show(truncate=False)


+---------+---------------------+----------------------------+-----+
|country  |other_language       |inventor                    |count|
+---------+---------------------+----------------------------+-----+
|Australia|Helen Dacres         |Nam Cao Hoai Le             |1    |
|Australia|Helen Dacres         |Yonggang Zhu                |1    |
|Australia|Naohiko Hirota       |Naohiko Hirota              |1    |
|Australia|Helen Dacres         |Nan Wu                      |1    |
|Australia|Dieter H. Klaubert   |James Unch                  |1    |
|Australia|Naohiko Hirota       |Hirotaka Kaneda             |1    |
|Australia|Dieter H. Klaubert   |Poncho Meisenheimer         |1    |
|Australia|Christopher Cooper   |Marshall Medoff             |1    |
|Australia|Dignan Herbert Rayner|Ivan Herbert Godfrey RAYNER |1    |
|Australia|Stephen Ecob         |Stephen Ecob                |1    |
|Australia|Helen Dacres         |Helen Dacres                |1    |
|Australia|Naohiko Hirota       |H

In [5]:
from pyspark.sql.functions import year

# Filtrer les lignes avec des valeurs non nulles dans les colonnes "country" et "publication_date"
df_filtre = df.filter(col("country").isNotNull() & col("publication_date").isNotNull())

# Distribution des brevets par pays, année de publication et titre
distribution_par_pays_annee_titre = df_filtre.select(
    "country",
    year("publication_date").alias("year"),
    "title"
).groupBy("country", "year", "title").count().orderBy("country", "year", desc("count"))

# Affichage des résultats
distribution_par_pays_annee_titre.show(truncate=False)


+---------+----+-----------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|country  |year|title                                                                                                                                                |count|
+---------+----+-----------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|Australia|2006|Portable apparatus for analysis of a refinery feedstock or a product of a refinery process                                                           |1    |
|Australia|2006|Value document with luminescent properties                                                                                                           |1    |
|Australia|2007|Nucleic acid constructs and methods for producing altered seed oil compositions                                        

In [15]:
import pandas as pd
from sqlalchemy import create_engine
import logging

# Setup logging
logging.basicConfig(level=logging.INFO)

# Create SQLAlchemy engine
engine = create_engine('postgresql+psycopg2://postgres:12345678@localhost:5432/PostgreSQL_16')

# Dictionary containing Spark DataFrames
dataframes_spark = {
    'Distribution_par_annee': distribution_par_annee,
    'Distribution_par_pays': distribution_par_pays,
    'Distribution_par_inventeur': distribution_par_inventeur,
    'Distribution_par_langue': distribution_par_langue,
    'Longueur_moyenne_description': longueur_moyenne_description,
    'Longueur_moyenne_revendications': longueur_moyenne_revendications,
    'Freq_mots_descriptions': freq_mots_descriptions,
    'Freq_mots_revendications': freq_mots_revendications,
    'Distribution_par_pays_et_annee': distribution_par_pays_et_annee,
    'Distribution_par_pays_et_revendications': distribution_par_pays_et_revendications,
    'Distribution_par_pays_annee_inventeur': distribution_par_pays_annee_inventeur,
    'Distribution_par_pays_langue_inventeur': distribution_par_pays_langue_inventeur,
    'Distribution_par_pays_annee_titre': distribution_par_pays_annee_titre
}

# Convert Spark DataFrames to pandas DataFrames and insert into PostgreSQL
for df_name, df_spark in dataframes_spark.items():
    try:
        logging.info(f"Processing DataFrame: {df_name}")
        df_pandas = df_spark.toPandas()
        df_pandas.to_sql(df_name, engine, if_exists='replace', index=False)
        logging.info(f"Successfully inserted DataFrame: {df_name}")
    except Exception as e:
        logging.error(f"Error inserting DataFrame {df_name}: {e}")

print("All DataFrames have been processed.")


INFO:root:Processing DataFrame: Distribution_par_annee
  df_pandas.to_sql(df_name, engine, if_exists='replace', index=False)
ERROR:root:Error inserting DataFrame Distribution_par_annee: 'Engine' object has no attribute 'cursor'
INFO:root:Processing DataFrame: Distribution_par_pays
  df_pandas.to_sql(df_name, engine, if_exists='replace', index=False)
ERROR:root:Error inserting DataFrame Distribution_par_pays: 'Engine' object has no attribute 'cursor'
INFO:root:Processing DataFrame: Distribution_par_inventeur
  df_pandas.to_sql(df_name, engine, if_exists='replace', index=False)
ERROR:root:Error inserting DataFrame Distribution_par_inventeur: 'Engine' object has no attribute 'cursor'
INFO:root:Processing DataFrame: Distribution_par_langue
  df_pandas.to_sql(df_name, engine, if_exists='replace', index=False)
ERROR:root:Error inserting DataFrame Distribution_par_langue: 'Engine' object has no attribute 'cursor'
INFO:root:Processing DataFrame: Longueur_moyenne_description
  df_pandas.to_sql(

All DataFrames have been processed.


In [17]:
import psycopg2
from psycopg2 import sql

# Connexion à la base de données
conn = psycopg2.connect(
    host="localhost",
    port=5432,
    database="PostgreSQL_16",
    user="postgres",
    password="12345678"
)

# Création d'un curseur
cur = conn.cursor()

# Récupération de la liste des tables dans la base de données
cur.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
tables = cur.fetchall()

# Parcours de chaque table et export des données
for table in tables:
    table_name = table[0]
    # Récupération des données de la table
    cur.execute(sql.SQL("SELECT * FROM {}").format(sql.Identifier(table_name)))
    data = cur.fetchall()
    
    # Écriture des données dans le fichier CSV avec l'encodage UTF-8
    with open(f"{table_name}.csv", "w", encoding="utf-8") as f:
        # Écriture de l'en-tête du fichier CSV avec les noms des colonnes
        cur.execute(sql.SQL("SELECT column_name FROM information_schema.columns WHERE table_name = %s"), (table_name,))
        columns = [col[0] for col in cur.fetchall()]
        f.write(','.join(columns) + '\n')
        
        # Écriture des données dans le fichier CSV
        for row in data:
            # Convertir chaque élément de la ligne en chaîne et le protéger contre les caractères spéciaux
            row_str = [str(elem).replace(',', '') if elem is not None else '' for elem in row]
            f.write(','.join(row_str) + '\n')


# Fermeture du curseur et de la connexion
cur.close()
conn.close()
