# requirements

In [1]:
# !pip install sqlalchemy
# !pip install py7zr
# !pip install pandas
# !pip install pyarrow
# !pip install py7zr sqlalchemy pandas pyarrow

# Imports

In [2]:
import os
import logging
import uuid
from datetime import (
    datetime,
    timezone
)
import re
import unicodedata

In [3]:
# import sqlalchemy as sa
import pandas as pd
import py7zr

# Utils & functions

### logger

In [4]:
# Logger
class __UUIDFilter(logging.Filter):
    """
    Internal utils
    Filter that adds a UUID4 to the log record.
    """
    def filter(self, record):
        record.uuid4 = uuid.uuid4()
        return True

def create_logger(log_file=None):
    """
    Creates a logger that writes messages to a file and writes them to the console.

    :param log_file: Name of the log file.
    :return: Configured logger.
    """
    # Logger creation
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)

    # Custom log format with UTC timestamp and UUID4
    log_format = logging.Formatter(
        '%(asctime)s - %(levelname)s - [%(funcName)s] - [%(uuid4)s] - %(message)s',
        datefmt="%Y-%m-%dT%H:%M:%S%z"
    )

    # Force the UTC time to appear in all handlers
    logging.Formatter.converter = lambda *args: datetime.now(timezone.utc).timetuple()

    uuid_filter = __UUIDFilter()
    logger.addFilter(uuid_filter)

    # File writer handler
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.DEBUG)
        file_handler.setFormatter(log_format)
        logger.addHandler(file_handler)

    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.DEBUG)
    console_handler.setFormatter(log_format)
    logger.addHandler(console_handler)

    return logger

### extract_7z

In [5]:
def extract_7z(bg_logger, file_path):
    """
    Extracts a 7z file to the same directory as the compressed file.

    :param bg_logger: initialized logger
    :param file_path: Path to the 7z file to extract.
    """
    # Record start time
    start_time = datetime.now()

    # Get extraction directory
    extract_dir = os.path.dirname(file_path)

    # Extract the 7z file
    with py7zr.SevenZipFile(file_path, mode='r') as z:
        z.extractall(path=extract_dir)

    # Calculate extraction duration
    duration = datetime.now() - start_time

    # Log success message with extraction details
    bg_logger.info(
        "Data successfully extracted to directory %s. Duration: %s",
        extract_dir, duration
    )
    extract_dir = None
    del extract_dir


### sanitize_column_data

In [6]:
def sanitize_column_data(bg_logger, df, column, c_dtype=str):
    """
    Corrects and specializes the data column format, replacing invalid values with NaN.
    Args:
        bg_logger: Logger instance for logging.
        df: DataFrame containing the data.
        column: Column to be transformed.
        c_dtype: Target data type (default is str).
    Returns:
        The transformed column.
    """
    start_time = datetime.now()
    df[column] = df[column].str.strip()

    bg_logger.info(
        "Specializing column data '%s' to '%s'. It took %s",
        column, str(c_dtype), str(datetime.now() - start_time)
    )
    return df[column]

### sanitize_text

In [7]:
def sanitize_text(text):
    """
    Normalizes text by:
    - Removing special characters
    - Replacing accented characters with their unaccented counterparts
        Note: This is a symbol-based scenario, meaning it only considers simple transformations.
        For words requiring special handling (e.g., unique characters in specific languages),
        we would need to either create a custom mapping or use a specialized
        library for broader support.
    - Removing extra spaces

    Args:
        text (str): The input string to normalize.

    Returns:
        str: The normalized text.
    """
    if not isinstance(text, str):
        return text  # Return as-is if not a string

    # 1. Remove accents
    text = ''.join(
        c for c in unicodedata.normalize('NFD', text) 
        if unicodedata.category(c) != 'Mn'
    )

    # 2. Remove special characters*
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)

    # 3. Remove extra spaces
    text = re.sub(r'\s+', ' ', text).strip()

    return text

### data maps

In [8]:
NORMATIZE_LOCATION_MAP = {
    "USA": "United States",
    "US": "United States",
    "UK": "United Kingdom",
    "EIRE": "Ireland",
    "RSA": "South Africa",
    "Unspecified": "co0z0",
}

# Pipeline

### Logger init

In [9]:
# initialize logger
bg_logger = create_logger()

### base path reference

In [10]:
# Get the current working directory
cwd_path = os.getcwd()

## Base data process

### Importing base DF

In [11]:
base_df_path = os.path.join(
    cwd_path,
    'ingestion',
    'Invoices_Year_2009-2010.7z'
)

In [12]:
extract_7z(bg_logger, base_df_path)

2024-11-24T05:48:21 - INFO - [extract_7z] - [945df8ac-784c-4d4d-bca3-039dd7b1dae5] - Data successfully extracted to directory /workspaces/betsson_group/ingestion. Duration: 0:00:00.428891


In [13]:
# os path join to avoid os-specific path issues

# get csv base df
base_retails_df = pd.read_csv(
    os.path.join(
        cwd_path,
        'ingestion',
        'Invoices_Year_2009-2010.csv'
    ),
    sep=',',
    encoding='latin1',
    low_memory=False
)

### 0 - Briefly check the data

##### A - dataset base info

In [14]:
base_retails_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 525461 entries, 0 to 525460
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   Invoice      525461 non-null  object 
 1   StockCode    525461 non-null  object 
 2   Description  522533 non-null  object 
 3   Quantity     525461 non-null  int64  
 4   InvoiceDate  525461 non-null  object 
 5   Price        525439 non-null  float64
 6   Customer ID  417541 non-null  object 
 7   Country      525430 non-null  object 
dtypes: float64(1), int64(1), object(6)
memory usage: 32.1+ MB


##### B - understanding data variations

In [15]:
base_retails_df.describe()

Unnamed: 0,Quantity,Price
count,525461.0,525439.0
mean,10.337667,4.688669
std,107.42411,146.130044
min,-9600.0,-53594.36
25%,1.0,1.25
50%,3.0,2.1
75%,10.0,4.21
max,19152.0,25111.09


In [16]:
base_retails_df.isnull().sum()

Invoice             0
StockCode           0
Description      2928
Quantity            0
InvoiceDate         0
Price              22
Customer ID    107920
Country            31
dtype: int64

##### C - Understanding base memory usage

In [17]:
base_memory_usage = base_retails_df.reset_index(drop=True).memory_usage(deep=True)
format_base_memory_usage = base_memory_usage.sum() / (1024 * 1024)
bg_logger.info(f'{format_base_memory_usage:.2f} MB')
base_memory_usage

2024-11-24T05:48:23 - INFO - [<module>] - [ef3bf194-b2fd-4926-b6dc-a2c1d8d6209d] - 188.98 MB


Index               132
Invoice        28910564
StockCode      28447549
Description    39656495
Quantity        4203688
InvoiceDate    33927016
Price           4203688
Customer ID    26000647
Country        32814455
dtype: int64

##### D - Overview Top and Bottom data

In [18]:
base_retails_df.head()

Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
0,489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,12/01/2009 07:45,6.95,13085,United Kingdom
1,489434,79323P,PINK CHERRY LIGHTS,12,12/01/2009 07:45,6.75,13085,United Kingdom
2,489434,79323W,WHITE CHERRY LIGHTS,12,12/01/2009 07:45,6.75,13085,United Kingdom
3,489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,12/01/2009 07:45,2.1,13085,United Kingdom
4,489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,12/01/2009 07:45,1.25,13085,United Kingdom


In [19]:
base_retails_df.tail()

Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
525456,538171,22271,FELTCRAFT DOLL ROSIE,2,12/09/2010 20:01,2.95,17530,United Kingdom
525457,538171,22750,FELTCRAFT PRINCESS LOLA DOLL,1,12/09/2010 20:01,3.75,17530,United Kingdom
525458,538171,22751,FELTCRAFT PRINCESS OLIVIA DOLL,1,12/09/2010 20:01,3.75,17530,United Kingdom
525459,538171,20970,PINK FLORAL FELTCRAFT SHOULDER BAG,2,12/09/2010 20:01,3.75,17530,United Kingdom
525460,538171,21931,JUMBO STORAGE BAG SUKI,2,12/09/2010 20:01,1.95,17530,United Kingdom


---
Based on the top and bottom data, it's likely a general retail store. I'll maintain the previously defined generic 'retail' nomenclature.

##### E - Reviewing data uniqueness

In [20]:
base_retails_df.nunique()

Invoice        28816
StockCode       4632
Description     4681
Quantity         825
InvoiceDate    25296
Price           1607
Customer ID     4384
Country           41
dtype: int64

In [21]:
list(base_retails_df['Country'].unique())

['United Kingdom',
 'France',
 'USA',
 'Belgium',
 'Australia',
 'EIRE',
 'Germany',
 'Portugal',
 'Japan',
 'Denmark',
 'Nigeria',
 'Netherlands',
 'Poland',
 'Spain',
 'Channel Islands',
 'Italy',
 'Cyprus',
 'Greece',
 'Norway',
 'Austria',
 'Sweden',
 'United Arab Emirates',
 'Finland',
 'Switzerland',
 'Unspecified',
 'Malta',
 'Bahrain',
 'RSA',
 'Bermuda',
 'Hong Kong',
 'Singapore',
 'Thailand',
 'Israel',
 'Lithuania',
 nan,
 'West Indies',
 'Lebanon',
 'Korea',
 'Brazil',
 'Canada',
 'Iceland',
 'U.K.']

### 1 - Going deeper into base errors and data types

##### A - Correcting data types to consistent formats

In [22]:
retails_df_stage = base_retails_df.copy()

# base_retails_df = None
# del base_retails_df

---
Stage 0 - Dtypes base format

In [23]:
# column name normalization
retails_df_stage.columns = ['invoice', 'stock_code', 'description', 'quantity', 'invoice_date', 'price', 'customer_id', 'country']

In [24]:
# base dtypes

# formating dtypes on data
retails_df_stage['invoice'] = sanitize_column_data(bg_logger, retails_df_stage, 'invoice')
retails_df_stage['stock_code'] = sanitize_column_data(bg_logger, retails_df_stage, 'stock_code')
retails_df_stage['description'] = sanitize_column_data(bg_logger, retails_df_stage, 'description')
retails_df_stage['customer_id'] = sanitize_column_data(bg_logger, retails_df_stage, 'customer_id')
retails_df_stage['country'] = sanitize_column_data(bg_logger, retails_df_stage, 'country')

2024-11-24T05:48:23 - INFO - [sanitize_column_data] - [2609426b-546a-42af-bbcf-91b0ce26ca5a] - Specializing column data 'invoice' to '<class 'str'>'. It took 0:00:00.067334
2024-11-24T05:48:23 - INFO - [sanitize_column_data] - [0418f301-10eb-4eca-936a-5035563a0448] - Specializing column data 'stock_code' to '<class 'str'>'. It took 0:00:00.072845
2024-11-24T05:48:23 - INFO - [sanitize_column_data] - [39770a0f-6301-4cb9-9331-c28b57852399] - Specializing column data 'description' to '<class 'str'>'. It took 0:00:00.071535
2024-11-24T05:48:24 - INFO - [sanitize_column_data] - [376c2572-6a9d-4385-832e-4c343031efa8] - Specializing column data 'customer_id' to '<class 'str'>'. It took 0:00:00.057748
2024-11-24T05:48:24 - INFO - [sanitize_column_data] - [8b77d26a-b742-43a4-b3c0-383470ff5c9b] - Specializing column data 'country' to '<class 'str'>'. It took 0:00:00.080778


In [25]:
# checking stage corrections
retails_df_stage.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 525461 entries, 0 to 525460
Data columns (total 8 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   invoice       525461 non-null  object 
 1   stock_code    525461 non-null  object 
 2   description   522533 non-null  object 
 3   quantity      525461 non-null  int64  
 4   invoice_date  525461 non-null  object 
 5   price         525439 non-null  float64
 6   customer_id   417541 non-null  object 
 7   country       525430 non-null  object 
dtypes: float64(1), int64(1), object(6)
memory usage: 32.1+ MB


In [26]:
# specialized DTYPES
retails_df_stage['quantity'] = pd.to_numeric(retails_df_stage['quantity'], errors='coerce')
retails_df_stage['price'] = pd.to_numeric(retails_df_stage['price'], errors='coerce')

In [27]:
# checking stage corrections
retails_df_stage.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 525461 entries, 0 to 525460
Data columns (total 8 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   invoice       525461 non-null  object 
 1   stock_code    525461 non-null  object 
 2   description   522533 non-null  object 
 3   quantity      525461 non-null  int64  
 4   invoice_date  525461 non-null  object 
 5   price         525439 non-null  float64
 6   customer_id   417541 non-null  object 
 7   country       525430 non-null  object 
dtypes: float64(1), int64(1), object(6)
memory usage: 32.1+ MB


In [28]:
# treating different date formats and converting to ISO 8601
retails_df_stage['invoice_date'] = pd.to_datetime(retails_df_stage['invoice_date'], errors='coerce')
retails_df_stage['invoice_date'] = retails_df_stage['invoice_date'].dt.strftime('%Y-%m-%dT%H:%M:%S')

In [29]:
retails_df_stage.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 525461 entries, 0 to 525460
Data columns (total 8 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   invoice       525461 non-null  object 
 1   stock_code    525461 non-null  object 
 2   description   522533 non-null  object 
 3   quantity      525461 non-null  int64  
 4   invoice_date  525461 non-null  object 
 5   price         525439 non-null  float64
 6   customer_id   417541 non-null  object 
 7   country       525430 non-null  object 
dtypes: float64(1), int64(1), object(6)
memory usage: 32.1+ MB


In [30]:
memory_usage = retails_df_stage.reset_index(drop=True).memory_usage(deep=True) / (1024 * 1024)
bg_logger.info('Old memory usage: %.2f MB x New memory usage: %.2f MB', format_base_memory_usage.sum(), memory_usage.sum())

2024-11-24T05:48:28 - INFO - [<module>] - [638d65da-69ef-4b45-8abf-c2be71389025] - Old memory usage: 188.98 MB x New memory usage: 190.61 MB


In [31]:
# removing object memory references
memory_usage = None
base_memory_usage = None
format_base_memory_usage = None

# removing from escope
del memory_usage, base_memory_usage, format_base_memory_usage

##### B - Overall data consistency

In [32]:
retails_df_stage.describe()

Unnamed: 0,quantity,price
count,525461.0,525439.0
mean,10.337667,4.688669
std,107.42411,146.130044
min,-9600.0,-53594.36
25%,1.0,1.25
50%,3.0,2.1
75%,10.0,4.21
max,19152.0,25111.09


In [33]:
retails_df_stage.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 525461 entries, 0 to 525460
Data columns (total 8 columns):
 #   Column        Non-Null Count   Dtype  
---  ------        --------------   -----  
 0   invoice       525461 non-null  object 
 1   stock_code    525461 non-null  object 
 2   description   522533 non-null  object 
 3   quantity      525461 non-null  int64  
 4   invoice_date  525461 non-null  object 
 5   price         525439 non-null  float64
 6   customer_id   417541 non-null  object 
 7   country       525430 non-null  object 
dtypes: float64(1), int64(1), object(6)
memory usage: 32.1+ MB


In [34]:
retails_df_stage.describe()

Unnamed: 0,quantity,price
count,525461.0,525439.0
mean,10.337667,4.688669
std,107.42411,146.130044
min,-9600.0,-53594.36
25%,1.0,1.25
50%,3.0,2.1
75%,10.0,4.21
max,19152.0,25111.09


---
checkpoint lineage: 0

In [35]:
retails_lineage_0_path = os.path.join(
    cwd_path,
    'temp_stage_1.parquet'
)

In [36]:
retails_df_stage.to_parquet(
    retails_lineage_0_path,
    index=False,
    compression='snappy'
)

In [37]:
retails_df_stage = None
del retails_df_stage

##### C - Specialized Data Corrections

In [38]:
retails_df_stage_I = pd.read_parquet(retails_lineage_0_path)

---
Stage 1 - Data specif and column corrections

> Country column - understanding overall logic

In [39]:
list(retails_df_stage_I['country'].unique())

['United Kingdom',
 'France',
 'USA',
 'Belgium',
 'Australia',
 'EIRE',
 'Germany',
 'Portugal',
 'Japan',
 'Denmark',
 'Nigeria',
 'Netherlands',
 'Poland',
 'Spain',
 'Channel Islands',
 'Italy',
 'Cyprus',
 'Greece',
 'Norway',
 'Austria',
 'Sweden',
 'United Arab Emirates',
 'Finland',
 'Switzerland',
 'Unspecified',
 'Malta',
 'Bahrain',
 'RSA',
 'Bermuda',
 'Hong Kong',
 'Singapore',
 'Thailand',
 'Israel',
 'Lithuania',
 None,
 'West Indies',
 'Lebanon',
 'Korea',
 'Brazil',
 'Canada',
 'Iceland',
 'U.K.']

> Country column - Cleaning and mapping correct values

In [40]:
# sanitize data
retails_df_stage_I['country'] = retails_df_stage_I['country'].apply(sanitize_text)

In [41]:
# correct acronyms and normalizing location names
retails_df_stage_I['country'] = retails_df_stage_I['country'].replace(NORMATIZE_LOCATION_MAP)

In [42]:
# overall data consistency
retails_df_stage_I['country'].value_counts(normalize=True) * 100

country
United Kingdom          92.463316
Ireland                  1.838685
Germany                  1.547114
France                   1.098529
Netherlands              0.526997
Spain                    0.243229
Switzerland              0.225910
Portugal                 0.209543
Belgium                  0.200598
Channel Islands          0.172430
Sweden                   0.171669
Italy                    0.139124
Australia                0.124469
Cyprus                   0.105437
Austria                  0.102202
Greece                   0.098396
United Arab Emirates     0.082218
Denmark                  0.081457
Norway                   0.070228
Finland                  0.067373
co0z0                    0.058999
United States            0.046438
Japan                    0.042632
Poland                   0.036922
Malta                    0.032735
Lithuania                0.029309
Singapore                0.022267
South Africa             0.021126
Bahrain                  0.020364
Canada

> Customer ID column - understanding overall logic

In [None]:
# validating customer id format, to understand if all values are alphanumeric
pattern = r'[a-zA-Z]'
retails_df_stage_I[retails_df_stage_I['customer_id'].str.contains(pattern, na=False, regex=True)]

Unnamed: 0,invoice,stock_code,description,quantity,invoice_date,price,customer_id,country
176263,506141,21670,BLUE SPOT CERAMIC DRAWER KNOB,1,2010-04-27T16:16:00,0.0,TEST,United Kingdom
176264,506141,79160,HEART SHAPE WIRELESS DOORBELL,1,2010-04-27T16:16:00,0.0,TEST,United Kingdom
176265,506141,90112,PINK DOLLY HAIR CLIPS,1,2010-04-27T16:16:00,0.0,TEST,United Kingdom
176266,506141,90100,NECKLACE+BRACELET SET PINK DAISY,1,2010-04-27T16:16:00,0.0,TEST,United Kingdom
176267,506141,85226A,WHITE/BLUE PULL BACK RACING CAR,1,2010-04-27T16:16:00,0.0,TEST,United Kingdom
176268,506141,21890,S/6 WOODEN SKITTLES IN COTTON BAG,1,2010-04-27T16:16:00,0.0,TEST,United Kingdom
176269,506141,21826,EIGHT PIECE DINOSAUR SET,1,2010-04-27T16:16:00,-100.0,TEST,United Kingdom


> Search for test in all columns

In [56]:
filtered_data_with_test = retails_df_stage_I[
    retails_df_stage_I.astype(str).apply(lambda col: col.str.contains(pattern, case=False, regex=True)).any(axis=1)
]

In [57]:
filtered_data_with_test

Unnamed: 0,invoice,stock_code,description,quantity,invoice_date,price,customer_id,country
27994,491725,TEST001,This is a test product.,10,2009-12-14T08:34:00,4.5,12346.0,United Kingdom
28251,491742,TEST001,This is a test product.,5,2009-12-14T11:00:00,4.5,12346.0,United Kingdom
28254,491744,TEST001,This is a test product.,5,2009-12-14T11:02:00,4.5,12346.0,United Kingdom
39398,492718,TEST001,This is a test product.,5,2009-12-18T10:47:00,4.5,12346.0,United Kingdom
39411,492722,TEST002,This is a test product.,1,2009-12-18T10:55:00,1.0,12346.0,United Kingdom
44614,493294,TEST002,,1,2009-12-22T15:15:00,0.0,,United Kingdom
45228,493410,TEST001,This is a test product.,5,2010-01-04T09:24:00,4.5,12346.0,United Kingdom
45230,493412,TEST001,This is a test product.,5,2010-01-04T09:53:00,4.5,12346.0,United Kingdom
56117,494450,TEST001,This is a test product.,5,2010-01-14T13:50:00,4.5,12346.0,United Kingdom
66084,495295,TEST001,This is a test product.,5,2010-01-22T13:30:00,4.5,12346.0,United Kingdom


In [None]:
# removing test data

retails_df_stage_I = retails_df_stage_I[
    ~retails_df_stage_I.astype(str).apply(lambda col: col.str.contains(pattern, case=False, regex=True)).any(axis=1)
]

> Price column - understanding overall logic

In [53]:
price_negative = retails_df_stage_I.query("price < 0")
price_negative.info()

<class 'pandas.core.frame.DataFrame'>
Index: 3 entries, 179403 to 403472
Data columns (total 8 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   invoice       3 non-null      object 
 1   stock_code    3 non-null      object 
 2   description   3 non-null      object 
 3   quantity      3 non-null      int64  
 4   invoice_date  3 non-null      object 
 5   price         3 non-null      float64
 6   customer_id   0 non-null      object 
 7   country       3 non-null      object 
dtypes: float64(1), int64(1), object(6)
memory usage: 216.0+ bytes


In [54]:
price_negative.head()

Unnamed: 0,invoice,stock_code,description,quantity,invoice_date,price,customer_id,country
179403,A506401,B,Adjust bad debt,1,2010-04-29T13:36:00,-53594.36,,United Kingdom
276274,A516228,B,Adjust bad debt,1,2010-07-19T11:24:00,-44031.79,,United Kingdom
403472,A528059,B,Adjust bad debt,1,2010-10-20T12:04:00,-38925.87,,United Kingdom


0   Invoice      525461 non-null  object   
1   StockCode    525461 non-null  object   
2   Description  522533 non-null  object   
3   Quantity     525461 non-null  int64    
4   InvoiceDate  525461 non-null  object   
5   Price        525439 non-null  float64  
6   Customer ID  417541 non-null  object   
7   Country      525430 non-null  object   

# Warehouse definitions