##Data Cleaning & Data loading & Creating Hive Tables

####Please for each clinicaltrial file change the Year variable.

#####variables

In [0]:
#The year of the clinicaltrial file
#Please for each clinicaltrial file change the Year variable.
Year="2021"

In [0]:
#The location of the file 
location="/FileStore/tables/"

#The name of the csv file without year
filename="clinicaltrial"

#File name with year
fileroot =filename+"_"+Year

#Current database 
DB = spark.catalog.currentDatabase()

#Checking the list of content
dbutils.fs.ls(location+filename+"_"+Year+".csv")


Out[21]: [FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2021.csv', name='clinicaltrial_2021.csv', size=50359696, modificationTime=1652377608000)]

#####Functions

In [0]:
#Create a function to rename files from _csv.gz to .csv.gz
def Rename_Files(fileroot):
    #the location of file
    FilePath ="/FileStore/tables/" + fileroot+"_csv.gz"
    try:
        #if file exist:
        if len(dbutils.fs.ls(FilePath))==1:
            #rename file name
            dbutils.fs.mv(FilePath,"/FileStore/tables/"+fileroot+".csv.gz")
            dbutils.fs.ls("/FileStore/tables/" + fileroot+".csv.gz" )
    except:
        #if file does not exist:
        print("The file /FileStore/tables/" + fileroot+"_csv.gz does not exist.")

In [0]:
#Create function to copy .gz files from DBFS to /tmp directory
def Copy_Files_To_Tmp(fileroot):
    try:
        #Copy data from DBFS to file:/tmp/
        dbutils.fs.cp("/FileStore/tables/" + fileroot + ".csv.gz", "file:/tmp/")
        #Declare an environment variable to be usable in shell commands
        import os
        os.environ['fileroot']= fileroot
        dbutils.fs.ls("file:/tmp/" + fileroot + ".csv.gz") 
    except:
        #If the file .csv.gz does not exist
        print("The file /FileStore/tables/" + fileroot + ".csv.gz does not exist.")

In [0]:
#A function to create a Hive table from a csv file. 
def createTable_From_CSV (location, filename,delimiter):
    # File type
    file_type = "csv"
    #Declaring file location based on the file name
    if filename=="clinicaltrial":
        #Append the Year variable to a clinicaltrial file 
        FilePath= location+filename+"_"+Year+"."+file_type 
    else:   
        FilePath = location+filename+"."+file_type #file path  
    # CSV options
    infer_schema = True
    first_row_is_header = True
    
    #creating a dataframe from a csv file
    df = spark.read.format(file_type) \
    .option ("inferSchema", infer_schema) \
    .option ("header", first_row_is_header) \
    .option ("sep", delimiter) \
    .load (FilePath)

    try:
         #Removing the directory if it exists.By default, data is stored in the /user/hive/warehouse directory.
            if len(dbutils.fs.ls("dbfs:/user/hive/warehouse/"+filename))>=1:
                dbutils.fs.rm("dbfs:/user/hive/warehouse/"+filename,True)
                print("dbfs:/user/hive/warehouse/"+filename+ " was removed.")
    except:
            pass
    DB = spark.catalog.currentDatabase()#Current database
    tables = spark.catalog.listTables(DB)#List of tables existed in current database
    table_list= [table.name for table in tables]  #a list of the name of tables
    exists = filename in table_list  #If filename exists in the above list or not
    #if the table doesn not exists convert dataframe to a table
    if exists:
        print("A Table named "+filename+" exist.")
    else:
        df.write.saveAsTable(filename)
        print("A Table named "+filename+" was created.")

#####Rename a file from _csv.gz to .csv.gz

In [0]:
#Control the file exists or not
try:
    print(dbutils.fs.ls("/FileStore/tables/" + fileroot+"_csv.gz" ))
except:
  #If the file does not exist,you get this message: "The file ... does not exist".
    print("The file /FileStore/tables/" + fileroot+"_csv.gz does not exist.")

The file /FileStore/tables/clinicaltrial_2021_csv.gz does not exist.


In [0]:
#Execute the function
Rename_Files(fileroot)

The file /FileStore/tables/clinicaltrial_2021_csv.gz does not exist.


In [0]:
#List of contents
dbutils.fs.ls("/FileStore/tables/")

Out[27]: [FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2019.csv', name='clinicaltrial_2019.csv', size=42400056, modificationTime=1652047090000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2019.csv.gz', name='clinicaltrial_2019.csv.gz', size=10060669, modificationTime=1652045682000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2020.csv', name='clinicaltrial_2020.csv', size=46318151, modificationTime=1652047310000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2020.csv.gz', name='clinicaltrial_2020.csv.gz', size=10981608, modificationTime=1652047241000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2021.csv', name='clinicaltrial_2021.csv', size=50359696, modificationTime=1652377608000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2021.csv.gz', name='clinicaltrial_2021.csv.gz', size=11921810, modificationTime=1652047433000),
 FileInfo(path='dbfs:/FileStore/tables/devicestatus.zip', name='devicestatus.zip', size=23873574, modificationTime=

#####Copy .gz file from dbfs:/FileStore/tables/ to file:/tmp/

In [0]:
#Execute Function for .csv.gz file to copy file from dbfs:/FileStore/tables/ to file:/tmp/
Copy_Files_To_Tmp(fileroot)

In [0]:
#Controlling the temp directory
try:
    print(dbutils.fs.ls("file:/tmp/"+fileroot+".csv.gz"))
except:
    #If the file has not been copied, you get this message: "The file ... does not exist".
    print("The file file:/tmp/"+fileroot+".csv.gz does not exist.")

[FileInfo(path='file:/tmp/clinicaltrial_2021.csv.gz', name='clinicaltrial_2021.csv.gz', size=11921810, modificationTime=1652387341418)]


#####Extract a .gz file:

In [0]:
%sh
gunzip -d /tmp /tmp/$fileroot.csv.gz
  

gzip: /tmp is a directory -- ignored


In [0]:
dbutils.fs.ls("file:/tmp/"+fileroot+".csv")

Out[31]: [FileInfo(path='file:/tmp/clinicaltrial_2021.csv', name='clinicaltrial_2021.csv', size=50359696, modificationTime=1652387341418)]

#####Move the file from local /tmp directory into DBFS

In [0]:
dbutils.fs.mv("file:/tmp/"+fileroot+".csv","/FileStore/tables/" + fileroot + ".csv")

Out[32]: True

In [0]:
dbutils.fs.ls("/FileStore/tables/" + fileroot + ".csv")

Out[33]: [FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2021.csv', name='clinicaltrial_2021.csv', size=50359696, modificationTime=1652387345000)]

##Creating Hive Tables

####Creating Hive Table from Clinicaltrial files

In [0]:
%sql
--Drop table if the table exists
drop table if exists Clinicaltrial


In [0]:
#Create a table from Clinicaltrial file which is a pip delimited file.
createTable_From_CSV(location, filename,"|")

#Controlling the creation of the table
spark.catalog.listTables(DB)


A Table named clinicaltrial was created.
Out[34]: [Table(name='clinicaltrial', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='mesh', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='pharma', database='default', description=None, tableType='MANAGED', isTemporary=False)]

####Creating Hive Table from mesh.csv

In [0]:
%sql
--Dropping the table if the table exists
drop table if exists mesh

In [0]:
#creating mesh table fron mesh.csv
createTable_From_CSV("/FileStore/tables/","mesh",",")
spark.catalog.listTables(DB)

A Table named mesh was created.
Out[35]: [Table(name='clinicaltrial', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='mesh', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='pharma', database='default', description=None, tableType='MANAGED', isTemporary=False)]

####Creating Hive Table from Pharma.csv

In [0]:
%sql
--Dropping the table if the table exists
drop table if exists pharma

In [0]:
#creating pharma table fron pharma.csv
createTable_From_CSV("/FileStore/tables/","pharma",",")
spark.catalog.listTables(DB)

A Table named pharma was created.
Out[36]: [Table(name='clinicaltrial', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='mesh', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='pharma', database='default', description=None, tableType='MANAGED', isTemporary=False)]

In [0]:
#Controlling the hive directory 
dbutils.fs.ls("dbfs:/user/hive/warehouse/")

Out[37]: [FileInfo(path='dbfs:/user/hive/warehouse/clinicaltrial/', name='clinicaltrial/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/hive/warehouse/mesh/', name='mesh/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/hive/warehouse/pairdf/', name='pairdf/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/hive/warehouse/pairtable/', name='pairtable/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/hive/warehouse/pharma/', name='pharma/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/hive/warehouse/zip_level_risk_v1_1/', name='zip_level_risk_v1_1/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/user/hive/warehouse/zip_level_risk_v1_2/', name='zip_level_risk_v1_2/', size=0, modificationTime=0)]

In [0]:
#List of Tables in default database
spark.catalog.listTables(DB)

Out[38]: [Table(name='clinicaltrial', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='mesh', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='pharma', database='default', description=None, tableType='MANAGED', isTemporary=False)]