In [1]:
# notebook ETL for a specific customer data problem - duplicate clients and ids

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 3, Finished, Available)

In [2]:
import pandas as pd

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 4, Finished, Available)

### Customer Data Loading (CSV file)

In [3]:
# Load data into pandas DataFrame from "/lakehouse/default/" + "Files/customers_novawines_oltp.csv"
customers_not_clean = pd.read_csv("/lakehouse/default/" + "Files/customers_novawines_oltp.csv", delimiter=';', encoding='utf-8')
customers_not_clean.head(20)

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 5, Finished, Available)

Unnamed: 0,customer_id,company_name,region,country,city,street_number,customer_phone_number
0,266,Basel Bites,Europe,Switzerland,Basel,789 Marktplatz,(41) 618 889 999
1,289,Bernese Bites,Europe,Switzerland,Bern,123 Zytglogge,(41) 314 445 555
2,336,Chur Cuisine,Europe,Switzerland,Chur,456 Old Town,(41) 814 445 555
3,369,Dnipro Delights,Europe,Ukraine,Dnipro,789 Monastyrsky lm,(380) 568 889 999
4,364,Fribourg Flavors,Europe,Switzerland,Fribourg,123 Place Georges-Python,(41) 268 889 999
5,257,Geneva Gourmet,Europe,Switzerland,Geneva,456 Jet d'Eau,(41) 225 556 666
6,357,La Chaux-de-Fonds Bites,Europe,Switzerland,La Chaux-de-Fonds,789 Espacité,(41) 325 556 666
7,270,Lausanne Luches,Europe,Switzerland,Lausanne,321 Olympic Museum,(41) 217 778 888
8,302,Lucerne Delights,Europe,Switzerland,Lucerne,789 Chapel Bridge,(41) 415 556 666
9,335,Lugano Luches,Europe,Switzerland,Lugano,123 Parco Ciani,(41) 918 889 999


### Data Exploration

In [4]:
customers_not_clean.shape

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 6, Finished, Available)

(519, 7)

In [5]:
customers_not_clean.describe()

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 7, Finished, Available)

Unnamed: 0,customer_id
count,519.0
mean,261.102119
std,150.602648
min,1.0
25%,130.5
50%,261.0
75%,390.5
max,634.0


In [6]:
customers_not_clean.info()

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 8, Finished, Available)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 519 entries, 0 to 518
Data columns (total 7 columns):
 #   Column                 Non-Null Count  Dtype 
---  ------                 --------------  ----- 
 0   customer_id            519 non-null    int64 
 1   company_name           519 non-null    object
 2   region                 519 non-null    object
 3   country                519 non-null    object
 4   city                   519 non-null    object
 5   street_number          519 non-null    object
 6   customer_phone_number  519 non-null    object
dtypes: int64(1), object(6)
memory usage: 28.5+ KB


No missing values

### Customer Data Cleaning 

###### We are going to goup by company name, country and city, why?
- if the same client has the same exact country and city it can be a duplicate, the same company can have a warehouse in different cities or countries, and since we use the customer id to track the order location, its important to have different ids in those cases.
- but having the same city and country means that the street number or phone number were (maybe) updated. We are going to check

In [7]:
columns_to_check = ['company_name', 'country', 'city']

# boolean mask for all occurrences of duplicated rows based on the specified columns
duplicate_mask = customers_not_clean.duplicated(subset=columns_to_check, keep=False)

duplicate_rows = customers_not_clean[duplicate_mask]
duplicate_rows

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 9, Finished, Available)

Unnamed: 0,customer_id,company_name,region,country,city,street_number,customer_phone_number
272,99,Luton Luxuries,Europe,Sweden,Luton,789 The Mall,(4415) 824 445 555
273,162,Luton Luxuries,Europe,Sweden,Luton,456 The Mall,(4415) 824 445 555
359,74,Plymouth Pleasures,Europe,Sweden,Plymouth,456 Royal Parade,(4417) 524 445 555
360,161,Plymouth Pleasures,Europe,Sweden,Plymouth,321 Royal Parade,(4417) 528 889 999


- <mark>Luton Luxuries - Different house number. If there were another way to verify the correct address through additional records, we could choose the accurate entry. However, in our case, we only have this information. Therefore, we are going to assume that the last entry (ID) is the correct one.

- <mark>Plymouth Pleasures - Different house number and phone number. Following the same logic, we are going to retain the last entry.</mark></mark>

In [8]:
customers_not_clean = customers_not_clean[~customers_not_clean['customer_id'].isin([99, 74])]

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 10, Finished, Available)

In [9]:
customers_not_clean.shape

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 11, Finished, Available)

(517, 7)

<mark>We lost 2 rows as expected. Now we want to see if we have repeated customer IDs</mark>

In [10]:
duplicate_IDs = customers_not_clean[customers_not_clean.duplicated(subset=['customer_id'], keep=False)]
duplicate_IDs

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 12, Finished, Available)

Unnamed: 0,customer_id,company_name,region,country,city,street_number,customer_phone_number
54,128,Beijing Beverage Co.,Asia,China,Beijing,456 Wangfujing Street,(86) 109 876 543
452,128,Tantalizing Tastes,North America,United States of America,North Dakota,1234 Redwood Drive,(13) 035 555 555


<mark>So, here's the issue: we have two different clients with the same ID. We need to add an index to create unique IDs for every row. Unfortunately, we have no other way to know which ID in the fact_sales data corresponds to each client, as it only contains the customer ID.</mark>

<mark>In other scenario, we would typically have more information to track clients' sales. However, given the information we have, we need to delete the rows that contain this shared ID in the fact_sales data. This situation could occur due to data quality issues, and by removing these rows, we aim to retain accurate and unique client information.<mark>

### Loading Sales Data to clean the repeated ID

In [11]:
# Load data into pandas DataFrame from "/lakehouse/default/" + "Files/orders_novawines_oltp.csv"
sales = pd.read_csv("/lakehouse/default/" + "Files/orders_novawines_oltp.csv", delimiter=';')
display(sales)

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 13, Finished, Available)

SynapseWidget(Synapse.DataFrame, 16b01c2f-0a3a-4cf5-aca0-f39d8bfb7787)

In [12]:
sales.shape

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 14, Finished, Available)

(8000, 10)

In [13]:
repeat_id_sales = sales[sales['customer_id'] == 128]
repeat_id_sales

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 15, Finished, Available)

Unnamed: 0,order_id,customer_id,product_id,quantity_sold,unit_price_eur,unit_cost_eur,order_placed_date,shipped_date,salesperson_id,supplier_id
8,906921821,128,5439,996,1.8,1.08,02/01/2020,20/02/2020,25,187654
1374,813931137,128,7810,11326,1.92,1.44,31/08/2020,08/10/2020,10,315827
1539,564382272,128,6709,13724,1.5,0.9,29/09/2020,26/10/2020,25,764129
1740,766429001,128,3209,3892,6.23,3.74,04/11/2020,10/11/2020,25,987654
2080,863317257,128,4365,439,2.7,2.03,09/01/2021,22/01/2021,25,315827
2505,615974670,128,6578,236,5.8,4.06,27/03/2021,04/05/2021,25,890123
2658,510860401,128,8763,561,20.0,14.0,22/04/2021,18/05/2021,6,698431
2815,897994142,128,1456,877,3.25,2.28,19/05/2021,18/06/2021,25,482903
3167,879234987,128,1098,9667,3.04,2.28,23/07/2021,02/09/2021,2,315827
3502,779303604,128,6719,7078,1.8,1.08,17/09/2021,17/10/2021,25,432109


In [14]:
repeat_id_sales.shape

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 16, Finished, Available)

(17, 10)

so, 17 records. Lets see the weight % in the total sales

In [15]:
percentage_for_id_128 = (sales[sales['customer_id'] == 128]['quantity_sold'].sum() / sales['quantity_sold'].sum()) * 100
percentage_for_id_128

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 17, Finished, Available)

0.2504620995282921

<mark>We want to ensure data integrity, and although it's not ideal to delete rows, we have decided to do so to mitigate the risk of associating sales with the wrong client. The sales corresponding to customer ID 128 account for 0.25% of our data, so their removal will not have a significant impact.<mark>

In [16]:
sales = sales[sales['customer_id'] != 128]

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 18, Finished, Available)

In [17]:
sales.shape

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 19, Finished, Available)

(7983, 10)

Back to customer data preparation, we are not going to remove the rows with the repeated ids, we are going to create a new index and make sure this doesn't occur again

In [18]:
customers_not_clean['new_index'] = range(1, len(customers_not_clean) + 1)
customers = customers_not_clean.copy()

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 20, Finished, Available)

In [19]:
customers.shape

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 21, Finished, Available)

(517, 8)

Same number of rows as expected 

In [20]:
customers.head(20)

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 22, Finished, Available)

Unnamed: 0,customer_id,company_name,region,country,city,street_number,customer_phone_number,new_index
0,266,Basel Bites,Europe,Switzerland,Basel,789 Marktplatz,(41) 618 889 999,1
1,289,Bernese Bites,Europe,Switzerland,Bern,123 Zytglogge,(41) 314 445 555,2
2,336,Chur Cuisine,Europe,Switzerland,Chur,456 Old Town,(41) 814 445 555,3
3,369,Dnipro Delights,Europe,Ukraine,Dnipro,789 Monastyrsky lm,(380) 568 889 999,4
4,364,Fribourg Flavors,Europe,Switzerland,Fribourg,123 Place Georges-Python,(41) 268 889 999,5
5,257,Geneva Gourmet,Europe,Switzerland,Geneva,456 Jet d'Eau,(41) 225 556 666,6
6,357,La Chaux-de-Fonds Bites,Europe,Switzerland,La Chaux-de-Fonds,789 Espacité,(41) 325 556 666,7
7,270,Lausanne Luches,Europe,Switzerland,Lausanne,321 Olympic Museum,(41) 217 778 888,8
8,302,Lucerne Delights,Europe,Switzerland,Lucerne,789 Chapel Bridge,(41) 415 556 666,9
9,335,Lugano Luches,Europe,Switzerland,Lugano,123 Parco Ciani,(41) 918 889 999,10


In [21]:
# Custom mappings for customer_id 99 and 74
custom_mappings = {99: 162, 74: 161}
sales['customer_id'] = sales['customer_id'].replace(custom_mappings)

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 23, Finished, Available)

In [22]:
count_99 = sales['customer_id'].value_counts().get(99, 0)
count_74 = sales['customer_id'].value_counts().get(74, 0)

print(f"Count of customer_id 99: {count_99}")
print(f"Count of customer_id 74: {count_74}")

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 24, Finished, Available)

Count of customer_id 99: 0
Count of customer_id 74: 0


In [23]:
# General mapping for the rest of the customer_id values in filtered_df
general_mapping = dict(zip(customers['customer_id'], customers['new_index']))

sales['new_customer_index'] = sales['customer_id'].map(general_mapping)
sales

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 25, Finished, Available)

Unnamed: 0,order_id,customer_id,product_id,quantity_sold,unit_price_eur,unit_cost_eur,order_placed_date,shipped_date,salesperson_id,supplier_id,new_customer_index
0,329218202,430,6897,686,2.70,2.03,01/01/2020,06/01/2020,24,315827,345
1,128355079,516,6709,474,1.50,0.90,01/01/2020,13/02/2020,6,764129,219
2,731184345,19,3214,696,1.50,0.90,01/01/2020,13/02/2020,17,482903,97
3,231559616,275,6391,788,1.50,0.90,01/01/2020,18/01/2020,8,315827,229
4,241060644,425,2810,702,14.30,10.01,02/01/2020,30/01/2020,11,890123,407
...,...,...,...,...,...,...,...,...,...,...,...
7995,688309770,346,7890,15788,4.34,3.04,05/11/2023,15/12/2023,13,764129,170
7996,464009720,186,4987,1600,10.80,7.56,06/11/2023,14/11/2023,18,698431,93
7997,804632669,39,5678,8664,4.67,3.27,06/11/2023,17/11/2023,25,764129,420
7998,826816469,382,6547,1924,4.65,3.26,06/11/2023,11/11/2023,8,764129,290


In [24]:
sales['new_customer_index'].isnull().sum()

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 26, Finished, Available)

0

In [25]:
#checking
customer_345 = customers.iloc[[344]]
customer_345


StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 27, Finished, Available)

Unnamed: 0,customer_id,company_name,region,country,city,street_number,customer_phone_number,new_index
345,430,Pacific Food Distributors,North America,Canada,"Victoria, British Columbia",234 Elmwood Avenue,(2) 504 567 890,345


seeing the sales df above we can see that 430 corresponds to the new index 345 (1st line)

In [26]:
customers = spark.createDataFrame(customers)

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 28, Finished, Available)

In [27]:
customers.summary

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 29, Finished, Available)

<bound method DataFrame.summary of DataFrame[customer_id: bigint, company_name: string, region: string, country: string, city: string, street_number: string, customer_phone_number: string, new_index: bigint]>

In [28]:
table_name = "customers_novawines_oltp"
customers.write.mode("overwrite").format("delta").save(f"Tables/{table_name}")
print(f"Dataframe saved to table: {table_name}")

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 30, Finished, Available)

Dataframe saved to table: customers_novawines_oltp


In [29]:
sales = spark.createDataFrame(sales)

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 31, Finished, Available)

In [30]:
table_name = "orders_novawines_oltp"
sales.write.mode("overwrite").format("delta").save(f"Tables/{table_name}")
print(f"Dataframe saved to table: {table_name}")

StatementMeta(, 4e30d751-6fbd-400e-aac3-f3ddbfe8e31c, 32, Finished, Available)

Dataframe saved to table: orders_novawines_oltp
