In [3]:
import os
import logging
import pandas as pd
import datetime as dt
import sys


In [8]:
# ---------------------------------------------------------
# 0. LOGGING CONFIGURATION
# ---------------------------------------------------------
# This sets up the logger to write to a file AND print to the console
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler("etl_process.log"), 
        logging.StreamHandler(sys.stdout)       
    ]
)

logger = logging.getLogger(__name__)

In [91]:
def run_etl_pipeline():
    
    logger.info(">>> ETL Process Started")
    sales_dir = 'C:\\Users\\karth\\Downloads\\Python Assignment\\Python Assignment\\Sales_Split'
    # ---------------------------------------------------------
    # Checking File Path
    # ---------------------------------------------------------
    try:
        if not os.path.exists(sales_dir):
            logger.error(f">>> Critical Error: Directory '{sales_dir}' not found.")
            return
        file_list = [f for f in os.listdir(sales_dir) if f.endswith('.csv')]
        file_count = len(file_list)
        logger.info(f">>> Scanning directory Ended.")
        logger.info(f">>> Found {file_count} sales files to process.")
        if file_count == 0:
            logger.warning(">>> No CSV files found in the directory. Stopping process.")
            return
    except Exception as e:
        logger.error(f">>> Error accessing directory '{sales_dir}': {e}")
        return
    # ---------------------------------------------------------
    # Loading and Merging the datasets
    # ---------------------------------------------------------
    try:
        data_frames = []
        
        logger.info(">>> Reading files ...")
        for filename in file_list:
            file_path = os.path.join(sales_dir, filename)
            try:
                # Read individual file
                df = pd.read_csv(file_path)
                data_frames.append(df)
            except Exception as e:
                logger.error(f">>> Failed to read file {filename}: {e}")
                continue
        if not data_frames:
            logger.error(">>> No valid dataframes loaded.")
            return
        all_sales_data = pd.concat(data_frames, ignore_index=True)
        total_rows = len(all_sales_data)
        logger.info(f">>> Successfully merged {len(data_frames)} files. Total sales records: {total_rows}")
        
    
        if total_rows > 0:
            logger.info(">>> Validation Passed: Sales data is not empty. Proceeding to Joins.")
        try:
                store_master = pd.read_csv('../store_master.csv')
                product_master = pd.read_csv('../product_master.csv')
                logger.info(f'>>> Total Rows in Store_Master Dataset {len(store_master)}')
                logger.info(f'>>> Total Rows in Product_Master Dataset {len(product_master)}')
        except FileNotFoundError as e:
                logger.error(f">>> Critical Error: Master file missing - {e}")
                return
        
        
        # -----------------------------------------------------
            # Left Joing the three datasets Extracted.
            # -----------------------------------------------------
        merged_df = pd.merge(all_sales_data, store_master, on='store_id', how='left')
        logger.info('>>> Joined Sales Data with Store Dataset')
        merged_df = pd.merge(merged_df, product_master, on='sku', how='left')
        logger.info(">>> Joined Sales Data with Product Dataset.")
        
        # -----------------------------------------------------
            # 5.Filtering for the store with active status column
            # -----------------------------------------------------
        if 'status' in merged_df.columns:
            active_sales_df = merged_df[merged_df['status'] == 'Active'].copy()
            logger.info(">>> Filtered for Active stores.")
        else:
            logger.warning(">>> Warning: 'status' column not found. Skipping filter.")
            active_sales_df = merged_df
        # -----------------------------------------------------
            # 6. Aggreating the Data with  sum() function
            # -----------------------------------------------------
        logger.info(">>> Aggregating data...")
        agg_list=['store_id', 'electronics_type', 'classification']
        name_qunty=['sales_qty', 'sales_value']
        aggregated_df = active_sales_df.groupby(
                agg_list
            )[name_qunty].sum().reset_index()
        
        # -----------------------------------------------------
            # Saving the Aggregated data
            # -----------------------------------------------------
        output_file = 'aggregated_sales.csv'
        aggregated_df.to_csv(output_file, index=False)
        logger.info(f">>> SUCCESS. Final output saved as '{output_file}'.")
        
    except Exception as e:
        logger.error(f">>> Error loading data: {e}")
        return
    
    
    

In [93]:
if __name__ == "__main__":
    run_etl_pipeline()

2025-12-22 14:47:24,622 - INFO - >>> ETL Process Started
2025-12-22 14:47:24,626 - INFO - >>> Scanning directory Ended.
2025-12-22 14:47:24,628 - INFO - >>> Found 20 sales files to process.
2025-12-22 14:47:24,629 - INFO - >>> Reading files ...
2025-12-22 14:47:24,626 - INFO - >>> Scanning directory Ended.
2025-12-22 14:47:24,628 - INFO - >>> Found 20 sales files to process.
2025-12-22 14:47:24,629 - INFO - >>> Reading files ...
2025-12-22 14:47:37,811 - INFO - >>> Successfully merged 20 files. Total sales records: 20000000
2025-12-22 14:47:37,813 - INFO - >>> Validation Passed: Sales data is not empty. Proceeding to Joins.
2025-12-22 14:47:37,811 - INFO - >>> Successfully merged 20 files. Total sales records: 20000000
2025-12-22 14:47:37,813 - INFO - >>> Validation Passed: Sales data is not empty. Proceeding to Joins.
2025-12-22 14:47:37,914 - INFO - >>> Total Rows in Store_Master Dataset 20000
2025-12-22 14:47:37,915 - INFO - >>> Total Rows in Product_Master Dataset 100000
2025-12-22

In [1]:
sales_dir = 'C:\\Users\\karth\\Downloads\\Python Assignment\\Python Assignment\\Sales_Split'

In [4]:
file_list = [f for f in os.listdir(sales_dir) if f.endswith('.csv')]

In [5]:
file_list

['sales_part_1.csv',
 'sales_part_10.csv',
 'sales_part_11.csv',
 'sales_part_12.csv',
 'sales_part_13.csv',
 'sales_part_14.csv',
 'sales_part_15.csv',
 'sales_part_16.csv',
 'sales_part_17.csv',
 'sales_part_18.csv',
 'sales_part_19.csv',
 'sales_part_2.csv',
 'sales_part_20.csv',
 'sales_part_3.csv',
 'sales_part_4.csv',
 'sales_part_5.csv',
 'sales_part_6.csv',
 'sales_part_7.csv',
 'sales_part_8.csv',
 'sales_part_9.csv']

In [6]:
data_frames = []

In [7]:
for filename in file_list:
            file_path = os.path.join(sales_dir, filename)
            df=pd.read_csv(file_path)
            data_frames.append(df)
            

In [8]:
data_frames[0].head()

Unnamed: 0,store_id,sku,date,sales_qty,sales_value
0,S00341,SKU064049,2024-11-02,1,364.9
1,S16112,SKU016687,2024-08-19,1,110.03
2,S13232,SKU079938,2024-11-27,1,65.94
3,S16882,SKU024958,2025-03-16,1,74.46
4,S13774,SKU024371,2025-02-05,1,266.55


In [9]:
all_sales_data = pd.concat(data_frames, ignore_index=True)

In [10]:
all_sales_data.head(10)

Unnamed: 0,store_id,sku,date,sales_qty,sales_value
0,S00341,SKU064049,2024-11-02,1,364.9
1,S16112,SKU016687,2024-08-19,1,110.03
2,S13232,SKU079938,2024-11-27,1,65.94
3,S16882,SKU024958,2025-03-16,1,74.46
4,S13774,SKU024371,2025-02-05,1,266.55
5,S05891,SKU094915,2025-01-27,1,165.24
6,S03025,SKU010522,2024-09-10,2,282.45
7,S19024,SKU043792,2024-06-19,1,182.86
8,S17730,SKU055661,2024-12-09,1,351.27
9,S11860,SKU004353,2024-07-14,1,44.71


In [11]:
store_master = pd.read_csv('../store_master.csv')
product_master = pd.read_csv('../product_master.csv')

In [12]:
store_master.head(10)

Unnamed: 0,store_id,city,state,region,status
0,S00001,Kolkata,KA,East,Active
1,S00002,Chennai,TS,West,Inactive
2,S00003,Hyderabad,TN,East,Active
3,S00004,Kolkata,DL,East,Active
4,S00005,Bangalore,TN,West,Active
5,S00006,Hyderabad,TS,North,Active
6,S00007,Hyderabad,TS,North,Active
7,S00008,Kolkata,WB,West,Active
8,S00009,Mumbai,TN,South,Active
9,S00010,Bangalore,TN,East,Active


In [13]:
product_master.head(10)

Unnamed: 0,sku,category,electronics_type,classification,mrp
0,SKU000001,ELECTRONICS,CHARGERS,Standard,453.55
1,SKU000002,ELECTRONICS,NETWORKING_DEVICES,Standard,487.13
2,SKU000003,ELECTRONICS,LAPTOPS,Economy,126.47
3,SKU000004,ELECTRONICS,SMARTWATCHES,Standard,330.04
4,SKU000005,ELECTRONICS,SMARTPHONES,Economy,407.68
5,SKU000006,ELECTRONICS,CAMERAS,Economy,168.71
6,SKU000007,ELECTRONICS,SMART_TV,Economy,435.71
7,SKU000008,ELECTRONICS,FITNESS_BANDS,Premium,187.58
8,SKU000009,ELECTRONICS,TABLETS,Economy,474.32
9,SKU000010,ELECTRONICS,MOBILES,Economy,319.49


In [14]:
merged_df = pd.merge(all_sales_data, store_master, on='store_id', how='left')

In [15]:
merged_df.head(10)

Unnamed: 0,store_id,sku,date,sales_qty,sales_value,city,state,region,status
0,S00341,SKU064049,2024-11-02,1,364.9,Bangalore,TS,West,Active
1,S16112,SKU016687,2024-08-19,1,110.03,Chennai,TN,North,Active
2,S13232,SKU079938,2024-11-27,1,65.94,Kolkata,DL,South,Active
3,S16882,SKU024958,2025-03-16,1,74.46,Mumbai,TS,South,Active
4,S13774,SKU024371,2025-02-05,1,266.55,Chennai,TN,South,Active
5,S05891,SKU094915,2025-01-27,1,165.24,Bangalore,MH,West,Inactive
6,S03025,SKU010522,2024-09-10,2,282.45,Bangalore,DL,East,Active
7,S19024,SKU043792,2024-06-19,1,182.86,Chennai,DL,West,Active
8,S17730,SKU055661,2024-12-09,1,351.27,Mumbai,TN,East,Active
9,S11860,SKU004353,2024-07-14,1,44.71,Delhi,MH,North,Active


In [16]:
merged_df = pd.merge(merged_df, product_master, on='sku', how='left')

In [17]:
merged_df.head(10)

Unnamed: 0,store_id,sku,date,sales_qty,sales_value,city,state,region,status,category,electronics_type,classification,mrp
0,S00341,SKU064049,2024-11-02,1,364.9,Bangalore,TS,West,Active,ELECTRONICS,MOBILES,Standard,483.39
1,S16112,SKU016687,2024-08-19,1,110.03,Chennai,TN,North,Active,ELECTRONICS,MODEMS,Economy,171.03
2,S13232,SKU079938,2024-11-27,1,65.94,Kolkata,DL,South,Active,ELECTRONICS,CABLES,Economy,103.32
3,S16882,SKU024958,2025-03-16,1,74.46,Mumbai,TS,South,Active,ELECTRONICS,SMARTWATCHES,Economy,110.4
4,S13774,SKU024371,2025-02-05,1,266.55,Chennai,TN,South,Active,ELECTRONICS,FITNESS_BANDS,Premium,434.85
5,S05891,SKU094915,2025-01-27,1,165.24,Bangalore,MH,West,Inactive,ELECTRONICS,ACTION_CAMERAS,Standard,260.5
6,S03025,SKU010522,2024-09-10,2,282.45,Bangalore,DL,East,Active,ELECTRONICS,TABLETS,Economy,182.83
7,S19024,SKU043792,2024-06-19,1,182.86,Chennai,DL,West,Active,ELECTRONICS,CABLES,Standard,237.06
8,S17730,SKU055661,2024-12-09,1,351.27,Mumbai,TN,East,Active,ELECTRONICS,DSLR_CAMERAS,Economy,453.68
9,S11860,SKU004353,2024-07-14,1,44.71,Delhi,MH,North,Active,ELECTRONICS,DSLR_CAMERAS,Standard,71.96


In [22]:
merged_df[merged_df['status']=='Active'].isna().sum()

store_id            0
sku                 0
date                0
sales_qty           0
sales_value         0
city                0
state               0
region              0
status              0
category            0
electronics_type    0
classification      0
mrp                 0
dtype: int64

In [23]:
active_sales_df=merged_df[merged_df['status']=='Active'].copy()

In [24]:
active_sales_df.head(10)

Unnamed: 0,store_id,sku,date,sales_qty,sales_value,city,state,region,status,category,electronics_type,classification,mrp
0,S00341,SKU064049,2024-11-02,1,364.9,Bangalore,TS,West,Active,ELECTRONICS,MOBILES,Standard,483.39
1,S16112,SKU016687,2024-08-19,1,110.03,Chennai,TN,North,Active,ELECTRONICS,MODEMS,Economy,171.03
2,S13232,SKU079938,2024-11-27,1,65.94,Kolkata,DL,South,Active,ELECTRONICS,CABLES,Economy,103.32
3,S16882,SKU024958,2025-03-16,1,74.46,Mumbai,TS,South,Active,ELECTRONICS,SMARTWATCHES,Economy,110.4
4,S13774,SKU024371,2025-02-05,1,266.55,Chennai,TN,South,Active,ELECTRONICS,FITNESS_BANDS,Premium,434.85
6,S03025,SKU010522,2024-09-10,2,282.45,Bangalore,DL,East,Active,ELECTRONICS,TABLETS,Economy,182.83
7,S19024,SKU043792,2024-06-19,1,182.86,Chennai,DL,West,Active,ELECTRONICS,CABLES,Standard,237.06
8,S17730,SKU055661,2024-12-09,1,351.27,Mumbai,TN,East,Active,ELECTRONICS,DSLR_CAMERAS,Economy,453.68
9,S11860,SKU004353,2024-07-14,1,44.71,Delhi,MH,North,Active,ELECTRONICS,DSLR_CAMERAS,Standard,71.96
10,S19645,SKU097297,2024-09-25,1,93.13,Pune,DL,South,Active,ELECTRONICS,SMART_HOME_DEVICES,Premium,145.57


In [25]:
agg_list=['store_id', 'electronics_type', 'classification']
name_qunty=['sales_qty', 'sales_value']

In [32]:
aggregated_df = active_sales_df.groupby(
                agg_list
            )[name_qunty].sum(numeric_only=True).reset_index()

In [33]:
aggregated_df.head(10)

Unnamed: 0,store_id,electronics_type,classification,sales_qty,sales_value
0,S00001,ACTION_CAMERAS,Economy,14,2297.85
1,S00001,ACTION_CAMERAS,Luxury,5,1002.31
2,S00001,ACTION_CAMERAS,Premium,11,1887.16
3,S00001,ACTION_CAMERAS,Standard,30,5993.22
4,S00001,AUDIO_DEVICES,Economy,18,3675.57
5,S00001,AUDIO_DEVICES,Premium,9,2584.09
6,S00001,AUDIO_DEVICES,Standard,15,2815.85
7,S00001,CABLES,Economy,20,3844.09
8,S00001,CABLES,Premium,7,1177.49
9,S00001,CABLES,Standard,29,5564.92
