# Connection to blob

In [0]:
storageAccount = "datalakefabdeprojetos"
containerName = "ingrid-sollim"
mountpoint = "/mnt/ingrid-sollim/"
storageEndpoint = f"wasbs://{containerName}@{storageAccount}.blob.core.windows.net" 
storageKey = dbutils.secrets.get(scope="Kv-fab-de-proj-dev",key="kv-ingrid-sollim")
storageConn = f"fs.azure.account.key.{storageAccount}.blob.core.windows.net"
try:
    if not any(mount.mountPoint==mountpoint for mount in dbutils.fs.mounts()):
        dbutils.fs.mount(
        source = storageEndpoint,
        mount_point = mountpoint,
        extra_configs = {storageConn:storageKey}
    )
        print(f"{mountpoint} has been mounted")
    else:
        print(f"Mount point '{mountpoint}' is already mounted.")
except Exception as e:
    raise e  # Re-raise the exception if mounting fails


Mount point '/mnt/ingrid-sollim/' is already mounted.


In [0]:
path=mountpoint+"bikes"
display(dbutils.fs.ls(path))

path,name,size,modificationTime
dbfs:/mnt/ingrid-sollim/bikes/customer.csv,customer.csv,1813963,1694474860000
dbfs:/mnt/ingrid-sollim/bikes/person.csv,person.csv,13646947,1694474860000
dbfs:/mnt/ingrid-sollim/bikes/product.csv,product.csv,104823,1694474860000
dbfs:/mnt/ingrid-sollim/bikes/salesorderdetail.csv,salesorderdetail.csv,13801182,1694474860000
dbfs:/mnt/ingrid-sollim/bikes/salesorderheader.csv,salesorderheader.csv,8267704,1694474862000
dbfs:/mnt/ingrid-sollim/bikes/spoffer.csv,spoffer.csv,36680,1694474860000


In [0]:
df = spark.read.option("header",True).options(delimiter=";").csv(path+"/salesorderdetail.csv")
display(df.limit(10))

SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
43659,1,4911-403C-98,1,776,1,2024994,0,2024.994,B207C96D-D9E6-402B-8470-2CC176C42283,2011-05-31 00:00:00.000
43659,2,4911-403C-98,3,777,1,2024994,0,6074.982,7ABB600D-1E77-41BE-9FE5-B9142CFC08FA,2011-05-31 00:00:00.000
43659,3,4911-403C-98,1,778,1,2024994,0,2024.994,475CF8C6-49F6-486E-B0AD-AFC6A50CDD2F,2011-05-31 00:00:00.000
43659,4,4911-403C-98,1,771,1,2039994,0,2039.994,04C4DE91-5815-45D6-8670-F462719FBCE3,2011-05-31 00:00:00.000
43659,5,4911-403C-98,1,772,1,2039994,0,2039.994,5A74C7D2-E641-438E-A7AC-37BF23280301,2011-05-31 00:00:00.000
43659,6,4911-403C-98,2,773,1,2039994,0,4079.988,CE472532-A4C0-45BA-816E-EEFD3FD848B3,2011-05-31 00:00:00.000
43659,7,4911-403C-98,1,774,1,2039994,0,2039.994,80667840-F962-4EE3-96E0-AECA108E0D4F,2011-05-31 00:00:00.000
43659,8,4911-403C-98,3,714,1,288404,0,86.5212,E9D54907-E7B7-4969-80D9-76BA69F8A836,2011-05-31 00:00:00.000
43659,9,4911-403C-98,1,716,1,288404,0,28.8404,AA542630-BDCD-4CE5-89A0-C1BF82747725,2011-05-31 00:00:00.000
43659,10,4911-403C-98,6,709,1,570,0,34.2,AC769034-3C2F-495C-A5A7-3B71CDB25D4E,2011-05-31 00:00:00.000


In [0]:
df.printSchema()

root
 |-- SalesOrderID: string (nullable = true)
 |-- SalesOrderDetailID: string (nullable = true)
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- OrderQty: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- SpecialOfferID: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- UnitPriceDiscount: string (nullable = true)
 |-- LineTotal: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: string (nullable = true)



In [0]:
#display(df.select("SalesOrderDetailID").distinct())

In [0]:
from pyspark.sql.functions import expr, col, regexp_replace,regexp_extract,when, to_date,to_timestamp

#Check for missing data

In [0]:
def replace_string_null(df, columns):
    for column_name in columns:
        df = df.withColumn(column_name, when(col(column_name) == "NULL", None).otherwise(col(column_name)))
    return df

# Example usage:
columns_to_replace = df.columns
null_replaced = replace_string_null(df, columns_to_replace)
display(null_replaced.limit(10))

SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
43659,1,4911-403C-98,1,776,1,2024994,0,2024.994,B207C96D-D9E6-402B-8470-2CC176C42283,2011-05-31 00:00:00.000
43659,2,4911-403C-98,3,777,1,2024994,0,6074.982,7ABB600D-1E77-41BE-9FE5-B9142CFC08FA,2011-05-31 00:00:00.000
43659,3,4911-403C-98,1,778,1,2024994,0,2024.994,475CF8C6-49F6-486E-B0AD-AFC6A50CDD2F,2011-05-31 00:00:00.000
43659,4,4911-403C-98,1,771,1,2039994,0,2039.994,04C4DE91-5815-45D6-8670-F462719FBCE3,2011-05-31 00:00:00.000
43659,5,4911-403C-98,1,772,1,2039994,0,2039.994,5A74C7D2-E641-438E-A7AC-37BF23280301,2011-05-31 00:00:00.000
43659,6,4911-403C-98,2,773,1,2039994,0,4079.988,CE472532-A4C0-45BA-816E-EEFD3FD848B3,2011-05-31 00:00:00.000
43659,7,4911-403C-98,1,774,1,2039994,0,2039.994,80667840-F962-4EE3-96E0-AECA108E0D4F,2011-05-31 00:00:00.000
43659,8,4911-403C-98,3,714,1,288404,0,86.5212,E9D54907-E7B7-4969-80D9-76BA69F8A836,2011-05-31 00:00:00.000
43659,9,4911-403C-98,1,716,1,288404,0,28.8404,AA542630-BDCD-4CE5-89A0-C1BF82747725,2011-05-31 00:00:00.000
43659,10,4911-403C-98,6,709,1,570,0,34.2,AC769034-3C2F-495C-A5A7-3B71CDB25D4E,2011-05-31 00:00:00.000


#Change Data types

In [0]:
null_replaced.printSchema()

root
 |-- SalesOrderID: string (nullable = true)
 |-- SalesOrderDetailID: string (nullable = true)
 |-- CarrierTrackingNumber: string (nullable = true)
 |-- OrderQty: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- SpecialOfferID: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- UnitPriceDiscount: string (nullable = true)
 |-- LineTotal: string (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- ModifiedDate: string (nullable = true)



In [0]:
#Replace comma for period
def replace_comma_with_period(df, columns_to_replace):
    for column in columns_to_replace:
        df = df.withColumn(column, regexp_replace(col(column), ",", "."))

    return df
#List of columns to replace
comma_replaced = ["UnitPrice","UnitPriceDiscount","LineTotal"]

#replaced df
comma_replaced = replace_comma_with_period(null_replaced, columns_to_replace)
display(comma_replaced.limit(10))


SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
43659,1,4911-403C-98,1,776,1,2024.994,0.0,2024.994,B207C96D-D9E6-402B-8470-2CC176C42283,2011-05-31 00:00:00.000
43659,2,4911-403C-98,3,777,1,2024.994,0.0,6074.982,7ABB600D-1E77-41BE-9FE5-B9142CFC08FA,2011-05-31 00:00:00.000
43659,3,4911-403C-98,1,778,1,2024.994,0.0,2024.994,475CF8C6-49F6-486E-B0AD-AFC6A50CDD2F,2011-05-31 00:00:00.000
43659,4,4911-403C-98,1,771,1,2039.994,0.0,2039.994,04C4DE91-5815-45D6-8670-F462719FBCE3,2011-05-31 00:00:00.000
43659,5,4911-403C-98,1,772,1,2039.994,0.0,2039.994,5A74C7D2-E641-438E-A7AC-37BF23280301,2011-05-31 00:00:00.000
43659,6,4911-403C-98,2,773,1,2039.994,0.0,4079.988,CE472532-A4C0-45BA-816E-EEFD3FD848B3,2011-05-31 00:00:00.000
43659,7,4911-403C-98,1,774,1,2039.994,0.0,2039.994,80667840-F962-4EE3-96E0-AECA108E0D4F,2011-05-31 00:00:00.000
43659,8,4911-403C-98,3,714,1,28.8404,0.0,86.5212,E9D54907-E7B7-4969-80D9-76BA69F8A836,2011-05-31 00:00:00.000
43659,9,4911-403C-98,1,716,1,28.8404,0.0,28.8404,AA542630-BDCD-4CE5-89A0-C1BF82747725,2011-05-31 00:00:00.000
43659,10,4911-403C-98,6,709,1,5.7,0.0,34.2,AC769034-3C2F-495C-A5A7-3B71CDB25D4E,2011-05-31 00:00:00.000


In [0]:
#Change date
def change_date_format(df, columns_to_change):
    for column in columns_to_change:
        df = df.withColumn(column, to_date(to_timestamp(col(column), "yyyy-MM-dd HH:mm:ss.SSS"), "yyyy-MM-dd"))

    return df

#Change to int
def change_data_type_to_int(df, columns_to_change):
    for column in columns_to_change:
        df = df.withColumn(column, col(column).cast("int"))

    return df

#Change to float
def change_data_type_to_float(df, columns_to_change):
    for column in columns_to_change:
        df = df.withColumn(column, col(column).cast("float"))

    return df

In [0]:
# Define the list of columns to change date format
columns_to_change_date = ["ModifiedDate"]
result_date = change_date_format(comma_replaced, columns_to_change_date)

#Columns to change to int
columns_to_change_int = ["SalesOrderID","SalesOrderDetailID","OrderQty","ProductID","SpecialOfferID"]
result_int=change_data_type_to_int(result_date,columns_to_change_int)

#Columns to change to float
columns_to_change_float = ["UnitPrice","UnitPriceDiscount","LineTotal"]
result_float = change_data_type_to_float(result_int,columns_to_change_float)

display(result_float.limit(10))


SalesOrderID,SalesOrderDetailID,CarrierTrackingNumber,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,rowguid,ModifiedDate
43659,1,4911-403C-98,1,776,1,2024.994,0.0,2024.994,B207C96D-D9E6-402B-8470-2CC176C42283,2011-05-31
43659,2,4911-403C-98,3,777,1,2024.994,0.0,6074.982,7ABB600D-1E77-41BE-9FE5-B9142CFC08FA,2011-05-31
43659,3,4911-403C-98,1,778,1,2024.994,0.0,2024.994,475CF8C6-49F6-486E-B0AD-AFC6A50CDD2F,2011-05-31
43659,4,4911-403C-98,1,771,1,2039.994,0.0,2039.994,04C4DE91-5815-45D6-8670-F462719FBCE3,2011-05-31
43659,5,4911-403C-98,1,772,1,2039.994,0.0,2039.994,5A74C7D2-E641-438E-A7AC-37BF23280301,2011-05-31
43659,6,4911-403C-98,2,773,1,2039.994,0.0,4079.988,CE472532-A4C0-45BA-816E-EEFD3FD848B3,2011-05-31
43659,7,4911-403C-98,1,774,1,2039.994,0.0,2039.994,80667840-F962-4EE3-96E0-AECA108E0D4F,2011-05-31
43659,8,4911-403C-98,3,714,1,28.8404,0.0,86.5212,E9D54907-E7B7-4969-80D9-76BA69F8A836,2011-05-31
43659,9,4911-403C-98,1,716,1,28.8404,0.0,28.8404,AA542630-BDCD-4CE5-89A0-C1BF82747725,2011-05-31
43659,10,4911-403C-98,6,709,1,5.7,0.0,34.2,AC769034-3C2F-495C-A5A7-3B71CDB25D4E,2011-05-31


In [0]:
result_float.columns

Out[41]: ['SalesOrderID',
 'SalesOrderDetailID',
 'CarrierTrackingNumber',
 'OrderQty',
 'ProductID',
 'SpecialOfferID',
 'UnitPrice',
 'UnitPriceDiscount',
 'LineTotal',
 'rowguid',
 'ModifiedDate']

In [0]:
column_mapping = {
'SalesOrderID':'sales_order_id',
 'SalesOrderDetailID':'sales_order_detail_id',
 'CarrierTrackingNumber':'carrier_tracking_number',
 'OrderQty':'order_qtd',
 'ProductID':'product_id',
 'SpecialOfferID':'special_offer_id',
 'UnitPrice':'unit_price',
 'UnitPriceDiscount':'unit_price_discount',
 'LineTotal':'line_total',
 'rowguid':'rowguid',
 'ModifiedDate':'modified_date'
}

# Create a list of column expressions with aliases
select_expr = [col(old_col).alias(new_col) for old_col, new_col in column_mapping.items()]

# Apply the column renaming using select and alias
sales_detail = result_float.select(*select_expr)
display(sales_detail.limit(10))

sales_order_id,sales_order_detail_id,carrier_tracking_number,order_qtd,product_id,special_offer_id,unit_price,unit_price_discount,line_total,rowguid,modified_date
43659,1,4911-403C-98,1,776,1,2024.994,0.0,2024.994,B207C96D-D9E6-402B-8470-2CC176C42283,2011-05-31
43659,2,4911-403C-98,3,777,1,2024.994,0.0,6074.982,7ABB600D-1E77-41BE-9FE5-B9142CFC08FA,2011-05-31
43659,3,4911-403C-98,1,778,1,2024.994,0.0,2024.994,475CF8C6-49F6-486E-B0AD-AFC6A50CDD2F,2011-05-31
43659,4,4911-403C-98,1,771,1,2039.994,0.0,2039.994,04C4DE91-5815-45D6-8670-F462719FBCE3,2011-05-31
43659,5,4911-403C-98,1,772,1,2039.994,0.0,2039.994,5A74C7D2-E641-438E-A7AC-37BF23280301,2011-05-31
43659,6,4911-403C-98,2,773,1,2039.994,0.0,4079.988,CE472532-A4C0-45BA-816E-EEFD3FD848B3,2011-05-31
43659,7,4911-403C-98,1,774,1,2039.994,0.0,2039.994,80667840-F962-4EE3-96E0-AECA108E0D4F,2011-05-31
43659,8,4911-403C-98,3,714,1,28.8404,0.0,86.5212,E9D54907-E7B7-4969-80D9-76BA69F8A836,2011-05-31
43659,9,4911-403C-98,1,716,1,28.8404,0.0,28.8404,AA542630-BDCD-4CE5-89A0-C1BF82747725,2011-05-31
43659,10,4911-403C-98,6,709,1,5.7,0.0,34.2,AC769034-3C2F-495C-A5A7-3B71CDB25D4E,2011-05-31


In [0]:
sales_detail.printSchema()

root
 |-- sales_order_id: integer (nullable = true)
 |-- sales_order_detail_id: integer (nullable = true)
 |-- carrier_tracking_number: string (nullable = true)
 |-- order_qtd: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- special_offer_id: integer (nullable = true)
 |-- unit_price: float (nullable = true)
 |-- unit_price_discount: float (nullable = true)
 |-- line_total: float (nullable = true)
 |-- rowguid: string (nullable = true)
 |-- modified_date: date (nullable = true)



#Ingest the cleaned data to the database

### Connection to the database

In [0]:
# Get the secrets from the jdbc secret scope.
username = dbutils.secrets.get(scope="Kv-fab-de-proj-dev", key="kv-soll-dblogin")
password = dbutils.secrets.get(scope="Kv-fab-de-proj-dev", key="kv-soll-dbpass")

jdbcHostname = "srv-fab-projetos.database.windows.net"
jdbcPort = 1433
jdbcDatabase = "db-fab-projetos"

jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)

connectionProperties = {
  "user" : username,
  "password" : password,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

table_name = 'sollim.sales_detail'

### Insert into the database


In [0]:
sales_detail.write \
  .format("jdbc") \
  .option("url", jdbcUrl) \
  .option("dbtable", table_name) \
  .mode("overwrite") \
  .options(**connectionProperties) \
  .save()

In [0]:
dbutils.fs.unmount(mountpoint)

/mnt/ingrid-sollim/ has been unmounted.
Out[46]: True