
**Source**
/FileStore/tables/articles/2024_2025.csv

**Imports**

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import when, col
from pyspark.sql.types import IntegerType, StringType, BooleanType

In [0]:
%fs

ls /FileStore/tables/articles/2024_2025.csv

path,name,size,modificationTime
dbfs:/FileStore/tables/articles/2024_2025.csv,2024_2025.csv,195961853,1730978546000


In [0]:
df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("/FileStore/tables/articles/2024_2025.csv")

In [0]:
df = (
    df.withColumn("Authors", df["Authors"].cast(StringType()))
      .withColumn("Author full names", df["Author full names"].cast(StringType()))
      .withColumn("Author(s) ID", df["Author(s) ID"].cast(StringType()))
      .withColumn("Title", df["Title"].cast(StringType()))
      .withColumn("Year", df["Year"].cast(IntegerType()))
      .withColumn("Source title", df["Source title"].cast(StringType()))
      .withColumn("Volume", df["Volume"].cast(IntegerType()))
      .withColumn("Issue", df["Issue"].cast(IntegerType()))
      .withColumn("Art. No.", df["`Art. No.`"].cast(IntegerType()))  # Add backticks here
      .withColumn("Page start", df["Page start"].cast(IntegerType()))
      .withColumn("Page end", df["Page end"].cast(IntegerType()))
      .withColumn("Page count", df["Page count"].cast(IntegerType()))
      .withColumn("Cited by", df["Cited by"].cast(IntegerType()))
      .withColumn("DOI", df["DOI"].cast(StringType()))
      .withColumn("Link", df["Link"].cast(StringType()))
      .withColumn("Affiliations", df["Affiliations"].cast(StringType()))
      .withColumn("Authors with affiliations", df["Authors with affiliations"].cast(StringType()))
      .withColumn("Abstract", df["Abstract"].cast(StringType()))
      .withColumn("Author Keywords", df["Author Keywords"].cast(StringType()))
      .withColumn("Index Keywords", df["Index Keywords"].cast(StringType()))
      .withColumn("Molecular Sequence Numbers", df["Molecular Sequence Numbers"].cast(StringType()))
      .withColumn("Chemicals/CAS", df["Chemicals/CAS"].cast(StringType()))
      .withColumn("Tradenames", df["Tradenames"].cast(StringType()))
      .withColumn("Manufacturers", df["Manufacturers"].cast(StringType()))
      .withColumn("Funding Details", df["Funding Details"].cast(StringType()))
      .withColumn("Funding Texts", df["Funding Texts"].cast(StringType()))
      .withColumn("References", df["References"].cast(StringType()))
      .withColumn("Correspondence Address", df["Correspondence Address"].cast(StringType()))
      .withColumn("Editors", df["Editors"].cast(StringType()))
      .withColumn("Publisher", df["Publisher"].cast(StringType()))
      .withColumn("Sponsors", df["Sponsors"].cast(StringType()))
      .withColumn("Conference name", df["Conference name"].cast(StringType()))
      .withColumn("Conference date", df["Conference date"].cast(StringType()))
      .withColumn("Conference location", df["Conference location"].cast(StringType()))
      .withColumn("Conference code", df["Conference code"].cast(StringType()))
      .withColumn("ISSN", df["ISSN"].cast(StringType()))
      .withColumn("ISBN", df["ISBN"].cast(StringType()))
      .withColumn("CODEN", df["CODEN"].cast(StringType()))
      .withColumn("PubMed ID", df["PubMed ID"].cast(StringType()))
      .withColumn("Language of Original Document", df["Language of Original Document"].cast(StringType()))
      .withColumn("Abbreviated Source Title", df["Abbreviated Source Title"].cast(StringType()))
      .withColumn("Document Type", df["Document Type"].cast(StringType()))
      .withColumn("Publication Stage", df["Publication Stage"].cast(StringType()))
      .withColumn("Open Access", when(col("Open Access").isNotNull(), True).otherwise(False).cast(BooleanType()))  # Boolean based on null check
      .withColumn("Source", df["Source"].cast(StringType()))
      .withColumn("EID", df["EID"].cast(StringType()))
)


In [0]:
df.printSchema()

root
 |-- Authors: string (nullable = true)
 |-- Author full names: string (nullable = true)
 |-- Author(s) ID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Source title: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Issue: integer (nullable = true)
 |-- Art. No.: integer (nullable = true)
 |-- Page start: integer (nullable = true)
 |-- Page end: integer (nullable = true)
 |-- Page count: integer (nullable = true)
 |-- Cited by: integer (nullable = true)
 |-- DOI: string (nullable = true)
 |-- Link: string (nullable = true)
 |-- Affiliations: string (nullable = true)
 |-- Authors with affiliations: string (nullable = true)
 |-- Abstract: string (nullable = true)
 |-- Author Keywords: string (nullable = true)
 |-- Index Keywords: string (nullable = true)
 |-- Molecular Sequence Numbers: string (nullable = true)
 |-- Chemicals/CAS: string (nullable = true)
 |-- Tradenames: string (nullable = true)
 |-- Manu

In [0]:
from pyspark.sql import functions as F

df = df.withColumn(
    "Title",
    F.ltrim(F.regexp_replace(F.col("Title"), "[å‘˜å·¥çš„æ•Œäººè¿˜æ˜¯æœ‹å‹ï¼šäººå·¥æ™ºèƒ½ä¸Žå‘˜å·¥ç¦»èŒå€¾å‘çš„å…³ç³»ç ”ç©¶\"]", ""))
)

In [0]:
# Corrected transformation for the 'CODEN' column
df = df.withColumn(
    "CODEN",
    F.when(
        F.col("CODEN").rlike(".*\\d.*") & F.col("CODEN").rlike(".*[^a-zA-Z].*"),  # Contains numbers and non-letter characters
        "no coden"
    ).when(
        F.size(F.split(F.col("CODEN"), " ")) > 1,  # More than one word
        "non coden"
    ).otherwise(
        F.col("CODEN")  # Keep original value
    )
)



In [0]:
# Sort the DataFrame by the 'Title' column in dictionary order
df_sorted = df.orderBy(F.col("Tradenames").asc())

# Show the sorted result
df_sorted.filter(F.col("Tradenames").isNotNull()).select("Tradenames").show()

+--------------------+
|          Tradenames|
+--------------------+
| 100306]. Referen...|
|       67 ppm “Cd67”|
| Abdelmalek Essaâ...|
| Association of A...|
|           Beni-Suef|
|           Brazilian|
| COMSATS Institut...|
|          Casablanca|
|      DarijaBert-mix|
|            Edmonton|
|                FSTH|
| Faculty of Scien...|
| Fourier transfor...|
|             France"|
| Germany; Fakultä...|
| Germany; Fakultä...|
| Germany; Fakultä...|
| Germany; Fakultä...|
|          HIV status|
|             Hamburg|
+--------------------+
only showing top 20 rows



In [0]:
df = df.withColumn(
    "Sponsors",
    F.when(
        ~F.col("Sponsors").rlike("^[a-zA-Z]"),  # If the value does not start with a letter
        None  # Set it to null
    ).when(
        F.size(F.split(F.col("Sponsors"), " ")) < 3,  # If it has less than 3 words
        None  # Set it to null
    ).when(
        F.col("Sponsors").rlike("^[a-z]"),  # If it starts with a lowercase letter
        None  # Set it to null
    ).otherwise(F.col("Sponsors"))
)

In [0]:
df = df.withColumn(
    "Tradenames",
    F.when(
        ~F.col("Tradenames").rlike("^[a-zA-Z]"),  # If the value does not start with a letter
        None  # Set it to null
    ).when(
        F.size(F.split(F.col("Tradenames"), " ")) < 3,  # If it has less than 3 words
        None  # Set it to null
    ).when(
        F.col("Tradenames").rlike("^[a-z]"),  # If it starts with a lowercase letter
        None  # Set it to null
    ).otherwise(F.col("Tradenames"))
)

In [0]:
df = df.withColumn(
    "PubMed ID",
    F.when(
        F.col("PubMed ID").rlike("[a-zA-Z]"),  # Contains any text (letters)
        None
    ).when(
        ~F.col("PubMed ID").rlike("^[0-9]"),  # Does not start with a number
        None
    ).when(
        F.length(F.regexp_replace(F.col("PubMed ID"), "[^0-9]", "")) < 5,  # Fewer than 5 digits
        None
    ).otherwise(F.col("PubMed ID"))  # Keep original value if none of the conditions are met
)

In [0]:

df = df.withColumn(
    "Publisher",
    F.trim(
        F.when(
            ~F.col("Publisher").rlike("^[a-zA-Z]"),  # If the value does not start with a letter
            None  # Set it to null
        ).otherwise(
            F.expr(
                """
                regexp_replace(
                    case 
                        when Publisher rlike '^[a-z]' then regexp_extract(Publisher, '[A-Z].*', 0)
                        else Publisher 
                    end, 
                    '"', ''
                )
                """
            )
        )
    )
)


In [0]:
df = df.withColumn(
    "Correspondence Address",
    F.when(
        ~F.col("Correspondence Address").rlike("^[a-zA-Z]"),  # Does not start with a letter
        "Unknown"
    ).otherwise(F.col("Correspondence Address"))
)


In [0]:
df = df.withColumn(
    "ISSN",
    F.when(
        F.col("ISSN").rlike("[a-zA-Z]{3,}"),  # Contains more than 2 letters
        None
    ).when(
        F.col("ISSN").rlike("^[0-9]"),  # Does not start with a number
        None
    ).when(
        F.length(F.regexp_replace(F.col("ISSN"), "[^0-9]", "")) < 5,  # Fewer than 5 digits
        None
    ).otherwise(F.col("ISSN"))  # Keep original value if none of the conditions are met
)


In [0]:
df = df.withColumn(
    "ISBN",
    F.when(
        F.col("ISBN").rlike("[a-zA-Z]"),  # Contains any text (letters)
        None
    ).when(
        ~F.col("ISBN").rlike("^[0-9]"),  # Does not start with a number
        None
    ).when(
        F.length(F.regexp_replace(F.col("ISBN"), "[^0-9]", "")) < 5,  # Fewer than 5 digits
        None
    ).otherwise(F.col("ISBN"))  # Keep original value if none of the conditions are met
)


In [0]:
df = df.withColumn(
    "Manufacturers",
    F.when(
        F.col("Manufacturers").rlike("^[^a-zA-Z]"),  
        None  
    ).otherwise(F.col("Manufacturers"))  
)

In [0]:
df = df.withColumn(
    "Chemicals/CAS",
    F.when(
        F.col("Chemicals/CAS").rlike("^[^a-zA-Z0-9]"),  
        None  
    ).otherwise(F.col("Chemicals/CAS"))  
)


In [0]:
df = df.withColumn(
    "Authors",
    F.when(F.col("Authors").rlike("^[^a-zA-Z]"), "Unknown").otherwise(F.col("Authors"))
)


In [0]:
df = df.withColumn(
    "Author full names",
    F.when(F.col("Author full names").rlike("^[^a-zA-Z]"), "Unknown").otherwise(F.col("Author full names"))
)


In [0]:
df = df.withColumn(
    "Abstract",
    F.when(
        F.col("Abstract").rlike("^[^a-zA-Z]"),  # Starts with non-letter character
        F.regexp_replace(F.col("Abstract"), "^[^a-zA-Z]+", "")  # Remove non-letters at the beginning
    ).otherwise(F.col("Abstract"))
)

# Remove all double quotes (") from the entire 'Abstract' column
df = df.withColumn("Abstract", F.regexp_replace(F.col("Abstract"), '"', ''))


In [0]:
df[['Open Access']].show(40)


+-----------+
|Open Access|
+-----------+
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|       true|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|       true|
|       true|
|      false|
|      false|
|       true|
|      false|
|      false|
|       true|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
|      false|
+-----------+
only showing top 40 rows



In [0]:
df.columns

Out[21]: ['Authors',
 'Author full names',
 'Author(s) ID',
 'Title',
 'Year',
 'Source title',
 'Volume',
 'Issue',
 'Art. No.',
 'Page start',
 'Page end',
 'Page count',
 'Cited by',
 'DOI',
 'Link',
 'Affiliations',
 'Authors with affiliations',
 'Abstract',
 'Author Keywords',
 'Index Keywords',
 'Molecular Sequence Numbers',
 'Chemicals/CAS',
 'Tradenames',
 'Manufacturers',
 'Funding Details',
 'Funding Texts',
 'References',
 'Correspondence Address',
 'Editors',
 'Publisher',
 'Sponsors',
 'Conference name',
 'Conference date',
 'Conference location',
 'Conference code',
 'ISSN',
 'ISBN',
 'CODEN',
 'PubMed ID',
 'Language of Original Document',
 'Abbreviated Source Title',
 'Document Type',
 'Publication Stage',
 'Open Access',
 'Source',
 'EID']

In [0]:
df.select("Author full names").show()


+--------------------+
|   Author full names|
+--------------------+
|Elbour, Aziz (259...|
|El Ouardi, Otmane...|
|Benallel, Abderra...|
|Bousseta, Mohamme...|
|Lemnawar, A. (589...|
|Mounnan, Oussama ...|
|Li, Xue (57221400...|
|Ouchn, Rachid (57...|
|Ahmida, Youssef (...|
|Farooq, Muhammad ...|
|Laamari, Yassine ...|
|Ettalibi, Nouhail...|
|Bifari, Elham N. ...|
|Laamari, Yassine ...|
|Adardour, Mohamed...|
|Alaoui, S. Bouham...|
|Rouzi, Khouloud (...|
|Abad, Nadeem (571...|
|Haloui, Doha (585...|
|Seqqat, Yousra (5...|
+--------------------+
only showing top 20 rows



In [0]:
df = df.dropDuplicates(["Title"])


In [0]:
df = df.filter(df['Title'].isNotNull())


In [0]:
df.filter(df['Title'].isNull()).count()


Out[25]: 0

In [0]:
df.filter(df['Page start'] == -1).count()


Out[26]: 0

In [0]:
#Add a Column where the authors are splitted as an Array
df = df.withColumn("Authors", F.split(F.col("Authors"), ",|;")) 
df = df.withColumn("Authors", F.expr("transform(Authors, x -> trim(lower(x)))"))


In [0]:
df.select(df["Author Full Names"]).show(100)


+--------------------+
|   Author Full Names|
+--------------------+
|Mesrar, M. (57201...|
|Oulhakem, Oussama...|
|Ouhaddou, Madani ...|
|Amenchar, Ouiam (...|
|Koucham, M’hamed ...|
|Abbad, Abdelghafo...|
|Aboussaleh, Ilyas...|
|Haboubi, Chaimae ...|
|Benatiya Andalous...|
|Oumoussa, Idris (...|
|Al Awaidy, Salah ...|
|Kim, Hwankoo (569...|
|Bouhssini, Ahmed ...|
|Belkadi, Said (57...|
|Douhi, Saïd (5813...|
|Briache, Ayoub (5...|
|Seridi, Karim (58...|
|Belatik, Assia (5...|
|Akiirne, Ziad (59...|
|Refki, Siham (566...|
|Qasery, Mouna El ...|
|Hakkoum, Hajar (5...|
|Houmma, Ismaguil ...|
|Boumajdi, Nasma (...|
|Ben Hssain, Idris...|
|Achagar, Redouane...|
|Bouanani, Oumaima...|
|Boussatta, Hicham...|
|Mimi, Safaa (5855...|
|Maskar, E. (57221...|
|Moustabchir, Abde...|
|Dahbi, Houda (593...|
|Zarrouk, Yassine ...|
|Boutyour, Youness...|
|Chait, Fatima (58...|
|Rbah, Yahya (5757...|
|Mohamed, Ait Yous...|
|Ed-daoudy, Lhouss...|
|Hamdoune, Meryem ...|
|Yamni, Mohamed (5...|
|Elmhaia, O

In [0]:
#Add two columns where the authors id and full names are splitted as an Array
df = df.withColumn("Author(s) ID", F.split(F.col("`Author(s) ID`"), ",|;"))
df = df.withColumn("Author(s) ID", F.expr("transform(`Author(s) ID`, x -> trim(x))"))


In [0]:
df = df.withColumn(
    "Author Full Names",
    F.expr("""
        transform(
            split(`Author Full Names`, ';'),
            x -> trim(
                    regexp_replace(regexp_replace(x, '[0-9()]+', ''), ',', '')  -- Remove numbers, parentheses, and commas
                )
        )
    """)
)




In [0]:
df.columns

Out[31]: ['Authors',
 'Author Full Names',
 'Author(s) ID',
 'Title',
 'Year',
 'Source title',
 'Volume',
 'Issue',
 'Art. No.',
 'Page start',
 'Page end',
 'Page count',
 'Cited by',
 'DOI',
 'Link',
 'Affiliations',
 'Authors with affiliations',
 'Abstract',
 'Author Keywords',
 'Index Keywords',
 'Molecular Sequence Numbers',
 'Chemicals/CAS',
 'Tradenames',
 'Manufacturers',
 'Funding Details',
 'Funding Texts',
 'References',
 'Correspondence Address',
 'Editors',
 'Publisher',
 'Sponsors',
 'Conference name',
 'Conference date',
 'Conference location',
 'Conference code',
 'ISSN',
 'ISBN',
 'CODEN',
 'PubMed ID',
 'Language of Original Document',
 'Abbreviated Source Title',
 'Document Type',
 'Publication Stage',
 'Open Access',
 'Source',
 'EID']

****Done with authors****

In [0]:
df = df.withColumn("Affiliations_Cleaned", F.regexp_replace(F.col("Affiliations"), ",", " "))
df = df.withColumn("Affiliations", F.split(F.col("Affiliations_Cleaned"), ";"))

# Optionally, drop the temporary 'Affiliations_Cleaned' column
df = df.drop("Affiliations_Cleaned")

In [0]:
from pyspark.sql import functions as F

# Truncate the 'Abstract' column to 3000 characters
df = df.withColumn("Abstract", F.when(F.length(F.col("Abstract")) > 3000, F.expr("substring(Abstract, 1, 3000)"))
                  .otherwise(F.col("Abstract")))


**done with affiliations**

In [0]:
df = df.withColumnRenamed("Art. No.", "article_number")


In [0]:
df.columns

Out[35]: ['Authors',
 'Author Full Names',
 'Author(s) ID',
 'Title',
 'Year',
 'Source title',
 'Volume',
 'Issue',
 'article_number',
 'Page start',
 'Page end',
 'Page count',
 'Cited by',
 'DOI',
 'Link',
 'Affiliations',
 'Authors with affiliations',
 'Abstract',
 'Author Keywords',
 'Index Keywords',
 'Molecular Sequence Numbers',
 'Chemicals/CAS',
 'Tradenames',
 'Manufacturers',
 'Funding Details',
 'Funding Texts',
 'References',
 'Correspondence Address',
 'Editors',
 'Publisher',
 'Sponsors',
 'Conference name',
 'Conference date',
 'Conference location',
 'Conference code',
 'ISSN',
 'ISBN',
 'CODEN',
 'PubMed ID',
 'Language of Original Document',
 'Abbreviated Source Title',
 'Document Type',
 'Publication Stage',
 'Open Access',
 'Source',
 'EID']

In [0]:
df[['Authors with affiliations']].show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
df = df.fillna({"Page start": -1, "Page end": -1})


In [0]:
df = df.fillna({"Cited by": 0})


In [0]:
df = df.fillna({"DOI": 0})


In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import MapType, StringType

# Define the UDF to create the dictionary
def create_author_affiliation_dict(authors_with_affiliations):
    # Split the authors and affiliations by ';'
    author_affiliation_pairs = authors_with_affiliations.split(';')
    
    # Create the dictionary
    author_affiliation_dict = {}
    for pair in author_affiliation_pairs:
        # Split each pair into author and affiliation
        parts = pair.split(',')
        author = parts[0].strip()  # Get the author's name
        affiliation = ', '.join(parts[1:]).strip()  # Join the remaining parts as the affiliation
        author_affiliation_dict[author] = affiliation

    return author_affiliation_dict

# Register the UDF
create_author_affiliation_dict_udf = udf(create_author_affiliation_dict, MapType(StringType(), StringType()))

# Apply the UDF to the DataFrame
df = df.withColumn("'Authors with affiliations", create_author_affiliation_dict_udf(df['Authors with affiliations']))

# Show the results
df.select("'Authors with affiliations").show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
#Add two columns where the authors id and full names are splitted as an Array
df = df.withColumn("Author Keywords", F.split(F.col("Author Keywords"), ",|;"))
df = df.withColumn("Author Keywords", F.expr("transform(`Author Keywords`, x -> trim(x))"))

In [0]:
#Add two columns where the authors id and full names are splitted as an Array
df = df.withColumn("Index Keywords", F.split(F.col("Index Keywords"), ",|;"))
df = df.withColumn("Index Keywords", F.expr("transform(`Index Keywords`, x -> trim(x))"))

In [0]:
df.printSchema()

root
 |-- Authors: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Author Full Names: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Author(s) ID: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Title: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Source title: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Issue: integer (nullable = true)
 |-- article_number: integer (nullable = true)
 |-- Page start: integer (nullable = false)
 |-- Page end: integer (nullable = false)
 |-- Page count: integer (nullable = true)
 |-- Cited by: integer (nullable = false)
 |-- DOI: string (nullable = false)
 |-- Link: string (nullable = true)
 |-- Affiliations: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Authors with affiliations: string (nullable = true)
 |-- Abstract: string (nullable = true)
 |-- Author Keywords: array (nullable = true)

In [0]:
df = df.fillna({'Tradenames': 'Unknown'})


In [0]:
df = df.fillna({'Manufacturers': 'Unknown'})


In [0]:
df = df.fillna({'Funding Details': 'Unknown'})


In [0]:
df = df.fillna({'Funding Texts': 'Unknown'})


In [0]:
df = df.withColumn("References", F.regexp_replace(F.col("References"), ",", "."))
df = df.withColumn("References", F.split(F.col("References"), ";"))
df = df.withColumn("References", F.expr("transform(`References`, x -> trim(x))"))

In [0]:
df = df.withColumn(
    "References",
    F.when(F.col("References").isNull(), F.array(F.lit("Unknown"))).otherwise(F.col("References"))
)

In [0]:
df = df.fillna({'Correspondence Address': 'Unknown'})


In [0]:
df = df.fillna({'Editors': 'Unknown'})


In [0]:
df = df.fillna({'Sponsors': 'Unknown'})


In [0]:
df = df.fillna({'Conference location': 'Unknown'})
df = df.fillna({'Conference name': 'Unknown'})



In [0]:
# Get the number of rows
row_count = df.count()

print(f"Total number of rows: {row_count}")


Total number of rows: 14935


In [0]:
# Get the data type of a specific column
column_type = df.schema['Conference date'].dataType
print(f"Data type of the column 'Conference date': {column_type}")


Data type of the column 'Conference date': StringType()


In [0]:


# Extract start and end dates using regex and convert them to DATE type
df = df.withColumn("Start_date", F.to_date(F.regexp_extract(F.col("Conference date"), r"(\d{1,2} \w+ \d{4})", 1), "d MMM yyyy"))
df = df.withColumn("End_date", F.to_date(F.regexp_extract(F.col("Conference date"), r"through (\d{1,2} \w+ \d{4})", 1), "d MMM yyyy"))



In [0]:
df = df.withColumn("ISSN", F.when(df['ISSN'].isNull(), "No ISSN").otherwise(df['ISSN']))
df = df.withColumn("ISBN", F.when(df['ISBN'].isNull(), "No ISBN").otherwise(df['ISBN']))
df = df.withColumn("CODEN", F.when(df['CODEN'].isNull(), "No CODEN").otherwise(df['CODEN']))
df = df.withColumn("PubMed ID", F.when(df['PubMed ID'].isNull(), "No PubMed ID").otherwise(df['PubMed ID']))





In [0]:
df.select("CODEN").distinct().show(truncate=False)
distinct_count = df.select("CODEN").distinct().count()

print(f"Number of distinct CODEN values: {distinct_count}")


+---------+
|CODEN    |
+---------+
|OCPHC    |
|No CODEN |
|ECMAD    |
|IJHMA    |
|IJHED    |
|OCMAE    |
|JECMA    |
|PSISD    |
|no coden |
|PPPYA    |
|JCEAA    |
|RPCHD    |
|SSIOD    |
|MCSID    |
|non coden|
|CMSTC    |
|CAGDE    |
|CPEAE    |
|ESPLE    |
|JMOSB    |
+---------+
only showing top 20 rows

Number of distinct CODEN values: 1014


In [0]:
df.select("Language of Original Document").distinct().show(truncate=False)
distinct_count = df.select("Language of Original Document").distinct().count()

print(f"Number of distinct Language of Original Document values: {distinct_count}")

+---------------------------------------------------------------------------------+
|Language of Original Document                                                    |
+---------------------------------------------------------------------------------+
| Annu Rev Pathol Mech Dis                                                        |
| (2021); Mikolov T.                                                              |
| (2023); Jeroundi D.                                                             |
| (2013);  World Health Assembly: Progress reports (A68/36)                       |
| Li H.                                                                           |
| Smart locks: Exploring security breaches and access extensions                  |
|null                                                                             |
|IGI Global                                                                       |
| Estimation of Electricity Production for a Moroccan Wind Farm; Triantaphyl

In [0]:
# Count the occurrences of each distinct value in the 'Language of Original Document' column
language_counts = df.groupBy("Language of Original Document").count()

# Show the counts for verification
language_counts.show(truncate=False)


+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|Language of Original Document                                                                                                                                                                                                                                                                                     |count|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
| 370                                                  

In [0]:
from pyspark.sql import functions as F

# Step 1: Count the occurrences of each distinct value in the 'Language of Original Document' column
language_counts = df.groupBy("Language of Original Document").count()

# Step 2: Join the original DataFrame with the counts DataFrame
df_with_counts = df.join(language_counts, on="Language of Original Document", how="left")

# Step 3: Replace values that appear fewer than 10 times or are non-language values (like numbers) with "Unknown"
df = df_with_counts.withColumn(
    "Language of Original Document",
    F.when(df_with_counts["count"] < 10, "Unknown")  # Replace values with count less than 10
    .when(df_with_counts["Language of Original Document"].rlike("^\d+$"), "Unknown")  # Replace numeric values
    .otherwise(df_with_counts["Language of Original Document"])  # Keep values with count >= 10
)

# Step 4: Drop the count column as it's no longer needed
df = df.drop("count")





In [0]:
df = df.fillna({'Language of Original Document': 'Unknown'})
df.select("Language of Original Document").distinct().show(truncate=False)

+-----------------------------+
|Language of Original Document|
+-----------------------------+
|English                      |
|Unknown                      |
|Spanish                      |
|French                       |
+-----------------------------+



In [0]:
df.select("Document Type").show(truncate=False)


+------------------------+
|Document Type           |
+------------------------+
|Article                 |
|Article                 |
|Article                 |
| Klix.ba Magazin Kultura|
|Article                 |
|Article                 |
|Article                 |
|Article                 |
|Article                 |
|Article                 |
|Article                 |
|Article                 |
|Article                 |
|Conference paper        |
|Article                 |
|Article                 |
|Article                 |
|Article                 |
|Article                 |
|Article                 |
+------------------------+
only showing top 20 rows



In [0]:
df = df.fillna({'Abbreviated Source Title': 'Unknown'})


In [0]:
df.select("Document Type").distinct().show(truncate=True)

+--------------------+
|       Document Type|
+--------------------+
|    Malays. J. Chem.|
|              Review|
|    (2021); Singh B.|
|                Note|
|        Book chapter|
|                null|
|            Leung D.|
|       pp. 2380-2384|
| (2019); Kurgalin S.|
|         pp. 321-344|
|          Data Brief|
|           Editorial|
| Multicriteria De...|
|          Corrado G.|
| Klix.ba Magazin ...|
|         pp. 579-598|
| (2007); Agence n...|
|             Article|
| A feedforward ne...|
|    Conference paper|
+--------------------+
only showing top 20 rows



In [0]:
# Step 1: Group by 'Document Type' and count the occurrences of each value
document_type_counts = df.groupBy("Document Type").count()

# Step 2: Order the counts in descending order to get the most frequent values first
document_type_counts_sorted = document_type_counts.orderBy(F.col("count").desc())

# Step 3: Show the result
document_type_counts_sorted.show(truncate=False)


+-------------------------------------+-----+
|Document Type                        |count|
+-------------------------------------+-----+
|Article                              |10383|
|Conference paper                     |2472 |
|Review                               |669  |
|Book chapter                         |616  |
|null                                 |95   |
|Erratum                              |72   |
|Letter                               |55   |
|Editorial                            |38   |
|Note                                 |36   |
|Book                                 |33   |
|English                              |19   |
|Data paper                           |18   |
|Short survey                         |15   |
| Et al.                              |6    |
| 2                                   |5    |
| Freiburg                            |4    |
| 6                                   |3    |
| 1                                   |3    |
| 7                               

In [0]:
document_type_counts = df.groupBy("Document Type").count()

filtered_document_types = document_type_counts.filter(document_type_counts["count"] > 14)

df_with_counts = df.join(filtered_document_types, on="Document Type", how="left")

df = df_with_counts.withColumn(
    "Document Type",
    F.when(df_with_counts["count"].isNull(), "Unknown")  # Replace low-frequency values with 'Unknown'
    .when(df_with_counts["Document Type"] == "English", "Unknown")  # Replace 'English' with 'Unknown'
    .otherwise(df_with_counts["Document Type"])  # Keep values that appear more than 15 times
)
df = df.drop("count")





In [0]:
df.select("Document Type").distinct().show(truncate=False)


+----------------+
|Document Type   |
+----------------+
|Review          |
|Note            |
|Book chapter    |
|Book            |
|Unknown         |
|Short survey    |
|Editorial       |
|Erratum         |
|Letter          |
|Article         |
|Conference paper|
|Data paper      |
+----------------+



In [0]:
df= df.withColumn(
    "Publication Stage",
    F.when(df["Publication Stage"].isNull(), "Unknown")  
    .when(~df["Publication Stage"].isin("Final", "Article in press"), "Unknown")  
    .otherwise(df["Publication Stage"])  
)

In [0]:
df.filter(df['Source'].isNull()).count()


Out[70]: 78

In [0]:
df.fillna({'Funding Details': 'Unknown'})
df.select("Publication Stage").distinct().show(truncate=False)


+-----------------+
|Publication Stage|
+-----------------+
|Final            |
|Unknown          |
|Article in press |
+-----------------+



In [0]:
df.select("Open Access").distinct().show(truncate=False)


+-----------+
|Open Access|
+-----------+
|true       |
|false      |
+-----------+



In [0]:
df.select("EID").show(truncate=False)


+------------------+
|EID               |
+------------------+
|2-s2.0-85186625423|
| Sarajevo         |
|2-s2.0-85199263123|
|2-s2.0-85204184204|
|2-s2.0-85187793265|
|2-s2.0-85195665789|
|2-s2.0-85171750642|
|2-s2.0-85189164135|
|2-s2.0-85192757665|
|2-s2.0-85192671410|
|2-s2.0-85196429577|
|2-s2.0-85206664621|
|2-s2.0-85188596227|
|2-s2.0-85205061817|
|2-s2.0-85189529793|
|2-s2.0-85167914092|
|2-s2.0-85190685603|
|2-s2.0-85184996217|
|2-s2.0-85182150679|
|2-s2.0-85191006324|
+------------------+
only showing top 20 rows



In [0]:
df = df.fillna({'Source': 'Scopus'})


In [0]:
from pyspark.sql.functions import to_date, to_timestamp

# Example if you have a date column
df = df.withColumn("Start_date", to_date(df["Start_date"], "yyyy-MM-dd"))
df = df.withColumn("End_date", to_date(df["End_date"], "yyyy-MM-dd"))



In [0]:
df.printSchema()

root
 |-- Document Type: string (nullable = true)
 |-- Language of Original Document: string (nullable = false)
 |-- Authors: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Author Full Names: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Author(s) ID: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Title: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Source title: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Issue: integer (nullable = true)
 |-- article_number: integer (nullable = true)
 |-- Page start: integer (nullable = false)
 |-- Page end: integer (nullable = false)
 |-- Page count: integer (nullable = true)
 |-- Cited by: integer (nullable = false)
 |-- DOI: string (nullable = false)
 |-- Link: string (nullable = true)
 |-- Affiliations: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Authors with affiliations: str

In [0]:
# Set the legacy time parser policy to handle datetime parsing like Spark 2.x
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")


In [0]:
from pyspark.sql.functions import to_date

# Convert Start_date and End_date to date format (assuming the format is 'yyyy-MM-dd')
df = df.withColumn("Start_date", to_date(df["Start_date"], "yyyy-MM-dd"))
df = df.withColumn("End_date", to_date(df["End_date"], "yyyy-MM-dd"))

# Check the results
df.select("Start_date", "End_date").distinct().show(5, truncate=False)


+----------+----------+
|Start_date|End_date  |
+----------+----------+
|2024-05-16|2024-05-17|
|null      |null      |
|2024-07-29|2024-07-31|
|2024-01-11|2024-01-12|
|2022-11-27|2022-11-30|
+----------+----------+
only showing top 5 rows



In [0]:
# Reset the legacy time parser policy to the default behavior (Spark >= 3.0)
spark.conf.set("spark.sql.legacy.timeParserPolicy", "CORRECTED")


In [0]:
df.select("Start_date", "End_date").distinct().show(5, truncate=False)


+----------+----------+
|Start_date|End_date  |
+----------+----------+
|2024-05-16|2024-05-17|
|2024-05-15|2024-05-17|
|null      |null      |
|2024-05-27|2024-05-31|
|2023-05-29|2023-05-30|
+----------+----------+
only showing top 5 rows



In [0]:
df.show()

+----------------+-----------------------------+--------------------+--------------------+--------------------+--------------------+----+--------------------+------+-----+--------------+----------+--------+----------+--------+--------------------+--------------------+--------------------+-------------------------+--------------------+--------------------+--------------------+--------------------------+--------------------+----------+-------------+--------------------+--------------------+--------------------+----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-------+---------------+--------+------------+------------------------+-----------------+-----------+--------+------------------+--------------------------+----------+--------+
|   Document Type|Language of Original Document|             Authors|   Author Full Names|        Author(s) ID|               Title|Year|   

**Dividing to tables**

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [0]:
# Generate Article_ID as a unique key from EID or DOI
df = df.withColumn("Article_ID", F.monotonically_increasing_id())


In [0]:
articles_df = df.select(
    "Article_ID", "Title", "Year", "Source title", "Volume", "Issue", "article_number",
    "Page start", "Page end", "Page count", "Cited by", "DOI", "Link", "Abstract",
    "Molecular Sequence Numbers", "Chemicals/CAS", "Tradenames", "Manufacturers",
    "Correspondence Address", "Editors", "Publisher", "Sponsors", "ISSN", "ISBN",
    "CODEN", "PubMed ID", "Language of Original Document", "Abbreviated Source Title",
    "Document Type", "Publication Stage", "Open Access"
)

In [0]:
articles_df.printSchema()


root
 |-- Article_ID: long (nullable = false)
 |-- Title: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Source title: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Issue: integer (nullable = true)
 |-- article_number: integer (nullable = true)
 |-- Page start: integer (nullable = false)
 |-- Page end: integer (nullable = false)
 |-- Page count: integer (nullable = true)
 |-- Cited by: integer (nullable = false)
 |-- DOI: string (nullable = false)
 |-- Link: string (nullable = true)
 |-- Abstract: string (nullable = true)
 |-- Molecular Sequence Numbers: string (nullable = true)
 |-- Chemicals/CAS: string (nullable = true)
 |-- Tradenames: string (nullable = false)
 |-- Manufacturers: string (nullable = false)
 |-- Correspondence Address: string (nullable = false)
 |-- Editors: string (nullable = false)
 |-- Publisher: string (nullable = true)
 |-- Sponsors: string (nullable = false)
 |-- ISSN: string (nullable = true)
 |-- ISBN: string (nulla

In [0]:
articles_df.show(1, truncate=False)


In [0]:
# 2. Authors Table
# Explode authors to create individual rows per author
authors_df = df.withColumn(
    "author_struct",
    F.explode(F.arrays_zip(F.col("Authors"), F.col("Author full names"), F.col("Author(s) ID")))
).select(
    "Article_ID",
    F.col("author_struct.Authors").alias("Author_Name"),
    F.col("author_struct.Author full names").alias("Author_Full_Name"),
    F.col("author_struct.Author(s) ID").alias("Author_ID")
)


In [0]:
authors_df.show()



In [0]:
df = df.drop("City")
df = df.drop("Country")

df.printSchema()

root
 |-- Document Type: string (nullable = true)
 |-- Language of Original Document: string (nullable = false)
 |-- Authors: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Author Full Names: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Author(s) ID: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Title: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Source title: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Issue: integer (nullable = true)
 |-- article_number: integer (nullable = true)
 |-- Page start: integer (nullable = false)
 |-- Page end: integer (nullable = false)
 |-- Page count: integer (nullable = true)
 |-- Cited by: integer (nullable = false)
 |-- DOI: string (nullable = false)
 |-- Link: string (nullable = true)
 |-- Affiliations: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Authors with affiliations: str

In [0]:
df.select("Affiliations").show(100,truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:

# 3. Affiliations Table
# Explode affiliations to create individual rows per affiliation
affiliations_df = df.select(
    "Article_ID",
    F.explode(F.col("Affiliations")).alias("Affiliation")
)

In [0]:
affiliations_df.show(truncate=False)


+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Article_ID|Affiliation                                                                                                                                                                                      |
+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0         |Signals  Systems and Components Laboratory (LSSC)  Faculty of Sciences and Technologies of Fez  Sidi Mohamed Ben Abdellah University  B.P. 2022  Fez  Morocco                                    |
|0         | Materials Science Energy and Nanoengineering Department  Mohamed VI Polytechnic University  Benguerir  Morocco                                                 

In [0]:
affiliations_df.printSchema()


root
 |-- Article_ID: long (nullable = false)
 |-- Affiliation: string (nullable = false)



In [0]:
from pyspark.sql.functions import split, col

# Split the Affiliation column by two spaces
affiliations_df = affiliations_df.withColumn("split_affiliation", split(col("Affiliation"), "  "))

# Show the new column with the split elements
affiliations_df.select("split_affiliation").show(500,truncate=False)



+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|split_affiliation                                                                                                                                                                                                                                               |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[Signals, Systems and Components Laboratory (LSSC), Faculty of Sciences and Technologies of Fez, Sidi Mohamed Ben Abdellah University, B.P. 2022, Fez, Morocco]                                                               

In [0]:
affiliations_df.select("split_affiliation").show(truncate=False)


+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|split_affiliation                                                                                                                                                                                  |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[Signals, Systems and Components Laboratory (LSSC), Faculty of Sciences and Technologies of Fez, Sidi Mohamed Ben Abdellah University, B.P. 2022, Fez, Morocco]                                    |
|[ Materials Science Energy and Nanoengineering Department, Mohamed VI Polytechnic University, Benguerir, Morocco]                                                                                  |
|[ Univers

In [0]:
from pyspark.sql.functions import col

# Extract the last element of the array using `getItem` function
affiliations_df = affiliations_df.withColumn("Country", col("split_affiliation").getItem(-1))

# Show the result with the new 'country' column
affiliations_df.select("split_affiliation", "Country").show(truncate=False)



+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|split_affiliation                                                                                                                                                                                  |Country|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|[Signals, Systems and Components Laboratory (LSSC), Faculty of Sciences and Technologies of Fez, Sidi Mohamed Ben Abdellah University, B.P. 2022, Fez, Morocco]                                    |null   |
|[ Materials Science Energy and Nanoengineering Department, Mohamed VI Polytechnic University, Benguerir, Morocco]                                                              

In [0]:
affiliations_df = affiliations_df.drop("Country")
affiliations_df.printSchema()

root
 |-- Article_ID: long (nullable = false)
 |-- Affiliation: string (nullable = false)
 |-- split_affiliation: array (nullable = false)
 |    |-- element: string (containsNull = false)



In [0]:
from pyspark.sql.functions import element_at, col

# Use element_at to get the last item from 'split_affiliation' and create a new column 'Country'
affiliations_df = affiliations_df.withColumn("Country", element_at(col("split_affiliation"), -1))

# Show the updated DataFrame
affiliations_df.show()


+----------+--------------------+--------------------+------------+
|Article_ID|         Affiliation|   split_affiliation|     Country|
+----------+--------------------+--------------------+------------+
|         0|Signals  Systems ...|[Signals, Systems...|     Morocco|
|         0| Materials Scienc...|[ Materials Scien...|     Morocco|
|         0| University of Ch...|[ University of C...|       China|
|         1|Nanoscience and N...|[Nanoscience and ...|     Morocco|
|         1| Nanomaterials an...|[ Nanomaterials a...|     Morocco|
|         1| College of Scien...|[ College of Scie...|Saudi Arabia|
|         1| Department of Ph...|[ Department of P...|      Kuwait|
|         2|Materials Science...|[Materials Scienc...|     Morocco|
|         3|Molecular Chemist...|[Molecular Chemis...|     Morocco|
|         3| Institut de Chim...|[ Institut de Chi...|      France|
|         4|University of Abd...|[University of Ab...|     Morocco|
|         4| LCCPS  ENSAM of ...|[ LCCPS, ENSAM 

In [0]:

affiliations_df = affiliations_df.withColumn(
    "Country",
    F.when(
        (F.col("Country").rlike("^[^a-zA-Z]")) |                # Starts with non-letter character
        (F.col("Country").rlike("[^a-zA-Z]")) |                  # Contains non-letter characters
        (F.size(F.split(F.col("Country"), " ")) > 3),             # More than 3 words
        "Unknown"
    ).otherwise(F.col("Country"))
)


In [0]:
# Show the distinct values in the 'Country' column
affiliations_df.select("Country").distinct().show(truncate=False)


+----------+
|Country   |
+----------+
|Turkey    |
|Germany   |
|Jordan    |
|France    |
|Qatar     |
|China     |
|India     |
|Kuwait    |
|Unknown   |
|Spain     |
|Morocco   |
|Oman      |
|Palestine |
|Tunisia   |
|Uganda    |
|Canada    |
|Uzbekistan|
|Lebanon   |
|Poland    |
|Egypt     |
+----------+
only showing top 20 rows



In [0]:
from pyspark.sql.functions import element_at, col, when, regexp_extract

# Extract the second-to-last element from the array (before the country)
second_to_last_element = element_at(col("split_affiliation"), -2)

# Check if the second-to-last element is a number
is_numeric = regexp_extract(second_to_last_element, r'^\d+$', 0)

# Use conditional logic to either take the second-to-last element or the third-to-last
affiliations_df = affiliations_df.withColumn(
    "City",
    when(is_numeric == "", second_to_last_element)  # If it's not a number, take the second-to-last element
    .otherwise(element_at(col("split_affiliation"), -3))  # If it's a number, take the third-to-last element
)

# Show the updated DataFrame
affiliations_df.show(truncate=False)


+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+---------------+
|Article_ID|Affiliation                                                                                                                                                                                      |split_affiliation                                                                                                                                                                                  |Country|City           |
+----------+--------------------------------------------------------------------------------------------------------------------------------------

In [0]:
affiliations_df.select("City").distinct().show(truncate=False)




In [0]:
affiliations_df.select("City").distinct().show(100,truncate=False)




In [0]:
from pyspark.sql.functions import col, when, size, split, regexp_replace, lit

# Replace city names with "Unknown" if they contain numbers, have more than two words, or contain the word "University"
affiliations_df = affiliations_df.withColumn(
    "City",
    when(
        (col("City").rlike(r"\d")) |  # Check if the city contains any digit
        (size(split(col("City"), " ")) > 2) | 
        (col("City").rlike("(?i)University")),  # Case-insensitive check for "University"
        lit("Unknown")
    ).otherwise(col("City"))
)

# Show the transformed DataFrame
affiliations_df.select("City").distinct().show(truncate=False)

+---------------+
|City           |
+---------------+
|Casablanca     |
|Beijing        |
|Tetouan        |
|Rabat          |
|Oujda          |
|Takaddoum Rabat|
|Muscat MOH     |
|Ankara         |
|Montreal       |
|El Jadida      |
|Safat          |
|Unknown        |
|Tetuan         |
|Lyon           |
|Riyadh         |
|Beni-Mellal    |
|Beirut         |
|Nablus         |
|Benguerir      |
|Muscat         |
+---------------+
only showing top 20 rows



In [0]:
affiliations_df = affiliations_df.withColumn(
    "City",
    F.when(
        F.col("City").rlike("^[^a-zA-Z]"),  
        "Unknown"
    ).otherwise(
        F.trim(F.col("City")) 
    )
)

In [0]:
from pyspark.sql.functions import array_except, concat_ws, col, regexp_replace, lit, size

# Create a list of City and Country values to exclude from split_affiliation
affiliations_df = affiliations_df.withColumn(
    "Institution",
    concat_ws(" ", 
        F.expr("filter(split_affiliation, x -> x NOT IN (City, Country) AND NOT rlike(x, '\\\\d{3,}'))")
    )
)

# Clean up: Remove leading/trailing spaces and handle empty institutions
affiliations_df = affiliations_df.withColumn(
    "Institution",
    F.when(F.trim(F.col("Institution")) == "", lit("Unknown")).otherwise(F.trim(F.col("Institution")))
)

In [0]:
affiliations_df = affiliations_df.drop("split_affiliation")
affiliations_df.show(100,truncate=False)


+----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Article_ID|Affiliation                                                                                                                                                                                                   |Country   |City           |Institution                                                                                                                                                                              |
+----------+--------------------------------------------------------------------------------------------------------------------------

In [0]:
df.printSchema()

root
 |-- Document Type: string (nullable = true)
 |-- Language of Original Document: string (nullable = false)
 |-- Authors: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Author Full Names: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Author(s) ID: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Title: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Source title: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Issue: integer (nullable = true)
 |-- article_number: integer (nullable = true)
 |-- Page start: integer (nullable = false)
 |-- Page end: integer (nullable = false)
 |-- Page count: integer (nullable = true)
 |-- Cited by: integer (nullable = false)
 |-- DOI: string (nullable = false)
 |-- Link: string (nullable = true)
 |-- Affiliations: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Authors with affiliations: str

In [0]:
keywords_df = df.select(
    "Article_ID",
    F.explode(F.col("Author Keywords")).alias("Keyword")
).union(
    df.select(
        "Article_ID",
        F.explode(F.col("Index Keywords")).alias("Keyword")
    )
)


In [0]:

keywords_df = keywords_df.withColumn(
    "keyword",
    F.when(
        F.col("keyword").rlike("^[^a-zA-Z]") | F.col("keyword").rlike("[^a-zA-Z]"),
        None
    ).otherwise(F.col("keyword"))
)


In [0]:
keywords_df.count()

Out[110]: 266129

In [0]:
articles_df.printSchema()
authors_df.printSchema()
affiliations_df.printSchema()
keywords_df.printSchema()


root
 |-- Article_ID: long (nullable = false)
 |-- Title: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Source title: string (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Issue: integer (nullable = true)
 |-- article_number: integer (nullable = true)
 |-- Page start: integer (nullable = false)
 |-- Page end: integer (nullable = false)
 |-- Page count: integer (nullable = true)
 |-- Cited by: integer (nullable = false)
 |-- DOI: string (nullable = false)
 |-- Link: string (nullable = true)
 |-- Abstract: string (nullable = true)
 |-- Molecular Sequence Numbers: string (nullable = true)
 |-- Chemicals/CAS: string (nullable = true)
 |-- Tradenames: string (nullable = false)
 |-- Manufacturers: string (nullable = false)
 |-- Correspondence Address: string (nullable = false)
 |-- Editors: string (nullable = false)
 |-- Publisher: string (nullable = true)
 |-- Sponsors: string (nullable = false)
 |-- ISSN: string (nullable = true)
 |-- ISBN: string (nulla

In [0]:
keywords_df = keywords_df.filter(F.col("Keyword").isNotNull())


In [0]:
row_count = keywords_df.count()
print("Number of rows:", row_count)


Number of rows: 79284


In [0]:
articles_df.write.format("csv").option("header", "true").save("/dbfs/FileStore/Article/articles_data.csv")


In [0]:
authors_df.write.format("csv").option("header", "true").save("/dbfs/FileStore/Article/authors_data.csv")

In [0]:
affiliations_df.write.format("csv").option("header", "true").save("/dbfs/FileStore/Article/affiliations_data.csv")

In [0]:
keywords_df.write.format("csv").option("header", "true").save("/dbfs/FileStore/Article/keywords_data1.csv")