## Data Transformation from JSON to Dataframe

### All Imports

In [1]:
import pandas as pd
import numpy as np
import seaborn as sns
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, explode, col, arrays_zip
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
import pyspark.sql.functions as F
from pyspark.sql.functions import sum,avg,max
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
import findspark
findspark.init()

### Spark Session

In [2]:
spark = SparkSession.builder \
    .master("local") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .appName("data_cleaning") \
    .getOrCreate()

### Read File

In [3]:
# json_data = spark.read.option("multiline","true").json(["./Data/JSON/drug-event-0008-of-0030.json","./Data/JSON/drug-event-0008-of-0034.json"])
json_data = spark.read.option("multiline","true").json("./Data/JSON")

In [4]:
json_data.printSchema()

root
 |-- meta: struct (nullable = true)
 |    |-- disclaimer: string (nullable = true)
 |    |-- last_updated: string (nullable = true)
 |    |-- license: string (nullable = true)
 |    |-- results: struct (nullable = true)
 |    |    |-- limit: long (nullable = true)
 |    |    |-- skip: long (nullable = true)
 |    |    |-- total: long (nullable = true)
 |    |-- terms: string (nullable = true)
 |-- results: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- authoritynumb: string (nullable = true)
 |    |    |-- companynumb: string (nullable = true)
 |    |    |-- duplicate: string (nullable = true)
 |    |    |-- fulfillexpeditecriteria: string (nullable = true)
 |    |    |-- occurcountry: string (nullable = true)
 |    |    |-- patient: struct (nullable = true)
 |    |    |    |-- drug: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- actiondrug: string (nullable = true)
 |    

In [5]:
# json_data.show()

In [6]:
exploded_results = json_data.select(explode(F.col("results")).alias("exploded_results"))

In [7]:
# exploded_results.count()
#12000

In [8]:
# exploded_results.show()

In [9]:
all_keys = []

### Converting Nested JSON Data into Columns

In [10]:
# temp_data.withColumn("keys", F.json_object_keys(temp_data.exploded_array)).show()
keys = exploded_results.select(F.col("exploded_results.*")).columns
keys = ["exploded_results."+str(i) for i in keys]
all_keys.extend(keys)

In [11]:
patient_keys = exploded_results.select(F.col("exploded_results.patient.*")).columns
patient_keys = ["exploded_results.patient."+str(i) for i in patient_keys]
all_keys.extend(patient_keys)
updated_data = exploded_results.select(all_keys)
updated_data = updated_data.drop(F.col("patient"))
all_keys = updated_data.columns

In [12]:
# updated_data.count()
#12000

In [13]:
updated_data = updated_data.select(all_keys)\
            .withColumn("explode_drug",F.explode(F.col("drug")))
drug_keys = updated_data.select(F.col("explode_drug.*")).columns
drug_keys = ["explode_drug."+i for i in drug_keys]
all_keys.extend(drug_keys)

In [14]:
updated_data = updated_data.select(all_keys)

In [15]:
updated_data = updated_data.drop(*['authoritynumb','duplicate','reportduplicate','patientagegroup','patientweight','summary'])

In [16]:
updated_data = updated_data.where(F.col("drugindication") != 'NULL')

In [17]:
updated_data = updated_data.where(F.col("drugindication") != "Product used for unknown indication")

In [18]:
# updated_data.cache()

In [19]:
# updated_data.show()

In [20]:
all_keys = updated_data.columns

In [21]:
all_keys = updated_data.columns
updated_data = updated_data.select(all_keys)\
            .withColumn("explode_reaction",F.explode(F.col("reaction")))
all_keys = updated_data.columns
reaction_keys = updated_data.select(F.col("explode_reaction.*")).columns
reaction_keys = ["explode_reaction."+i for i in reaction_keys]
all_keys.extend(reaction_keys)

In [22]:
updated_data = updated_data.select(all_keys)

In [23]:
updated_data = updated_data.where(F.col("reactionmeddrapt") != 'NULL')

In [24]:
updated_data = updated_data.select(["seriousnessdeath","seriousnesslifethreatening","seriousnesshospitalization","seriousnessdisabling","seriousnesscongenitalanomali","seriousnessother","patientonsetage","reactionmeddrapt","reactionoutcome","drugindication","activesubstance.activesubstancename","medicinalproduct","openfda.route","openfda.brand_name","openfda.generic_name"])

In [25]:
# updated_data.cache()

In [26]:
updated_data = updated_data.dropna()

In [27]:
grouped_data = updated_data.groupby(F.col("activesubstancename")).count()

In [28]:
grouped_data.orderBy(F.col("count").desc()).agg(avg("count")).show()

+-----------------+
|       avg(count)|
+-----------------+
|60.51837769328264|
+-----------------+



In [29]:
updated_data = updated_data.join(grouped_data.filter(F.col("count") > 60).select("activesubstancename"),on="activesubstancename", how="inner")

In [30]:
grouped_data = updated_data.groupby(F.col("medicinalproduct")).count()
grouped_data.orderBy(F.col("count").desc()).agg(avg("count")).show()

+-----------------+
|       avg(count)|
+-----------------+
|162.0207468879668|
+-----------------+



In [31]:
updated_data = updated_data.join(grouped_data.filter(F.col("count") > 162).select("medicinalproduct"),on="medicinalproduct", how="inner")

In [32]:
grouped_data = updated_data.groupby(F.col("reactionmeddrapt")).count()
grouped_data.orderBy(F.col("count").desc()).agg(avg("count")).show()

+------------------+
|        avg(count)|
+------------------+
|11.653800708940528|
+------------------+



In [33]:
updated_data = updated_data.join(grouped_data.filter(F.col("count") > 11).select("reactionmeddrapt"),on="reactionmeddrapt", how="inner")
updated_data = updated_data.where(F.col("reactionmeddrapt") != 'Off label use')

In [34]:
grouped_data = updated_data.groupby(F.col("drugindication")).count()
grouped_data.orderBy(F.col("count").desc()).agg(avg("count")).show()

+-----------------+
|       avg(count)|
+-----------------+
|39.72875226039783|
+-----------------+



In [35]:
updated_data = updated_data.join(grouped_data.filter(F.col("count") > 39).select("drugindication"),on="drugindication", how="inner")
updated_data = updated_data.where(F.col("drugindication") != 'Off label use')

In [36]:
grouped_data = updated_data.groupby(F.col("medicinalproduct")).count()
grouped_data.orderBy(F.col("count").desc()).agg(avg("count")).show()

+-----------------+
|       avg(count)|
+-----------------+
|324.8813559322034|
+-----------------+



In [37]:
updated_data = updated_data.join(grouped_data.filter(F.col("count") > 324).select("medicinalproduct"),on="medicinalproduct", how="inner")

In [38]:
# updated_data = updated_data.select(all_keys)
# all_keys = updated_data.columns
# openfda_keys = updated_data.select(F.col("openfda.*")).columns
# openfda_keys = ["openfda."+i for i in openfda_keys]
# all_keys.extend(openfda_keys)
# all_keys.append("activesubstance.activesubstancename")

In [39]:
# updated_data = updated_data.withColumn("openfda_{}".format("application_number"),F.explode(F.col('openfda.application_number')))\
#             .withColumn("openfda_{}".format("brand_name"),F.explode(F.col('openfda.brand_name')))\
#             .withColumn("openfda_{}".format("generic_name"),F.explode(F.col('openfda.generic_name')))\
#             .withColumn("openfda_{}".format("manufacturer_name"),F.explode(F.col('openfda.manufacturer_name')))\
#             .withColumn("openfda_{}".format("product_type"),F.explode(F.col('openfda.product_type')))\
#             .withColumn("openfda_{}".format("substance_name"),F.explode(F.col('openfda.substance_name')))
#             # .withColumn("openfda_{}".format("route"),F.explode(F.col('openfda.route')))\
#             # .withColumn("openfda_{}".format("rxcui"),F.explode(F.col('openfda.rxcui')))\
#             # .withColumn("openfda_{}".format("spl_id"),F.explode(F.col('openfda.spl_id')))\
#             # .withColumn("openfda_{}".format("spl_set_id"),F.explode(F.col('openfda.spl_set_id')))\
#             # .withColumn("openfda_{}".format("nui"),F.explode(F.col('openfda.nui')))\
#             # .withColumn("openfda_{}".format("package_ndc"),F.explode(F.col('openfda.package_ndc')))\
#             # .withColumn("openfda_{}".format("pharm_class_cs"),F.explode(F.col('openfda.pharm_class_cs')))\
#             # .withColumn("openfda_{}".format("pharm_class_epc"),F.explode(F.col('openfda.pharm_class_epc')))\
#             # .withColumn("openfda_{}".format("pharm_class_moa"),F.explode(F.col('openfda.pharm_class_moa')))\
#             # .withColumn("openfda_{}".format("pharm_class_pe"),F.explode(F.col('openfda.pharm_class_pe')))\
#             # .withColumn("openfda_{}".format("product_ndc"),F.explode(F.col('openfda.product_ndc')))\
#             # .withColumn("openfda_{}".format("unii"),F.explode(F.col('openfda.unii'))).show()

In [40]:
# updated_data.count()

In [41]:
# all_keys = updated_data.columns
# primarysource_keys = updated_data.select(F.col("primarysource.*")).columns
# primarysource_keys = ["primarysource."+i for i in primarysource_keys]
# all_keys.extend(primarysource_keys)

In [42]:
# updated_data.show()

In [43]:
# try:
#     updated_data = updated_data.select(all_keys)\
#                 .withColumn("explode_drugrecurrence",F.explode(F.col("drugrecurrence")))
#     all_keys = updated_data.columns
#     drugrecurrence_keys = updated_data.select(F.col("explode_drugrecurrence.*")).columns
#     drugrecurrence_keys = ["explode_drugrecurrence."+i for i in drugrecurrence_keys]
#     all_keys.extend(drugrecurrence_keys)
# except Exception as e:
#     print(e)

In [44]:
# updated_data = updated_data.drop(F.col("primarysource"))
# updated_data = updated_data.drop(F.col("explode_drugrecurrence"))
# updated_data = updated_data.drop(F.col("drug"))
# updated_data = updated_data.drop(F.col("reaction"))
# updated_data = updated_data.drop(F.col("receiver"))
# updated_data = updated_data.drop(F.col("sender"))
# updated_data = updated_data.drop(F.col("summary"))
# updated_data = updated_data.drop(F.col("activesubstance"))
# updated_data = updated_data.drop(F.col("drugrecurrence"))
# updated_data = updated_data.drop(F.col("openfda"))
# updated_data = updated_data.drop(F.col("explode_reaction"))

## Sample Data Storing

In [45]:
df_sample = updated_data.sample(withReplacement=False, fraction=0.1)

In [49]:
pandas_df = df_sample.toPandas()

In [50]:
pandas_df.to_csv("transformed_data.csv")

## Data Encoding

In [45]:
spark = SparkSession.builder \
    .master("local") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .appName("data_encoding") \
    .getOrCreate()

In [46]:
data = spark.read.csv("./transformed_data.csv",header=True)

In [47]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- medicinalproduct: string (nullable = true)
 |-- drugindication: string (nullable = true)
 |-- reactionmeddrapt: string (nullable = true)
 |-- activesubstancename: string (nullable = true)
 |-- seriousnessdeath: string (nullable = true)
 |-- seriousnesslifethreatening: string (nullable = true)
 |-- seriousnesshospitalization: string (nullable = true)
 |-- seriousnessdisabling: string (nullable = true)
 |-- seriousnesscongenitalanomali: string (nullable = true)
 |-- seriousnessother: string (nullable = true)
 |-- patientonsetage: string (nullable = true)
 |-- reactionoutcome: string (nullable = true)
 |-- route: string (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- generic_name: string (nullable = true)



In [48]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [49]:
indexer = StringIndexer(inputCol="medicinalproduct", outputCol="op_medicinalproduct")

In [50]:
encoder = OneHotEncoder(inputCol="op_medicinalproduct", outputCol="vec_medicinalproduct")

In [51]:
data = indexer.fit(data).transform(data)
data = encoder.fit(data).transform(data)

In [None]:
from pyspark.ml.functions import vector_to_array
data = data.withColumn('vec_medicinalproduct_dense', vector_to_array('vec_medicinalproduct'))

In [53]:
data.select(["medicinalproduct","op_medicinalproduct","vec_medicinalproduct"]).show(5)

+----------------+-------------------+--------------------+
|medicinalproduct|op_medicinalproduct|vec_medicinalproduct|
+----------------+-------------------+--------------------+
|        REVLIMID|                0.0|      (16,[0],[1.0])|
|        REVLIMID|                0.0|      (16,[0],[1.0])|
|        REVLIMID|                0.0|      (16,[0],[1.0])|
|         OCREVUS|                3.0|      (16,[3],[1.0])|
|     VEDOLIZUMAB|                6.0|      (16,[6],[1.0])|
+----------------+-------------------+--------------------+
only showing top 5 rows



In [61]:
data.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- medicinalproduct: string (nullable = true)
 |-- drugindication: string (nullable = true)
 |-- reactionmeddrapt: string (nullable = true)
 |-- activesubstancename: string (nullable = true)
 |-- seriousnessdeath: string (nullable = true)
 |-- seriousnesslifethreatening: string (nullable = true)
 |-- seriousnesshospitalization: string (nullable = true)
 |-- seriousnessdisabling: string (nullable = true)
 |-- seriousnesscongenitalanomali: string (nullable = true)
 |-- seriousnessother: string (nullable = true)
 |-- patientonsetage: string (nullable = true)
 |-- reactionoutcome: string (nullable = true)
 |-- route: string (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- generic_name: string (nullable = true)
 |-- op_medicinalproduct: double (nullable = false)
 |-- vec_medicinalproduct: vector (nullable = true)
 |-- vec_medicinalproduct_dense: array (nullable = false)
 |    |-- element: double (containsNull = false)



In [66]:
data = [(0.0, Vectors.dense(0.5, 10.0)),
        (0.0, Vectors.dense(1.5, 20.0)),
        (1.0, Vectors.dense(1.5, 30.0)),
        (0.0, Vectors.dense(3.5, 30.0)),
        (0.0, Vectors.dense(3.5, 40.0)),
        (1.0, Vectors.dense(3.5, 40.0))]
df = spark.createDataFrame(data, ["label", "features"])

In [68]:
df.printSchema()

root
 |-- label: double (nullable = true)
 |-- features: vector (nullable = true)



In [69]:
from pyspark.ml.feature import VectorAssembler

# Assuming seriousnessdeath is a string column
updated_data = updated_data.withColumn("seriousnessdeath", updated_data["seriousnessdeath"].cast("double"))\
                           .withColumn("reactionoutcome", updated_data["reactionoutcome"].cast("double"))\

# vec_assembler_seriousnessdeath = VectorAssembler(inputCols=["seriousnessdeath"], outputCol="seriousnessdeath_vector")
# updated_data = vec_assembler_seriousnessdeath.transform(updated_data)

# # Create vector for 'reactionoutcome'
# vec_assembler_reactionoutcome = VectorAssembler(inputCols=["reactionoutcome"], outputCol="reactionoutcome_vector")
# updated_data = vec_assembler_reactionoutcome.transform(updated_data)


In [70]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import ChiSquareTest

In [71]:
updated_data.columns

['medicinalproduct',
 'drugindication',
 'reactionmeddrapt',
 'activesubstancename',
 'seriousnessdeath',
 'seriousnesslifethreatening',
 'seriousnesshospitalization',
 'seriousnessdisabling',
 'seriousnesscongenitalanomali',
 'seriousnessother',
 'patientonsetage',
 'reactionoutcome',
 'route',
 'brand_name',
 'generic_name',
 'seriousnessdeath_vector',
 'reactionoutcome_vector']

In [72]:
# updated_data.select("seriousnessdeath", "reactionoutcome").show(truncate=False)

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 59458)
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "C:\Users\Akshay\anaconda3\envs\tf\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Akshay\anaconda3\envs\tf\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "C:\Users\Akshay\anaconda3\envs\tf\lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "C:\Users\Akshay\anaconda3\envs\tf\lib\socket.py", line 704, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
Traceback (most recent

KeyboardInterrupt: 

In [None]:
r = ChiSquareTest.test(updated_data, "seriousnessdeath", "reactionoutcome").head()
print("pValues: " + str(r.pValues))
print("degreesOfFreedom: " + str(r.degreesOfFreedom))
print("statistics: " + str(r.statistics))

## Data Cleaning

In [48]:
pandas_df = updated_data.toPandas()

In [49]:
pandas_df.dropna()

Unnamed: 0,medicinalproduct,drugindication,reactionmeddrapt,activesubstancename,seriousnessdeath,seriousnesslifethreatening,seriousnesshospitalization,seriousnessdisabling,seriousnesscongenitalanomali,seriousnessother,patientonsetage,reactionoutcome,route,brand_name,generic_name
0,NAPROXEN SODIUM,Headache,Asthenia,NAPROXEN\NAPROXEN SODIUM,2,1,1,1,2,1,43,3,[ORAL],"[NAPROXEN SODIUM, LIL DRUG STORE PAIN RELIEF A...","[NAPROXEN SODIUM, NAPROXEN SODIUM, COATED TABL..."
1,NAPROXEN SODIUM,Headache,Arthropathy,NAPROXEN\NAPROXEN SODIUM,2,1,1,1,2,1,43,3,[ORAL],"[NAPROXEN SODIUM, LIL DRUG STORE PAIN RELIEF A...","[NAPROXEN SODIUM, NAPROXEN SODIUM, COATED TABL..."
2,NAPROXEN SODIUM,Headache,Arthralgia,NAPROXEN\NAPROXEN SODIUM,2,1,1,1,2,1,43,2,[ORAL],"[NAPROXEN SODIUM, LIL DRUG STORE PAIN RELIEF A...","[NAPROXEN SODIUM, NAPROXEN SODIUM, COATED TABL..."
3,NAPROXEN SODIUM,Headache,Arthralgia,NAPROXEN\NAPROXEN SODIUM,1,1,1,1,2,1,43,5,[ORAL],"[NAPROXEN SODIUM, LIL DRUG STORE PAIN RELIEF A...","[NAPROXEN SODIUM, NAPROXEN SODIUM, COATED TABL..."
4,NAPROXEN SODIUM,Headache,Nausea,NAPROXEN\NAPROXEN SODIUM,2,1,1,1,2,1,43,2,[ORAL],"[NAPROXEN SODIUM, LIL DRUG STORE PAIN RELIEF A...","[NAPROXEN SODIUM, NAPROXEN SODIUM, COATED TABL..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
462088,ADALIMUMAB,Psoriasis,Crohn^s disease,ADALIMUMAB,2,2,2,2,2,1,26,1,[SUBCUTANEOUS],"[IDACIO, HUMIRA, ADALIMUMAB, YUFLYMA, ADALIMUM...","[ADALIMUMAB, ADALIMUMAB-AATY, ADALIMUMAB-FKJP]"
462089,ADALIMUMAB,Psoriasis,Psoriasis,ADALIMUMAB,2,2,1,2,2,2,60,6,[SUBCUTANEOUS],"[IDACIO, HUMIRA, ADALIMUMAB, YUFLYMA, ADALIMUM...","[ADALIMUMAB, ADALIMUMAB-AATY, ADALIMUMAB-FKJP]"
462090,ADALIMUMAB,Psoriasis,Psoriasis,ADALIMUMAB,2,2,1,2,2,2,60,6,[SUBCUTANEOUS],"[IDACIO, HUMIRA, ADALIMUMAB, YUFLYMA, ADALIMUM...","[ADALIMUMAB, ADALIMUMAB-AATY, ADALIMUMAB-FKJP]"
462091,ADALIMUMAB,Psoriasis,Psoriasis,ADALIMUMAB,2,2,1,2,2,2,60,6,[SUBCUTANEOUS],"[IDACIO, HUMIRA, ADALIMUMAB, YUFLYMA, ADALIMUM...","[ADALIMUMAB, ADALIMUMAB-AATY, ADALIMUMAB-FKJP]"


In [50]:
# pandas_df["medicinalproduct"].value_counts()

### Removing Null Values

In [None]:
updated_data.na.drop().toPandas().to_csv("test1.csv")

In [None]:
# os.environ["HADOOP_HOME"] = "C:/hadoop/hadoop-2.8.3"
# os.environ["PATH"] = "C:/hadoop/hadoop-2.8.3/bin"
# updated_data.write.csv("./test1.csv", mode="overwrite", header=True)

In [None]:
# updated_data.write.parquet("output_parquet_path", mode="overwrite")