# The medallion lakehouse architecture

##### 1/. *BRONZE LAYER:* LOADING DATA FROM BLOB STORAGE.

A. Access to azure SQL database:

In [0]:
# Configure connection properties
jdbcHostname = "najma.database.windows.net"
jdbcPort = 1433
jdbcDatabase = "datatables"
jdbcUsername = "Fatima"
jdbcPassword = "Away!123"

# Create the JDBC URL
jdbcUrl = f"jdbc:sqlserver://{jdbcHostname}:{jdbcPort};database={jdbcDatabase}"

connectionProperties = {
    "user": jdbcUsername,
    "password": jdbcPassword,
    "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

# Load data from Azure SQL Database
table_name = "staging_table"
df = spark.read.jdbc(url=jdbcUrl, table=table_name, properties=connectionProperties)

# Show the loaded data
df.show()


+--------------------+--------------------+----------+--------------------+--------------------+--------------+--------+
|               title|            abstract|      date|            inventor|             country|   document_id|language|
+--------------------+--------------------+----------+--------------------+--------------------+--------------+--------+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|world intellectua...|wo2010145768a1|      fr|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|               china|    cn1431018a|      en|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|world intellectua...|   wo9909193a1|      fr|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|               china|    cn1696152a|      en|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|world intellectua...|  wo03095656a1|      fr|
|['vaccinia', 'vir...|['recombin

**Bronze Layer (Raw Data):**

In [0]:
%sql
DROP table if exists bronze_table_1;
CREATE TABLE bronze_table_1 (
    title STRING,
    abstract STRING,
    date STRING,
    inventor STRING,
    country STRING,
    document_id STRING,
    language STRING
)
USING DELTA
LOCATION '/mnt/bronze_table6'




**Load data on Bronze table:**

In [0]:
# Assuming raw_data_df is your DataFrame containing the raw data
df.write.format("delta").mode("append").save("/mnt/bronze_table6")


In [0]:
%sql
select * from bronze_table_1;


In [0]:
%sql
select count(*) from bronze_table_1;

count(1)
15405


**Silver data: DATA VALIDAZTION**

In [0]:
bronze_df = spark.read.format("delta").load("/mnt/bronze_table6")


In [0]:
bronze_df.show()

+--------------------+--------------------+----------+--------------------+--------------------+--------------+--------+
|               title|            abstract|      date|            inventor|             country|   document_id|language|
+--------------------+--------------------+----------+--------------------+--------------------+--------------+--------+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|world intellectua...|wo2010145768a1|      fr|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|               china|    cn1431018a|      en|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|world intellectua...|   wo9909193a1|      fr|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|               china|    cn1696152a|      en|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|world intellectua...|  wo03095656a1|      fr|
|['vaccinia', 'vir...|['recombin

1. Transformed to correct format date yyyy-MM-dd:

In [0]:
from pyspark.sql.functions import to_date, date_format

# Check if the 'date' column exists in the DataFrame
if 'date' in bronze_df.columns:
    df = bronze_df.withColumn('date', date_format(to_date(bronze_df['date'], 'yyyy-MM-dd'), 'yyyy-MM-dd'))
else:
    # Handle the case when 'date' column is missing
    print("The 'date' column does not exist in the DataFrame.")

In [0]:
df.show()

+--------------------+--------------------+----------+--------------------+--------------------+--------------+--------+
|               title|            abstract|      date|            inventor|             country|   document_id|language|
+--------------------+--------------------+----------+--------------------+--------------------+--------------+--------+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|world intellectua...|wo2010145768a1|      fr|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|               china|    cn1431018a|      en|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|world intellectua...|   wo9909193a1|      fr|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|               china|    cn1696152a|      en|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|world intellectua...|  wo03095656a1|      fr|
|['vaccinia', 'vir...|['recombin

1. Country column Processing:

In [0]:
from pyspark.sql.functions import col, when

In [0]:
def replace_country(dataframe, column_name):
    dataframe = dataframe.withColumn(column_name,
                                     when((col(column_name) == 'world intellectual property organization wipo') |
                                          (col(column_name) == 'World Intellectual Property Organization (WIPO)') |
                                          (col(column_name) == 'European Patent Office (EPO)') |
                                          (col(column_name) == 'Eurasian Patent Organization (EAPO)') |
                                          (col(column_name) == 'eurasian patent organization eapo') |
                                          (col(column_name) == 'european patent office epo') |
                                          (col(column_name) == 'European Patent Office') |
                                          (col(column_name) == 'WIPO (PCT)'),
                                          'unknown country').otherwise(col(column_name)))
    return dataframe

column_name = 'country'  # Name of the column to be replaced

df = replace_country(df, column_name)


In [0]:
# Show the DataFrame
df.show()

+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|               title|            abstract|      date|            inventor|        country|   document_id|language|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          china|    cn1431018a|      en|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          china|    cn1696152a|      en|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|
|['vaccinia', 'vir...|['recombinant', '...|1989-07-19|hoffmann la roche 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Function to determine distinct countries
def get_distinct_countries(df):
    countries_df = df.select(col("country")).distinct().filter(col("country") != "unknown country")
    countries = [row["country"] for row in countries_df.collect()]
    return countries

# Call the function
distinct_countries = get_distinct_countries(df)
print("Distinct Countries:", distinct_countries)


Distinct Countries: ['australia', 'Russia', 'portugal', 'us', 'south korea', 'jp', 'Germany', 'lv', 'vn', 'pl', 'cn', 'za', 'France', 'japan', 'Taiwan', 'pt', 'in', 'au', 'sg', 'Finland', 'China', 'United States', 'gb', 'serbia', 'br', 'Spain', 'es', 'canada', 'Denmark', 'kr', 'france', 'ar', 'ph', 'nl', 'United States of America', 'South Korea', 'ca', 'my', 'nz', 'ee', 'united states of america', 'ru', 'united kingdom great britain', 'lt', 'germany', 'Canada', 'russia', 'wo', 'Brazil', 'mx', 'Japan', 'china', 'fr', 'Country', 'Australia', 'spain', 'dk', 'id', 'Serbia', 'ep', 'United Kingdom', 'Netherlands']


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Mapping dictionary for normalization
country_mapping = {
    'australia': 'Australia',
    'russia': 'Russia',
    'portugal': 'Portugal',
    'us': 'United States',
    'south korea': 'South Korea',
    'jp': 'Japan',
    'germany': 'Germany',
    'lv': 'Latvia',
    'vn': 'Vietnam',
    'pl': 'Poland',
    'cn': 'China',
    'za': 'South Africa',
    'france': 'France',
    'japan': 'Japan',
    'taiwan': 'Taiwan',
    'pt': 'Portugal',
    'in': 'India',
    'au': 'Australia',
    'sg': 'Singapore',
    'finland': 'Finland',
    'united states': 'United States',
    'gb': 'United Kingdom',
    'serbia': 'Serbia',
    'br': 'Brazil',
    'spain': 'Spain',
    'es': 'Spain',
    'canada': 'Canada',
    'denmark': 'Denmark',
    'kr': 'South Korea',
    'ar': 'Argentina',
    'ph': 'Philippines',
    'nl': 'Netherlands',
    'united states of america': 'United States',
    'ca': 'Canada',
    'my': 'Malaysia',
    'nz': 'New Zealand',
    'ee': 'Estonia',
    'ru': 'Russia',
    'united kingdom great britain': 'United Kingdom',
    'lt': 'Lithuania',
    'wo': 'unknown country',
    'mx': 'Mexico',
    'spain': 'Spain',
    'dk': 'Denmark',
    'id': 'Indonesia',
    'united kingdom': 'United Kingdom',
    'ep': 'Europe',
    'china': 'China',
    'fr': 'France',
    'Country':'unknown country'
    
}

# UDF for transforming country names
@udf(StringType())
def normalize_country(country):
    return country_mapping.get(country.lower(), country)

# Function to apply transformation to country column
def transform_country_column(df, country_column):
    df_transformed = df.withColumn(country_column, normalize_country(df[country_column]))
    return df_transformed


# Apply transformation to country column
df_transformed = transform_country_column(df, "country")

# Show the transformed DataFrame
df_transformed.show()


+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|               title|            abstract|      date|            inventor|        country|   document_id|language|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|
|['vaccinia', 'vir...|['recombinant', '...|1989-07-19|hoffmann la roche 

In [0]:
# Select the 'country' column and get distinct values
distinct_countries = df_transformed.select('country').distinct()

# Collect the distinct country names
country_names = [row.country for row in distinct_countries.collect()]

print(country_names)

['Russia', 'Philippines', 'Malaysia', 'Singapore', 'Germany', 'Europe', 'France', 'Taiwan', 'Argentina', 'Finland', 'China', 'United States', 'India', 'Lithuania', 'Spain', 'Denmark', 'South Korea', 'Mexico', 'unknown country', 'Estonia', 'Indonesia', 'Latvia', 'Canada', 'Brazil', 'Japan', 'New Zealand', 'Country', 'Poland', 'Portugal', 'Australia', 'Serbia', 'South Africa', 'United Kingdom', 'Vietnam', 'Netherlands']


In [0]:
df_transformed.show()

+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|               title|            abstract|      date|            inventor|        country|   document_id|language|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|
|['vaccinia', 'vir...|['recombinant', '...|1989-07-19|hoffmann la roche 

In [0]:
from pyspark.sql.functions import col

# Check for null values in the 'country' column
null_country_count = df_transformed.filter((col('country').isNull()) | (col('country') == '') | (col('country') == 'NULL')).count()

print("Number of null values in the 'country' column:", null_country_count)

# Alternatively, you can check for non-null values using isNotNull()
non_null_country_count = df_transformed.filter(col('country').isNotNull() & (col('country') != '') & (col('country') != 'NULL')).count()

print("Number of non-null values in the 'country' column:", non_null_country_count)



Number of null values in the 'country' column: 0
Number of non-null values in the 'country' column: 15405


In [0]:
df_silver = df_transformed# No transformation for this example

df_transformed.show()

+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|               title|            abstract|      date|            inventor|        country|   document_id|language|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|
|['vaccinia', 'vir...|['recombinant', '...|1989-07-19|hoffmann la roche 

In [0]:
%sql
DROP TABLE IF EXISTS silver_table;  -- Correcting the table name
CREATE TABLE silver_table(
    title STRING,
    abstract STRING,
    date STRING,
    inventor STRING,
    country STRING,
    document_id STRING,
    language STRING
)
USING DELTA
LOCATION '/mnt/ SilverTable'; 

In [0]:
# List files in the '/mnt/' directory
dbutils.fs.ls("/mnt/")


[FileInfo(path='dbfs:/mnt/GoldTable/', name='GoldTable/', size=0, modificationTime=1717010002000),
 FileInfo(path='dbfs:/mnt/GoldTable1/', name='GoldTable1/', size=0, modificationTime=1717010194000),
 FileInfo(path='dbfs:/mnt/GoldTable2/', name='GoldTable2/', size=0, modificationTime=1717010269000),
 FileInfo(path='dbfs:/mnt/bronze_table5/', name='bronze_table5/', size=0, modificationTime=1716992043000),
 FileInfo(path='dbfs:/mnt/bronze_table6/', name='bronze_table6/', size=0, modificationTime=1717005227000),
 FileInfo(path='dbfs:/mnt/gold_data/', name='gold_data/', size=0, modificationTime=1716995587000),
 FileInfo(path='dbfs:/mnt/silver_table/', name='silver_table/', size=0, modificationTime=1716991800000),
 FileInfo(path='dbfs:/mnt/temp_delta_location/', name='temp_delta_location/', size=0, modificationTime=1716994272000)]

In [0]:
from pyspark.sql.functions import col



# Verify the schema after casting
df_transformed.printSchema()

# Append the DataFrame to the Delta table with mergeSchema option
df_transformed.write.format("delta").mode("append").option("mergeSchema", "true").save("/mnt/SilverTable")

root
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- date: string (nullable = true)
 |-- inventor: string (nullable = true)
 |-- country: string (nullable = true)
 |-- document_id: string (nullable = true)
 |-- language: string (nullable = true)



### Gold Layer

In [0]:
df_Gold= df_silver

In [0]:
df_Gold.show()

+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|               title|            abstract|      date|            inventor|        country|   document_id|language|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|
|['vaccinia', 'vir...|['recombinant', '...|1989-07-19|hoffmann la roche 

Performe some analysis:

In [0]:
from pyspark.sql.functions import col

# Group by 'country' and count the number of patents
patents_per_country = df_Gold.groupBy('country').count().orderBy(col('count').desc())

# Show the result
patents_per_country.show(truncate=False)


+---------------+-----+
|country        |count|
+---------------+-----+
|China          |5866 |
|United States  |4926 |
|unknown country|1838 |
|Japan          |760  |
|Australia      |504  |
|Russia         |398  |
|Canada         |249  |
|Spain          |229  |
|South Korea    |177  |
|Europe         |131  |
|Taiwan         |104  |
|India          |71   |
|Denmark        |60   |
|Germany        |30   |
|Mexico         |9    |
|Netherlands    |7    |
|France         |6    |
|Country        |6    |
|New Zealand    |5    |
|United Kingdom |5    |
+---------------+-----+
only showing top 20 rows



In [0]:
df_Gold.show()

+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|               title|            abstract|      date|            inventor|        country|   document_id|language|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|
|['vaccinia', 'vir...|['recombinant', '...|1989-07-19|hoffmann la roche 

In [0]:
# Set the legacy time parser policy
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")




In [0]:
import random
from pyspark.sql.functions import udf, col, when
from pyspark.sql.types import DateType
from datetime import datetime, timedelta

# Define a function to generate a random date between 2020-01-01 and 2023-12-31
def generate_random_date():
    start_date = datetime(2020, 1, 1)
    end_date = datetime(2023, 12, 31)
    delta = end_date - start_date
    random_days = random.randint(0, delta.days)
    random_date = start_date + timedelta(days=random_days)
    return random_date

# Register the function as a UDF
generate_random_date_udf = udf(generate_random_date, DateType())

# Replace NULL dates with random dates
df_gold = df.withColumn('date', col('date').cast(DateType()))  # Ensure 'date' column is of DateType
df_gold = df_gold.withColumn('date', when(col('date').isNotNull(), col('date'))
                                    .otherwise(generate_random_date_udf()))

# Verify the replacement
df_gold.show()

+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|               title|            abstract|      date|            inventor|        country|   document_id|language|year|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|2010|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|2003|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|1999|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|2005|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|2003|
|['vaccinia', 'vir...|['recombin

In [0]:
df_g.show()

+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|               title|            abstract|      date|            inventor|        country|   document_id|language|year|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|2010|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|2003|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|1999|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|2005|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|2003|
|['vaccinia', 'vir...|['recombin

In [0]:
from pyspark.sql.functions import year, col

# Check the column names in the DataFrame
df.printSchema()

# Extract year from 'date' column
df = df_g.withColumn('year', year(col('date')))

# Group by 'year' and count the number of patents
patents_per_year = df.groupBy('year').count().orderBy(col('year'))

# Show the result
patents_per_year.show()

root
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- date: date (nullable = true)
 |-- inventor: string (nullable = true)
 |-- country: string (nullable = true)
 |-- document_id: string (nullable = true)
 |-- language: string (nullable = true)
 |-- year: integer (nullable = true)

+----+-----+
|year|count|
+----+-----+
|1973|    1|
|1979|    1|
|1982|    1|
|1983|    5|
|1984|    2|
|1985|    6|
|1986|   19|
|1987|   15|
|1988|   16|
|1989|   15|
|1990|   13|
|1991|   14|
|1992|   22|
|1993|   24|
|1994|   20|
|1995|   24|
|1996|   30|
|1997|   16|
|1998|   19|
|1999|   33|
+----+-----+
only showing top 20 rows



In [0]:
# Group by 'inventor' and count the number of patents
top_inventors = df_gold.groupBy('inventor').count().orderBy(col('count').desc()).limit(50)

# Show the result
top_inventors.show()


+--------------------+-----+
|            inventor|count|
+--------------------+-----+
|         tian kegong|   66|
|                    |   61|
|                  ru|   60|
|          zhang xuke|   56|
|lanzhou veterinar...|   34|
|         wang hualin|   31|
|          univ fudan|   28|
|        zheng haixue|   24|
|         Katie Brown|   22|
|        Laura Taylor|   20|
|      Michael Wilson|   20|
|         Laura Moore|   20|
|         Laura Brown|   20|
|      yusibov vidadi|   20|
|         Chris Brown|   20|
|hubei xinzongke v...|   19|
|       univ yangzhou|   19|
|       Michael Smith|   19|
|univ huazhong agr...|   19|
|         Chris Davis|   19|
+--------------------+-----+
only showing top 20 rows



Check null values

In [0]:
# Check for NULL values in the 'date' column
df_g.filter(col('date').isNull()).show()

# Check for NULL values in other columns (example for 'abstract')
df_g.filter(col('abstract').isNull()).show()


+-----+--------+----+--------+-------+-----------+--------+----+
|title|abstract|date|inventor|country|document_id|language|year|
+-----+--------+----+--------+-------+-----------+--------+----+
+-----+--------+----+--------+-------+-----------+--------+----+

+----------------+--------+----------+------------+-------+--------------------+--------+----+
|           title|abstract|      date|    inventor|country|         document_id|language|year|
+----------------+--------+----------+------------+-------+--------------------+--------+----+
|    ????????????|    NULL|2022-08-18|Katie Wilson|  China|        CN214209201U|   zh-cn|NULL|
|????????????????|    NULL|2022-04-12|Sarah Miller|  Japan|JP2022058425A  JP...|      ja|2022|
+----------------+--------+----------+------------+-------+--------------------+--------+----+



In [0]:
# Check for NULL values in other columns (example for 'abstract')
df_gold.filter(col('inventor').isNull()).show()

+-----+--------+----+--------+-------+-----------+--------+----+
|title|abstract|date|inventor|country|document_id|language|year|
+-----+--------+----+--------+-------+-----------+--------+----+
+-----+--------+----+--------+-------+-----------+--------+----+



In [0]:
df_g.show()

+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|               title|            abstract|      date|            inventor|        country|   document_id|language|year|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|2010|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|2003|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|1999|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|2005|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|2003|
|['vaccinia', 'vir...|['recombin

In [0]:
from pyspark.sql.functions import regexp_replace

# Remove punctuation from the 'inventor' column
df_gold = df_g.withColumn('inventor', regexp_replace(col('inventor'), '[^\w\s]', ''))

# Show the updated DataFrame
df_gold.show()


+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|               title|            abstract|      date|            inventor|        country|   document_id|language|year|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|2010|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|2003|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|1999|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|2005|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|2003|
|['vaccinia', 'vir...|['recombin

In [0]:
df_gold.show()


+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|               title|            abstract|      date|            inventor|        country|   document_id|language|year|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|2010|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|2003|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|1999|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|2005|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|2003|
|['vaccinia', 'vir...|['recombin

In [0]:
%sql
DROP TABLE IF EXISTS GOLD_table;  -- Correcting the table name
CREATE TABLE GOLD_table(
    title STRING,
    abstract STRING,
    date DATE,
    inventor STRING,
    country STRING,
    document_id STRING,
    language STRING,
    year STRING
)
USING DELTA
LOCATION '/mnt/GoldTable2'; 

In [0]:
from pyspark.sql.functions import col

# Verify the schema of df_gold
df_gold.printSchema()

# Cast the year column to STRING if it is not already
df_gold = df_gold.withColumn("year", col("year").cast("STRING"))

# Verify the schema after casting
df_gold.printSchema()

# Append the DataFrame to the Delta table with mergeSchema option
df_gold.write.format("delta").mode("append").option("mergeSchema", "true").save("/mnt/GoldTable2")


root
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- date: date (nullable = true)
 |-- inventor: string (nullable = true)
 |-- country: string (nullable = true)
 |-- document_id: string (nullable = true)
 |-- language: string (nullable = true)
 |-- year: integer (nullable = true)

root
 |-- title: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- date: date (nullable = true)
 |-- inventor: string (nullable = true)
 |-- country: string (nullable = true)
 |-- document_id: string (nullable = true)
 |-- language: string (nullable = true)
 |-- year: string (nullable = true)



Top Words:

In [0]:
gold_df = spark.read.format("delta").load("/mnt/GoldTable2")



In [0]:
gold_df.show()

+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|               title|            abstract|      date|            inventor|        country|   document_id|language|year|
+--------------------+--------------------+----------+--------------------+---------------+--------------+--------+----+
|['auxiliaire', 'p...|['linvention', 'c...|2010-12-23|goracon engineeri...|unknown country|wo2010145768a1|      fr|2010|
|['syncretic', 'pr...|['antivirus', 'an...|2003-07-23|dongkanglong viru...|          China|    cn1431018a|      en|2003|
|['vecteur', 'de',...|['...', 'sutilise...|1999-02-25|hepavec ag fuer g...|unknown country|   wo9909193a1|      fr|1999|
|['lie7', 'recombi...|['...', 'hpv16', ...|2005-11-16|virus diseases pr...|          China|    cn1696152a|      en|2005|
|['virus', 'de', '...|['linvention', 'c...|2003-11-20|gsf forschungszen...|unknown country|  wo03095656a1|      fr|2003|
|['vaccinia', 'vir...|['recombin

In [0]:

# Convert Spark DataFrame to Pandas DataFrame
df_pandas = gold_df.toPandas()




In [0]:
df_pandas

Unnamed: 0,title,abstract,date,inventor,country,document_id,language,year
0,"['auxiliaire', 'permettant', 'la', 'monte', 'd...","['linvention', 'concerne', 'un', 'auxiliaire',...",2010-12-23,goracon engineering gmbh,unknown country,wo2010145768a1,fr,2010
1,"['syncretic', 'protein', 'product', 'human', '...","['antivirus', 'antalgic', 'methionine', 'enkep...",2003-07-23,dongkanglong virus biotechnolo,China,cn1431018a,en,2003
2,"['vecteur', 'de', 'baculovirus', 'modifie', 'p...","['...', 'sutiliser', 'en', 'gnothrapie', 'chez...",1999-02-25,hepavec ag fuer gentherapie de,unknown country,wo9909193a1,fr,1999
3,"['lie7', 'recombined', 'protein', '16', 'type'...","['...', 'hpv16', 'chitoprotein', 'gene', 'l1',...",2005-11-16,virus diseases prevention co,China,cn1696152a,en,2005
4,"['virus', 'de', 'variole', 'aviaire', 'recombi...","['linvention', 'concerne', 'un', 'virus', 'de'...",2003-11-20,gsf forschungszentrum umwelt de,unknown country,wo03095656a1,fr,2003
...,...,...,...,...,...,...,...,...
15400,materials methods treatment human genetic dise...,description crossreference related application...,2018-07-19,Matthew Hebden Porteus,United States,US-2018200387-A1,en,2018
15401,sweet pepper hybrid,description crossreference related application...,2011-09-06,William McCarthy,United States,US-8013222-B2,en,2011
15402,immunoconjugates methods producing,description translated korean immunoconjugates...,2019-03-26,John Taylor,South Korea,KR-101962476-B1,en,2019
15403,anticd antibody use treatment prevention cance...,description cross references related applicati...,2012-12-25,CheLeung Law Julie McEarchern Alan F Wahl,United States,US-8337838-B2,en,2012


In [0]:
import pandas as pd
import re
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS

# Function to join list of strings safely
def join_strings(lst):
    return ' '.join(lst) if isinstance(lst, list) else ''

# Combine title and abstract columns
df_pandas['text'] = df_pandas['title'].apply(join_strings) + ' ' + df_pandas['abstract'].apply(join_strings)

# Convert to lowercase
df_pandas['text'] = df_pandas['text'].str.lower()

# Function to preprocess text
def preprocess_text(text):
    # Remove punctuation and non-alphabetic characters
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    # Tokenize
    words = text.split()
    # Remove stop words
    words = [word for word in words if word not in ENGLISH_STOP_WORDS]
    return words

# Apply preprocessing
df_pandas['words'] = df_pandas['text'].apply(preprocess_text)

# Explode the words column into individual words
df_words = df_pandas.explode('words').dropna(subset=['words'])




In [0]:
df_pandas

Unnamed: 0,title,abstract,date,inventor,country,document_id,language,year,text,words
0,"['auxiliaire', 'permettant', 'la', 'monte', 'd...","['linvention', 'concerne', 'un', 'auxiliaire',...",2010-12-23,goracon engineering gmbh,unknown country,wo2010145768a1,fr,2010,,[]
1,"['syncretic', 'protein', 'product', 'human', '...","['antivirus', 'antalgic', 'methionine', 'enkep...",2003-07-23,dongkanglong virus biotechnolo,China,cn1431018a,en,2003,,[]
2,"['vecteur', 'de', 'baculovirus', 'modifie', 'p...","['...', 'sutiliser', 'en', 'gnothrapie', 'chez...",1999-02-25,hepavec ag fuer gentherapie de,unknown country,wo9909193a1,fr,1999,,[]
3,"['lie7', 'recombined', 'protein', '16', 'type'...","['...', 'hpv16', 'chitoprotein', 'gene', 'l1',...",2005-11-16,virus diseases prevention co,China,cn1696152a,en,2005,,[]
4,"['virus', 'de', 'variole', 'aviaire', 'recombi...","['linvention', 'concerne', 'un', 'virus', 'de'...",2003-11-20,gsf forschungszentrum umwelt de,unknown country,wo03095656a1,fr,2003,,[]
...,...,...,...,...,...,...,...,...,...,...
15400,materials methods treatment human genetic dise...,description crossreference related application...,2018-07-19,Matthew Hebden Porteus,United States,US-2018200387-A1,en,2018,,[]
15401,sweet pepper hybrid,description crossreference related application...,2011-09-06,William McCarthy,United States,US-8013222-B2,en,2011,,[]
15402,immunoconjugates methods producing,description translated korean immunoconjugates...,2019-03-26,John Taylor,South Korea,KR-101962476-B1,en,2019,,[]
15403,anticd antibody use treatment prevention cance...,description cross references related applicati...,2012-12-25,CheLeung Law Julie McEarchern Alan F Wahl,United States,US-8337838-B2,en,2012,,[]


In [0]:
def clean_text(text):
    if not isinstance(text, str):
        return ''
    # Remove square brackets
    text = re.sub(r'\[|\]', '', text)
    # Remove single quotes
    text = re.sub(r"'", '', text)
    # Remove any other punctuation and special characters except underscores and hyphens
    text = re.sub(r'[^a-zA-Z_\-\s]', '', text)
    return text


# Apply cleaning function to title and abstract columns
df_pandas['title'] = df_pandas['title'].apply(clean_text)
df_pandas['abstract'] = df_pandas['abstract'].apply(clean_text)


In [0]:
df_pandas

Unnamed: 0,title,abstract,date,inventor,country,document_id,language,year,text,words
0,auxiliaire permettant la monte dune personne o...,linvention concerne un auxiliaire permettant l...,2010-12-23,goracon engineering gmbh,unknown country,wo2010145768a1,fr,2010,,[]
1,syncretic protein product human methionine enk...,antivirus antalgic methionine enkephalin inter...,2003-07-23,dongkanglong virus biotechnolo,China,cn1431018a,en,2003,,[]
2,vecteur de baculovirus modifie par coque de pr...,sutiliser en gnothrapie chez lhomme le vecteu...,1999-02-25,hepavec ag fuer gentherapie de,unknown country,wo9909193a1,fr,1999,,[]
3,lie recombined protein type human papilomavir...,hpv chitoprotein gene l hpv gene engineering ...,2005-11-16,virus diseases prevention co,China,cn1696152a,en,2005,,[]
4,virus de variole aviaire recombinant,linvention concerne un virus de variole aviair...,2003-11-20,gsf forschungszentrum umwelt de,unknown country,wo03095656a1,fr,2003,,[]
...,...,...,...,...,...,...,...,...,...,...
15400,materials methods treatment human genetic dise...,description crossreference related application...,2018-07-19,Matthew Hebden Porteus,United States,US-2018200387-A1,en,2018,,[]
15401,sweet pepper hybrid,description crossreference related application...,2011-09-06,William McCarthy,United States,US-8013222-B2,en,2011,,[]
15402,immunoconjugates methods producing,description translated korean immunoconjugates...,2019-03-26,John Taylor,South Korea,KR-101962476-B1,en,2019,,[]
15403,anticd antibody use treatment prevention cance...,description cross references related applicati...,2012-12-25,CheLeung Law Julie McEarchern Alan F Wahl,United States,US-8337838-B2,en,2012,,[]


In [0]:
# Combine title and abstract columns for analysis
df_pandas['text'] = df_pandas['title'] + ' ' + df_pandas['abstract']

# Convert to lowercase
df_pandas['text'] = df_pandas['text'].str.lower()

In [0]:
df_pandas


Unnamed: 0,title,abstract,date,inventor,country,document_id,language,year,text,words,processed_text
0,auxiliaire permettant la monte dune personne o...,linvention concerne un auxiliaire permettant l...,2010-12-23,goracon engineering gmbh,unknown country,wo2010145768a1,fr,2010,auxiliaire permettant la monte dune personne o...,[],
1,syncretic protein product human methionine enk...,antivirus antalgic methionine enkephalin inter...,2003-07-23,dongkanglong virus biotechnolo,China,cn1431018a,en,2003,syncretic protein product human methionine enk...,[],
2,vecteur de baculovirus modifie par coque de pr...,sutiliser en gnothrapie chez lhomme le vecteu...,1999-02-25,hepavec ag fuer gentherapie de,unknown country,wo9909193a1,fr,1999,vecteur de baculovirus modifie par coque de pr...,[],
3,lie recombined protein type human papilomavir...,hpv chitoprotein gene l hpv gene engineering ...,2005-11-16,virus diseases prevention co,China,cn1696152a,en,2005,lie recombined protein type human papilomavir...,[],
4,virus de variole aviaire recombinant,linvention concerne un virus de variole aviair...,2003-11-20,gsf forschungszentrum umwelt de,unknown country,wo03095656a1,fr,2003,virus de variole aviaire recombinant linventio...,[],
...,...,...,...,...,...,...,...,...,...,...,...
15400,materials methods treatment human genetic dise...,description crossreference related application...,2018-07-19,Matthew Hebden Porteus,United States,US-2018200387-A1,en,2018,materials methods treatment human genetic dise...,[],
15401,sweet pepper hybrid,description crossreference related application...,2011-09-06,William McCarthy,United States,US-8013222-B2,en,2011,sweet pepper hybrid description crossreference...,[],
15402,immunoconjugates methods producing,description translated korean immunoconjugates...,2019-03-26,John Taylor,South Korea,KR-101962476-B1,en,2019,immunoconjugates methods producing description...,[],
15403,anticd antibody use treatment prevention cance...,description cross references related applicati...,2012-12-25,CheLeung Law Julie McEarchern Alan F Wahl,United States,US-8337838-B2,en,2012,anticd antibody use treatment prevention cance...,[],


In [0]:
# Explode the words column into individual words
df_words = df_pandas.explode('text').dropna(subset=['text'])

# Count word occurrences
word_counts = df_words['text'].value_counts().reset_index()
word_counts.columns = ['text', 'count']


In [0]:
df_pandas['text']

0        auxiliaire permettant la monte dune personne o...
1        syncretic protein product human methionine enk...
2        vecteur de baculovirus modifie par coque de pr...
3        lie recombined protein  type human papilomavir...
4        virus de variole aviaire recombinant linventio...
                               ...                        
15400    materials methods treatment human genetic dise...
15401    sweet pepper hybrid description crossreference...
15402    immunoconjugates methods producing description...
15403    anticd antibody use treatment prevention cance...
15404    repair antibodies cdr description translated s...
Name: text, Length: 15405, dtype: object

In [0]:
from collections import Counter
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS

# Tokenize the text and remove stop words
df_pandas['words'] = df_pandas['text'].apply(lambda x: [word for word in x.split()])

# Flatten the list of words and count word occurrences
all_words = [word for words_list in df_pandas['words'] for word in words_list]
word_counts = Counter(all_words)

# Convert to a DataFrame for better visualization
word_counts_df = pd.DataFrame(word_counts.items(), columns=['word', 'count']).sort_values(by='count', ascending=False)

# Display the top 20 words
top_words = word_counts_df.head(20)
print("Top 20 words:")
print(top_words)



Top 20 words:
             word   count
423          acid  218019
188      sequence  216811
632            id  191627
631           seq  188135
693           one  187253
380          cell  171104
169     invention  167848
301         cells  160100
422         amino  145960
1365          may  145415
236      antibody  130344
65        protein  124362
1612  embodiments  123336
1070        least  121412
281       present  107366
207             c  103244
92           used   99915
6901      example   96938
4375       herein   91498
63            fig   90743


In [0]:
word_counts_df

Unnamed: 0,word,count
423,acid,218019
188,sequence,216811
632,id,191627
631,seq,188135
693,one,187253
...,...,...
226979,proxied,1
226982,certifies,1
226985,fictitiously,1
226986,uninspected,1


In [0]:
word_count=spark.createDataFrame(word_counts_df)

In [0]:
word_count.printSchema

<bound method DataFrame.printSchema of DataFrame[word: string, count: bigint]>

In [0]:
%sql
DROP TABLE IF EXISTS Word_table;  -- Correcting the table name
CREATE TABLE Word_table(
    word STRING,
   count BIGINT
)
USING DELTA
LOCATION '/mnt/WordTable2'; 

In [0]:
# Append the DataFrame to the Delta table with mergeSchema option
word_count.write.format("delta").mode("append").option("mergeSchema", "true").save("/mnt/WordTable2")