#### CartItem ETL

In [None]:
# ETL BrandDim
from jobs.extract.extracts import extract_mongo_brands_to_df
from jobs.load.loads import load_brand_df_to_mssql

brand_df = extract_mongo_brands_to_df()
load_brand_df_to_mssql(brand_df)



In [None]:
# ETL CatalogDim
import os
import pandas as pd
from jobs.extract.extracts import extract_mongo_catalogs_to_df
from jobs.load.loads import load_catalog_df_to_mssql

category_df = extract_mongo_catalogs_to_df()
#TODO:
# 1. TRANSFORM DELETE NULL CATALOG NAME AND NOT SAVE TO MSSQL
load_catalog_df_to_mssql(category_df)


In [None]:
# ETL SubCatalogDim
from jobs.extract.extracts import extract_mongo_subcatalogs_to_df
from jobs.load.loads import load_subcatalog_df_to_mssql

subcatalog_df = extract_mongo_subcatalogs_to_df()
print(subcatalog_df)
load_subcatalog_df_to_mssql(subcatalog_df)


In [None]:
from jobs.extract.extracts import extract_mongo_usage_instruction_to_df

usage_instruction_df = extract_mongo_usage_instruction_to_df()
usage_instruction_df

In [None]:
# ETL PreserveInstructionTypeDim using SpaCy and Vector Similarity
import pandas as pd

usage_instruction_df = pd.read_csv('resources/product_usage_instruction.csv')
usage_instruction_df.dropna(inplace=True, subset=['ProductUsageInstruction'])

# another approach is to use topic modeling to find the most important words in the usage instruction, 
# the most important words will be the labels for the usage instruction type
from jobs.transform.usage_instruction_transform import *
sublists_list = transform_preprocess_usage_instruction_df_to_sublists_tokens(usage_instruction_df)
lemm_stemm_sublists_list = transform_lemm_stem_sublists(sublists_list)
reduced_usage_instruction_type_df = transform_sublists_to_reduced_usage_instruction_df(lemm_stemm_sublists_list)
# print(reduced_usage_instruction_type_df)
from jobs.load.loads import load_usage_instruction_df_to_mssql
load_usage_instruction_df_to_mssql(reduced_usage_instruction_type_df)

In [None]:
# ETL UsageInstructionTypeDim using LDA topic modeling
import pandas as pd

usage_instruction_df = pd.read_csv('resources/product_usage_instruction.csv')
usage_instruction_df.dropna(inplace=True, subset=['ProductUsageInstruction'])

from jobs.transform.usage_instruction_transform import transform_to_usage_instruction_df_with_topic_and_word_representation
from jobs.load.loads import load_usage_instruction_df_to_mssql
# TODO create a util func to get the best number of topic
usage_instruction_df_with_topic_and_wp = transform_to_usage_instruction_df_with_topic_and_word_representation(usage_instruction_df, 8)
print(usage_instruction_df_with_topic_and_wp)
# how to save the model to evaluate the new data: https://stackoverflow.com/a/22034166
# how to use this model and evaluate the new data: https://datascience.stackexchange.com/a/107383
# another document with all the above: https://medium.com/@rayhantithokharisma/latent-dirichlet-allocation-topic-modelling-with-online-learning-feature-5051ca9df749
load_usage_instruction_df_to_mssql(usage_instruction_df_with_topic_and_wp)
    

In [None]:
# ETL PreserveInstructionTypeDim
import pandas as pd
from jobs.extract.extracts import extract_mongo_preserve_instruction_to_df
from jobs.transform.preserve_instruction_transform import transform_to_preserve_instruction_df_with_topic_and_word_representation
# TODO create a util func to get the best number of topic
preserve_instruction_df = extract_mongo_preserve_instruction_to_df()
preserve_instruction_df_with_topic_and_wp = transform_to_preserve_instruction_df_with_topic_and_word_representation(preserve_instruction_df, 3)
print(preserve_instruction_df_with_topic_and_wp)
from jobs.load.loads import load_preserve_instruction_df_to_mssql
load_preserve_instruction_df_to_mssql(preserve_instruction_df_with_topic_and_wp)

In [None]:
import pandas as pd
from jobs.extract.extracts import extract_mongo_product_to_df
from jobs.transform.product_product_models_transform import transform_product_df_to_product_df_with_fkey
from jobs.load.loads import load_product_df_to_mssql
product_not_transformed_df = extract_mongo_product_to_df()
product_df_with_dim_keys = transform_product_df_to_product_df_with_fkey(product_not_transformed_df)
load_product_df_to_mssql(product_df_with_dim_keys)

In [None]:
from jobs.extract.extracts import extract_mongo_product_model_to_df
from jobs.load.loads import load_product_model_df_to_mssql
product_model_df = extract_mongo_product_model_to_df()
load_product_model_df_to_mssql(product_model_df)


In [None]:
from jobs.utils.datetime_utils import generate_date_df
from jobs.load.loads import load_date_df_to_mssql
date_df = generate_date_df(start='2023-12-01 00:00:00', end='2024-08-31 00:00:00')
load_date_df_to_mssql(date_df)

In [None]:
from jobs.extract.extracts import extract_mssql_cart_item_to_df
from jobs.transform.cart_items_orders_transform import transform_cart_item_df_to_cart_item_df_with_fkey
from jobs.load.loads import load_cart_item_df_to_mssql
cart_item_df = extract_mssql_cart_item_to_df()
cart_item_df_with_dim_keys = transform_cart_item_df_to_cart_item_df_with_fkey(cart_item_df)
load_cart_item_df_to_mssql(cart_item_df_with_dim_keys)

#### OrderCart ETL

In [None]:
from jobs.utils.enum_utils import generate_payment_method_df, generate_discount_type_df, generate_order_status_df, generate_address_df
from jobs.load.loads import load_address_df_to_mssql, load_discount_type_df_to_mssql, load_order_status_df_to_mssql, load_payment_method_df_to_mssql
payment_method_df = generate_payment_method_df()
load_payment_method_df_to_mssql(payment_method_df)

In [None]:
discount_type_df = generate_discount_type_df()
load_discount_type_df_to_mssql(discount_type_df)

In [None]:
order_status_df = generate_order_status_df()
load_order_status_df_to_mssql(order_status_df)

In [None]:
address_df = generate_address_df()
load_address_df_to_mssql(address_df)

In [None]:
from jobs.extract.extracts import extract_mssql_cart_order_to_df
from jobs.transform.cart_items_orders_transform import transform_cart_order_df_to_cart_order_df_with_fkey
from jobs.load.loads import load_cart_order_df_to_mssql

cart_order_df = extract_mssql_cart_order_to_df()
cart_order_df_with_dim_keys = transform_cart_order_df_to_cart_order_df_with_fkey(cart_order_df)
load_cart_order_df_to_mssql(cart_order_df_with_dim_keys)

#### Recommend Cross Sell ETL

In [None]:
from jobs.extract.extracts import extract_mongo_interactions_to_df
from jobs.transform.product_interactions_transform import transform_product_user_interactions_df
interactions_df = extract_mongo_interactions_to_df()
interactions_df = transform_product_user_interactions_df(interactions_df)
interactions_df.to_csv('../recommendAndCrossSellApi/resources/product_user_interactions.csv', index=False)

In [None]:
from jobs.extract.extracts import extract_mssql_cart_item_transactions_to_df
from jobs.transform.cart_items_orders_transform import transform_cart_item_transactions_df
cart_item_transactions_df = extract_mssql_cart_item_transactions_to_df()
cart_item_transactions_df = transform_cart_item_transactions_df(cart_item_transactions_df)
cart_item_transactions_df.to_csv('../recommendAndCrossSellApi/resources/cart_item_transactions.csv', index=False)

#### Move resources to HDFS

In [None]:
from hdfs import InsecureClient

namenode_url = 'http://localhost:9870'
hdfs_client_pc = InsecureClient(namenode_url, user='PC')
hdfs_client = InsecureClient(namenode_url, user='root')
parent_folder = '/user/root'
upload_folder = '/user/root/upload'
if hdfs_client_pc.list(parent_folder) == None or len(hdfs_client_pc.list(parent_folder)) == 0:
    print("Creating upload folder")
    hdfs_client_pc.set_owner(parent_folder, owner='root', group='supergroup')
    hdfs_client.set_permission(parent_folder, 777)
    hdfs_client.makedirs(upload_folder)
    print(hdfs_client.list(upload_folder))
    hdfs_client.set_permission(upload_folder, 777)
hdfs_client.upload(upload_folder, '../recommendAndCrossSellApi/resources/product_user_interactions.csv', overwrite=True)
hdfs_client.upload(upload_folder, '../recommendAndCrossSellApi/resources/cart_item_transactions.csv', overwrite=True)
print("After upload")
print(hdfs_client.list(upload_folder))


In [None]:
from shared.get_connections import get_db_mongo
MONGODB_PRODUCT_COLLECTION = os.environ.get('MONGODB_PRODUCT_COLLECTION')
MONGO_CONNECTION_STRING = os.environ.get('MONGO_CONNECTION_STRING')
MONGO_DATABASE = os.environ.get('MONGO_DATABASE')
db = get_db_mongo(MONGO_CONNECTION_STRING, MONGO_DATABASE)
product_collection = db.get_collection(MONGODB_PRODUCT_COLLECTION)

cursor_products = product_collection.find({})
return_df = pd.DataFrame()
for p in cursor_products:
    try:
        product_models = p.get('ProductModels')
        product_model_id = product_models[0].get('_id')
        product_model_name = p.get('ProductName')
        product_price = product_models[0].get('Price')
        product_business_key = p.get('BusinessKey')
        product_id = p.get('_id')
        product_model_cublic_type_fkey = product_models[0].get('CublicType')

        df_row_from_series = pd.Series([product_model_id, product_model_name, product_business_key\
                                        , product_id, product_model_cublic_type_fkey, product_price])\
                                    .to_frame().T
        return_df = pd.concat([return_df, df_row_from_series], ignore_index=True)
    except Exception as e:
        print(e)
        continue
db.client.close()
return_df.columns = ['ProductModelId', 'ProductModelName', 'BusinessKey', 'ProductId', 'CublicTypeKey', 'Price']
product_model_df = return_df.copy()

In [None]:
product_features_df = product_not_transformed_df[['ProductName', 'ProductBrandName', 'ProductSubCatalogName', 'BusinessKey']]
product_features_df = pd.merge(product_features_df, product_model_df, left_on='BusinessKey', right_on='BusinessKey', how='inner')
product_features_df = product_features_df[['ProductName', 'ProductBrandName', 'ProductSubCatalogName', 'BusinessKey', 'Price']]
subcatalog_with_catalog_df = pd.merge(subcatalog_df, category_df, left_on='CatalogId', right_on='CatalogId', how='inner')
subcatalog_with_catalog_df = subcatalog_with_catalog_df[['SubCatalogName', 'CatalogName']]
product_features_df = pd.merge(product_features_df, subcatalog_with_catalog_df, left_on='ProductSubCatalogName', right_on='SubCatalogName', how='inner')
product_features_df = product_features_df[['ProductName', 'ProductBrandName', 'ProductSubCatalogName', 'CatalogName', 'Price', 'BusinessKey']]

product_features_df.to_csv('resources/product_features.csv', index=False)
hdfs_client.upload(upload_folder, 'resources/product_features.csv', overwrite=True)
print(hdfs_client.list(upload_folder))

In [None]:
product_features_df.info()
product_features_df.head(2)

In [None]:
product_ga4_df = pd.read_csv('resources/product_ga4_data_unmodified.csv')
product_ga4_df = product_ga4_df.rename(columns={
    'Item name': 'p_name',
    'Item category': 'p_subcatalog',
    'Item ID': 'p_business_key',
    'Items viewed': 'views',
    'Items added to cart': 'add_to_carts',
    'Items purchased': 'purchases',
    'Bounce rate': 'bounce_rate',
    'Engaged sessions per user': 'engaged_sessions_per_user',}, 
)
product_ga4_df = product_ga4_df[['p_name', 'p_subcatalog', 'p_business_key', 'views', 'add_to_carts', 'purchases']]
product_ga4_df.to_csv('resources/product_ga4_data.csv', index=False)
hdfs_client.upload(upload_folder, 'resources/product_ga4_data.csv', overwrite=True)
print(hdfs_client.list(upload_folder))

In [None]:
product_ga4_df.info()