In [1]:
import pandas as pd
import os
import statsmodels.api as sm
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import ast
from sqlalchemy import create_engine, types as sat

In [2]:
path = os.getcwd()
file_list = []
for f in os.listdir(path):
        full_path = os.path.join(path, f)
        if os.path.isfile(full_path):
            file_list.append(f)


In [3]:
def upload_table(dtype_map, df, table_name, batch_size = 10_000):
    """
    Upload a pandas DataFrame to SQL Server.
    - Drops the table if it already exists.
    - Recreates it with the correct data types.
    - Loads data efficiently in chunks.
    """
    
    # Create a database connection
    engine = create_engine(
        'mssql+pyodbc://@localhost\\SQLEXPRESS/app?driver=ODBC+Driver+17+for+SQL+Server',
        fast_executemany=True
    )   
    
    # Drop the existing table if it exists
    query = f"IF OBJECT_ID('dbo.{table_name}', 'U') IS NOT NULL DROP TABLE dbo.{table_name};"
    
    try:
        with engine.begin() as conn:
            # Execute drop query
            conn.exec_driver_sql(query)
            # Upload DataFrame to SQL Server
            df.to_sql(
                name=table_name,
                con=conn,
                schema='dbo',
                if_exists='fail', # fail if table somehow already exists
                index=False, 
                dtype=dtype_map, # column type mapping
                chunksize=batch_size # batch size for faster upload
            )
        print(f"Table '{table_name}' successfully uploaded ({len(df)} rows).")
        return True  
    except Exception as e:
        print(f"Error uploading table '{table_name}': {e}")
        return False

1. Prepare transaction to rd_transaction

In [5]:
transactions = pd.read_csv('transactions.csv')
transactions.sample(5)

Unnamed: 0,created_at,customer_id,booking_id,session_id,product_metadata,payment_method,payment_status,promo_amount,promo_code,shipment_fee,shipment_date_limit,shipment_location_lat,shipment_location_long,total_amount
459002,2018-02-14T11:54:45.797470Z,1739,f5732263-d17b-4f20-9c0a-b3df08a29246,df437729-d8a7-4f1c-aa52-e15aae1d4c45,"[{'product_id': 26073, 'quantity': 1, 'item_pr...",Gopay,Success,4970,XX2022,0,2018-02-19T21:44:59.954226Z,-7.882427,112.92281,188704
125379,2019-01-16T17:47:40.136850Z,13514,1f08ae93-3fab-4eda-a9f8-b5b7ec91a046,e8587c07-bac5-44a6-a25c-b0e035e9f9c9,"[{'product_id': 42664, 'quantity': 1, 'item_pr...",Gopay,Success,0,,10000,2019-01-22T13:29:07.594780Z,-5.333423,104.036873,113296
598101,2016-07-09T20:33:57.744176Z,38239,f953611d-1183-4ba7-a632-d58cfe565e13,9cd613f4-7cc0-43d1-a8b4-d8fd37e442b1,"[{'product_id': 22593, 'quantity': 3, 'item_pr...",Credit Card,Success,3416,WEEKENDMANTAP,10000,2016-07-15T09:45:14.839672Z,1.354218,116.245514,985625
197794,2021-05-20T22:01:09.860198Z,56448,990de267-d3ca-4cc5-baa6-7473f5e34243,a72aebde-1068-410c-ad5a-6a38787daede,"[{'product_id': 55860, 'quantity': 1, 'item_pr...",OVO,Success,0,,10000,2021-05-25T22:30:51.344087Z,-6.141026,106.781025,195514
667553,2022-03-01T01:55:17.369746Z,88452,f0c5eb84-10d8-4d7a-9446-cefa4d443438,28f7adea-30db-4653-a84e-a45556f3d7da,"[{'product_id': 17149, 'quantity': 1, 'item_pr...",Gopay,Success,3506,AZ2022,10000,2022-03-06T11:25:38.342973Z,-7.737344,110.461554,1236382


In [6]:
transactions_1 = transactions.copy()

In [7]:
transactions_1['created_at'] = pd.to_datetime(transactions_1['created_at'])
transactions_1['created_at'] = transactions_1['created_at'].dt.floor('s')
transactions_1['shipment_date_limit'] = pd.to_datetime(transactions_1['shipment_date_limit'])
transactions_1['shipment_date_limit'] = transactions_1['shipment_date_limit'].dt.floor('s')

In [8]:
transactions_1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 852584 entries, 0 to 852583
Data columns (total 14 columns):
 #   Column                  Non-Null Count   Dtype              
---  ------                  --------------   -----              
 0   created_at              852584 non-null  datetime64[ns, UTC]
 1   customer_id             852584 non-null  int64              
 2   booking_id              852584 non-null  object             
 3   session_id              852584 non-null  object             
 4   product_metadata        852584 non-null  object             
 5   payment_method          852584 non-null  object             
 6   payment_status          852584 non-null  object             
 7   promo_amount            852584 non-null  int64              
 8   promo_code              326536 non-null  object             
 9   shipment_fee            852584 non-null  int64              
 10  shipment_date_limit     852584 non-null  datetime64[ns, UTC]
 11  shipment_location_lat   85

In [9]:
transactions_1['has_free_shipping'] = (transactions_1['shipment_fee'] == 0).astype(int)
transactions_1['has_promo'] = (transactions_1['promo_amount'] > 0).astype(int)

In [10]:
transactions_dtype_map = {
    'booking_id':            sat.NVARCHAR(None),  # None == NVARCHAR(MAX)
    'session_id':            sat.NVARCHAR(None),
    'customer_id':           sat.BigInteger(),
    'created_at':            sat.DateTime(),
    'shipment_date_limit':   sat.DateTime(),
    'days_to_shipment':      sat.Integer(),
    'promo_flag':            sat.Integer(),
    'promo_amount':          sat.Numeric(18, 2),
    'promo_code':            sat.NVARCHAR(None),
    'payment_method':        sat.NVARCHAR(None),
    'payment_status':        sat.NVARCHAR(None),
    'shipment_fee':          sat.Numeric(18, 2),
    'total_amount':          sat.Numeric(18, 2),
}

upload_table(transactions_dtype_map, transactions_1, table_name = 'rd_transactions')



Table 'rd_transactions' successfully uploaded (852584 rows).


True

2. Organize the purchase composition into a separate table - rd_transactions_prods


In [12]:
def parse_and_explode_products(df):
    """
    Explode product_metadata into separate rows with error handling.
    """
    # Parse product_metadata safely
    def safe_parse(x):
        if isinstance(x, str):
            try:
                return ast.literal_eval(x)
            except (ValueError, SyntaxError):
                return []
        return x if isinstance(x, list) else []
    
    df = df.copy()
    df['products'] = df['product_metadata'].apply(safe_parse)
    
    # Explode products
    exploded = df[['booking_id', 'products']].explode('products')
    
    # Filter out empty/None values
    exploded = exploded[exploded['products'].notna()].reset_index(drop=True)
    
    # Extract fields
    result = pd.DataFrame({
        'booking_id': exploded['booking_id'],
        'product_id': exploded['products'].apply(lambda x: x.get('product_id')),
        'quantity': exploded['products'].apply(lambda x: x.get('quantity')),
        'item_price': exploded['products'].apply(lambda x: x.get('item_price'))
    })
    
    # Calculate amount
    result['product_amount'] = result['quantity'] * result['item_price']
    
    return result

transactions_prods_1 = parse_and_explode_products(transactions_1)


In [13]:
transactions_prods_dtype_map = {
    'booking_id':     sat.NVARCHAR(None),  # NVARCHAR(MAX) — чтобы влезли любые UUID/строки
    'product_id':     sat.BigInteger(),
    'quantity':       sat.BigInteger(),
    'item_price':     sat.BigInteger(),
    'product_amount': sat.BigInteger(),
}
upload_table(transactions_prods_dtype_map, transactions_prods_1, table_name = 'rd_transactions_prods', batch_size = 20_000)


Table 'rd_transactions_prods' successfully uploaded (1254585 rows).


True

3. Prepare click stream

In [15]:
click_stream = pd.read_csv("click_stream.csv")
click_stream.sample(5)


Unnamed: 0,session_id,event_name,event_time,event_id,traffic_source,event_metadata
9256921,b586bb7c-7fe1-4219-ba8a-22100ed12d02,SCROLL,2021-08-09T22:33:12.891590Z,25cdf0e0-7350-47e2-a1be-10af48d0f27f,MOBILE,
2818314,a21ee564-6d50-4b43-a205-b85fd76fcc89,CLICK,2018-06-11T03:17:53.447372Z,c4b200ac-9c5d-4a15-8999-5ce6e865e182,MOBILE,
2416422,2c52329b-6644-4147-ab76-0ee4602aab63,BOOKING,2021-12-11T12:58:23.400385Z,c1ecb9a5-60e7-485d-9d07-d3f4080fe45d,MOBILE,{'payment_status': 'Success'}
2611089,4f9ab76e-b67f-444f-94f8-081bb326ce71,SCROLL,2018-08-31T01:19:21.892401Z,977f5b70-9412-44fe-8c60-9fcabb753031,MOBILE,
7073178,bb8fea8b-be58-4e48-844f-cf4ec10f865c,ITEM_DETAIL,2016-10-01T20:44:29.096315Z,899ecfcd-0676-4643-8693-64807aecdd53,MOBILE,


In [16]:
click_stream_1 = click_stream.copy()
click_stream_1['event_time'] = pd.to_datetime(click_stream_1['event_time'])
click_stream_1['event_time'] = click_stream_1['event_time'].dt.floor("s")

click_stream_1.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 12833602 entries, 0 to 12833601
Data columns (total 6 columns):
 #   Column          Dtype              
---  ------          -----              
 0   session_id      object             
 1   event_name      object             
 2   event_time      datetime64[ns, UTC]
 3   event_id        object             
 4   traffic_source  object             
 5   event_metadata  object             
dtypes: datetime64[ns, UTC](1), object(5)
memory usage: 587.5+ MB


4. Split event attributes into separate tables and prepare rd_events_add_to_cart
   

In [18]:
def make_event_table(df, event_name):

    # 1. Filter by event
    df_event = df[df['event_name'] == event_name].copy()
    
    # 2. Convert string to a dict (via ast.literal_eval)
    df_event['event_metadata'] = df_event['event_metadata'].apply(ast.literal_eval)
    
    # 3.  Expand the dictionary into columns
    metadata_df = pd.DataFrame(df_event['event_metadata'].tolist(), index=df_event.index)
    
    # 4. Merge with the remaining columns except event_metadata
    df_event = df_event.drop(columns=['event_metadata', 'event_name' ]).join(metadata_df)
    
    return df_event


In [19]:
events_add_to_cart = make_event_table(click_stream_1, 'ADD_TO_CART')
events_add_to_cart['prod_amount'] = events_add_to_cart['quantity']*events_add_to_cart['item_price']


In [20]:
events_add_to_cart['event_time'] = events_add_to_cart['event_time'].dt.tz_convert(None) # deleting UTC
events_add_to_cart['prod_id'] = events_add_to_cart['product_id'].astype('int64')
events_add_to_cart['quantity'] = events_add_to_cart['quantity'].astype('int64')
events_add_to_cart['item_price'] = events_add_to_cart['item_price'].astype('float64').round(2)


In [21]:
events_add_to_cart_1 = events_add_to_cart[[
    'event_id',
    'event_time',
    'session_id',
    'traffic_source',
    'product_id',
    'quantity',
    'item_price',
    'prod_amount'
]].copy()

events_add_to_cart_1.rename(columns={'product_id': 'prod_id'}, inplace=True)
events_add_to_cart_1.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1937157 entries, 3 to 12833599
Data columns (total 8 columns):
 #   Column          Dtype         
---  ------          -----         
 0   event_id        object        
 1   event_time      datetime64[ns]
 2   session_id      object        
 3   traffic_source  object        
 4   prod_id         int64         
 5   quantity        int64         
 6   item_price      float64       
 7   prod_amount     int64         
dtypes: datetime64[ns](1), float64(1), int64(3), object(3)
memory usage: 133.0+ MB


In [22]:
add_to_cart_dtype_map = {   
    'event_id':       sat.NVARCHAR(None),
    'event_time':     sat.DateTime(),
    'session_id':     sat.NVARCHAR(None),
    'traffic_source': sat.NVARCHAR(31),
    'prod_id':        sat.BigInteger(),
    'quantity':       sat.Integer(),
    'item_price':     sat.Numeric(10, 2),
    'prod_amount':    sat.Numeric(10, 2)
}

upload_table(add_to_cart_dtype_map, events_add_to_cart_1, table_name = 'rd_events_add_to_cart', batch_size = 20_000)


Table 'rd_events_add_to_cart' successfully uploaded (1937157 rows).


True

5.Preparation products

In [24]:
with open('product.csv', encoding='utf-8', errors='ignore') as f:
    lines = ['\t'.join(line.split(',')[:10]).replace('\n',' ').replace('\r',' ') for line in f]

products = pd.read_csv(pd.io.common.StringIO('\n'.join(lines)), sep='\t', quotechar='"')

In [25]:
products_1 = products.copy()
products_1['year'] = products_1['year'].astype('Int64')
products_1 = products_1.rename(columns={'id': 'product_id'})
products_1.head()

Unnamed: 0,product_id,gender,masterCategory,subCategory,articleType,baseColour,season,year,usage,productDisplayName
0,15970,Men,Apparel,Topwear,Shirts,Navy Blue,Fall,2011,Casual,Turtle Check Men Navy Blue Shirt
1,39386,Men,Apparel,Bottomwear,Jeans,Blue,Summer,2012,Casual,Peter England Men Party Blue Jeans
2,59263,Women,Accessories,Watches,Watches,Silver,Winter,2016,Casual,Titan Women Silver Watch
3,21379,Men,Apparel,Bottomwear,Track Pants,Black,Fall,2011,Casual,Manchester United Men Solid Black Track Pants
4,53759,Men,Apparel,Topwear,Tshirts,Grey,Summer,2012,Casual,Puma Men Grey T-shirt


In [26]:
products_1['season'] = products_1['season'].fillna('Not set')
products_1['season'].value_counts()

season
Summer     21476
Fall       11445
Winter      8519
Spring      2985
Not set       21
Name: count, dtype: int64

In [27]:
products_1['usage'] = products_1['usage'].fillna('Not set')
products_1['usage'].value_counts()

usage
Casual          34414
Sports           4025
Ethnic           3208
Formal           2359
Not set           317
Smart Casual       67
Party              29
Travel             26
Home                1
Name: count, dtype: int64

In [28]:
products_1['prod_id'] = products_1['product_id']


In [29]:
product_categories_renamed = pd.read_excel('product_categories_renaming.xlsx')

In [30]:
product_categories_renamed.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 170 entries, 0 to 169
Data columns (total 4 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   original_name_concat  170 non-null    object
 1   masterCategory_new    170 non-null    object
 2   subCategory_new       170 non-null    object
 3   articleType_new       170 non-null    object
dtypes: object(4)
memory usage: 5.4+ KB


In [31]:
products_1['categories_concat'] = (
    products_1['masterCategory'].astype(str) + "-" +
    products_1['subCategory'].astype(str) + "-" +
    products_1['articleType'].astype(str)
)


In [32]:
products_1 = products_1.merge(
    product_categories_renamed[
        ['original_name_concat', 'masterCategory_new', 'subCategory_new', 'articleType_new', ]
    ],
    left_on='categories_concat',      
    right_on='original_name_concat',
    how='left'                       
)

In [33]:
products_1.columns = products_1.columns.str.strip()
rd_products = products_1[[
    'prod_id',
    'productDisplayName',
    'masterCategory_new',
    'subCategory_new',
    'articleType_new',
    'gender',
    'baseColour',
    'season',
    'year',
    'usage'
]].copy()
rd_products.columns = [
    'prod_id',
    'prod_name',
    'category_level_1',
    'category_level_2',
    'category_level_3',
    'gender',
    'baseColour',
    'season',
    'year',
    'usage'
]

In [34]:
prods_dtype_map = {
    'prod_id':          sat.BigInteger(),
    'prod_name':        sat.NVARCHAR(None),  # NVARCHAR(MAX) — чтобы влезли любые строки   
    'category_level_1': sat.NVARCHAR(255),
    'category_level_2': sat.NVARCHAR(255),
    'category_level_3': sat.NVARCHAR(255),
    'gender':           sat.NVARCHAR(255),
    'baseColour':       sat.NVARCHAR(255),
    'season':           sat.NVARCHAR(255),
    'year':             sat.BigInteger(),
    'usage':            sat.NVARCHAR(255)
}
upload_table(prods_dtype_map, rd_products, table_name = 'rd_prods')


Table 'rd_prods' successfully uploaded (44446 rows).


True

6. Prepare rd_customers

In [36]:
customers = pd.read_csv('customer.csv')

In [37]:
customers_1 = customers[[
'customer_id',
'gender',
'birthdate',
'device_type',
'device_version',  
'home_location'
]].copy()
customers_1['birthdate'] = pd.to_datetime(customers_1['birthdate'])


In [38]:
customers_1.info()  

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 6 columns):
 #   Column          Non-Null Count   Dtype         
---  ------          --------------   -----         
 0   customer_id     100000 non-null  int64         
 1   gender          100000 non-null  object        
 2   birthdate       100000 non-null  datetime64[ns]
 3   device_type     100000 non-null  object        
 4   device_version  100000 non-null  object        
 5   home_location   100000 non-null  object        
dtypes: datetime64[ns](1), int64(1), object(4)
memory usage: 4.6+ MB


In [39]:
customers_dtype_map = {
    'customer_id':     sat.BigInteger(),
    'gender':          sat.NVARCHAR(255),
    'birthdate':       sat.DateTime(),
    'device_type':     sat.NVARCHAR(255),
    'device_version':  sat.NVARCHAR(255),
    'home_location':   sat.NVARCHAR(255)
}

upload_table(customers_dtype_map, customers_1, table_name = 'rd_customers')


Table 'rd_customers' successfully uploaded (100000 rows).


True

7. Prepare sessions

In [41]:
# 1. Define event types 
event_types = [
    'ADD_PROMO', 'ADD_TO_CART', 'BOOKING', 'CLICK',
    'HOMEPAGE', 'ITEM_DETAIL', 'PROMO_PAGE', 'SCROLL', 'SEARCH'
]

In [42]:
# 2. Create *_time columns dynamically 
step1 = click_stream_1.copy()
for e in event_types:
    step1[f'{e}_time'] = step1.loc[step1['event_name'] == e, 'event_time']

In [43]:
# 3. Aggregate by session and traffic_source
agg_dict = {}
for e in event_types:
    if e in ['ADD_PROMO', 'BOOKING']:
        agg_dict[f'{e}_time'] = (f'{e}_time', 'min')
    else:
        agg_dict[f'{e}_cnt'] = (f'{e}_time', 'count')
        agg_dict[f'{e}_first_time'] = (f'{e}_time', 'min')
        agg_dict[f'{e}_last_time'] = (f'{e}_time', 'max')

sessions = (
    step1.groupby(['session_id', 'traffic_source'], as_index=False)
         .agg(**agg_dict)
)

In [44]:
# 4. Remove timezone info
for c in sessions.filter(like='_time').columns:
    sessions[c] = sessions[c].dt.tz_convert(None)


In [45]:
# 5. Build base session stats 
base = (
    click_stream_1.groupby('session_id', as_index=True)
                  .agg(
                      session_events_cnt=('event_id', 'count'),
                      session_start_time=('event_time', 'min'),
                      session_end_time=('event_time', 'max'),
                  )
)

In [46]:
# 6. Define step categories 
step2_events = ['CLICK', 'SCROLL', 'SEARCH', 'ITEM_DETAIL', 'PROMO_PAGE']
step3_events = ['ADD_TO_CART', 'ADD_PROMO']

In [47]:
# 7. Compute step times 
step2 = (
    click_stream_1[click_stream_1['event_name'].isin(step2_events)]
    .groupby('session_id')['event_time']
    .min()
    .rename('step2_time')
)

step3 = (
    click_stream_1[click_stream_1['event_name'].isin(step3_events)]
    .groupby('session_id')['event_time']
    .min()
    .rename('step3_time')
)




In [48]:
# 8. Merge session-level data 
sessions_agg = base.join([step2, step3]).reset_index()

for c in sessions_agg.filter(like='_time').columns:
    sessions_agg[c] = sessions_agg[c].dt.tz_convert(None)

sessions = (
    sessions.merge(sessions_agg, on='session_id', how='left')
            .drop(columns=['index'], errors='ignore')
)


In [49]:
sessions_dtype_map = {
    'session_id':              sat.NVARCHAR(None),
    'traffic_source':          sat.NVARCHAR(31),
    'ADD_PROMO_time':          sat.DateTime(),
    'ADD_TO_CART_cnt':         sat.Integer,     
    'ADD_TO_CART_first_time':  sat.DateTime(),
    'ADD_TO_CART_last_time':   sat.DateTime(),
    'BOOKING_time':            sat.DateTime(),
    'CLICK_cnt':               sat.Integer,
    'CLICK_first_time':        sat.DateTime(),
    'CLICK_last_time':         sat.DateTime(),
    'HOMEPAGE_cnt':            sat.Integer,
    'HOMEPAGE_first_time':     sat.DateTime(),
    'HOMEPAGE_last_time':      sat.DateTime(),
    'ITEM_DETAIL_cnt':         sat.Integer,   
    'ITEM_DETAIL_first_time':  sat.DateTime(),
    'ITEM_DETAIL_last_time':   sat.DateTime(),
    'PROMO_PAGE_cnt':          sat.Integer,   
    'PROMO_PAGE_first_time':   sat.DateTime(),
    'PROMO_PAGE_last_time':    sat.DateTime(),
    'SCROLL_cnt':              sat.Integer,
    'SCROLL_first_time':       sat.DateTime(),
    'SCROLL_last_time':        sat.DateTime(),
    'SEARCH_cnt':              sat.Integer,
    'SEARCH_first_time':       sat.DateTime(),
    'SEARCH_last_time':        sat.DateTime(),
    'session_events_cnt':      sat.Integer,         
    'session_start_time':      sat.DateTime(),
    'session_end_time':        sat.DateTime(),
    'step2_time':              sat.DateTime(),
    'step3_time':              sat.DateTime()
}

upload_table(sessions_dtype_map, sessions, table_name = 'rd_sessions', batch_size = 20_000)


Table 'rd_sessions' successfully uploaded (895203 rows).


True