In [52]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    udf, col, lower, trim, regexp_replace, explode, split, 
    array, when, lit, coalesce, concat_ws, collect_set, 
    length, first, array_distinct, flatten
)
from pyspark.sql.types import StringType, ArrayType, BooleanType
import re


In [53]:
spark = (
    SparkSession.builder
    .appName("WikiDrugJoin")
    .config("spark.executor.memory", "16g")
    .config("spark.driver.memory", "16g")
    .config("spark.ui.enabled", "true")
    .config("spark.ui.port", "4040")
    .config("spark.driver.host", "0.0.0.0")
    .config("spark.executor.heartbeatInterval", "120s")
    .config("spark.network.timeout", "800s")
    .config("spark.rpc.askTimeout", "600s")
    .config("spark.sql.execution.arrow.pyspark.enabled", "false")
    .config("spark.python.worker.reuse", "true")
    .enableHiveSupport()
    .getOrCreate()
)

In [4]:
wiki_rdd = spark.sparkContext.textFile("data/enwiki-latest-pages-articles-multistream.xml.bz2")

In [5]:
def split_pages(lines):
    page = []
    for line in lines:
        page.append(line)
        if '</page>' in line:
            yield "\n".join(page)
            page = []

wiki_pages = wiki_rdd.mapPartitions(split_pages)
wiki_pages = wiki_pages.repartition(100)

In [6]:
def extract_page_fields_partition(pages):
    for page_xml in pages:
        title_match = re.search(r"<title>(.*?)</title>", page_xml)
        text_match = re.search(r"<text[^>]*?>([\s\S]*?)</text>", page_xml, re.DOTALL)
        if title_match and text_match:
            yield (title_match.group(1), text_match.group(1))

wiki_pairs = wiki_pages.mapPartitions(extract_page_fields_partition).filter(lambda x: x is not None)
wiki_df = wiki_pairs.toDF(["title", "text"])

                                                                                

In [60]:
def normalize(text):
    return re.sub(r'[^a-z0-9 ]', ' ', text.lower()).strip()

drugs = spark.read.csv("data/drugs_final.tsv", sep="\t", header=True)
drug_names = [r["drug_name"] for r in drugs.select("drug_name").collect() if r["drug_name"]]
active_ings = [r["active_ingredients"] for r in drugs.select("active_ingredients").collect() if r["active_ingredients"]]

drug_names = {normalize(n) for n in drug_names}
active_ings = {normalize(i) for i in active_ings}

all_names = drug_names.union(active_ings)

broadcast_names = spark.sparkContext.broadcast(all_names)

                                                                                

In [61]:
drugs.count()

14182

In [56]:
from pyspark.sql.functions import udf, col, lower
from pyspark.sql.types import BooleanType

infobox_pattern = r"\{\{(drugbox|infobox\s+(drug|chemical|pharmaceutical|medicine))"
identifier_pattern = r"atc\s*code|cas\s*number|drugbank|pubchem|unii|kegg"
# phrase_pattern = r"used\s+to\s+treat|prescribed\s+for|indicated\s+for"

wiki_druglike_df = wiki_df.filter(
    lower(col("text")).rlike(f"{infobox_pattern}|{identifier_pattern}")
)

wiki_druglike_df.count()

                                                                                

42561

In [None]:
wiki_druglike_df.write.mode("overwrite").parquet("data/wiki_druglike.parquet")

[Stage 40:>                                                      (0 + 64) / 100]

In [6]:
wiki_druglike_df = spark.read.parquet("data/wiki_druglike.parquet")

                                                                                

In [62]:
def extract_section(text, section_name):
    pattern = rf"==+\s*{section_name}\s*==+(.*?)(?:(?:==+)|\Z)"
    match = re.search(pattern, text, flags=re.IGNORECASE | re.DOTALL)
    if match:
        section_text = re.sub(r'\s+', ' ', match.group(1)).strip()
        return section_text[:5000]
    return False

def normalize(text):
    if not text:
        return ""
    normalized = re.sub(r'[^a-z0-9 ]', ' ', text.lower())
    normalized = re.sub(r'\s+', ' ', normalized).strip()
    return normalized

normalize_udf = udf(normalize, StringType())

def extract_section_any(text, section_list):
    if not text:
        return ""
    for section_name in section_list:
        section_text = extract_section(text, section_name)
        if section_text:
            return section_text
    return ""
def cleanup(parts):
    for part in parts:
        clean = re.sub(r'\{\{[^}]+\}\}', '', part)
        clean = re.sub(r'\[\[([^\]|]+\|)?([^\]]+)\]\]', r'\2', clean)
        clean = re.sub(r'<[^>]+>', '', clean)
        clean = re.sub(r'\([^)]*\)', '', clean)
        clean = clean.strip()
        if clean and len(clean) > 2 and len(clean) < 100:
            return clean
        else:
            return None
def extract_alternative_names(text):
    if not text:
        return []
    
    names = []
    other_names_pattern = r'\|\s*other[_\s]*names?\s*=\s*([^\n\|]+)'
    matches = re.findall(other_names_pattern, text, re.IGNORECASE)
    for match in matches:

        parts = re.split(r'[,;/]|\bor\b|\band\b', match)
        clean = cleanup(parts)
        if clean:
            names.append(normalize(clean))
    
    tradename_pattern = r'\|\s*tradename\s*=\s*([^\n\|]+)'
    matches = re.findall(tradename_pattern, text, re.IGNORECASE)
    for match in matches:
        parts = re.split(r'[,;/]|\bor\b|\band\b', match)
        clean = cleanup(parts)
        if clean:
            names.append(normalize(clean))
    
    iupac_pattern = r'\|\s*IUPAC[_\s]*name\s*=\s*([^\n\|]+)'
    matches = re.findall(iupac_pattern, text, re.IGNORECASE)
    for match in matches:
        parts = re.split(r'[,;/]|\bor\b|\band\b', match)
        clean = cleanup(parts)
        if clean:
            names.append(normalize(clean))

    aka_pattern = r'(?:also known as|also called|commonly known as|abbreviated as)\s+(?:\'\'\')?([^.,\n]{3,50})'
    matches = re.findall(aka_pattern, text[:1000], re.IGNORECASE)
    for match in matches:
        parts = re.split(r'[,;/]|\bor\b|\band\b', match)
        clean = cleanup(parts)
        if clean:
            names.append(normalize(clean))
    seen = set()
    unique_names = []
    for name in names:
        if name and name not in seen:
            seen.add(name)
            unique_names.append(name)
    
    return unique_names

extract_names_udf = udf(extract_alternative_names, ArrayType(StringType()))

wiki_with_names = (
    wiki_druglike_df
    .withColumn("alternative_names", extract_names_udf(col("text")))
    .withColumn("title_normalized", normalize_udf(col("title")))
    .withColumn("all_names", array_distinct(flatten(
        array(array(col("title_normalized")), col("alternative_names"))
    )))
)
extract_section_any_udf = udf(extract_section_any, StringType())

section_map = {
    "pharmacodynamics": ["Pharmacodynamics", "Pharmacology", "Mechanism of action"],
    "pharmacokinetics": ["Pharmacokinetics"],
    "medical_uses": ["Medical uses", "Clinical use", "Indications", "Therapeutic applications"],
    "adverse_effects": ["Adverse effects", "Side effects"],
    "chemistry": ["Chemistry", "Structure", "Synthesis"]
}

wiki_extracted = (
    wiki_with_names
    .withColumn("pharmacodynamics", extract_section_any_udf(col("text"), lit(section_map["pharmacodynamics"])))
    .withColumn("pharmacokinetics", extract_section_any_udf(col("text"), lit(section_map["pharmacokinetics"])))
    .withColumn("medical_uses", extract_section_any_udf(col("text"), lit(section_map["medical_uses"])))
    .withColumn("adverse_effects", extract_section_any_udf(col("text"), lit(section_map["adverse_effects"])))
    .withColumn("chemistry", extract_section_any_udf(col("text"), lit(section_map["chemistry"])))
)
wiki_extracted.printSchema()
wiki_extracted.show(3, truncate=120)


root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- alternative_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- title_normalized: string (nullable = true)
 |-- all_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- pharmacodynamics: string (nullable = true)
 |-- pharmacokinetics: string (nullable = true)
 |-- medical_uses: string (nullable = true)
 |-- adverse_effects: string (nullable = true)
 |-- chemistry: string (nullable = true)



[Stage 448:>                                                        (0 + 1) / 1]

+---------+------------------------------------------------------------------------------------------------------------------------+----------------------------+----------------+-------------------------------------+----------------+----------------+------------+---------------+------------------------------------------------------------------------------------------------------------------------+
|    title|                                                                                                                    text|           alternative_names|title_normalized|                            all_names|pharmacodynamics|pharmacokinetics|medical_uses|adverse_effects|                                                                                                               chemistry|
+---------+------------------------------------------------------------------------------------------------------------------------+----------------------------+----------------+--------------------

                                                                                

In [58]:
def clean_wikitext(text):
    if not text:
        return ""

    text = re.sub(r"\{\{.*?\}\}", " ", text, flags=re.DOTALL)

    text = re.sub(r"\[\[(File|Image):.*?\]\]", " ", text, flags=re.IGNORECASE)

    text = re.sub(r"<ref.*?>.*?</ref>", " ", text, flags=re.DOTALL)

    text = re.sub(r"\[\[Category:.*?\]\]", " ", text, flags=re.IGNORECASE)

    text = re.sub(r"\[\[([^\]|]+)\|([^\]]+)\]\]", r"\2", text)
    text = re.sub(r"\[\[([^\]]+)\]\]", r"\1", text)

    text = re.sub(r"<[^>]+>", " ", text)

    text = re.sub(r"\s+", " ", text).strip()
    return text
clean_wikitext_udf = udf(clean_wikitext, StringType())

cleaned_wiki = (
    wiki_extracted
    .withColumn("pharmacodynamics", clean_wikitext_udf(col("pharmacodynamics")))
    .withColumn("pharmacokinetics", clean_wikitext_udf(col("pharmacokinetics")))
    .withColumn("medical_uses", clean_wikitext_udf(col("medical_uses")))
    .withColumn("adverse_effects", clean_wikitext_udf(col("adverse_effects")))
    .withColumn("chemistry", clean_wikitext_udf(col("chemistry")))
    .withColumn("text", clean_wikitext_udf(col("text")))
)
cleaned_wiki.select("title","all_names", "pharmacodynamics", "pharmacokinetics").show(5, truncate=120)



[Stage 398:>                                                        (0 + 1) / 1]

+-------------+-------------------------------------+------------------------------------------------------------------------------------------------------------------------+----------------+
|        title|                            all_names|                                                                                                        pharmacodynamics|pharmacokinetics|
+-------------+-------------------------------------+------------------------------------------------------------------------------------------------------------------------+----------------+
|    Tellurium|                          [tellurium]|                                                                                                                        |                |
|     Tungsten|               [tungsten, wolfram lt]|                                                                                                                        |                |
|      Alanine|[alanine, alanic acid lt,

25/11/20 14:13:37 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 398 (TID 5013): Attempting to kill Python Worker
                                                                                

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    udf, col, lower, trim, regexp_replace, explode, split, 
    array, when, lit, coalesce, concat_ws, collect_set, 
    length, first, array_distinct, flatten, array_union,broadcast,row_number
)
from pyspark.sql.types import StringType, ArrayType
import re
from pyspark.sql.window import Window

wiki_prepared = (
    cleaned_wiki
    .select(
        col("title").alias("wiki_title"),
        col("title_normalized"),
        col("text").alias("wiki_text"),
        col("pharmacodynamics"),
        col("pharmacokinetics"),
        col("medical_uses"),
        col("adverse_effects"),
        col("chemistry"),
        explode(col("all_names")).alias("name_variant")
    )
)

def extract_ingredients(ingredients_str):
    if not ingredients_str:
        return []

    parts = re.split(r'[,;]', ingredients_str)

    ingredients = []
    for part in parts:
        ingredient = re.sub(r'\([^)]*\)', '', part).strip()
        if ingredient:
            ingredients.append(normalize(ingredient))
    
    return ingredients

extract_ingredients_udf = udf(extract_ingredients, ArrayType(StringType()))

drugs_prepared = (
    drugs
    .withColumn("drug_name_norm", normalize_udf(col("drug_name")))
    .withColumn("ingredients_list", extract_ingredients_udf(col("active_ingredients")))
    .filter(length(col("drug_name_norm")) > 0)
    .select(
        col("setid"),
        col("drug_name"),
        col("drug_name_norm"),
        col("product_type"),
        col("active_ingredients"),
        col("ingredients_list"),
        col("inactive_ingredients"),
        col("indications_and_usage"),
        col("contraindications"),
        col("warnings"),
        col("filepath")
    )
)

drugs_exploded = (
    drugs_prepared
    .withColumn("ingredient_norm", explode(col("ingredients_list")))
    .filter(length(col("ingredient_norm")) > 0) 
    .select(
        col("setid"),
        col("drug_name"),
        col("drug_name_norm"),
        col("ingredient_norm"),
        col("product_type"),
        col("active_ingredients"),
        col("inactive_ingredients"),
        col("indications_and_usage"),
        col("contraindications"),
        col("warnings"),
        col("filepath")
    )
)

from pyspark.sql.functions import (
    col, explode, array_distinct, collect_list, concat_ws, 
    length, when, lit, size, struct
)

joined = (
    drugs_exploded
    .join(
        wiki_prepared.select("name_variant", "wiki_title", "pharmacodynamics", 
                            "pharmacokinetics", "medical_uses", "adverse_effects", "chemistry"),
        drugs_exploded["ingredient_norm"] == wiki_prepared["name_variant"],
        "inner"
    )
)

final_joined = (
    joined
    .groupBy("setid")
    .agg(
        collect_list("wiki_title").alias("wiki_titles"),
        collect_list("pharmacodynamics").alias("pharmacodynamics_array"),
        collect_list("pharmacokinetics").alias("pharmacokinetics_array"),
        collect_list("medical_uses").alias("medical_uses_array"),
        collect_list("adverse_effects").alias("adverse_effects_array"),
        collect_list("chemistry").alias("chemistry_array")
    )

    .withColumn("pharmacodynamics", concat_ws(" | ", col("pharmacodynamics_array")))
    .withColumn("pharmacokinetics", concat_ws(" | ", col("pharmacokinetics_array")))
    .withColumn("medical_uses", concat_ws(" | ", col("medical_uses_array")))
    .withColumn("adverse_effects", concat_ws(" | ", col("adverse_effects_array")))
    .withColumn("chemistry", concat_ws(" | ", col("chemistry_array")))
    .withColumn("wiki_sources", concat_ws(", ", col("wiki_titles")))
    .drop("pharmacodynamics_array", "pharmacokinetics_array", "medical_uses_array",
          "adverse_effects_array", "chemistry_array")
)

result = (
    drugs_prepared
    .join(final_joined.drop("drug_name", "product_type", "active_ingredients",
                            "indications_and_usage", "contraindications", "warnings", "filepath"),
          on="setid", how="left")
)


total = result.count()
matched = result.filter(col("wiki_sources").isNotNull()).count()
print(f"Matched: {matched}/{total} ({matched/total*100:.1f}%)")

result.filter(col("wiki_sources").isNotNull()).select("setid",
    "drug_name", "active_ingredients", "wiki_sources", "pharmacodynamics"
).show(5, truncate=60)

result.write.mode("overwrite").parquet("data/drugs_with_wiki.parquet")

                                                                                

Matched: 7375/14182 (52.0%)


                                                                                

+------------------------------------+-------------------------------------------------------+-------------------------------------------+---------------------------+------------------------------------------------------------+
|                               setid|                                              drug_name|                         active_ingredients|               wiki_sources|                                            pharmacodynamics|
+------------------------------------+-------------------------------------------------------+-------------------------------------------+---------------------------+------------------------------------------------------------+
|00046231-8fab-44a0-b986-9a9dd450881d|     NP THYROID 120- levothyroxine, liothyronine tablet|LEVOTHYROXINE (76 ug), LIOTHYRONINE (18 ug)|Levothyroxine, Liothyronine|T4 is a prohormone; T4 is a precursor to the hormone T3. ...|
|002f9748-b0e8-4b6e-a38b-56930a839491|BKEMV- eculizumab-aeeb injection, solution, concen

                                                                                

pharmacodynamics: 5625


                                                                                

pharmacokinetics: 3207


                                                                                

medical_uses: 4657


                                                                                

adverse_effects: 5746


                                                                                

chemistry: 3714




In [68]:
result_final = result.drop("drug_name_norm","ingredients_list","wiki_titles","wiki_sources")

root
 |-- setid: string (nullable = true)
 |-- drug_name: string (nullable = true)
 |-- product_type: string (nullable = true)
 |-- active_ingredients: string (nullable = true)
 |-- inactive_ingredients: string (nullable = true)
 |-- indications_and_usage: string (nullable = true)
 |-- contraindications: string (nullable = true)
 |-- filepath: string (nullable = true)
 |-- pharmacodynamics: string (nullable = true)
 |-- pharmacokinetics: string (nullable = true)
 |-- medical_uses: string (nullable = true)
 |-- adverse_effects: string (nullable = true)
 |-- chemistry: string (nullable = true)



In [69]:
result_final.write.mode("overwrite").csv("data/wiki_drug_merge.tsv", sep="\t", header=True)

                                                                                