#Defining reusuable codes for Clinical Trial datasets
--------------------------------------------------
#Instructions: Upload the clinical trial and pharma zip file. 
#Change the year in the fileroot (in cmd 2) to the uploaded clinicaltrial file name year

In [0]:
#Defining reusuable codes for Clinical Trial datasets
fileroot = "clinicaltrial_2023"

In [0]:
#Extracting the year of clinical trial
fileName = fileroot.split("_")
trial_Year = fileName[1]
trial_Year

In [0]:
#Copying the file to /tmp directory on driver node
dbutils.fs.cp("/FileStore/tables/" + fileroot + ".zip", "file:/tmp/")

In [0]:
#Checking the fileroot.zip in DBFS directory
dbutils.fs.ls("/FileStore/tables/")

In [0]:
#Checking the fileroot.zip in local tmp directory
dbutils.fs.ls("file:/tmp/")

In [0]:
#Making 'fileroot' accessible by the command line
import os
os.environ['fileroot'] = fileroot

In [0]:
%sh
# Use the '-o' option to overwrite existing files without prompting
unzip -o -d /tmp /tmp/$fileroot.zip

In [0]:
#Moving the unzipped file to the DBFS directory
dbutils.fs.mv("file:/tmp/" + fileroot +".csv" , "/FileStore/tables/"+ fileroot +".csv", True )

In [0]:
#Checking the unzipped file in DBFS directory
dbutils.fs.ls("/FileStore/tables/")

In [0]:
#Checking the DBFS directory
dbutils.fs.ls ("/FileStore/tables/" )

In [0]:
file_path = "/FileStore/tables/"+ fileroot +".csv"
print(dbutils.fs.head(file_path))

In [0]:
dbutils.fs.head("/FileStore/tables/"+fileroot+".csv")

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

# Create or get a Spark session (modify appName as needed)
spark = SparkSession.builder.appName("ClinicalTrialData").getOrCreate()

# Define the schema for clinicaltrial_2023
schema_2023 = StructType([
    StructField("Id", StringType(), True),
    StructField("Study Title", StringType(), True),
    StructField("Acronym", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Conditions", StringType(), True),
    StructField("Interventions", StringType(), True),
    StructField("Sponsor", StringType(), True),
    StructField("Collaborators", StringType(), True),
    StructField("Enrollment", StringType(), True),
    StructField("Funder Type", StringType(), True),
    StructField("Type", StringType(), True),
    StructField("Study Design", StringType(), True),
    StructField("Start", StringType(), True),
    StructField("Completion", StringType(), True)
])

# Define the schema for clinicaltrial_2020
schema_2020 = StructType([
    StructField("Id", StringType(), True),
    StructField("Sponsor", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Start", StringType(), True),
    StructField("Completion", StringType(), True),
    StructField("Type", StringType(), True),
    StructField("Submission", DateType(), True),
    StructField("Conditions", StringType(), True),
    StructField("Interventions", StringType(), True)
])

# Specify the file path
file_path = "/FileStore/tables/" + fileroot + ".csv"

# Determine the file type and apply the appropriate schema and delimiter
if "clinicaltrial_2023" in fileroot:
    # Read the CSV file using the schema for clinicaltrial_2023
    clinicaltrial_df = spark.read.format("csv")\
        .option("delimiter", "\t")\
        .option("quote","")\
        .option("header", "true")\
        .schema(schema_2023)\
        .load(file_path)
        
elif "clinicaltrial_2020" in fileroot or "clinicaltrial_2021" in fileroot:
    # Read the CSV file using the schema for clinicaltrial_2020
    clinicaltrial_df = spark.read.format("csv")\
        .option("delimiter", "|")\
        .option("header", "true")\
        .schema(schema_2020)\
        .load(file_path)
else:
    print("Unknown file type in fileroot")

# Display the DataFrame
display(clinicaltrial_df)


In [0]:
from pyspark.sql.functions import trim, regexp_replace, col

# Perform cleaning operations only if the fileroot is clinicaltrial_2023
if "clinicaltrial_2023" in fileroot:
    # Clean the first column of the DataFrame
    clinicaltrial_df = clinicaltrial_df.withColumn(
        clinicaltrial_df.columns[0],  # First column
        regexp_replace(col(clinicaltrial_df.columns[0]), r'^"|"$', '')  # Remove leading and trailing quotation marks
    ).withColumn(
        clinicaltrial_df.columns[0],  # First column
        trim(col(clinicaltrial_df.columns[0]))  # Trim leading and trailing spaces
    )

    # Clean the 14th column of the DataFrame
    clinicaltrial_df = clinicaltrial_df.withColumn(
        clinicaltrial_df.columns[13],  # 14th column (index 13)
        regexp_replace(col(clinicaltrial_df.columns[13]), r'("|,)+$', '')  # Remove trailing quotation marks and commas
    ).withColumn(
        clinicaltrial_df.columns[13],  # 14th column (index 13)
        trim(col(clinicaltrial_df.columns[13]))  # Trim leading and trailing spaces
    )

    # Display the cleaned DataFrame
    display(clinicaltrial_df)
else:
    # If not clinicaltrial_2023, display the DataFrame as is
    display(clinicaltrial_df)


#Question 1. The number of studies in the dataset. You must ensure that you explicitly check distinct studies.

In [0]:
def count_distinct_studies(df, id_col, title_col):
    """
    Calculate the distinct counts of studies based on 'Id' and 'Study Title' columns.

    Args:
        df (DataFrame): The DataFrame containing clinical trial data.
        id_col (str): The column name for 'Id'.
        title_col (str): The column name for 'Study Title'.

    Returns:
        dict: A dictionary containing the distinct counts for 'Id' and 'Study Title'.
    """
    # Count the distinct number of studies based on the 'Id' column
    distinct_studies_id = df.select(id_col).distinct().count()

    # Count the distinct number of studies based on the 'Study Title' column
    distinct_studies_title = df.select(title_col).distinct().count()

    # Display the results
    print(f"Number of distinct studies based on '{id_col}': {distinct_studies_id}")
    print(f"Number of distinct studies based on '{title_col}': {distinct_studies_title}")

    # Return a dictionary with the results
    return {
        "distinct_studies_id": distinct_studies_id,
        "distinct_studies_title": distinct_studies_title
    }

# Assuming 'clinicaltrial_df' is the DataFrame you want to work with
# Define the column names based on the DataFrame's structure
# For clinicaltrial_2023, the column names are 'Id' and 'Study Title'
# For clinicaltrial_2020 and clinicaltrial_2021, the column names may be different

if "clinicaltrial_2023" in fileroot:
    # Define column names for clinicaltrial_2023
    id_col = "Id"
    title_col = "Study Title"
elif "clinicaltrial_2020" in fileroot or "clinicaltrial_2021" in fileroot:
    # Define column names for clinicaltrial_2020 and clinicaltrial_2021
    # Assume the 'Id' column name is 'Id' and there is no 'Study Title' column
    id_col = "Id"
    title_col = None
else:
    # Default case (modify as needed based on data)
    id_col = "Id"
    title_col = "Study Title"

# Call the function to calculate distinct counts
if title_col is not None:
    results = count_distinct_studies(clinicaltrial_df, id_col, title_col)
else:
    # If there's no 'Study Title' column, only calculate the distinct count for 'Id'
    results = count_distinct_studies(clinicaltrial_df, id_col, id_col)

# Display the dictionary of results (optional)
print(results)


In [0]:
def display_first_20_distinct_ids(df, id_col):
    """
    Display the first 20 distinct Ids from the DataFrame.

    Args:
        df (DataFrame): The DataFrame containing clinical trial data.
        id_col (str): The column name for 'Id'.

    Returns:
        None
    """
    # Get the distinct Ids
    distinct_ids = df.select(id_col).distinct()

    # Select the first 20 distinct Ids
    first_20_distinct_ids = distinct_ids.limit(20)

    # Display the first 20 distinct Ids
    print(f"First 20 distinct Ids in column '{id_col}':")
    display(first_20_distinct_ids)

# Assuming 'clinicaltrial_df' is the DataFrame you want to work with
# Define the column name for 'Id'
id_col = "Id"  # Assuming the column name for 'Id' is 'Id'

# Call the function to display the first 20 distinct Ids
display_first_20_distinct_ids(clinicaltrial_df, id_col)


In [0]:
from pyspark.sql import DataFrame

def display_first_20_distinct_study_titles(df: DataFrame, title_col: str) -> None:
    """
    Display the first 20 distinct study titles from the DataFrame.

    Args:
        df (DataFrame): The DataFrame containing clinical trial data.
        title_col (str): The column name for 'Study Title'.

    Returns:
        None
    """
    # Check if the title_col exists in the DataFrame
    if title_col not in df.columns:
        print(f"The column '{title_col}' does not exist in the DataFrame.")
        return

    # Get the distinct study titles
    distinct_study_titles = df.select(title_col).distinct()

    # Select the first 20 distinct study titles
    first_20_distinct_study_titles = distinct_study_titles.limit(20)

    # Display the first 20 distinct study titles
    print(f"First 20 distinct study titles in column '{title_col}':")
    display(first_20_distinct_study_titles)

# Assuming 'clinicaltrial_df' is the DataFrame you want to work with
# Define the column name for 'Study Title'
title_col = "Study Title"  # Assuming the column name for 'Study Title' is 'Study Title'

# Call the function to display the first 20 distinct study titles
display_first_20_distinct_study_titles(clinicaltrial_df, title_col)


In [0]:
import matplotlib.pyplot as plt
import pandas as pd

def plot_distinct_studies(df, id_col, title_col=None):
    """
    Plot the distinct number of studies based on 'Id' and 'Study Title'.

    Args:
        df (DataFrame): The DataFrame containing clinical trial data.
        id_col (str): The column name for 'Id'.
        title_col (str, optional): The column name for 'Study Title'. Defaults to None.

    Returns:
        None
    """
    # Count distinct studies based on 'Id'
    distinct_studies_id = df.select(id_col).distinct().count()

    # Initialize the count of distinct studies based on 'Study Title' to 0
    distinct_studies_title = 0

    # If 'Study Title' column exists in the DataFrame, count distinct studies based on 'Study Title'
    if title_col and title_col in df.columns:
        distinct_studies_title = df.select(title_col).distinct().count()

    # Create a DataFrame for the data
    data = {
        'Category': ['Distinct Studies (Id)', 'Distinct Studies (Study Title)'],
        'Count': [distinct_studies_id, distinct_studies_title]
    }

    df_counts = pd.DataFrame(data)

    # Create a bar plot
    plt.figure(figsize=(8, 6))
    bars = plt.bar(df_counts['Category'], df_counts['Count'], color=['blue', 'green'])

    # Add labels and title
    plt.xlabel('Category')
    plt.ylabel('Count')
    plt.title('Distinct Number of Studies Based on "Id" and "Study Title"')

    # Add hover-over data: annotate bars
    for bar in bars:
        # Get the height of the bar (count value)
        yval = bar.get_height()
        # Add text annotation above the bar to display the count value
        plt.text(bar.get_x() + bar.get_width() / 2, yval, f'{int(yval)}',
                 ha='center', va='bottom', fontsize=10, fontweight='bold')

    # Display the plot
    plt.show()

# Example usage
# Assuming 'clinicaltrial_df' is the DataFrame you want to work with

# Define the column names for 'Id' and 'Study Title'
id_col = "Id"  # Column name for 'Id'
title_col = "Study Title"  # Column name for 'Study Title'

# Call the function to plot the distinct studies
plot_distinct_studies(clinicaltrial_df, id_col, title_col)


#Question 2. You should list all the types (as contained in the Type column) of studies in the dataset along with the frequencies of each type. These should be ordered from most frequent to least frequent.

In [0]:
from pyspark.sql.functions import desc
from pyspark.sql import DataFrame

def group_and_count(df: DataFrame, group_col: str, order_desc: bool = True):
    """
    Group the DataFrame by a specified column and count the occurrences.

    Args:
        df (DataFrame): The DataFrame containing clinical trial data.
        group_col (str): The column name to group by.
        order_desc (bool): Whether to order the results in descending order based on the count. Defaults to True.

    Returns:
        None
    """
    # Group by the specified column and count the occurrences of each group
    grouped_frequencies = df.groupBy(group_col).count()

    # If ordering in descending order is specified, order the result
    if order_desc:
        grouped_frequencies = grouped_frequencies.orderBy(desc("count"))

    # Display the result
    display(grouped_frequencies)

# Assuming 'clinicaltrial_df' is the DataFrame you want to work with
# Define the column name to group by
group_col = "Type"  # Example: you can change this to the column you want to group by

# Call the function to group and count the DataFrame
group_and_count(clinicaltrial_df, group_col)


In [0]:
import matplotlib.pyplot as plt
from pyspark.sql import DataFrame
from pyspark.sql.functions import desc

def plot_type_frequencies(df: DataFrame, group_col: str):
    """
    Plot the frequencies of each type of study in the dataset.

    Args:
        df (DataFrame): The DataFrame containing clinical trial data.
        group_col (str): The column name to group by.

    Returns:
        None
    """
    # Group by the specified column and count the occurrences of each group
    type_frequencies = df.groupBy(group_col).count()

    # Order the result in descending order based on the count
    type_frequencies_ordered = type_frequencies.orderBy(desc("count"))

    # Convert the Spark DataFrame to Pandas DataFrame
    type_frequencies_df = type_frequencies_ordered.toPandas()

    # Check for null values and replace them with 'Unknown'
    type_frequencies_df[group_col] = type_frequencies_df[group_col].fillna("Unknown")

    # Plotting the bar chart
    plt.figure(figsize=(12, 6))
    plt.bar(type_frequencies_df[group_col], type_frequencies_df["count"], color='blue')

    # Add labels and title
    plt.xlabel(f"{group_col} of Study")
    plt.ylabel("Frequency")
    plt.title(f"Frequencies of Each {group_col} of Study in the Dataset")

    # Rotate x-axis labels for better readability
    plt.xticks(rotation=45, ha='right')

    # Display the chart
    plt.show()

# Assuming 'clinicaltrial_df' is the DataFrame you want to work with
# Define the column name to group by
group_col = "Type"  # Example: you can change this to the column you want to group by

# Call the function to plot the frequencies
plot_type_frequencies(clinicaltrial_df, group_col)


#Question 3. The top 5 conditions (from Conditions) with their frequencies.

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import desc, col, split, explode, trim, isnull

def display_top_5_conditions(clinicaltrial_df: DataFrame, fileroot: str):
    """
    Display the top 5 conditions and their frequencies based on the given fileroot.

    Args:
        clinicaltrial_df (DataFrame): The DataFrame containing the data to work with.
        fileroot (str): The root of the file to determine which operations to perform.

    Returns:
        None
    """
    if fileroot == "clinicaltrial_2023":
        # For clinicaltrial_2023, group by "Conditions" column and count occurrences
        condition_frequencies = clinicaltrial_df.groupBy("Conditions").count()
        # Order the result from most frequent to least frequent and limit to the top 5
        top_5_conditions = condition_frequencies.orderBy(desc("count")).limit(5)
        # Display the top 5 conditions with their frequencies
        top_5_conditions.show()

    elif fileroot in ("clinicaltrial_2020", "clinicaltrial_2021"):
        # For clinicaltrial_2020 and clinicaltrial_2021, split the 'conditions' column by commas
        conditions_df = clinicaltrial_df.select(split('conditions', ',').alias('condition_list'))
        # Use explode to create a new DataFrame with one row per condition
        exploded_conditions_df = conditions_df.select(explode('condition_list').alias('condition'))
        # Filter out empty strings and whitespace-only conditions
        exploded_conditions_df = exploded_conditions_df.filter(
            ~isnull(exploded_conditions_df['condition']) & (trim(exploded_conditions_df['condition']) != "")
        )
        # Calculate the frequency of each condition using groupBy and count
        condition_frequency_df = exploded_conditions_df.groupBy('condition').count()
        # Order the conditions by frequency in descending order and limit to the top 5
        top_5_conditions_df = condition_frequency_df.orderBy(desc('count')).limit(5)
        # Display the top 5 conditions and their frequencies
        print("Top 5 conditions and their frequencies:")
        top_5_conditions_df.show()

display_top_5_conditions(clinicaltrial_df, fileroot)

In [0]:
pip install mplcursors

In [0]:
import matplotlib.pyplot as plt
import mplcursors  # Importing mplcursors for hover functionality

def plot_top_5_conditions(clinicaltrial_df, fileroot):
    """
    Plot a bar chart for the top 5 conditions with their frequencies in a given DataFrame
    and incorporate hover functionality using mplcursors to display tooltips.

    Args:
        clinicaltrial_df (DataFrame): The DataFrame containing the data to work with.
        fileroot (str): The root of the file to determine which operations to perform.

    Returns:
        None
    """
    if fileroot == "clinicaltrial_2023":
        # Group by the "Conditions" column and count the occurrences of each condition
        condition_frequencies = clinicaltrial_df.groupBy("Conditions").count()
        # Order the result from most frequent to least frequent and limit to the top 5
        top_5_conditions = condition_frequencies.orderBy(desc("count")).limit(5)
        # Convert the Spark DataFrame to a Pandas DataFrame
        top_5_conditions_df = top_5_conditions.toPandas()

    elif fileroot in ("clinicaltrial_2020", "clinicaltrial_2021"):
        # Step 1: Split the 'conditions' column by commas and alias it as 'condition_list'
        conditions_df = clinicaltrial_df.select(split('conditions', ',').alias('condition_list'))
        # Step 2: Use explode to create a new DataFrame with one row per condition
        exploded_conditions_df = conditions_df.select(explode('condition_list').alias('condition'))
        # Step 3: Filter out empty strings and whitespace-only conditions
        exploded_conditions_df = exploded_conditions_df.filter(
            ~isnull(exploded_conditions_df['condition']) & (trim(exploded_conditions_df['condition']) != "")
        )
        # Step 4: Calculate the frequency of each condition using groupBy and count
        condition_frequency_df = exploded_conditions_df.groupBy('condition').count()
        # Order the conditions by frequency in descending order and limit to the top 5
        top_5_conditions_df = condition_frequency_df.orderBy(desc('count')).limit(5)
        # Convert the result to a Pandas DataFrame
        top_5_conditions_df = top_5_conditions_df.toPandas()

    # Plotting the top 5 conditions
    plt.figure(figsize=(12, 6))

    bars = plt.bar(
        top_5_conditions_df['condition' if fileroot in ("clinicaltrial_2020", "clinicaltrial_2021") else "Conditions"],
        top_5_conditions_df['count'],
        color='blue'
    )

    # Add labels and title
    plt.xlabel('Condition')
    plt.ylabel('Frequency')
    plt.title('Top 5 Conditions with Frequencies')

    # Rotate x-axis labels for better readability
    plt.xticks(rotation=45, ha='right')

    # Enable hover functionality using mplcursors
    cursor = mplcursors.cursor(bars, hover=True)

    # Define the tooltip content
    def on_add(sel):
        condition = top_5_conditions_df.iloc[sel.index][0]  # Access condition value based on fileroot
        count = top_5_conditions_df.iloc[sel.index][1]
        sel.annotation.set_text(f'Condition: {condition}\nCount: {count}')

    # Connect the cursor to the on_add function
    cursor.connect("add", on_add)

    # Display the plot
    plt.show()

# Assuming 'clinicaltrial_df' is the DataFrame you want to work with and 'fileroot' is the identifier
plot_top_5_conditions(clinicaltrial_df, fileroot)


In [0]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, regexp_replace, split, explode, trim, expr, lower, count

def process_conditions(clinicaltrial_df: DataFrame, fileroot: str):
    """
    Process the conditions based on the fileroot value.

    Args:
        clinicaltrial_df (DataFrame): The DataFrame containing the data to work with.
        fileroot (str): The root of the file to determine which operations to perform.

    Returns:
        None
    """
    if fileroot == "clinicaltrial_2023":
        # Step 1: Split the 'Conditions' column by pipe ('|') to create an array
        cleaned_df = clinicaltrial_df.withColumn(
            "Conditions",
            split(col("Conditions"), r"\|")
        )

        # Step 2: Explode the 'Conditions' array to separate rows for each condition
        exploded_df = cleaned_df.withColumn(
            "Conditions",
            explode(col("Conditions"))
        )

        # Step 3: Group the exploded data by 'Conditions' and count occurrences
        condition_counts = exploded_df.groupBy("Conditions").count()

        # Step 4: Order the DataFrame by count in descending order
        condition_counts_sorted = condition_counts.orderBy(col("count").desc())

        # Step 5: Select the top 5 conditions
        top_5_conditions = condition_counts_sorted.limit(5)

        # Display the top 5 conditions with their frequencies
        top_5_conditions.show()

        # Convert PySpark DataFrame to Pandas DataFrame
        top_5_conditions_df = top_5_conditions.toPandas()

        # Plot the top 5 conditions with their frequencies
        plt.figure(figsize=(10, 6))
        sns.barplot(x='count', y='Conditions', data=top_5_conditions_df)
        plt.title('Top 5 Conditions with Their Frequencies')
        plt.xlabel('Frequency')
        plt.ylabel('Conditions')
        plt.show()

    elif fileroot in ("clinicaltrial_2020", "clinicaltrial_2021"):
        print(f"This operation is not applicable to {fileroot} file.")

# Call the function with the appropriate DataFrame and fileroot
process_conditions(clinicaltrial_df, fileroot)


###Additional Cleaning for Question 3 The top 5 conditions (from Conditions) with their frequencies.

In [0]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, regexp_replace, split, explode, trim, expr, lower

def process_clinicaltrial_data(clinicaltrial_df: DataFrame, fileroot: str):
    """
    Process the clinical trial data based on the fileroot.

    Args:
        clinicaltrial_df (DataFrame): The DataFrame containing the data to work with.
        fileroot (str): The root of the file to determine which operations to perform.

    Returns:
        None
    """
    if fileroot == "clinicaltrial_2023":
        # Step 1: Remove quotes and clean the 'Conditions' column and trim leading and trailing spaces
        cleaned_df = clinicaltrial_df.withColumn(
            "Conditions",
            trim(regexp_replace(col("Conditions").cast("string"), r'"', ''))
        )
        # Step 2: Replace square brackets (`[` and `]`) and parentheses (`(` and `)`) with no space in 'Conditions' column
        cleaned_df = cleaned_df.withColumn(
            "Conditions",
            regexp_replace(col("Conditions"), r'[\[\]\(\)]', '')
        )
        # Step 3: Split the 'Conditions' column by pipe ('|') to create an array
        cleaned_df = cleaned_df.withColumn(
            "Conditions",
            split(col("Conditions"), r"\|")
        )
        # Step 4: Explode the 'Conditions' array to separate rows for each condition
        cleaned_df = cleaned_df.withColumn(
            "Conditions",
            explode(col("Conditions"))
        )
        # Step 5: Split each condition by comma (',') and flatten the nested conditions
        cleaned_df = cleaned_df.withColumn(
            "Conditions",
            split(col("Conditions"), ",")
        )
        # Step 6: Explode the array created in Step 5
        cleaned_df = cleaned_df.withColumn(
            "Conditions",
            explode(col("Conditions"))
        )
        # Step 7: Trim leading and trailing whitespaces from the 'Conditions' column
        cleaned_df = cleaned_df.withColumn(
            "Conditions",
            trim(col("Conditions"))
        )
        # Step 8: Filter out rows with the condition 'e.g.' in a case-insensitive manner
        filtered_df = cleaned_df.filter(
            ~lower(col("Conditions")).like("e.g.")
        )

        # Calculate the top 5 conditions with their frequencies
        top_5_conditions = filtered_df.groupBy("Conditions").count()  # Group by 'Conditions' and count occurrences
        top_5_conditions = top_5_conditions.orderBy(col("count").desc())  # Order by count in descending order
        top_5_conditions = top_5_conditions.limit(5)  # Select the top 5 conditions

        # Display the top 5 conditions with their frequencies
        top_5_conditions.show()

        # Convert the top 5 conditions to a Pandas DataFrame
        top_5_conditions_df = top_5_conditions.toPandas()

        # Plot the top 5 conditions with their frequencies
        plt.figure(figsize=(10, 6))
        sns.barplot(x='count', y='Conditions', data=top_5_conditions_df)
        plt.title('Top 5 Conditions with Their Frequencies')
        plt.xlabel('Frequency')
        plt.ylabel('Conditions')
        plt.show()
    else:
        print(f"This operation is not applicable to {fileroot} file.")

# Call the function with the appropriate DataFrame and fileroot
process_clinicaltrial_data(clinicaltrial_df, fileroot)


In [0]:
display(clinicaltrial_df)

In [0]:
fileroot1= "pharma"

In [0]:
#Copying the file to /tmp directory on driver node
dbutils.fs.cp("/FileStore/tables/" + fileroot1+".zip", "file:/tmp/")

In [0]:
#Checking the fileroot.zip in DBFS directory
dbutils.fs.ls("/FileStore/tables/")

In [0]:
#Checking the fileroot.zip in local tmp directory
dbutils.fs.ls("file:/tmp/")

In [0]:
#Making 'fileroot' accessible by the command line
import os
os.environ['fileroot1'] = fileroot1

In [0]:
#Unzipping the file in the local directory

In [0]:
%sh
unzip -d /tmp /tmp/$fileroot1.zip

In [0]:
#Moving the unzipped file to the DBFS directory
dbutils.fs.mv("file:/tmp/" + fileroot1 +".csv" , "/FileStore/tables/", True )

In [0]:
#Checking the unzipped file in DBFS directory
dbutils.fs.ls("/FileStore/tables/")

In [0]:
#Checking the DBFS directory
dbutils.fs.ls ("/FileStore/tables/")

In [0]:
#Listing the contents of the csv file
dbutils.fs.head("/FileStore/tables/" + fileroot1 + ".csv" )

#Question 4. Find the 10 most common sponsors that are not pharmaceutical companies, along with the number of clinical trials they have sponsored. Hint: For a basic implementation, you can assume that the Parent Company column contains all possible pharmaceutical companies.

In [0]:
#reading the csv data into spark data frame using read.csv method
pharmaDF = spark.read.options(delimiter = ',').csv("/FileStore/tables/"+ fileroot1 + ".csv",
                             header = "true",
                             inferSchema = "true")

In [0]:
pharmaDF.display()

In [0]:
#printing the schema for pharma datset
pharmaDF.printSchema()

In [0]:
from pyspark.sql.functions import count, desc
 # From parent company column, extracting the list of all pharmaceutical companies
pharmaceutical_companies = [row[0] for row in pharmaDF.select('Parent_Company').distinct().collect()]
 
# Using clinical trial Sponsor column, filtering out all non pharmaceutical companies
non_pharmaceutical_sponsors = clinicaltrial_df.filter(~clinicaltrial_df['Sponsor'].isin(pharmaceutical_companies))
 
# Count the number of clinical trials sponsored by each sponsor and sort in descending order
tenTopSponsors = non_pharmaceutical_sponsors.groupBy('Sponsor') \
                                          .agg(count("*").alias("numberOfTrials")) \
                                          .sort(desc("numberOfTrials"))
                                          
# Show the top 10 sponsors
tenTopSponsors.show(10, truncate = False)

In [0]:
import matplotlib.pyplot as plt
import matplotlib.cm as cm

# Convert the top 10 sponsors DataFrame to Pandas DataFrame
top_10_sponsors = tenTopSponsors.limit(10).toPandas()

# Create a colormap
cmap = cm.get_cmap('viridis', 10)  # Choose a colormap, e.g., 'viridis'

# Normalize the `numberOfTrials` values to use with the colormap
norm = plt.Normalize(top_10_sponsors['numberOfTrials'].min(), top_10_sponsors['numberOfTrials'].max())

# Plot the data
plt.figure(figsize=(8, 6))  # Adjusting figure size
plt.barh(
    top_10_sponsors['Sponsor'], 
    top_10_sponsors['numberOfTrials'], 
    color=cmap(norm(top_10_sponsors['numberOfTrials']))
)
plt.xlabel('Number of Trials')
plt.ylabel('Sponsor')
plt.title('Top 10 Non-Pharmaceutical Sponsors by Number of Clinical Trials')
plt.gca().invert_yaxis()  # Invert the y-axis for better visualization
plt.colorbar(cm.ScalarMappable(norm=norm, cmap=cmap), label='Number of Trials')  # Add a color bar
plt.show()

#Question 5. Plot number of completed studies for each month in 2023. You need to include your visualization as well as a table of all the values you have plotted for each month.

In [0]:
import pandas as pd
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import col

def process_clinicaltrial(clinicaltrial_df: DataFrame, fileroot: str, trial_year: int):
    """
    Process clinicaltrial data and generate bar and scatter plots for completed studies each month in the specified trial year.

    Args:
        clinicaltrial_df (DataFrame): Spark DataFrame containing the clinicaltrial data.
        fileroot (str): The file root string to determine which operations to perform.
        trial_year (int): The year of interest for the analysis.

    Returns:
        None
    """
    if fileroot == "clinicaltrial_2023":
        # Convert the 'Start' and 'Completion' columns to timestamp data type
        clinicaltrial_df = clinicaltrial_df.withColumn("Start", col("Start").cast("timestamp"))
        clinicaltrial_df = clinicaltrial_df.withColumn("Completion", col("Completion").cast("timestamp"))

        # Function to plot studies each month for the specified completion status and year
        def plot_studies_each_month(completion_status, study_year):
            # Convert the completion status parameter to lowercase
            completion_status = completion_status.lower()
            
            # Filter the DataFrame for studies with the specified completion status and in the specified study year
            studies_completed = clinicaltrial_df.filter(
                (F.lower(clinicaltrial_df.Status) == completion_status) & 
                (F.year(clinicaltrial_df.Completion) == study_year)
            )

            # Extract the month from the 'Completion' column
            studies_completed = studies_completed.withColumn(
                "month", F.month(clinicaltrial_df.Completion)
            )
            
            # Group by the extracted month and count the number of studies
            monthly_basis_results = studies_completed.groupBy("month").count()
            
            # Display the count of completed studies for each month, ordered by month
            monthly_basis_results.orderBy("month").show()
            
            # Collect the results to lists for plotting
            month_number = [row.month for row in monthly_basis_results.collect()]
            no_of_completed_studies = [row["count"] for row in monthly_basis_results.collect()]
            
            return month_number, no_of_completed_studies

        # Call the function with the desired completion status and study year
        month_number, no_of_completed_studies = plot_studies_each_month("Completed", trial_year)

        # Create a bar graph
        plt.figure(figsize=(10, 6))
        bars = plt.bar(month_number, no_of_completed_studies, color='blue')

        # Add counts on top of each bar
        for bar in bars:
            y_val = bar.get_height()  # Get the height of the bar (count)
            plt.text(
                bar.get_x() + bar.get_width() / 2,  # Center the text over the bar
                y_val,  # Place the text at the top of the bar
                str(y_val),  # Convert count to string
                ha='center',  # Center-align the text
                va='bottom'   # Place the text at the bottom (above the bar)
            )

        # Add labels and title
        plt.xlabel('Month')
        plt.ylabel('Number of Completed Studies')
        plt.title(f'Bar Graph of Completed Studies Each Month in {trial_year}')

        # Display the bar plot
        plt.show()

        # Create a scatter plot
        plt.figure(figsize=(10, 6))
        plt.scatter(month_number, no_of_completed_studies, color='blue', marker='o')

        # Add labels and title
        plt.xlabel('Month')
        plt.ylabel('Number of Completed Studies')
        plt.title(f'Scatter Plot of Completed Studies Each Month in {trial_year}')

        # Display the scatter plot
        plt.show()
    
    elif fileroot in ["clinicaltrial_2020", "clinicaltrial_2021"]:
        # Filter the DataFrame to only include completed studies in the specified trial year
        completed_studies_df = clinicaltrial_df.filter(
            (F.col('Status') == 'Completed') &
            (F.year(F.to_date(F.col('Completion'), 'MMM yyyy')) == trial_year)
        )

        # Group the data by month and count the number of completed studies
        monthly_completed_studies_df = completed_studies_df.withColumn(
            'month_year',
            F.date_format(F.to_date(F.col('Completion'), 'MMM yyyy'), 'yyyy-MM')
        ).groupBy('month_year').count()

        # Order the DataFrame by 'month_year' in ascending order
        monthly_completed_studies_df = monthly_completed_studies_df.orderBy('month_year', ascending=True)

        # Show the monthly completed studies in ascending order of months
        monthly_completed_studies_df.show()

        # Convert the DataFrame to a Pandas DataFrame
        monthly_completed_studies_pd_df = monthly_completed_studies_df.toPandas()

        # Plotting the data
        plt.figure(figsize=(10, 6))
        plt.plot(monthly_completed_studies_pd_df['month_year'], monthly_completed_studies_pd_df['count'], marker='o')
        plt.title(f'Number of Completed Studies Each Month in {trial_year}')
        plt.xlabel('Month')
        plt.ylabel('Number of Completed Studies')
        plt.xticks(rotation=45)  # Rotate the x-axis labels for better readability
        plt.grid(True)  # Add grid lines
        plt.show()  # Display the plot


process_clinicaltrial(clinicaltrial_df, fileroot, trial_Year)

#Additional Analyses and its Visualization only for Clinicaltrial_2023

#1. Analysis of Study Status Distribution

In [0]:
import pyspark.sql.functions as func
import matplotlib.pyplot as plt

def analyze_clinicaltrial_status(clinicaltrial_df, fileroot):
    """
    Analyze the status distribution in a clinical trial DataFrame and plot the distribution.

    Args:
        clinicaltrial_df (DataFrame): A Spark DataFrame containing the clinical trial data.
        fileroot (str): The root identifier of the file, e.g., 'clinicaltrial_2023', 'clinicaltrial_2020', etc.

    Returns:
        None
    """
    if fileroot == 'clinicaltrial_2023':
        # Filter the DataFrame to include only records with 'completed' or 'recruiting' status
        status_filtered_df = clinicaltrial_df.filter(
            (func.lower(clinicaltrial_df.Status) == 'completed') |
            (func.lower(clinicaltrial_df.Status) == 'recruiting')
        )

        # Group by the 'Status' column and count the frequency of each status
        status_distribution = status_filtered_df.groupBy("Status").count()

        # Display the status distribution
        status_distribution.show()

        # Plot the status distribution
        status_labels = [row.Status for row in status_distribution.collect()]
        status_counts = [row["count"] for row in status_distribution.collect()]

        plt.figure(figsize=(10, 6))
        plt.bar(status_labels, status_counts, color='blue')
        plt.xlabel('Status')
        plt.ylabel('Frequency')
        plt.title('Distribution of Study Statuses: Completed and Recruiting')
        plt.show()
    else:
        print(f"This additional analysis is not applicable for fileroot = {fileroot}")


analyze_clinicaltrial_status(clinicaltrial_df, fileroot)


#2. Analysis of Start Dates of Studies

In [0]:
import pyspark.sql.functions as func
import matplotlib.pyplot as plt
import seaborn as sns

def analyze_study_start_dates(clinicaltrial_df, fileroot):
    """
    Analyze and plot the distribution of study start dates in a clinical trial DataFrame.

    Args:
        clinicaltrial_df (DataFrame): A Spark DataFrame containing the clinical trial data.
        fileroot (str): The root identifier of the file, e.g., 'clinicaltrial_2023', 'clinicaltrial_2020', etc.

    Returns:
        None
    """
    if fileroot == 'clinicaltrial_2023':
        # Group the data by year and month of the 'Start' column
        start_dates_distribution = clinicaltrial_df.withColumn("year", func.year("Start")) \
            .withColumn("month", func.month("Start")) \
            .groupBy("year", "month") \
            .count()

        # Display the results
        start_dates_distribution.show()

        # Filter the data to include only the years from 2015 to 2023
        filtered_data = start_dates_distribution.filter(
            (start_dates_distribution.year >= 2015) & (start_dates_distribution.year <= 2023)
        )

        # Create a scatter plot
        plt.figure(figsize=(10, 6))

        # Create a color palette with 12 colors (one for each month)
        colors = sns.color_palette("husl", 12)

        # Iterate through each month and plot a scatter plot with each month in different colors
        for month in range(1, 13):  # Months from 1 to 12
            # Filter the data for the current month
            monthly_data = filtered_data.filter(filtered_data.month == month)

            # Extract year and count for the current month
            years = [row.year for row in monthly_data.collect()]
            counts = [row["count"] for row in monthly_data.collect()]

            # Plot the data points for the current month in a specific color
            plt.scatter(years, counts, color=colors[month - 1], label=f'Month {month}', alpha=0.7)

        # Add labels and title
        plt.xlabel('Year')
        plt.ylabel('Number of Studies Started')
        plt.title('Distribution of Study Start Dates from 2015 to 2023')

        # Add a legend with a reduced font size
        plt.legend(loc='upper left', fontsize='small')

        # Show the plot
        plt.show()
    else:
        print(f"This additional analysis is not applicable for fileroot = {fileroot}")


analyze_study_start_dates(clinicaltrial_df, fileroot)


#3. Analysis on Interventions

In [0]:
pip install plotly

In [0]:
import pyspark.sql.functions as func
import pandas as pd
import plotly.express as px

def analyze_interventions(clinicaltrial_df, fileroot):
    """
    Analyze the top 5 intervention types in a clinical trial DataFrame.

    Args:
        clinicaltrial_df (DataFrame): A Spark DataFrame containing the clinical trial data.
        fileroot (str): The root identifier of the file, e.g., 'clinicaltrial_2023', 'clinicaltrial_2020', etc.

    Returns:
        None
    """
    if fileroot == 'clinicaltrial_2023':
        # Step 1: Clean the 'Interventions' column by replacing all double quotes (") with no space
        cleaned_interventions_df = clinicaltrial_df.withColumn(
            "Cleaned_Interventions",
            func.regexp_replace(clinicaltrial_df["Interventions"], '"', '')
        )

        # Step 2: Extract the word before the colon (:)
        cleaned_interventions_df = cleaned_interventions_df.withColumn(
            "Intervention_Type",
            func.split(cleaned_interventions_df["Cleaned_Interventions"], ":").getItem(0)
        )

        # Step 3: Remove leading and trailing spaces from the intervention type
        cleaned_interventions_df = cleaned_interventions_df.withColumn(
            "Intervention_Type",
            func.trim(cleaned_interventions_df["Intervention_Type"])
        )

        # Step 4: Group by 'Intervention_Type' and count the occurrences, order by count in descending order
        intervention_counts = cleaned_interventions_df.groupBy("Intervention_Type").count().orderBy(func.desc("count"))

        # Step 5: Display only the top 5 counts
        top_5_intervention_counts = intervention_counts.limit(5).collect()
        for row in top_5_intervention_counts:
            print(row)

        # Convert the top 5 intervention counts to a pandas DataFrame
        df = pd.DataFrame({
            'Intervention_Type': [row['Intervention_Type'] for row in top_5_intervention_counts],
            'Count': [row['count'] for row in top_5_intervention_counts]
        })

        # Create an interactive bar chart using Plotly
        fig = px.bar(
            df,
            x='Intervention_Type',
            y='Count',
            color='Count',
            color_continuous_scale='Reds',
            title='Top 5 Intervention Types and Their Counts'
        )

        # Customize the layout for better visibility
        fig.update_layout(
            xaxis_title='Intervention Types',
            yaxis_title='Count'
        )

        # Display the interactive plot
        fig.show()
    else:
        print(f"This additional analysis is not applicable for fileroot = {fileroot}")


analyze_interventions(clinicaltrial_df, fileroot)


#Visualization in POWERBI

In [0]:
display(clinicaltrial_df)

In [0]:
#Changing the Start and Completion columns to timestamp data type

from pyspark.sql.functions import col
clinicaltrial_df = clinicaltrial_df.withColumn("Start",col("Start").cast("timestamp"))
clinicaltrial_df = clinicaltrial_df.withColumn("Completion",col("Completion").cast("timestamp"))
display(clinicaltrial_df)

In [0]:
# Write the DataFrame to a CSV file, overwriting any existing file at the specified path
clinicaltrial_df.write.mode("overwrite").csv("/FileStore/tables/CT2023.csv")

In [0]:
%sql
drop table if exists CT2023;
create table CT2023
using csv
options (path "dbfs:/FileStore/tables/CT2023.csv", header "False" , inferSchema "True")

In [0]:
%sql
SHOW TABLES

In [0]:
%sql
SELECT * FROM CT2023

In [0]:
# Define the path of the directory you want to delete
directory_path = "/FileStore/tables/"

# Use dbutils.fs.rm() to delete all files and directories in the specified location
dbutils.fs.rm(directory_path, recurse=True)

# Confirmation message
print(f"All files and directories in the location {directory_path} have been deleted.")