In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StructType, StructField, StringType
import os
import psycopg2

spark = SparkSession \
        .builder \
        .appName("Create delta tables") \
        .getOrCreate()


In [None]:
# Use the database demodb in the catalog hive_metastore
spark.sql("USE DATABASE demodb;")

# Queries to get data from the postgres database
queries = [
    {
        "table":"airports_data",
        "query":"SELECT a.airport_code, a.airport_name, a.city, a.coordinates FROM public.airports_data a LIMIT 200",
        "columns":["airport_code", "airport_name", "city", "coordinates"] 
        }
    ,{   "table":"aircrafts_data", 
        "query":" SELECT b.aircraft_code, b.model, b.range FROM public.aircrafts_data b LIMIT 200",
        "columns":[ "aircraft_code", "model","range"] 
        } 
    ]

def query_database(query):
    """Get data from postgres database on the database hosting server.
    Args:
        query(Str): The query for getting data from postgres database.
    return: 
        my_cursor(cursor):Returns data.
    """
    my_connection = psycopg2.connect(user="******", password="*****", host="******",port="5432",database="******")
    my_cursor = my_connection.cursor()
    my_cursor.execute(query)
    return my_cursor.fetchall()

def data_postgres_database_to_delta_table():
    """Get data from the postgres database, and save as delta table.
    Args:
        table_name(Str): The table name which contains data.
        columns(list): List of columns name of the table.
    """
    for t in queries:
        df = spark.createDataFrame(query_database(t["query"])).toDF(*t["columns"])
        df.write.format("parquet").mode("overwrite").saveAsTable(t["table"])
    
def dbfs_to_delta_table(path_dbfs):
    """Get data from DBFS.
    Args:
        path_dbfs(Str): The path where csv file are on Databrick File System
    """
    list_of_csv_file = os.listdir(path_dbfs)
    location = path_dbfs.replace("/dbfs","")
    for file in list_of_csv_file:
        drop_table = "DROP TABLE IF EXISTS " + file
        spark.sql(drop_table)
        df = spark.read.format("csv") \
            .option("inferSchema", "true") \
            .option("header", "true") \
            .option("sep",",") \
            .load(location + "/" + file)
        df.write.format("parquet").mode("overwrite").saveAsTable(file[:file.index(".")])
    
data_postgres_database_to_delta_table()
dbfs_to_delta_table("/dbfs/data_of_demodb")


