In [None]:
!pip install pyspark
!pip install langdetect translate


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, lit

spark = SparkSession.builder \
    .appName("Lecture de CSV avec Spark") \
    .getOrCreate()

df_wipo = spark.read.csv("/content/big_data_project.wipo_end.csv", header=True)
df_es = spark.read.csv("/content/big_data_project.es.csv", header=True)
df_cana = spark.read.csv("/content/big_data_project.cn.csv", header=True)
df_fpo = spark.read.csv("/content/big_data_project.fpn.csv", header=True)
df_gp = spark.read.csv("/content/big_data_project.gp.csv", header=True)

In [None]:
df_fpo.show(10, truncate= False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, lit


df_wipo = df_wipo.withColumn("Application_Date", to_date("Application_Date", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
df_es = df_es.withColumn("Date_application", to_date("Date_application", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
df_cana = df_cana.withColumn("Filled Date", to_date("Filled Date", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
df_fpo = df_fpo.withColumn("Filing Date", to_date("Filing Date", "MM/dd/yyyy"))
df_gp = df_gp.withColumn("Filing Date", to_date("Filing Date", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))

df_wipo_titles = df_wipo.select("Title", "Abstract", "Inventors", "Application_Date", "Office").withColumn("Source", lit("Wipo"))
df_es_titles = df_es.select("Title", "Patent_abstract", "Inventors", "Date_application", "Country_Name").withColumn("Source", lit("Es"))
df_cana_titles = df_cana.select("Title", "Abstract", "Inventors", "Filled Date", "Country").withColumn("Source", lit("Cana"))
df_fpo_titles = df_fpo.select("Title", "Summary", "Inventors", "Filing Date", "Country").withColumn("Source", lit("Fpo"))
df_gp_titles = df_gp.select("Title", "Abstract", "Inventors", "Filing Date", "Country Name").withColumn("Source", lit("Gp"))

dfs_merged = df_wipo_titles.union(df_es_titles).union(df_cana_titles).union(df_fpo_titles).union(df_gp_titles)

dfs_merged = dfs_merged.withColumnRenamed("Office", "country")

dfs_merged.show(3)



In [None]:

count_before = dfs_merged.count()

dfs_merged = dfs_merged.dropDuplicates()

count_after = dfs_merged.count()

if count_before == count_after:
    print("Il n'y a pas de duplicatas.")
else:
    print("Il y a des duplicatas et ils ont été supprimés.")
    print(f"Nombre de lignes avant suppression : {count_before}")
    print(f"Nombre de lignes après suppression : {count_after}")


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

spark = SparkSession.builder.appName("IndexingRows").getOrCreate()

rdd = dfs_merged.rdd.zipWithIndex()

dfs_merged = rdd.map(lambda row: Row(*row[0], row[1])).toDF(dfs_merged.columns + ["row_index"])

dfs_merged.show(5)


In [None]:
import pandas as pd

df_pandas = dfs_merged.toPandas()

df_pandas.head(3)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, isnull, explode
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from wordcloud import WordCloud
import matplotlib.pyplot as plt

spark = SparkSession.builder \
    .appName("WordCloudExample") \
    .getOrCreate()

dfs_merged = dfs_merged.filter(col("Title").isNotNull())
dfs_merged = dfs_merged.filter(col("Abstract").isNotNull())

dfs_merged = dfs_merged.withColumn("title_cleaned", lower(col("Title")))
dfs_merged = dfs_merged.withColumn("title_cleaned", regexp_replace(col("title_cleaned"), "[^a-zA-Z\\s]", ""))
tokenizer_title = Tokenizer(inputCol="title_cleaned", outputCol="title_tokens")
dfs_merged = tokenizer_title.transform(dfs_merged)
remover_title = StopWordsRemover(inputCol="title_tokens", outputCol="title_no_stopwords", stopWords=StopWordsRemover.loadDefaultStopWords("english"))
dfs_merged = remover_title.transform(dfs_merged)
remover_fr_title = StopWordsRemover(inputCol="title_no_stopwords", outputCol="title_no_stopwords_fr", stopWords=StopWordsRemover.loadDefaultStopWords("french"))
dfs_merged = remover_fr_title.transform(dfs_merged)

dfs_merged = dfs_merged.withColumn("abstract_cleaned", lower(col("Abstract")))
dfs_merged = dfs_merged.withColumn("abstract_cleaned", regexp_replace(col("abstract_cleaned"), "[^a-zA-Z\\s]", ""))
tokenizer_abstract = Tokenizer(inputCol="abstract_cleaned", outputCol="abstract_tokens")
dfs_merged = tokenizer_abstract.transform(dfs_merged)
remover_abstract = StopWordsRemover(inputCol="abstract_tokens", outputCol="abstract_no_stopwords", stopWords=StopWordsRemover.loadDefaultStopWords("english"))
dfs_merged = remover_abstract.transform(dfs_merged)
remover_fr_abstract = StopWordsRemover(inputCol="abstract_no_stopwords", outputCol="abstract_no_stopwords_fr", stopWords=StopWordsRemover.loadDefaultStopWords("french"))
dfs_merged = remover_fr_abstract.transform(dfs_merged)

def generate_wordcloud(df=None, col_name=None, title=None):
    words_df = df.select(explode(col(col_name)).alias("word"))
    word_counts = words_df.groupBy("word").count()
    word_counts_dict = {row['word']: row['count'] for row in word_counts.collect()}

    wordcloud = WordCloud(width=800, height=400, background_color='white', colormap='viridis', max_words=100, max_font_size=120).generate_from_frequencies(word_counts_dict)

    plt.figure(figsize=(12, 6))
    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    plt.title(title, fontsize=20, pad=20)
    plt.show()

generate_wordcloud(dfs_merged, "title_no_stopwords_fr", "Nuage de Mots pour la Colonne 'Title' ")

generate_wordcloud(dfs_merged, "abstract_no_stopwords_fr", "Nuage de Mots pour la Colonne 'Abstract'")



In [None]:
dfs_merged_fin = dfs_merged.select('Title', 'Abstract', 'Inventors', 'Application_Date', 'country', 'Source','title_no_stopwords_fr', 'abstract_no_stopwords_fr')

dfs_merged_fin = dfs_merged_fin.toPandas()
nouvelles_colonnes = {
    'title_no_stopwords_fr': 'title_tokens',
    'abstract_no_stopwords_fr': 'abstract_tokens'
}

dfs_merged_fin = dfs_merged_fin.rename(columns=nouvelles_colonnes)


In [None]:

csv_path = "/content/df_merged.csv"
dfs_merged_fin.to_csv(csv_path, index=False)

In [None]:
dfs_merged_pandas = dfs_merged.toPandas()

In [None]:
dfs_merged_pandas.head()

In [None]:
import re
df_inventors = dfs_merged_pandas[['Inventors']].copy()
df_inventors['row_index'] = df_inventors.index

def split_inventors(df):
 
    rows = []

    for idx, row in df.iterrows():
        if pd.notna(row['Inventors']): 
            inventors = re.split(r'[\s,]*and[\s,]*|[\s,]*,[\s,]*', row['Inventors'].lower())
            for inventor in inventors:

                inventor = re.sub(r'\[.*?\]', '', inventor.strip())
                if inventor: 
                    rows.append({'Inventor': inventor, 'row_index': row['row_index']})

    new_df = pd.DataFrame(rows)
    return new_df

df_result = split_inventors(df_inventors)

In [None]:
df_result.head()

In [None]:

excel_path = "/content/inventors.xlsx"
df_result.to_excel(excel_path, index=False)

In [None]:

occurrences_df = df_result['Inventor'].value_counts().reset_index()
occurrences_df.columns = ['Inventor', 'Occurrences']
occurrences_df.head()

In [None]:

excel_path = "/content/inventors_ocurrence.xlsx"
occurrences_df.to_excel(excel_path, index=False)

In [None]:
dfs_merged.show(3)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from langdetect import detect
from translate import Translator

spark = SparkSession.builder.appName("TitleTranslation").getOrCreate()


def detect_language(text):
    try:
        return detect(text)
    except:
        return "unknown"

detect_language_udf = udf(detect_language, StringType())

dfs_merged = dfs_merged.withColumn("language_Title", detect_language_udf(col("Title")))

def translate_to_english(text, lang):
    try:
        if lang != "en" and lang != "unknown":
            translator = Translator(to_lang="en")
            return translator.translate(text)
        return text
    except:
        return text

translate_udf = udf(translate_to_english, StringType())

dfs_merged = dfs_merged.withColumn("title_translated", translate_udf(col("Title"), col("language_Title")))

dfs_merged.show(truncate=False)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
from langdetect import detect
from translate import Translator

spark = SparkSession.builder.appName("TitleTranslation").getOrCreate()


def detect_language(text):
    try:
        return detect(text)
    except:
        return "unknown"

detect_language_udf = udf(detect_language, StringType())

dfs_merged = dfs_merged.withColumn("language_Abstract", detect_language_udf(col("Abstract")))

def translate_to_english(text, lang):
    try:
        if lang != "en" and lang != "unknown":
            translator = Translator(to_lang="en")
            return translator.translate(text)
        return text
    except:
        return text

translate_udf = udf(translate_to_english, StringType())

dfs_merged = dfs_merged.withColumn("abstract_translated", translate_udf(col("Abstract"), col("language_Abstract")))

dfs_merged.show(truncate=False)


In [None]:
dfs_merged.show(3)

In [None]:

dfs_merged_final = dfs_merged.select('Title', 'Abstract', 'Inventors', 'Application_Date', 'country', 'Source','title_no_stopwords_fr', 'abstract_no_stopwords_fr', 'language_Title', 'title_translated','language_Abstract', 'abstract_translated')

dfs_merged_final = dfs_merged_final.toPandas()
nouvelles_colonnes = {
    'title_no_stopwords_fr': 'title_tokens',
    'abstract_no_stopwords_fr': 'abstract_tokens'
}

dfs_merged_final = dfs_merged_final.rename(columns=nouvelles_colonnes)


In [None]:
dfs_merged_final.head(2)

In [None]:

csv_path = "/content/df_merged_final.csv"
dfs_merged_final.to_csv(csv_path, index=False)

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.sql.functions import col, expr
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf


spark = SparkSession.builder.appName("SimilarityCalculation").getOrCreate()

tokenizer = Tokenizer(inputCol="title_translated", outputCol="words")
tokenized_df = tokenizer.transform(dfs_merged)

source_df_filtered = tokenized_df.filter(col("title_translated").isNotNull())

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10000)
tfidf = IDF(inputCol="rawFeatures", outputCol="features")
tfidf_model = tfidf.fit(hashingTF.transform(source_df_filtered))
tfidf_df = tfidf_model.transform(hashingTF.transform(source_df_filtered))

def cosine_similarity(v1, v2):
    dot_product = float(v1.dot(v2))
    norm_product = float(v1.norm(2) * v2.norm(2))
    similarity = dot_product / norm_product if norm_product != 0 else 0.0
    return similarity

cosine_similarity_udf = udf(cosine_similarity, FloatType())

pairs = source_df_filtered.select(col("row_index").alias("row_index1")) \
    .crossJoin(source_df_filtered.select(col("row_index").alias("row_index2"))) \
    .where("row_index1 < row_index2")

similarity_df_1 = pairs.join(tfidf_df.alias("df1"), col("row_index1") == col("df1.row_index")) \
    .join(tfidf_df.alias("df2"), col("row_index2") == col("df2.row_index")) \
    .select(col("row_index1"), col("row_index2"), cosine_similarity_udf(col("df1.features"), col("df2.features")).alias("cosine_similarity"))

similarity_df_1.show()


In [7]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.linalg import SparseVector
from pyspark.sql.functions import col, expr
from itertools import combinations
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf

spark = SparkSession.builder.appName("SimilarityCalculation").getOrCreate()


tokenizer = Tokenizer(inputCol="abstract_translated", outputCol="words")
tokenized_df = tokenizer.transform(dfs_merged)

source_df_filtered = tokenized_df.filter(col("abstract_translated").isNotNull())

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10000)
tfidf = IDF(inputCol="rawFeatures", outputCol="features")
tfidf_model = tfidf.fit(hashingTF.transform(source_df_filtered))
tfidf_df = tfidf_model.transform(hashingTF.transform(source_df_filtered))

def cosine_similarity(v1, v2):
    dot_product = float(v1.dot(v2))
    norm_product = float(v1.norm(2) * v2.norm(2))
    similarity = dot_product / norm_product if norm_product != 0 else 0.0
    return similarity

cosine_similarity_udf = udf(cosine_similarity, FloatType())

pairs = dfs_merged.select(col("row_index").alias("row_index1")).crossJoin(dfs_merged.select(col("row_index").alias("row_index2"))).where("row_index1 < row_index2")

similarity_df_2 = pairs.join(tfidf_df.alias("df1"), col("row_index1") == col("df1.row_index")) \
    .join(tfidf_df.alias("df2"), col("row_index2") == col("df2.row_index")) \
    .select(col("row_index1"), col("row_index2"), cosine_similarity_udf(col("df1.features"), col("df2.features")).alias("cosine_similarity"))

similarity_df_2.show()


In [None]:

subset_similarity_df = similarity_df_1.filter((col("row_index1") >= 200) & (col("row_index1") <= 220))

subset_similarity_df.show()


In [None]:

subset_similarity_df = similarity_df_2.filter((col("row_index1") >= 200) & (col("row_index1") <= 220))

subset_similarity_df.show()


In [None]:

num_rows = similarity_df_1.count()

print("Nombre de lignes dans similarity_df :", num_rows)


In [None]:

num_rows = similarity_df_2.count()

print("Nombre de lignes dans similarity_df :", num_rows)


In [8]:

similarity_df_2.write.csv('/content/similarity_abstract.csv', header=True)
