In [1]:
import gcsfs
import pandas as pd

In [2]:
pd.__version__

'1.3.5'

In [25]:
customer_overview_path = "gs://pg-explore/data/magento/OneMillionDataset/Customer_Table1k.csv/part-00000-3c8424df-7f2a-4880-a7a0-fd9d5b71868d-c000.csv"
products_overview_path = "gs://pg-explore/data/magento/OneMillionDataset/Product1k_coalesce.csv/part-00000-d60ccfcd-fce3-4747-b88c-6ac6f5bb4e19-c000.csv"
oders_history_path = "gs://pg-explore/data/magento/OneMillionDataset/Order_coalesce1M.csv/part-00000-1906bbca-26b1-48f7-8c75-14735d4d1772-c000.csv"

#### Transformation logic

In [26]:
def transform_order_data(df_order):
    """To do."""
    # convert datatime
#     df_order["sales_document_date"] = df_order["sales_document_date"] \
#         .astype(str).apply(lambda x: x[:4]+"-"+x[4:6]+"-"+x[6:])
    df_order["sales_document_date"] = pd.to_datetime(df_order["sales_document_date"])

    # keep processing, shipped or partially shipped orders, filter on-hold and cancelled orders
    df_order = df_order[df_order["order_header_status"]
                        .isin(["Shipped", "Partially Shipped", "Processing"])]

    # consolidate duplicated orders and keep selected fields
    cols = ["sales_organization", "sales_order_number", "sales_document_date",
            "sold_to_party", "product_key"]
    df_order = df_order.groupby(cols).agg({"order_qty": "sum", "net_value":
                                           "sum", "total": "first", "order_uom": "first",
                                           "order_header_status": "first"}).reset_index()

    # rename columns and keep only columns needed
    df_order = df_order.rename(columns={"sales_organization": "sales_org",
                                        "sold_to_party": "cust_id"})

    order_cols = ["sales_org", "cust_id", "sales_order_number", "sales_document_date",
                  "product_key", "order_qty", "net_value", "total", "order_uom",
                  "order_header_status"]

    df_order = df_order[order_cols]


    return df_order


def transform_customer_overview(df_customer_overview):
    """To do."""
    customeroverview_cols = ["sales_org", "cust_id", "city", "assortment_groups",
                             "customer_classification"]
    df_customer_overview = df_customer_overview[customeroverview_cols]
    return df_customer_overview


def transform_product_overview(df_product_overview):
    """To do."""
    productoverview_cols = ["sales_org", "product_key",
                            "subsector", "subsector_id", "category", "category_id", "brand",
                            "brand_id", "item_gtin", "prod_name"]
    df_product_overview = df_product_overview[productoverview_cols]
    return df_product_overview

### Joining orders, customer_overview and product_overview

In [27]:
def create_joined_data(df_order, df_customer_overview, df_product_overview):
    """To do."""
    df_joined = df_order.merge(df_customer_overview, on=["sales_org", "cust_id"]) \
        .merge(df_product_overview, on=["sales_org", "product_key"])
    return df_joined

#### Loading customer data

In [28]:
customers_fs = gcsfs.GCSFileSystem(project= "tiger-mle", token="/mnt/d/PnG/keys/tiger-mle-8c54fa5ce18f.json")
with customers_fs.open(customer_overview_path) as customers:
    df_customer_overview = pd.read_csv(customers)

#### Loading order history data

In [29]:
orders_fs = gcsfs.GCSFileSystem(project= "tiger-mle", token="/mnt/d/PnG/keys/tiger-mle-8c54fa5ce18f.json")
with orders_fs.open(oders_history_path) as orders:
    df_order = pd.read_csv(orders)

#### Loading prodcuts data

In [30]:
products_fs = gcsfs.GCSFileSystem(project= "tiger-mle", token="/mnt/d/PnG/keys/tiger-mle-8c54fa5ce18f.json")
with products_fs.open(products_overview_path) as products:
    df_product_overview = pd.read_csv(products)

In [31]:
df_order = transform_order_data(df_order)

In [24]:
df_order['cust_id']

0         6300659
1         6300343
2         6300382
3         6300619
4         6300355
           ...   
868321    6300473
868322    6300307
868323    6300459
868324    6300757
868325    6300761
Name: cust_id, Length: 868326, dtype: int64

In [32]:
df_customer_overview = transform_customer_overview(df_customer_overview)

In [33]:
df_product_overview = transform_product_overview(df_product_overview)

In [22]:
df_customer_overview

Unnamed: 0,sales_org,cust_id,city,assortment_groups,customer_classification
0,Argentina,63000086,1,1,1
1,Argentina,63000087,2,2,2
2,Brazil,63000088,3,3,3
3,Argentina,63000089,4,4,4
4,Argentina,63000090,5,5,5
...,...,...,...,...,...
1178,Brazil,63001264,255,2,9
1179,Argentina,63001265,256,3,10
1180,Argentina,63001266,257,4,11
1181,Brazil,63001267,258,5,12


In [18]:
df_product_overview.head()

Unnamed: 0,sales_org,product_key,subsector,subsector_id,category,category_id,brand,brand_id,item_gtin,prod_name
0,Argentina,39160,2,2,10,10,38,38,1010,1010
1,Brazil,39161,2,2,11,11,39,39,1011,1011
2,Argentina,39162,2,2,12,12,40,40,1012,1012
3,Argentina,39163,2,2,13,13,41,41,1013,1013
4,Brazil,39164,2,2,14,14,42,42,1014,1014


#### Joined data

In [35]:
df_joined = create_joined_data(df_order, df_customer_overview, df_product_overview)

In [37]:
df_joined.count()

sales_org                  356267
cust_id                    356267
sales_order_number         356267
sales_document_date        356267
product_key                356267
order_qty                  356267
net_value                  356267
total                      356267
order_uom                  356267
order_header_status        356267
city                       356267
assortment_groups          356267
customer_classification    356267
subsector                  356267
subsector_id               356267
category                   356267
category_id                356267
brand                      356267
brand_id                   356267
item_gtin                  356267
prod_name                  356267
dtype: int64

In [38]:
sales_org = "Argentina"

#### Filtering Argentina records

In [39]:
country_df_joined = df_joined[df_joined["sales_org"] == sales_org]

In [40]:
country_df_joined.count()

sales_org                  286166
cust_id                    286166
sales_order_number         286166
sales_document_date        286166
product_key                286166
order_qty                  286166
net_value                  286166
total                      286166
order_uom                  286166
order_header_status        286166
city                       286166
assortment_groups          286166
customer_classification    286166
subsector                  286166
subsector_id               286166
category                   286166
category_id                286166
brand                      286166
brand_id                   286166
item_gtin                  286166
prod_name                  286166
dtype: int64

#### save joined data

In [41]:
country_df_joined.to_csv(path_or_buf=f"gs://pg-explore/data/sample_dataset/intermediate/joined/df_joined_{sales_org}.csv",storage_options={"token":"/mnt/d/PnG/keys/tiger-mle-8c54fa5ce18f.json"})