In [None]:
# Settings
project_id = '<YOUR_PROJECT_ID>'
project_number = '<YOUR_PROJECT_NUMBER>'
branch_id = 1
placement_id = 'default_catalog'

#Catalog branch represents the version of your catalog
branch = f'projects/{project_id}/locations/global/catalogs/default_catalog/branches/{branch_id}'

# Catalog placement represents the search configs you're using on this catalog
placement = f'projects/{project_id}/locations/global/catalogs/{placement_id}/placements/default_search'

In [None]:
import json
import gzip

from google.cloud import retail_v2
import google.auth

credentials,project = google.auth.default(quota_project_id=project_id)

In [None]:
def mapping_age_groups(age_groups):
    return (age_groups.split('-')[0] if '-' in age_groups else age_groups).lower()

def mapping_product(doc):
    product=retail_v2.Product()
    product.id = str(doc['id'])
    # product.gtin
    product.categories = [' > '.join([doc['masterCategory']['typeName'], doc['subCategory']['typeName'], doc['articleType']['typeName']])]
    product.title = doc['productDisplayName']
    product.brands = [doc['brandName']]
    product.description = doc.get('productDescriptors', {}).get('description', {}).get('value')
    # product.language_code
    # product.attributes
    # product.tags
    product.price_info.price = doc['discountedPrice']
    product.price_info.original_price = doc['price']
    # product.rating
    # product.expire_time
    # product.ttl
    # product.available_time
    # product.availability
    # product.available_quantity
    # product.fulfillment_info
    product.uri = 'http://localhost/' + doc['landingPageUrl']
    product.audience.genders = [doc['gender'].lower()]
    product.audience.age_groups = [mapping_age_groups(doc['ageGroup'])]
    # product.colorInfo
    # product.sizes
    material = doc.get('productDescriptors', {}).get('materials_care_desc', {}).get('value')
    if material:
        product.materials = [material[0:199]]
    # product.patterns
    # product.conditions
    # product.retrievable_fields
    # product.publish_time
    # product.promotions
    for k, v in  doc['styleImages'].items():
        if type(v) == dict and k != 'search':
            image = retail_v2.Image()
            image.uri = v['imageURL']
            product.images.append(image)

    return product

In [None]:
def import_file(filename):
    # Create a client
    client = retail_v2.ProductServiceClient(credentials=credentials)

    try:
        with gzip.open(filename, 'rt') as f:
            for i, row in enumerate(f):
                product = mapping_product(json.loads(row)['data'])
                # print(product)
                request = retail_v2.CreateProductRequest(
                        parent=branch,
                        product=product,
                        product_id=product.id,
                    )
                # break
                # send create product request
                client.create_product(request=request)
    except Exception as e:
        print(e)
        print(f'error : {filename}#L{i}')
        return e

In [None]:
files = [f'dataset-fashion-product/styles-{part:02d}.jsonl.gz' for part in range(1, 61)]

from multiprocessing import Pool

with Pool(5) as p:
    print(p.map(import_file, files))