#### 緣由：
1. 上傳資料至Azure時，需重新賦予每一欄資料型別的資訊(即date type、string type 等)，一般習慣讓spark自動辨認資料型別
2. 根據經驗，spark 自動辨認 data type 的方式常出現資料型別辨認錯誤的情況，常見的錯誤如下：
  * data_month 欄位的 date type 資料被誤認為 string type 或是 timestamp type
  * age 欄位的 integer type 被誤認為 string type
3. 某次上傳表格時發現100多個欄位的寬表格並不適合用「手動校正法」(手動一個一個調整會很辛苦也不切實際)，因此為了快速校正辨認錯誤的資料型別而撰寫此筆記本

#### 此Notebook用途：
* 上傳主表格(main table)後，以相對應的資料型別表(datatype table)來讀入正確的 data type，以此為依據校正主表格的data type之後，再寫入指定的資料庫
* **總共要上傳兩個table，但是只會有main table被寫進資料庫；datatype table僅是用來對照資料框架的，不會被寫進資料庫**

#### 此Notebook使用方法：
* 請clone到自己的Workspace使用，若非必要不須改動程式碼
* 將需要上傳的兩份檔案準備好，詳情請見【預先準備】
* 順著每一個step的順序執行各cell
* **有建立小Widget工具的地方會需要手動去填入需要的參數，填寫完才能繼續跑下一個cell**
* 如有任何疑問，請就近尋找單位中會使用python的同仁協助

#### 預先準備：需先至Azure Databricks首頁上傳兩個檔案
1. 上傳主表格，並複製上傳後Databricks所顯示的檔案名稱: /FileStore/tables/ **"main_table.csv"**
2. 上傳對應的資料型別表，並複製上傳後Databricks所顯示的檔案名稱: /FileStore/tables/ **"datatype_table.csv"**
  - **注意：欄位名稱必須包含COLUMN_NAME,TYPE_NAME**
  - **注意：資料型別表(datatype table)中的欄位順序要和主表格欄位順序完全一致**
  
  
@ version: 2020-08-30

In [0]:
#import package
from pyspark.sql.types import *
from pyspark.sql.functions import *

#remove all widgets
dbutils.widgets.removeAll()

## Step 1 - 讀入主表格

In [0]:
dbutils.widgets.removeAll()
#parameters for dataset
#創建Widget小工具
TFLst = ["true","false"]
FormatLst = ['csv','text','json']

dbutils.widgets.text("FileName","main_table.csv")
dbutils.widgets.dropdown("FileFormat","csv",FormatLst)
dbutils.widgets.dropdown("Header", "true", TFLst)
dbutils.widgets.text("Encoding","utf8")
dbutils.widgets.text("Delimiter",",")

In [0]:
#印出小工具內容物
FILENAME = dbutils.widgets.get("FileName")
FILEFORMAT = dbutils.widgets.get("FileFormat")
HEADER = dbutils.widgets.get("Header")
ENCODING = dbutils.widgets.get("Encoding")
DELIMITER = dbutils.widgets.get("Delimiter")

print('上傳後的檔案名稱:', FILENAME)
print('檔案類型:', FILEFORMAT)
print('讀入column名稱:',HEADER)
print('檔案編碼:',ENCODING)
print('分隔符號:',DELIMITER)
print('====若有輸入錯誤，上方小工具更正後，重跑此cell即可====')


#用上面的小工具讀入file content
df = spark.read.format(FILEFORMAT).options(header = HEADER,#是否具有表頭
                                           inferSchema = "true",#是否需要自動辨識data type
                                           encoding= ENCODING, #指定文檔編碼
                                           delimiter = DELIMITER
                                          ).load("/FileStore/tables/"+FILENAME)
#display df
display(df)


#get parameters for futher use
#get File Name as DatasetName (as the default value when creating the widgt for writing dataset name ) 
DatasetName = dbutils.widgets.get("FileName")

## Step 2 - 讀入資料型別對照表

In [0]:
#remove all widgets
dbutils.widgets.removeAll()

#parameters  for datatype table
#創建Widget小工具
TFLst = ["true","false"]
FormatLst = ['csv','text','json']
dbutils.widgets.text("DataTypeFileName","datatype_table.csv")
dbutils.widgets.dropdown("DataTypeFileFormat","csv",FormatLst)
dbutils.widgets.dropdown("DataTypeFileHeader", "true", TFLst)
dbutils.widgets.text("DataTypeFileEncoding","utf8")
dbutils.widgets.text("DataTypeFileDelimiter",",")

In [0]:
#印出小工具內容物
DataTypeFILENAME = dbutils.widgets.get("DataTypeFileName")
DataTypeFILEFORMAT = dbutils.widgets.get("DataTypeFileFormat")
DataTypeHEADER = dbutils.widgets.get("DataTypeFileHeader")
DataTypeENCODING = dbutils.widgets.get("DataTypeFileEncoding")
DataTypeDELIMITER = dbutils.widgets.get("DataTypeFileDelimiter")

print('上傳後的檔案名稱:', DataTypeFILENAME)
print('檔案類型:', DataTypeFILEFORMAT)
print('讀入column名稱:',DataTypeHEADER)
print('檔案編碼:',DataTypeENCODING)
print('分隔符號:',DataTypeDELIMITER)
print('====若有輸入錯誤，上方小工具更正後，重跑此cell即可====')


#用上面的小工具讀入file content
DataType_df = spark.read.format(DataTypeFILEFORMAT).options(header = DataTypeHEADER,#是否具有表頭
                                           inferSchema = "true",#是否需要自動辨識data type
                                           encoding= DataTypeENCODING, #指定文檔編碼
                                           delimiter = DataTypeDELIMITER
                                          ).load("/FileStore/tables/"+DataTypeFILENAME)

#DatasetName = dbutils.widgets.get("FileName")
display(DataType_df)

## Step 3 - 以資料型別對照表校正後，呈現主表格改動後的資料型態

In [0]:
#create a list for linking main table column name and data type content
ColLst = []
for c in range(len(df.schema.fields)):
  ColLst.append((c+1,
                  df.schema.fields[c].name,
                  DataType_df.select("TYPE_NAME").collect()[c].TYPE_NAME))
  

#display the linked result
ColLstDF =spark.createDataFrame(ColLst,["#",
                                        "主表格欄位名稱",
                                       #"datatype_df Col Name",
                                        "預計的資料型別"])
ColLstDF.show(10000,False)

In [0]:
#get distinct DF data type as a python list
DistinctTypeLst = [i.TYPE_NAME for i in DataType_df.select('TYPE_NAME').distinct().collect()]


#create a list that contains spark data types
TypeLst = [
  'ByteType',
  'ShortType',
  'IntegerType',
  'LongType',
  'FloatType',
  'DoubleType',
  #'DecimalType(n,0)',
  'StringType',
  'BinaryType',
  'BooleanType',
  'TimestampType',
  'DateType',
  #'ArrayType',
  #'MapType',
  #'StructType',
  #'StructField'
]

TypeLst.sort()

#remove widgets for data import
dbutils.widgets.removeAll()

#create widget for data type 
for n in range(len(DistinctTypeLst)):
  dbutils.widgets.combobox(DistinctTypeLst[n],TypeLst[0], TypeLst)

In [0]:
#create a dictionery for changing widget string to function name: 
Dic = {
  'ByteType':ByteType(),
  'ShortType':ShortType(),
  'IntegerType':IntegerType(),
  'LongType':LongType(),
  'FloatType':FloatType(),
  'DoubleType':DoubleType(),
  #'DecimalType':DecimalType(),
  'StringType':StringType(),
  'BinaryType':BinaryType(),
  'BooleanType':BooleanType(),
  'TimestampType':TimestampType(),
  'DateType':DateType(),
  #'ArrayType':ArrayType(),
  #'MapType':MapType(),
  #'StructType':StructType(),
  #'StructField':StructField()
}


#modify schema in a new dataframe
##column name: DFScheLst[n][0]
##DF data type: DFScheLst[n][1]
##new data type: dbutils.widgets.get(DFScheLst[n][0])
##transform data type name to function: Dic[dbutils.widgets.get(DFScheLst[n][0])]

df_2 = df
for n in range(len(ColLst)):
  df_2 = (df_2.withColumn(ColLst[n][1],df[ColLst[n][1]].cast(Dic[dbutils.widgets.get(ColLst[n][2])]))
         ) 

print('====預覽轉換過的資料表樣式====')
display(df_2)

## optional step - drop 不需要寫入資料庫的欄位

In [0]:
dbutils.widgets.removeAll()
dbutils.widgets.text("drop this column","column name")
drop_col = dbutils.widgets.get("drop this column")
print('drop column: ',drop_col)

In [0]:
df_2 = df_2.drop(drop_col)
display(df_2)

## Step 4 - 將表格寫入至資料庫

In [0]:
#check file name
dbutils.widgets.removeAll()
TableName = DatasetName[:-4]
WriteModeLst = ['append','overwrite','ignore','error']

#create widget
dbutils.widgets.text("InputTableName",TableName)
dbutils.widgets.text("InputDBName","自家單位使用的資料庫名稱")
dbutils.widgets.dropdown("WriteMode","overwrite",WriteModeLst)

#get input names
INPPUTDB = dbutils.widgets.get("InputDBName")
INPUTTABLE = dbutils.widgets.get("InputTableName")
WRITEMODE = dbutils.widgets.get("WriteMode")

#print input names
print('寫入資料庫:', INPPUTDB)
print('寫入資料表名稱:', INPUTTABLE)
print('寫入模式:', WRITEMODE)

In [0]:
#write to database
df_2.write.format('delta').saveAsTable(INPPUTDB+'.'+INPUTTABLE,mode = WRITEMODE)