## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [None]:
# Mount from Azure Blob
dbutils.fs.mount(
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/<mount-name>",
  extra_configs = {"<conf-key>":dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")})


In [None]:
# File location and type
file_location1 = "/mnt/<mount-file/Financial_Complaints_Product.csv"
file_location2 = "/mnt/<mount-file/Financial_Complaints_Issue.csv"
file_location3 = "/mnt/<mount-file/Financial_Complaints_Response.csv"
file_location4 = "/mnt/<mount-file/Financial_Complaints_Region.csv"
file_type = "csv"


# The applied options are for CSV files. For other file types, these will be ignored.
df1 = spark.read.csv(file_location1, header=True, inferSchema=True)
df2 = spark.read.csv(file_location2, header=True, inferSchema=True)
df3 = spark.read.csv(file_location3, header=True, inferSchema=True)
df4 = spark.read.csv(file_location4, header=True, inferSchema=True)

In [None]:
df1.printSchema()

In [None]:
# create merged dataframe
from pyspark.sql.functions import col
df = df1.join(df2,["Complaint ID"]) \
     .join(df3,["Complaint ID"]) \
        .join(df4,["Complaint ID"])

In [None]:
df.printSchema()

In [None]:
#conver string to date format
from pyspark.sql.functions import unix_timestamp, from_unixtime
df = df.select(
    'Date Sumbited', 
    from_unixtime(unix_timestamp('Date Sumbited', 'MM/dd/yyy')).alias('Date_Sumbited'), 'Date Received', 
    from_unixtime(unix_timestamp('Date Received', 'MM/dd/yyy')).alias('Date_Received'))

In [None]:
#selected columns

column_list = ['Complaint ID', 'Date_Sumbited', 'Product', 'Sub-product', 'Issue',
       'Sub-issue', 'Company public response', 'State', 'ZIP code',
       'Tags', 'Consumer consent provided?', 'Submitted via', 'Date_Received',
       'Company response to consumer', 'Timely response?',
       'Consumer disputed?']
df = df.select([column for column in df.columns if column in column_list])  

In [None]:
#Load to ADLS
df.write.csv("/mnt/<mount-name>"/Trans_Financial_Complaints, header = True, mode='overwrite')

In [None]:
#try to create a delta table and save the data back to Azure blob.....so delta table appear in dafault table in data bricks but location on Azure blob
deltapath = "/mnt/<mount-name/deltadata"
df.write.format("delta").mode("append").option("path", deltapath).saveAsTable("default.mytableblob")
