In [4]:
import pandas as pd
import numpy as np
import datetime
import os

In [2]:
raw_data = pd.read_csv('../data/raw/market_sales.csv',
                     header=0, parse_dates= ['period'],
                     names=['period', 'user_id', 'store_id', 
                            'item_id', 'license', 'type_by_nomenclature',
                            'rating'],
                     dtype={'user_id': 'str',
                            'store_id': np.str,
                            'item_id': np.str,
                            'license': np.int8,
                            'type_by_nomenclature': np.str,
                            'rating': np.int32})

In [3]:
raw_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2495719 entries, 0 to 2495718
Data columns (total 7 columns):
 #   Column                Dtype         
---  ------                -----         
 0   period                datetime64[ns]
 1   user_id               object        
 2   store_id              object        
 3   item_id               object        
 4   license               int8          
 5   type_by_nomenclature  object        
 6   rating                int32         
dtypes: datetime64[ns](1), int32(1), int8(1), object(4)
memory usage: 69.0+ MB


In [4]:
# deleting spaces in string columns
raw_data['user_id'] = raw_data['user_id'].str.strip()
raw_data['store_id'] = raw_data['store_id'].str.strip()
raw_data['item_id'] = raw_data['item_id'].str.strip()

In [5]:
stores = raw_data.groupby(['store_id'])['item_id'].nunique().reset_index()
stores = stores.rename(columns={'item_id':'unique_goods'})
stores['first_visit_date'] = stores.store_id.apply(lambda x: 
                                                   raw_data[raw_data.store_id == x]
                                                   .period.min())

In [9]:
stores.to_csv('../data/raw/stores.csv', index=False)

NameError: name 'stores' is not defined

## Get clients data

In [6]:
def get_client_data(purchases_clear):
    clients = purchases_clear.groupby('user_id').agg({'item_id':'count',
                                                      'period':'min', 
                                                      'license':'max'}).reset_index()
    clients = clients.rename(columns={'user_id':'user_id', 
                                      'item_id':'num_purchases', 
                                      'period':'first_visit'})    
    return clients

In [16]:
# delete strange user_id   
purchases_clear = raw_data[raw_data.user_id.str.len()==13]
# get clients data
clients = get_client_data(purchases_clear)

In [17]:
purchases_clear.to_csv('../data/raw/purchases_2k.csv', index=False)
clients.to_csv('../data/raw/clients.csv', index=False)

## Get goods info

In [9]:
goods = purchases_clear.groupby('item_id').agg({'license':'max', 
                                        'type_by_nomenclature':'max', 
                                        'rating':'mean'})

In [10]:
goods.to_csv('../data/raw/goods.csv', index=False)

## Purchases

In [2]:
purchases_clear = pd.read_csv('../data/raw/purchases_2k.csv', 
                              parse_dates=['period'])

In [3]:
purchases_clear.loc[:,'year'] = purchases_clear.period.dt.year
purchases_clear.loc[:,'month'] = purchases_clear.period.dt.month
purchases_clear.loc[:,'day'] = purchases_clear.period.dt.day

In [4]:
def set_season_by_month(month):
    if month in [1, 2, 12]:
        return 1 # winter
    if month in [3, 4, 5]:
        return 2 # spring
    if month in [6, 7, 8]:
        return 3 # summer
    if month in [9, 10, 11]:
        return 4 # autumn

In [5]:
purchases_clear['season'] = purchases_clear.month.apply(lambda month: set_season_by_month(month))

In [6]:
purchases_clear.month.value_counts().reset_index()

Unnamed: 0,index,month
0,8,275905
1,4,234282
2,3,234157
3,12,216411
4,9,214514
5,11,208051
6,7,202932
7,2,201582
8,10,197726
9,6,170913


In [8]:
purchases_clear.to_csv('../data/raw/purchases_2k.csv', index=False)

In [5]:
class Transaction:
    def __init__(self, transaction_id, transaction_datetime, **kwargs):
        self.data = {
            **{"tid": transaction_id, "datetime": transaction_datetime, "products": [],},
            **kwargs,
        }

    def add_item(
        self, product_id: str, season: int,
    ) -> None:
        p = {
            "product_id": product_id,
            "season": product_quantity,
        }
        self.data["products"].append(p)

    def as_dict(self,):
        return self.data

    def transaction_id(self,):
        return self.data["tid"]

In [3]:
class ClientHistory:
    def __init__(self, client_id):
        self.data = {
            "client_id": client_id,
            "transaction_history":[],
        }
    
    def add_transaction(self, transaction):
        self.data["transaction_history"].append(transaction)
        
    def as_dict(self,):
        return self.data
    
    def client_id(self,):
        return self.data["client_id"]
        

In [None]:
class RowSplitter:
    def __init__(self, output_path, n_shards=16):
        self.n_shards = n_shards
        os.makedirs(output_path, exist_ok=True)
        self.outs = []
        for i in range(self.n_shards):
            self.outs.append(open(output_path + '/{:02d}.json'.format(i), "w",))
            self._client = None
            self._transaction = None

    def finish(self,):
        self.flush()
        for outs in self.outs:
            outs.close()

    def flush(self,):
        if self._client is not None:
            self._client.add_transaction(self._transaction.as_dict())
            # rows are sharded by cliend_id
            shard_idx = md5_hash(self._client.client_id()) % self.n_shards
            data = self._client.as_dict()
            self.outs[shard_idx].write(json.dumps(data) + "\n")

            self._client = None
            self._transaction = None
            
    def consume_row(
        self, row,
    ):
        if self._client is not None and self._client.client_id() != row.client_id:
            self.flush()

        if self._client is None:
            self._client = ClientHistory(client_id=row.client_id)

        if self._transaction is not None and self._transaction.transaction_id() != row.transaction_id:
            self._client.add_transaction(self._transaction.as_dict())
            self._transaction = None

        if self._transaction is None:
            self._transaction = Transaction(
                transaction_id=row.transaction_id,
                transaction_datetime=row.transaction_datetime,
                season=row.season,
                store_id=row.store_id,
            )

        self._transaction.add_item(
            product_id=row.product_id,
            season=row.season,
        )