In [None]:
# Microsoft Fabric Exercises 
# Author: Claudio Mirti, Microsoft
# Disclaimer: This is a public data set and used for demo purpose only.

In [None]:
import requests
import json
import pandas as pd
import datetime
from pyspark.sql import SparkSession


In [None]:
CDC_BASE_URL = 'https://clinicaltrials.gov/api/query/study_fields?expr=COVID-19&max_rnk=1000&fmt=json'

In [None]:
cdc_extract_fields = [
    'BriefTitle',
    'DesignAllocation',
    'DesignMasking',
    'DesignMaskingDescription',
    'InterventionName',
    'InterventionType',
    'LastKnownStatus',
    'OfficialTitle',
    'OutcomeAnalysisStatisticalMethod',
    'OutcomeMeasureTimeFrame',
    'SecondaryOutcomeMeasure',
    'StartDate',
    'StudyFirstPostDate',
    'StudyFirstPostDateType',
    'StudyFirstSubmitDate',
    'StudyFirstSubmitQCDate',
    'StudyPopulation',
    'StudyType',
    'WhyStopped'
]

In [None]:
query_url = f'{CDC_BASE_URL}&fields={",".join(cdc_extract_fields)}'
print(query_url)

In [None]:
r = requests.get(query_url)

In [None]:
# Check we have a successful extract with code 200
r.status_code

In [None]:
# Load the JSON data to a dictionary
j = json.loads(r.content)

In [None]:
# This is quite a flat JSON structure, so can be loaded into a DataFrame
df = pd.DataFrame(j['StudyFieldsResponse']['StudyFields'])

In [None]:
# Some of the fields are single-item lists which can be cleaned
def de_list(input_field):
    if isinstance(input_field, list):
        if len(input_field) == 0:
            return None
        elif len(input_field) == 1:
            return input_field[0]
        else:
            return '; '.join(input_field)
    else:
        return input_field

In [None]:
for c in df.columns:
    df[c] = df[c].apply(de_list)

In [None]:
df['StudyFirstPostDate'] = pd.to_datetime(df.StudyFirstPostDate)
df = df.sort_values(by='StudyFirstPostDate', ascending=False)

In [None]:
df[df.StudyType == 'Interventional'].head(100)

In [None]:
#Create PySpark DataFrame from Pandas
sparkDF=spark.createDataFrame(df) 
sparkDF.printSchema()
sparkDF.show()

In [None]:
# no need to define portfolio_path before
sparkDF.write.mode("overwrite").format("delta").save("Tables/" + "clinicalT")