In [0]:
%py
dbutils.fs.rm("/user/hive/warehouse/bd_finals.db/", True)

True

Created a database bd_finals

In [0]:
%sql
create database if not exists bd_finals;
use bd_finals;

In [0]:
dbutils.fs.mkdirs("dbfs:/user/hive/warehouse/bd_finals.db/stocks")

True

copying data to my database, Bronze folder

In [0]:
dbutils.fs.cp("dbfs:/FileStore/batch_1_data.csv", 
              "dbfs:/user/hive/warehouse/bd_finals.db/stocks/bronze/batch_1_data.csv")

True

Reading Data from bronze

In [0]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dbfs:/user/hive/warehouse/bd_finals.db/stocks/bronze/batch_1_data.csv")

In [0]:
df.limit(5).display()

symbol,t,o,h,l,c,v,n
GOOGL,2022-12-15T09:01:00Z,93.73,93.74,93.55,93.55,1519.0,52
GOOGL,2022-12-15T09:02:00Z,93.49,93.51,93.45,93.45,992.0,33
GOOGL,2022-12-15T09:03:00Z,93.45,93.45,93.25,93.31,2008.0,43
GOOGL,2022-12-15T09:04:00Z,93.31,93.5,93.31,93.36,1658.0,52
GOOGL,2022-12-15T09:05:00Z,93.38,93.38,93.34,93.34,605.0,19


In [0]:
df.printSchema()

2024-12-19 19:27:34,096 - INFO - Received command c on object id p0


root
 |-- symbol: string (nullable = true)
 |-- t: timestamp (nullable = true)
 |-- o: double (nullable = true)
 |-- h: double (nullable = true)
 |-- l: double (nullable = true)
 |-- c: double (nullable = true)
 |-- v: double (nullable = true)
 |-- n: integer (nullable = true)



**Size of dataset**

In [0]:
rows=df.count()
columns=len(df.columns)
display(f"shape: {rows},{columns}")

2024-12-19 19:27:34,195 - INFO - Received command c on object id p0


'shape: 6749358,8'

**Importing necessary libraries**

In [0]:
import logging
import time
import functools
from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    current_timestamp, to_date, date_format, col, 
    trim, encode, lit, regexp_replace, when
)
from pyspark.sql.types import StringType, VarcharType
from pyspark.ml import Pipeline
from pyspark.ml.base import Transformer
from pyspark.ml.param.shared import HasInputCols, HasOutputCols
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

2024-12-19 19:27:40,815 - INFO - Received command c on object id p0


This block of code sets up basic logging configuration, specifying the logging level as INFO and formatting log messages to include the timestamp, log level, and the log message.

In [0]:
# Basic logging configuration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

2024-12-19 19:27:40,915 - INFO - Received command c on object id p0


In [0]:
def timer_logger(func):
    """A decorator to log the execution time of a function.
    This decorator logs when the execution of the wrapped function starts and finishes,
    along with the total time taken for execution. It uses the `logging` module for logging
    messages, ensuring consistent and configurable log outputs.
    """
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        logging.info(f"Starting {func.__name__}")
        result = func(*args, **kwargs)
        end_time = time.time()
        execution_time = end_time - start_time
        logging.info(f"Finished {func.__name__} in {execution_time:.2f} seconds")
        return result
    return wrapper

2024-12-19 19:27:41,017 - INFO - Received command c on object id p0


A transformer is a processing stage that transforms input data into output data

transform() method is the one which defines how input data is transformed into output data.

In [0]:
class DateTimeSplitTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    """Splits timestamp column into date and time columns"""
    
    def __init__(self):
        super().__init__()
        
    def _transform(self, df: DataFrame) -> DataFrame:
        try:
            if 't' not in df.columns:
                raise AnalysisException("Column 't' not found in DataFrame")
                
            return df.withColumn('date', to_date(col('t'))) \
                    .withColumn('time', date_format(col('t'), 'HH:mm:ss')) \
                    .drop('t') \
                    .select('date', 'time', 'symbol', 'o', 'h', 'l', 'c', 'v', 'n')
                    
        except Exception as e:
            logging.error(f"Error in DateTimeSplitTransformer: {str(e)}")
            raise

2024-12-19 23:50:10,574 - INFO - Received command c on object id p0


In [0]:
class StringTrimmerTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    """Trims whitespace from string columns"""
    
    def __init__(self):
        super().__init__()
        
    def _transform(self, df: DataFrame) -> DataFrame:
        try:
            string_columns = [col_name for col_name, dtype in df.dtypes 
                            if dtype == "string"]
            
            for col_name in string_columns:
                df = df.withColumn(col_name, trim(col(col_name)))
            return df
            
        except Exception as e:
            logging.error(f"Error in StringTrimmerTransformer: {str(e)}")
            raise

In [0]:
class ColumnRenameTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    """Renames columns according to specified mapping"""
    
    def __init__(self):
        super().__init__()
        
    def _transform(self, df: DataFrame) -> DataFrame:
        try:
            column_mapping = {
                'symbol': 'company',
                'o': 'open_price',
                'h': 'high_price', 
                'l': 'low_price',
                'c': 'close_price',
                'v': 'volume',
                'n': 'number_of_trades'
            }
            
            missing_cols = [col for col in column_mapping.keys() 
                          if col not in df.columns]
            if missing_cols:
                raise AnalysisException(
                    f"Missing required columns: {', '.join(missing_cols)}"
                )
                
            renamed_df = df
            for old_col, new_col in column_mapping.items():
                renamed_df = renamed_df.withColumnRenamed(old_col, new_col)
                
            return renamed_df
            
        except Exception as e:
            logging.error(f"Error in ColumnRenameTransformer: {str(e)}")
            raise

In [0]:
class AuditColumnTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    """Adds audit columns to the DataFrame"""
    
    def __init__(self):
        super().__init__()
        
    def _transform(self, df: DataFrame) -> DataFrame:
        try:
            return df.withColumn("ingestion_date", current_timestamp())
        except Exception as e:
            logging.error(f"Error in AuditColumnTransformer: {str(e)}")
            raise

2024-12-19 19:27:41,429 - INFO - Received command c on object id p0


In [0]:
class EncodingTransformer(Transformer, DefaultParamsReadable, DefaultParamsWritable):
    """Standardizes text encoding"""
    
    def __init__(self, target_encoding: str = "UTF-8"):
        super().__init__()
        self.target_encoding = target_encoding
        
    def _transform(self, df: DataFrame) -> DataFrame:
        try:
            string_columns = [
                field.name
                for field in df.schema.fields
                if isinstance(field.dataType, (StringType, VarcharType))
            ]
            for column_name in string_columns:
                df = df.withColumn(
                    column_name,
                    encode(
                        when(col(column_name).isNotNull(), col(column_name))
                        .otherwise(lit("")),
                        self.target_encoding,
                    ).cast("string"),
                )   
                df = df.withColumn(
                    column_name,
                    regexp_replace(col(column_name), r"[\x00-\x1F\x7F-\xFF]", ""),
                )
            return df
            
        except Exception as e:
            logging.error(f"Error in EncodingTransformer: {str(e)}")
            raise

2024-12-19 19:27:41,523 - INFO - Received command c on object id p0


In [0]:
@timer_logger
def create_standardization_pipeline() -> Pipeline:
    """Creates and returns a pipeline for DataFrame standardization"""
    
    return Pipeline(stages=[
        DateTimeSplitTransformer(),
        StringTrimmerTransformer(),
        ColumnRenameTransformer(),
        AuditColumnTransformer(),
        EncodingTransformer()
    ])

2024-12-19 19:27:41,620 - INFO - Received command c on object id p0


In [0]:
@timer_logger
def standardize_dataframe(df: DataFrame) -> DataFrame:
    """
    Standardizes the DataFrame using a Pipeline of transformers
    
    Args:
        df (DataFrame): The input DataFrame
        
    Returns:
        DataFrame: The standardized DataFrame
    """
    try:
        # Drop duplicates first (not part of pipeline as it's a simple operation)
        df = df.dropDuplicates()
        
        # Create and fit the pipeline
        pipeline = create_standardization_pipeline()
        standardized_df = pipeline.fit(df).transform(df)
        
        return standardized_df
        
    except Exception as e:
        logging.error(f"Error in standardize_dataframe: {str(e)}")
        raise

In [0]:
df_silver = standardize_dataframe(df)

2024-12-19 19:27:41,827 - INFO - Starting standardize_dataframe
2024-12-19 19:27:41,832 - INFO - Starting create_standardization_pipeline
2024-12-19 19:27:41,834 - INFO - Finished create_standardization_pipeline in 0.00 seconds
2024-12-19 19:27:42,358 - INFO - Finished standardize_dataframe in 0.53 seconds


In [0]:
df_silver.limit(5).display()

date,time,company,open_price,high_price,low_price,close_price,volume,number_of_trades,ingestion_date
2022-12-15,09:01:00,GOOGL,93.73,93.74,93.55,93.55,1519.0,52,2024-12-19T19:27:42.447Z
2022-12-15,09:05:00,GOOGL,93.38,93.38,93.34,93.34,605.0,19,2024-12-19T19:27:42.447Z
2022-12-15,09:04:00,GOOGL,93.31,93.5,93.31,93.36,1658.0,52,2024-12-19T19:27:42.447Z
2022-12-15,09:03:00,GOOGL,93.45,93.45,93.25,93.31,2008.0,43,2024-12-19T19:27:42.447Z
2022-12-15,09:02:00,GOOGL,93.49,93.51,93.45,93.45,992.0,33,2024-12-19T19:27:42.447Z


In [0]:
# Save the DataFrame as a Delta table in the Silver layer
(
    df_silver.write
    .format("delta")
    .option("mergeSchema", "true")
    .mode("overwrite")
    .option("path",f"dbfs:/user/hive/warehouse/bd_finals.db/stocks/silver/stocks_silver")
    .saveAsTable("stocks_silverdelta")
)

2024-12-19 19:28:29,816 - INFO - Received command c on object id p0


## Loading Silver level Data

In [0]:
df1 = (
    spark.read.format("delta")
    .option("header", True)
    .option("inferSchema", True)
    .load("dbfs:/user/hive/warehouse/bd_finals.db/stocks/silver/stocks_silver")
)

In [0]:
df1.limit(5).display()

2024-12-19 19:31:44,591 - INFO - Received command c on object id p0


date,time,company,open_price,high_price,low_price,close_price,volume,number_of_trades,ingestion_date
2022-12-15,12:02:00,GOOGL,93.65,93.65,93.6,93.6,1221.0,19,2024-12-19T19:28:32.342Z
2022-12-15,14:16:00,GOOGL,93.15,93.15,93.04,93.04,4155.0,165,2024-12-19T19:28:32.342Z
2022-12-15,15:34:00,GOOGL,91.82,91.845,91.72,91.7388,87192.0,962,2024-12-19T19:28:32.342Z
2022-12-15,22:24:00,GOOGL,90.64,90.64,90.64,90.64,771.0,25,2024-12-19T19:28:32.342Z
2022-12-15,23:02:00,GOOGL,90.79,90.8,90.79,90.8,659.0,16,2024-12-19T19:28:32.342Z


In [0]:
rows=df1.count()
columns=len(df1.columns)
display(f"shape: {rows},{columns}")

2024-12-19 19:32:15,513 - INFO - Received command c on object id p0


'shape: 6749358,10'

In [0]:
df1.printSchema()

root
 |-- date: date (nullable = true)
 |-- time: string (nullable = true)
 |-- company: string (nullable = true)
 |-- open_price: double (nullable = true)
 |-- high_price: double (nullable = true)
 |-- low_price: double (nullable = true)
 |-- close_price: double (nullable = true)
 |-- volume: double (nullable = true)
 |-- number_of_trades: integer (nullable = true)
 |-- ingestion_date: timestamp (nullable = true)

