In [0]:
# import all neccesary functions
import logging
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [0]:
# logger config
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler()  # logs display in console
    ]
)
logger = logging.getLogger("ETL_PIPELINE")

# ABFSS

In [0]:
storage="storagedbrics"
app_id=dbutils.secrets.get(scope="kamilScope", key="appID")
ten_id=dbutils.secrets.get(scope="kamilScope", key="tenID")
secret_id=dbutils.secrets.get(scope="kamilScope", key="secretID")

In [0]:
spark.conf.set(f"fs.azure.account.auth.type.{storage}.dfs.core.windows.net", "OAuth")
spark.conf.set(f"fs.azure.account.oauth.provider.type.{storage}.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set(f"fs.azure.account.oauth2.client.id.{storage}.dfs.core.windows.net",app_id)
spark.conf.set(f"fs.azure.account.oauth2.client.secret.{storage}.dfs.core.windows.net", secret_id)
spark.conf.set(f"fs.azure.account.oauth2.client.endpoint.{storage}.dfs.core.windows.net", f"https://login.microsoftonline.com/{ten_id}/oauth2/token")

In [0]:
file=dbutils.fs.ls(f"abfss://source@{storage}.dfs.core.windows.net")
path=file[0].path

# Bronze Layer

In [0]:
logger.info("Begginig of Bronze layer file")
try:
  df=spark.read.format("csv")\
              .option("header", True)\
              .option("inferSchema", True)\
              .load(path)
  logger.info(f"File readed succesfully")
except Exception as e:
  logger.error(f"Error reading file: {e}")

In [0]:
# counting null values in each column
columns=df.columns

for column in columns:
    null_count=df.filter(col(column).isNull()).count()
    print(f" Number of nulls in {column}: {null_count}")



 Number of nulls in LK_SO_NUMBER_TXT: 0
 Number of nulls in LK_SO_ITEM_TXT: 0
 Number of nulls in GEOHI_ID_LVL1_TXT: 0
 Number of nulls in GEOHI_ID_LVL2_TXT: 0
 Number of nulls in GEOHI_ID_LVL3_TXT: 0
 Number of nulls in GEOHI_ID_LVL4_TXT: 0
 Number of nulls in LK_CUSTOMER_ID_TXT: 0
 Number of nulls in LK_MATERIAL_NUMBER_TXT: 0
 Number of nulls in SAHI_ID_LVL4_TXT: 0
 Number of nulls in SAHI_ID_LVL5_TXT: 0
 Number of nulls in SAHI_ID_LVL6_TXT: 0
 Number of nulls in SAHI_ID_LVL7_TXT: 0
 Number of nulls in OTDR_EXT_FLG: 0
 Number of nulls in SI_CPO_CREATION_DAT: 0
 Number of nulls in SI_SO_CREATION_DAT: 0
 Number of nulls in READY_TO_SHIP_DAT: 0
 Number of nulls in CUST_REQ_DELIVERY_DATE_DAT: 0
 Number of nulls in SI_VENDOR_ID_TXT: 0
 Number of nulls in VAL_PLANT_ID_TXT: 0


We see that there are no typical nulls values in df. But when we display our df we can clearly see that we have "NULL" values in string format

In [0]:
# counting nulls in string format

for column in df.columns:
    null_str_count=df.filter(col(column).cast(StringType()).isin("NULL", "Null", "Nan", "nan")).count()
    print(f" Number of 'null' string types in {column}: {null_str_count} ")

 Number of 'null' string types in LK_SO_NUMBER_TXT: 0 
 Number of 'null' string types in LK_SO_ITEM_TXT: 0 
 Number of 'null' string types in GEOHI_ID_LVL1_TXT: 2049 
 Number of 'null' string types in GEOHI_ID_LVL2_TXT: 2049 
 Number of 'null' string types in GEOHI_ID_LVL3_TXT: 2049 
 Number of 'null' string types in GEOHI_ID_LVL4_TXT: 2049 
 Number of 'null' string types in LK_CUSTOMER_ID_TXT: 0 
 Number of 'null' string types in LK_MATERIAL_NUMBER_TXT: 0 
 Number of 'null' string types in SAHI_ID_LVL4_TXT: 2049 
 Number of 'null' string types in SAHI_ID_LVL5_TXT: 2049 
 Number of 'null' string types in SAHI_ID_LVL6_TXT: 2049 
 Number of 'null' string types in SAHI_ID_LVL7_TXT: 2049 
 Number of 'null' string types in OTDR_EXT_FLG: 0 
 Number of 'null' string types in SI_CPO_CREATION_DAT: 13803 
 Number of 'null' string types in SI_SO_CREATION_DAT: 0 
 Number of 'null' string types in READY_TO_SHIP_DAT: 6753 
 Number of 'null' string types in CUST_REQ_DELIVERY_DATE_DAT: 2855 
 Number o

In [0]:
# write data to Bronze layer

logger.info("Saving data to bronze layer")
try:
    df.write.format("delta")\
        .mode("overwrite")\
        .save(f"abfss://destination@{storage}.dfs.core.windows.net/bronze")
    logger.info("Data saved succesfully to bronze layer")
except:
    logger.error("Error during saving data to bronze layer")
# reading bronze layer data to silver

# Silver layer

In [0]:
# reading bronze layer data to silver


logger.info("Reading data from Bronze layer to Silver layer")
try:
df_silver=spark.read.format("delta")\
                    .option("header", True)\
                    .option("inferSchema", True)\
                    .load(f"abfss://destination@{storage}.dfs.core.windows.net/bronze")
  logger.info("Data succesfully readed from Bronze layer to Silver layer")
except Exception as e:
  logger.error(f"Error reading file from Bronze layer to Silver layer: {e}")


## 1. changing values in column from string "Null" to Null

In [0]:
for column in df.columns:
    df_silver=df_silver.withColumn(column, when(trim(lower(col(column))).isin("null"),None).otherwise(col(column)))

# count of rows in df_silver with fake nulls
df_silver.count()

138075

## 2. Droping Null values, drop duplicates, casting date and timestamp columns

In [0]:
df_silver=(
    df_silver
    .dropna(how="all")
    .dropDuplicates()

    # Columns casting to timestamp
    .withColumn('SI_CPO_CREATION_DAT', to_timestamp(col('SI_CPO_CREATION_DAT'),"dd/MM/yyyy HH:mm"))
    .withColumn('SI_SO_CREATION_DAT', to_timestamp(col('SI_SO_CREATION_DAT'),"dd/MM/yyyy HH:mm"))
    .withColumn('READY_TO_SHIP_DAT', to_timestamp(col('READY_TO_SHIP_DAT'),"dd/MM/yyyy HH:mm"))
    .withColumn('CUST_REQ_DELIVERY_DATE_DAT', to_timestamp(col('CUST_REQ_DELIVERY_DATE_DAT'),"dd/MM/yyyy HH:mm")
              
))

LK_SO_NUMBER_TXT,LK_SO_ITEM_TXT,GEOHI_ID_LVL1_TXT,GEOHI_ID_LVL2_TXT,GEOHI_ID_LVL3_TXT,GEOHI_ID_LVL4_TXT,LK_CUSTOMER_ID_TXT,LK_MATERIAL_NUMBER_TXT,SAHI_ID_LVL4_TXT,SAHI_ID_LVL5_TXT,SAHI_ID_LVL6_TXT,SAHI_ID_LVL7_TXT,OTDR_EXT_FLG,SI_CPO_CREATION_DAT,SI_SO_CREATION_DAT,READY_TO_SHIP_DAT,CUST_REQ_DELIVERY_DATE_DAT,SI_VENDOR_ID_TXT,VAL_PLANT_ID_TXT
1132770344,300,RG_MEA,RU_NA,RU MEA-NA-NA,LY,206146,473751A,40010.0,50018.0,600583.0,700936.0,1,2024-01-04T00:00:00Z,2024-01-25T13:08:00Z,2024-06-03T00:00:00Z,2024-06-26T00:00:00Z,,FI25
1132693206,100,RG_LAM,RU_LAS,RU LAT-LAS-LAS,BR,270099,475647A,4007.0,500858.0,600862.0,700295.0,1,,2024-01-05T18:47:00Z,2024-01-05T00:00:00Z,2024-01-24T00:00:00Z,,BR49
1132687821,130,RG_INDIA,RU_IND,RU IND-IND-IND,IN,273692,3HE00027CA,4009.0,50040.0,600539.0,700540.0,0,2023-12-29T00:00:00Z,2024-01-04T19:54:00Z,,2025-09-22T00:00:00Z,NIN8017,IN08
1132693440,270,RG_NAM,RU_US,RU NAM-US-US,US,219396,475124A.102,4006.0,500654.0,60082.0,700307.0,1,2024-01-05T19:00:00Z,2024-01-05T20:27:00Z,2024-01-18T00:00:00Z,2024-02-29T00:00:00Z,,FI42
1132705809,470,RG_EUROPE,RU_SEE,RU EUR-SEE-IBI,IT,206823,473246A,4004.0,500102.0,600794.0,700809.0,1,2023-12-21T00:00:00Z,2024-01-09T16:42:00Z,2024-02-08T00:00:00Z,2024-03-09T00:00:00Z,,FI25
1132711356,1060,RG_NAM,RU_CA,RU NAM-CA-CA,CA,219485,471605A,4006.0,50039.0,60085.0,701015.0,1,2023-12-25T00:00:00Z,2024-01-10T23:53:00Z,2024-03-14T00:00:00Z,2024-04-15T00:00:00Z,,FI25
1132718149,140,RG_MEA,RU_NA,RU MEA-NA-NA,LY,206146,474580A,40010.0,50018.0,600583.0,700936.0,0,2024-01-04T00:00:00Z,2024-01-12T10:15:00Z,2024-05-21T00:00:00Z,2024-04-20T00:00:00Z,,FI25
1132720622,70,RG_NAM,RU_US,RU NAM-US-US,US,215642,474901A,4006.0,500534.0,60086.0,700315.0,1,2024-01-10T23:48:00Z,2024-01-12T21:12:00Z,2024-02-29T00:00:00Z,2024-04-01T00:00:00Z,,FI42
1132713094,650,RG_EUROPE,RU_SEE,RU EUR-SEE-IBI,IT,206823,475339A,4004.0,500102.0,600794.0,700809.0,1,2023-12-27T00:00:00Z,2024-01-11T11:26:00Z,2024-02-06T00:00:00Z,2024-03-08T00:00:00Z,,FI25
1132717493,340,RG_MEA,RU_NA,RU MEA-NA-NA,LY,206146,CS7000156.00,40010.0,50018.0,600583.0,700936.0,1,2024-01-04T00:00:00Z,2024-01-12T07:58:00Z,2024-05-20T00:00:00Z,2024-06-26T00:00:00Z,,FI26


In [0]:
df_silver=(
    df_silver
    .withColumn("DIFF_DATE_CPO_SHIP_DAYS", date_diff(end=col("READY_TO_SHIP_DAT"), start=col("SI_CPO_CREATION_DAT")))
    .withColumn("PERIOD", date_format(col("SI_CPO_CREATION_DAT"), "yyyy-MM"))
    .withColumn("PERIOD", col("PERIOD").cast(StringType()))
    # rename columns
    .withColumnRenamed("GEOHI_ID_LVL4_TXT", "COUNTRY")
)


In [0]:
# writing data to silver Layer

logger.info("Saving data to silver layer")
try:
    df_silver.write.format("delta")\
        .mode("append")\
        .save(f"abfss://destination@{storage}.dfs.core.windows.net/silver")
    legger.info("Data succesfully saved to silver layer")
except Exception as e:
    loger.error(f"Error during saving data to silver layer: {e}")

# GOLD LAYER

In [0]:
# reading data from silver to gold layer

logger.info("Reading data from silver to gold layer")
try:
    df_gold=spark.read.format("delta")\
        .load(f"abfss://destination@{storage}.dfs.core.windows.net/silver")
    logger.info("Data succesfully readed from silver to gold layer")
except Exception as e:
    logger.error(f"Error reading data from silver to gold layer: {e}")

## Transforming and agregating data

In [0]:
df_gold_grouped=df_gold.groupBy("COUNTRY","PERIOD").agg(
    sum(col("OTDR_EXT_FLG")).alias("SUM_OTDR"),
    count(col("OTDR_EXT_FLG")).alias("TOTAL_OTDR")
)

df_gold_grouped=df_gold_grouped.withColumn("PTC_OTDR", round((col("SUM_OTDR")/col("TOTAL_OTDR"))*100,2) )
df_gold_grouped=df_gold_grouped.drop("SUM_OTDR","TOTAL_OTDR")

In [0]:
# writing data to gold layer

logger.info("Saving data to gold layer")
try:
    df_gold_grouped.write.format("delta")\
        .mode("overwrite")\
        .save(f"abfss://destination@{storage}.dfs.core.windows.net/gold")
    logger.info("Data succesfully saved to gold layer")
except Exception as e:
    logger.error(f"Error during saving data to gold layer: {e}")4