In [0]:
%run /Repos/gowdhaman.bj@diggibyte.com/databricks_assignment/source_to_bronze/utils


In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import *

In [0]:
dept_schema = StringType([
    StructField("DepartmentId", StringType(), True),
    StructField("DepartmentName", StringType(), True)
])

employee_schema = StringType([
    StructField("EmployeeId", IntegerType(), True),
    StructField("EmployeeName", StringType(), True),
    StructField("Department", StringType(), True),
    StructField("Country", StringType(), True),
    StructField("Salary", IntegerType(), True),
    StructField("Age", IntegerType(), True)
])

country_schema = StringType([
    StructField("CountryCode", StringType(), True),
    StructField("CountryName", StringType(), True)
])

In [0]:
country_csv_file = '/Volumes/databricks_assignment/documents/csvfiles/Country-Q1.csv'
department_csv_file = '/Volumes/databricks_assignment/documents/csvfiles/Department-Q1.csv'
employee_csv_file ='/Volumes/databricks_assignment/documents/csvfiles/Employee-Q1.csv'

file_format = 'csv'

In [0]:
read_dept = read_file(department_csv_file,file_format,schema = dept_schema,header=True)
read_country = read_file(country_csv_file,file_format,schema=country_schema,header=True)
read_employee = read_file(employee_csv_file,file_format,schema=employee_schema,header=True)
display(read_dept)
display(read_country)
display(read_employee)

convert the Camel case of the columns to the snake case using UDF. 

In [0]:
import re

In [0]:
def camel_to_snake_case(camel_str):
     s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', camel_str)
     return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

def rename_columns_to_snake_case(df):
    new_cols = [camel_to_snake_case(col) for col in df.columns]
    return df.toDF(*new_cols)


In [0]:
read_dept = rename_columns_to_snake_case(read_dept)
read_country = rename_columns_to_snake_case(read_country)
read_employee = rename_columns_to_snake_case(read_employee)

# Show results
display(read_dept)
display(read_country)
display(read_employee)

Add the load_date column with the current date. 

In [0]:
employee_load_date = read_employee.withColumn('load_date', current_date())
display(employee_load_date)

The primary key is EmployeeID, the Database name is Employee_info, Table name is dim_employee. 

write the DF as a delta table to the location /silver/db_name/table_name. 

In [0]:
employee_load_date.write.format('delta').mode('overwrite').option("overwriteSchema", "true").saveAsTable('databricks_assignment.employe_info.dim_employee')

In [0]:
employee_load_date.write.format('delta').mode('overwrite').option("overwriteSchema", "true").save('/Volumes/databricks_assignment/default/employee_info_table')