In [1]:
from pathlib import Path
import numpy as np
import pandas as pd
import duckdb
import polars as pl

In [2]:
from sklearn.feature_extraction.text import TfidfVectorizer

In [3]:
source_fol = Path('parsed_files')
reviews_fol = Path(source_fol, 'reviews')
meta_fol = Path(source_fol, 'meta')

In [4]:
all_files = set(Path.rglob(source_fol, '*.parquet'))
meta_files = set(Path.rglob(source_fol, 'meta*.parquet'))
review_files = all_files.difference(meta_files)

In [10]:
db_file = 'e_com.duckdb'
duck = duckdb.connect(db_file, read_only=True)

## Создаём таблицу продуктов

In [6]:
meta_files

{WindowsPath('parsed_files/meta/meta_Amazon_Fashion.parquet'),
 WindowsPath('parsed_files/meta/meta_art.parquet'),
 WindowsPath('parsed_files/meta/meta_Baby_Products.parquet'),
 WindowsPath('parsed_files/meta/meta_Grocery_and_Gourmet_Food.parquet'),
 WindowsPath('parsed_files/meta/meta_Handmade_Products.parquet'),
 WindowsPath('parsed_files/meta/meta_Industrial_and_Scientific.parquet'),
 WindowsPath('parsed_files/meta/meta_Magazine_Subscriptions.parquet'),
 WindowsPath('parsed_files/meta/meta_Musical_Instruments.parquet'),
 WindowsPath('parsed_files/meta/meta_Software.parquet')}

In [7]:
mfiles = list(meta_files)[1:]+[list(meta_files)[0]]
mfiles

[WindowsPath('parsed_files/meta/meta_Industrial_and_Scientific.parquet'),
 WindowsPath('parsed_files/meta/meta_art.parquet'),
 WindowsPath('parsed_files/meta/meta_Amazon_Fashion.parquet'),
 WindowsPath('parsed_files/meta/meta_Grocery_and_Gourmet_Food.parquet'),
 WindowsPath('parsed_files/meta/meta_Software.parquet'),
 WindowsPath('parsed_files/meta/meta_Magazine_Subscriptions.parquet'),
 WindowsPath('parsed_files/meta/meta_Handmade_Products.parquet'),
 WindowsPath('parsed_files/meta/meta_Musical_Instruments.parquet'),
 WindowsPath('parsed_files/meta/meta_Baby_Products.parquet')]

### Создаём таблицу stores

In [8]:
duck.query("""
    create table stores(
        store_id bigint
        , store varchar
        , primary key (store_id)
    )
""")

In [9]:
duck.query(f"""
    create sequence if not exists seq_1;
    insert into stores
    with
    store_list as (
        select distinct store
        from parquet_scan({[str(x) for x in mfiles]}, hive_partitioning=True) 
    )
    select
        nextval('seq_1') as store_id
        , store
    from store_list
""")

In [10]:
duck.query("select count(store), count(distinct store) from stores")

┌──────────────┬───────────────────────┐
│ count(store) │ count(DISTINCT store) │
│    int64     │         int64         │
├──────────────┼───────────────────────┤
│       379473 │                379473 │
└──────────────┴───────────────────────┘

### Создаём таблицу categories

In [11]:
duck.query("""
    create table categories(
        category_id bigint
        , category varchar
        , primary key (category_id)
    )
""")

In [12]:
duck.query(f"""
    create sequence if not exists seq_1;
    insert into categories
    with
    category_list as (
        select distinct main_category as category
        from parquet_scan({[str(x) for x in mfiles]}, hive_partitioning=True) 
    )
    select
        nextval('seq_1') as category_id
        , category
    from category_list
""")

In [13]:
duck.query("select count(category), count(distinct category) from categories")

┌─────────────────┬──────────────────────────┐
│ count(category) │ count(DISTINCT category) │
│      int64      │          int64           │
├─────────────────┼──────────────────────────┤
│              44 │                       44 │
└─────────────────┴──────────────────────────┘

### Создаём таблицу items

In [14]:
duck.query("""
    create table items(
        item_id bigint
        , parent_asin varchar
        , title varchar
        , image varchar
        , primary key (item_id)
    )
""")

In [15]:
duck.query(f"""
    create sequence if not exists seq_1;
    insert into items
    with
    meta as (
        select
           parent_asin
           , title
           , images
        from parquet_scan({[str(x) for x in mfiles]}, hive_partitioning=True)
    )
    select
        nextval('seq_1') as item_id
        , parent_asin
        , m.title
        , m.images as image
    from meta m
""")

In [16]:
duck.query("""
    select * from items
""")

┌─────────┬─────────────┬───────────────────────────────────┬──────────────────────────────────────────────────────────┐
│ item_id │ parent_asin │               title               │                          image                           │
│  int64  │   varchar   │              varchar              │                         varchar                          │
├─────────┼─────────────┼───────────────────────────────────┼──────────────────────────────────────────────────────────┤
│  382911 │ B01C4319LO  │ Chicco Viaro Travel System, Teak  │ https://m.media-amazon.com/images/I/51fWzrTOFjL.jpg      │
│  382912 │ B07FM4MJJP  │ Kisbaby Four Layers Muslin Ligh…  │ https://m.media-amazon.com/images/I/31urSmul1mL._AC_.jpg │
│  382913 │ B08WCG372G  │ EZTOTZ Meals with Milton - USA …  │ https://m.media-amazon.com/images/I/41BitYt282L.jpg      │
│  382914 │ B0083SXABC  │ Nuby iMonster Toddler Bowl        │ https://m.media-amazon.com/images/I/31yFxn-V-HL.jpg      │
│  382915 │ B07N8GRHHK  │ mDesig

In [17]:
duck.query("select count(parent_asin), count(distinct parent_asin) from items")

┌────────────────────┬─────────────────────────────┐
│ count(parent_asin) │ count(DISTINCT parent_asin) │
│       int64        │            int64            │
├────────────────────┼─────────────────────────────┤
│            3347168 │                     3347168 │
└────────────────────┴─────────────────────────────┘

### Создаём таблицу items_info

In [18]:
duck.query("""
    create table if not exists items_info(
        item_id bigint
        , category_id bigint
        , store_id bigint
        , price double
        , has_image bigint
        , has_video bigint
        , foreign key (item_id) references items(item_id)
--        , foreign key (category_id) references categories(category_id)
--        , foreign key (store_id) references stores(store_id)
    )
""")

In [21]:
info = duck.query(f"""
    with
    meta as (
        select
           parent_asin
           , main_category as category
           , store
           , price
           , has_image
           , videos as has_video
        from parquet_scan({[str(x) for x in mfiles]}, hive_partitioning=True)
    )
    select
        i.item_id
        , c.category_id
        , s.store_id
        , m.price
        , m.has_image
        , m.has_video
    from meta m
        left join items i on i.parent_asin = m.parent_asin
        left join categories c on c.category = m.category
        left join stores s on s.store = m.store 
""").pl()

In [22]:
info.estimated_size("mb")

154.41823196411133

In [23]:
duck.query("""
    insert into items_info
    select *
    from info
""")

### Создаём матрицу наиболее часто встречающихся слов в текстовых метаданных

In [24]:
vectorizer = TfidfVectorizer(min_df=.1, max_df=.8)

In [28]:
meta = duck.query(f"""
    select 
        i.item_id
        , concat_ws(';', t.categories, t.features, t.description, t.details) as txt 
    from parquet_scan({[str(x) for x in mfiles]}, hive_partitioning=True) t
        left join items i on t.parent_asin=i.parent_asin
""").pl()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [29]:
meta.estimated_size('mb')

5455.077537536621

In [30]:
vectorized_data = vectorizer.fit_transform(
    meta.to_pandas()['txt'].fillna('')
)

In [42]:
tfidf_df=pd.DataFrame(
    vectorized_data.toarray(),
    columns = vectorizer.get_feature_names_out(),
    index = meta.select('item_id').to_series().to_numpy()
).reset_index(names='item_id')

In [44]:
del vectorized_data
del meta

In [45]:
duck.query("""
    create table tfidf as
    select *
    from tfidf_df
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

## Создаём таблицу users

In [10]:
duck.query("""
    create table if not exists users(
        user_id bigint
        , user varchar
        , primary key (user_id)
    )
""")

In [11]:
duck.query(f"""
    create sequence if not exists seq_1;
    insert into users
    with
    u as (
        select distinct
           user_id as user
        from parquet_scan({[str(x) for x in review_files]}, hive_partitioning=True)
    )
    select
        nextval('seq_1') as user_id
        , user
    from u
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [12]:
duck.query("select count(user), count(distinct user) from users")

┌───────────────┬────────────────────────┐
│ count("user") │ count(DISTINCT "user") │
│     int64     │         int64          │
├───────────────┼────────────────────────┤
│      17776687 │               17776687 │
└───────────────┴────────────────────────┘

### Создание таблицы reviews

In [13]:
duck.query("""
    create table if not exists reviews (
        user_id bigint
        , item_id bigint
        , review_timestamp timestamp
        , rating double
        , helpful_vote bigint
        , verified_purchase boolean
        , foreign key (user_id) references users(user_id)
        --, foreign key (item_id) references items(item_id)
    )
""")

In [14]:
reviews_df = duck.query(f"""
    with
    r as (
        select
            user_id as user
            , parent_asin
            , epoch_ms(timestamp) as review_timestamp
            , rating
            , helpful_vote
            , verified_purchase
        from parquet_scan({[str(x) for x in review_files]}, hive_partitioning=True)
    )
    select
        u.user_id
        , i.item_id
        , review_timestamp
        , rating
        , helpful_vote
        , verified_purchase
    from r
        left join users u on r.user=u.user
        left join items i on r.parent_asin=i.parent_asin
""").pl()

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [15]:
reviews_df.estimated_size("mb")

1746.1388816833496

In [16]:
duck.query(f"""
    insert into reviews
    select
        user_id
        , item_id
        , review_timestamp
        , rating
        , helpful_vote
        , verified_purchase
    from reviews_df
""")

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

In [17]:
duck.query("select * from reviews")

┌──────────┬─────────┬─────────────────────────┬────────┬──────────────┬───────────────────┐
│ user_id  │ item_id │    review_timestamp     │ rating │ helpful_vote │ verified_purchase │
│  int64   │  int64  │        timestamp        │ double │    int64     │      boolean      │
├──────────┼─────────┼─────────────────────────┼────────┼──────────────┼───────────────────┤
│ 17305926 │  548571 │ 2022-11-20 11:40:49.07  │    5.0 │            3 │ true              │
│ 17305926 │  508762 │ 2022-11-20 04:48:34.27  │    5.0 │            5 │ true              │
│  5226228 │  508762 │ 2022-04-09 05:11:37.942 │    5.0 │            0 │ true              │
│  5226228 │ 1017564 │ 2022-02-12 22:14:30.853 │    5.0 │            0 │ true              │
│  5226228 │  483284 │ 2021-08-18 03:59:52.372 │    5.0 │            0 │ true              │
│  5569176 │  770327 │ 2014-02-02 19:44:49     │    5.0 │            0 │ true              │
│ 17305927 │ 1017564 │ 2017-08-04 04:08:36.714 │    4.0 │            0

In [11]:
duck.query("show tables")

┌────────────┐
│    name    │
│  varchar   │
├────────────┤
│ categories │
│ items      │
│ items_info │
│ reviews    │
│ stores     │
│ tfidf      │
│ users      │
└────────────┘

In [12]:
duck.query("select * from users")

┌─────────┬──────────────────────────────┐
│ user_id │             user             │
│  int64  │           varchar            │
├─────────┼──────────────────────────────┤
│ 3785830 │ AFFBFLESGGNDQ2XSKASEQIID3DQA │
│ 3785831 │ AHSI6LPMRNXSZZCZORRVZR4KLKKQ │
│ 3785832 │ AHLZERHOULVEYT2F3VVNRDK2WYRQ │
│ 3785833 │ AHBB2RTYSZDO7AQSQ4C36OOVJVQA │
│ 3785834 │ AHTVBDEH37JBGADGJFZRADISJP5Q │
│ 3785835 │ AHQ6YSXS6HHJDNQH3A6MHDEIRJXA │
│ 3785836 │ AHZ3FWQZDRQ7KS2JPRVF2EMAA5PQ │
│ 3786811 │ AH6FDHYQISXJUDXFG6EYH4WZN3NA │
│ 3786812 │ AECBR5WWHXWNZHVSBNPI5Q4PN4ZA │
│ 3786813 │ AEAVEECIN4JDRYDNAVQ3BTWHR3PQ │
│    ·    │              ·               │
│    ·    │              ·               │
│    ·    │              ·               │
│ 3868514 │ AGGPRIHG7CKVRDD6ORA7EHSCNNPQ │
│ 3868515 │ AFITCZDO2QHEOMFTMRCES5WKGDAQ │
│ 3868516 │ AEOP5BHE2MBCRPJP2KYUKKTKWUAA │
│ 3868517 │ AFBSNWY7KQ4QPOJMNTPJOTOM3FFQ │
│ 3868518 │ AG5TL35QASNRKGRT5OT4AJKTNIVQ │
│ 3868519 │ AEXVKFLVUBYPJLWLKUF67662OF6A │
│ 3868520 │

In [9]:
duck.close()