In [1]:
%env SPARK_RAPIDS_PLUGIN_JAR=/opt/spark/jars/rapids-4-spark_2.13-24.04.1.jar

env: SPARK_RAPIDS_PLUGIN_JAR=/opt/spark/jars/rapids-4-spark_2.13-24.04.1.jar


In [3]:
from pyspark.sql import SparkSession
import os


spark_rapids:str = os.getenv("SPARK_RAPIDS_PLUGIN_JAR")

# SPARK SESSION CONFIGURATION
print("Running Spark session...")
session_builder = SparkSession.Builder() \
    .master("local[*]") \
    .appName("Learning Spark") \
    .config("spark.ui.enabled", True) \
    .config("spark.driver.bindAddress", "localhost") \
    .config("spark.ui.port", "8080") \
    .config("spark.driver.memory", "10g") \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.extraClassPath", spark_rapids) \
    .config("spark.plugins", "com.nvidia.spark.SQLPlugin") \
    .config("spark.rapids.memory.gpu.pooling.enabled", True) \
    .config("spark.rapids.sql.enabled", True) \
    .config("spark.rapids.sql.explain", "NONE")
spark = session_builder.getOrCreate()
print("Session started")

Running Spark session...
Session started


In [5]:
datasets_excel = os.listdir("datasets-private/amarnameh-excel")
datasets_csv = sorted(os.listdir("datasets-private/amarnameh-csv"))

In [10]:
from pyspark.sql import DataFrame
import pandas as pd

dataframes: dict[int, DataFrame] = {}

for file_name in datasets_csv:
    print(f"[INFO] Reading: {str(file_name)}")
    filePath = f"datasets-private/amarnameh-csv/{file_name}"
    df = spark.read.csv(header=True, path=filePath, inferSchema=True)
    df_name = file_name.rstrip(".csv")
    dataframes.update({int(df_name): df})
    print("---- Done ----")

[INFO] Reading: 1398.csv
---- Done ----
[INFO] Reading: 1399.csv
---- Done ----
[INFO] Reading: 1400.csv
---- Done ----
[INFO] Reading: 1401.csv
---- Done ----
[INFO] Reading: 1402.csv
---- Done ----


In [15]:
for df_name in dataframes:
    df = dataframes.get(df_name)
    print(f"~~~~~~~~ Dataset[{df_name}]: ~~~~~~~~")
    print(f'----> Contains: {df.count()} Rows')
    print(f"----> Headers Len: {len(df.columns)}")
    for i in range(len(df.columns)):
        print(df.columns[i])

~~~~~~~~ Dataset[1398]: ~~~~~~~~
----> Contains: 36999 Rows
----> Headers Len: 27
تولید کننده / وارد کننده
صاحب پروانه
توزیع کننده
کد فرآورده
کد ATC
نام انگلیسی فرآورده
نام فارسی فرآورده
نام ژنریک
Column1
کد ژنریک
تولیدی/وارداتی
فوریتی/غیرفوریتی
نوع فرآورده
نام انگلیسی فهرست
کد ATC سطح1
عنوان ATC سطح 1
کد ATC سطح 2
عنوان ATC سطح 2
کد ATC سطح 3
عنوان ATC سطح 3
کد ATC سطح 4
عنوان ATC سطح 4
حجم فروش
تعداد در بسته
 فروش عددی 
 فروش ریالی 
_c26
~~~~~~~~ Dataset[1399]: ~~~~~~~~
----> Contains: 1048575 Rows
----> Headers Len: 20
صاحب پروانه
توزیع کننده
IRC
نام فرآورده (برند)
تولیدی/وارداتی
OTC
بیولوژیک
کشور تولید کننده
تحت لیسانس
111
نام ژنریک
فرآورده
کد ژنریک
ماده موثره
ATC Code
 تعداد فروش (بسته) 
 تعداد در بسته 
 فروش عددی  
 فروش ریالی مصرف کننده  
_c19
~~~~~~~~ Dataset[1400]: ~~~~~~~~
----> Contains: 52445 Rows
----> Headers Len: 19
نام شرکت تولید کننده
نام شرکت تامین کننده
نام صاحب برند
توزیع کننده
کشور تولید  کننده
نام برند
نام لاتین برند
نام ژنریک
نام ماده موثره
 تعداد فروش (بسته) 
 ت

In [160]:
schema = ["atc_code", "brand_owner", "drug_generic_name", "type"]

print("Normalizing...")
df_1398 = dataframes.get("1398")
df_1399 = dataframes.get("1399")
df_1400 = dataframes.get("1400")
df_1401 = dataframes.get("1401")
df_1402 = dataframes.get("1402")

df_1398 = df_1398.withColumnsRenamed({"صاحب پروانه": "brand_owner", "کد ATC": "atc_code", "نام انگلیسی فرآورده": "en_name"}) \
                            .select(["brand_owner", "atc_code", "en_name"])
df_1399 = df_1399.withColumnsRenamed({"صاحب پروانه":"brand_owner", "ATC Code": "atc_code", "نام فرآورده (برند)": "en_name"}) \
                            .select(["brand_owner", "atc_code", "en_name"])
df_1400 = df_1400.withColumnsRenamed({"نام صاحب برند": "brand_owner", "ATC Code": "atc_code", "نام لاتین برند": "en_name"}) \
                            .select(["brand_owner","atc_code", "en_name"])
df_1401 = df_1401.withColumnsRenamed({"صاحب پروانه" : "brand_owner", "کد ATC": "atc_code" , "نام انگلیسی فرآورده": "en_name"}) \
                            .select(["brand_owner", "atc_code", "en_name"])
df_1402 = df_1402.withColumnsRenamed({"صاحب پروانه" : "brand_owner", "کد ATC": "atc_code", "نام انگلیسی فرآورده": "en_name"}) \
                            .select(["brand_owner", "atc_code", "en_name"])
dataframes.update({"1398": df_1398, "1399":df_1399,  "1400": df_1400, "1401": df_1401, "1402": df_1402})
for df_year in dataframes:
    print(dataframes.get(df_year).show(truncate=False, n=1))
print("Done")


Normalizing...
+-------------+--------+---------------------------------+
|brand_owner  |atc_code|en_name                          |
+-------------+--------+---------------------------------+
|داروسازی امین|N03AX16 |PREGABALIN   CAPSULE ORAL 50 mg  |
+-------------+--------+---------------------------------+
only showing top 1 row

None
+---------------+--------+---------------------------------------+
|brand_owner    |atc_code|en_name                                |
+---------------+--------+---------------------------------------+
|اکترو خاورمیانه|L02BX03 |ABIRATERONE ACTE   TABLET ORAL 250 mg  |
+---------------+--------+---------------------------------------+
only showing top 1 row

None
+-----------+--------+--------------------------------+
|brand_owner|atc_code|en_name                         |
+-----------+--------+--------------------------------+
|اکتوورکو   |J05AX   |FAVIPIRAVIR   TABLET ORAL 200 mg|
+-----------+--------+--------------------------------+
only showing top 

In [161]:
antibiotics = readData(format="csv", fromPath="datasets-private/antibiotics/ghotb-isfahan.csv")
antibiotics = antibiotics.select(["ATC", "LABEL"]).withColumnsRenamed(colsMap={"ATC": "atc_code",
                                                                               "LABEL": "label"})
antibiotics.show(n=2, truncate=False)

Reading the dataset...
Completed
+--------+----------------------------------------------------------+
|atc_code|label                                                     |
+--------+----------------------------------------------------------+
|J01GB06 |AMIKACIN (AS SULFATE) INJECTION PARENTERAL 250 mg/1mL 2 mL|
|J01GB06 |AMIKACIN (AS SULFATE) INJECTION PARENTERAL 50 mg/1mL 2 mL |
+--------+----------------------------------------------------------+
only showing top 2 rows



In [162]:
from pyspark.sql.functions import countDistinct
from pyspark.sql.functions import udf, col
from pyspark.sql.types import IntegerType, StringType

dosage_form_words = ["INJECTION", "CAPSULE", "SUSP", "TABLET", "SUSPENSION", "CREAM"]

def DosageForm(name: str):
     if name == None:
          return "None"
     words: list[str] = name.split(" ")
     for word in words:
          word = word.rstrip(",").upper()
          for form in dosage_form_words:
               if word == form:
                    return form
               
findDosageFormUDF = udf(DosageForm, StringType())

def FindDosageForm(dataframe: DataFrame, col_name: str) -> DataFrame:
     return dataframe \
          .withColumn("dosage_form", findDosageFormUDF(col(f"{col_name}"))) \

def countManufacturers(df: dict, year:int):
     df: DataFrame = FindDosageForm(df, "en_name")
     df = df.groupBy("atc_code", "dosage_form").agg(countDistinct("brand_owner").alias(f"manufacturers_{year}")) 
     return df
        
     #    print(f"All manufacturers with their ATC codes: {result_df.count()}")
     #    print("-=-=-=-=-=-=-=-")
     # #    result_dfs.update(  {i: result_df}  )

antibiotics_dossed = FindDosageForm(antibiotics, "label")
for df_year in dataframes:
     df = dataframes.get(df_year)
     counted_df = countManufacturers(df, int(df_year))
     result_df = counted_df.join(antibiotics_dossed, on=["atc_code" ,"dosage_form"], how="inner")
     dataframes.update({df_year : result_df})

In [163]:
df = dataframes.get("1398") \
    .join(dataframes.get("1399").drop("label"), on=["atc_code", "dosage_form"], how="outer") \
    .join(dataframes.get("1400").drop("label"), on=["atc_code", "dosage_form"], how="outer") \
    .join(dataframes.get("1401").drop("label"), on=["atc_code", "dosage_form"], how="outer") \
    .join(dataframes.get("1402").drop("label"), on=["atc_code", "dosage_form"], how="outer")

df = df.select("atc_code", "dosage_form", "manufacturers_1398",
               "manufacturers_1399" , "manufacturers_1400" ,
               "manufacturers_1401" , "manufacturers_1402" , "label") \
        .dropDuplicates(["atc_code", "dosage_form", "label"]) \
        .sort("label")

print(f"ROWS: {df.count()}")
df.show(truncate=False)


                                                                                

ROWS: 151


[Stage 5433:====>           (2 + 5) / 7][Stage 5439:>               (0 + 1) / 1]

+--------+-----------+------------------+------------------+------------------+------------------+------------------+---------------------------------------------------------------------------+
|atc_code|dosage_form|manufacturers_1398|manufacturers_1399|manufacturers_1400|manufacturers_1401|manufacturers_1402|label                                                                      |
+--------+-----------+------------------+------------------+------------------+------------------+------------------+---------------------------------------------------------------------------+
|J01DC02 |INJECTION  |NULL              |NULL              |NULL              |2                 |NULL              |NULL                                                                       |
|J01GB06 |INJECTION  |3                 |3                 |3                 |3                 |3                 |AMIKACIN (AS SULFATE) INJECTION PARENTERAL 250 mg/1mL 2 mL                 |
|J01GB06 |INJECTION  |3       

                                                                                

In [164]:
def ZeroHandler(input):
    if input == None:
        return 0
    else:
        return input
    
zeroHandlerUDF = udf(ZeroHandler, IntegerType())

for df_year in dataframes:
    df = df.withColumn(f"{df_year}", zeroHandlerUDF(col(f"manufacturers_{df_year}")))

In [165]:
from pyspark.sql.functions import array
from pyspark.sql.types import FloatType
import numpy as np
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans

    
def Var(array):
    return float(np.var(array))/float(np.mean(array))

varUDF = udf(Var, FloatType())

years_columns = ['1398', '1399', '1400', '1401', '1402']

df = df.withColumn("years_array", array([col(x) for x in years_columns]))
df = df.withColumn("variance", varUDF(col("years_array")))

In [166]:
# Assemble the variance into a feature vector
assembler = VectorAssembler(inputCols=["variance"], outputCol="variance_vec")
df = assembler.transform(df)

# Normalize the variance
scaler = StandardScaler(inputCol="variance_vec", outputCol="normalized_variance", withMean=True, withStd=True)
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

# Cluster the data
k_mean = KMeans(k=3, seed=0, featuresCol="normalized_variance", predictionCol="cluster")
model = k_mean.fit(df)
df = model.transform(df)

# Define cluster to category mapping
cluster_mapping = {0: 'little', 1: 'most', 2: 'moderate'}

# Convert mapping
mapping_df = spark.createDataFrame(list(cluster_mapping.items()), ["cluster", "category"])
df = df.join(mapping_df, "cluster")

df.select("atc_code", "variance", "cluster", "category").show(truncate=False)



                                                                                

+--------+----------+-------+--------+
|atc_code|variance  |cluster|category|
+--------+----------+-------+--------+
|J01CA01 |0.6       |0      |little  |
|J01CA01 |0.6       |0      |little  |
|J01FA10 |0.10666667|0      |little  |
|J01FA10 |0.10666667|0      |little  |
|J01FA10 |0.07619048|0      |little  |
|J01FA10 |0.13333334|0      |little  |
|J01FA10 |0.07619048|0      |little  |
|J01FA10 |0.07619048|0      |little  |
|J01FA10 |0.07619048|0      |little  |
|J01FA10 |0.07619048|0      |little  |
|J01FA10 |0.17704917|0      |little  |
|J01FA10 |0.17704917|0      |little  |
|J01CE01 |1.7       |2      |moderate|
|J01CE01 |1.7       |2      |moderate|
|J01CE01 |1.7       |2      |moderate|
|J01DB01 |1.75      |2      |moderate|
|J01DB01 |1.75      |2      |moderate|
|J01DB01 |0.5777778 |0      |little  |
|J01DB01 |0.5777778 |0      |little  |
|J01DB04 |1.4689655 |2      |moderate|
+--------+----------+-------+--------+
only showing top 20 rows



In [168]:
save = True

if save:
    print("Saving final result...")
    with pd.ExcelWriter("datasets-private/result.xlsx") as writer:
        print("result page ...")
        df.drop("cluster", "1398", "1399", "1400", "1401", "1402").toPandas().to_excel(writer, sheet_name="result", index=True)
        print("joined data frames pages ...")
        for i in range(1398, 1403):
            print(f"page {i} ...")
            dataframes.get(str(i)).toPandas().to_excel(writer, sheet_name=f"{i}", index=False)
        print("Done")
else:
    print("NOT SAVED")

Saving final result...
result page ...


                                                                                

joined data frames pages ...
page 1398 ...
page 1399 ...
page 1400 ...
page 1401 ...
page 1402 ...
Done
