### Mount Colab to Google Drive

In [None]:
from google.colab import drive
from pathlib import Path
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
#@markdown To access data saved on Shared Folder, add them to "My drive" as shortcut first
root_path="/content/drive/My Drive/" #@param {type:"string"}
_root = Path(root_path)
#!ls "$root_path"

### Import librabries

In [None]:
import pandas as pd
pd.set_option('display.max_columns', None)
import numpy as np
import sqlite3 as lite

In [None]:
import io
import json
import pyarrow as pa
import pyarrow.json as pj
import pyarrow.parquet as pq

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 40 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 47.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=d60b15e7d36700bdb1de4cc038dab5a19e6b8db79fbc25fc625c480ac11a98dc
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder\
        .master("local")\
        .getOrCreate()
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

### Helper Function

In [None]:
def ArrowSchemaToPyDict(dt):
  # If this data type is schema or struct
  # then iterate each field inside
  if isinstance(dt, (pa.lib.Schema, pa.lib.StructType)):
    schema = {}
    for field in dt:
      schema[field.name] = ArrowSchemaToPyDict(field.type)
    return schema

  # If this data type is list
  # then wrap them in a list of struct
  elif isinstance(dt, pa.lib.ListType):
    return [ArrowSchemaToPyDict(dt.value_field.type)]

  # If this data type is arrow defined
  # then return string value
  elif isinstance(dt, pa.lib.DataType):
    return str(dt)

In [None]:
def PyDictToArrowSchema(dt, level=0):
  if level == 0:
    return pa.schema([
        (field, PyDictToArrowSchema(dt[field], level=level+1)) for field in dt.keys()
    ])
  
  if isinstance(dt, dict):
    return pa.struct([
        (field, PyDictToArrowSchema(dt[field], level=level+1)) for field in dt.keys()
    ])
  elif isinstance(dt, list):
    return pa.list_(PyDictToArrowSchema(dt[0], level=level+1))
  elif isinstance(dt, str):
    map = {
        'string': pa.string(),
        'int64': pa.int64(),
        'bool': pa.bool_(),
        'null': pa.null(),
        'double': pa.float64()
    }
    return map[str(dt)]

In [None]:
def ArrowSchemaToDefinition(dt):
  # If this data type is schema or struct
  # then iterate each field inside
  if isinstance(dt, pa.lib.Schema):
    def_str = f"""pa.schema([
      {','.join([
          'pa.field("'+field.name+'", '+ArrowSchemaToDefinition(field.type)+')' 
          for field in dt
      ])}
    ])"""
  elif isinstance(dt, pa.lib.StructType):
    def_str = f"""pa.struct([
      {','.join([
          'pa.field("'+field.name+'", '+ArrowSchemaToDefinition(field.type)+')' 
          for field in dt
      ])}
    ])"""

  # If this data type is list
  # then wrap them in a list of struct
  elif isinstance(dt, pa.lib.ListType):
    def_str = f"""pa.list_({ArrowSchemaToDefinition(dt.value_field.type)})"""
    
  # If this data type is arrow defined
  # then return string value
  elif isinstance(dt, pa.lib.DataType):
    map = {
        'string': 'pa.string()',
        'int64': 'pa.int64()',
        'bool': 'pa.bool_()',
        'null': 'pa.null()',
        'double': 'pa.float64()'
    }
    def_str = map[str(dt)]

  return def_str

In [None]:
# Stolen from https://stackoverflow.com/questions/71035754/pyarrow-drop-a-column-in-a-nested-structure/71039389#71039389?newreg=0e5baf2fd7184da8adad65c8e1789db3
# and improve
def ArrowDropNull(array):
    # Catch table first
    if isinstance(array, pa.Table):
        names = []
        arrays = []
        for field in array.schema:
          data = ArrowDropNull(array[field.name])
          if data is not None:
            names.append(field.name)
            arrays.append(data)
        return pa.Table.from_arrays(arrays, names)

    # Catch null type second
    if pa.types.is_null(array.type): return None

    # Catch other types later
    if pa.types.is_struct(array.type):
        # Bắt buộc phải gom chunked array -> StructArray
        if isinstance(array, pa.ChunkedArray):
            array = array.combine_chunks()
        names = []
        arrays = []
        for index, field in enumerate(array.type):
          data = ArrowDropNull(array.field(index))
          if data is not None:
            names.append(field.name)
            arrays.append(data)        
        if len(arrays) == 0: return None  
        else: return pa.StructArray.from_arrays(arrays, names)

    elif pa.types.is_list(array.type):
        if isinstance(array, pa.ChunkedArray):
            array = array.combine_chunks()
        offset = array.offsets
        value = ArrowDropNull(array.values)
        if value is None: return None
        else: return pa.ListArray.from_arrays(offset, value)
    else:
        return array

# Main

## Foody file

In [None]:
!ls drive/MyDrive/Data/'1 - raw data'/foody/hcmc-foody-0-101000/

hcmc-0-101000.csv	       hcmc-0-101000-update-2.csv
hcmc-0-101000-update-1.csv     hcmc-0-101000-update-3.csv
hcmc-0-101000-update-1.gsheet  url-hcmc-0-101000.csv


In [None]:
#@markdown Read latest foody data file
# foody_file_path = "foody/hcmc-foody-0-101000/hcmc-0-101000-update-3.csv" #@param {type:"string"}
# df = pd.read_csv(_root.joinpath(foody_file_path))
df = pd.read_csv('drive/MyDrive/Data/1 - raw data/foody/hcmc-foody-0-101000/hcmc-0-101000-update-3.csv')
df.head(2)

Unnamed: 0,latitude,longitude,restaurant_id,district_id,city_id,name,street_address,area_address,district_address,region_address,category,cuisine,audience,average_rating,position_rating,price_rating,quality_rating,service_rating,space_rating,review_count,review_excellent_count,review_good_count,review_average_count,review_bad_count,view_count,checked_in_count,favourite_count,wanted_count,picture_count,status,time_clock,price_range,url
0,10.785022,106.698976,678064.0,1.0,217.0,Bún Miến Gà Vỉa Hè,"37 Mạc Đĩnh Chi, P. Đa Kao",Khu vực Đài Truyền Hình,Quận 1,TP. HCM,Quán ăn,Món Việt,"Gia đình, Nhóm hội, Giới văn phòng",8.0,7.5,9.0,8.5,8.5,6.5,2.0,0.0,2.0,0.0,0.0,237.0,0.0,2.0,6.0,7.0,Đang hoạt động,11:00 - 13:00,20.000đ - 25.000đ,http://www.foody.vn/ho-chi-minh/bun-mien-ga-vi...
1,10.771073,106.703596,210493.0,1.0,217.0,Thiên Nhiên - Vịt Quay Heo Quay,56 Hàm Nghi,Khu vực Bitexco Tower,Quận 1,TP. HCM,Quán ăn,Món Việt,"Gia đình, Nhóm hội, Giới văn phòng",8.0,10.0,10.0,10.0,5.0,5.0,1.0,0.0,1.0,0.0,0.0,635.0,1.0,1.0,3.0,2.0,Đang hoạt động,08:30 - 22:00,50.000đ - 200.000đ,http://www.foody.vn/ho-chi-minh/thien-nhien-vi...


In [None]:
#@markdown Drop rows without `restaurant_id` and convert ID columns to integer
df = df[~df["restaurant_id"].isna()]
df = df.astype({
    "restaurant_id": 'Int64', 
    "district_id": 'Int64', 
    "city_id": 'Int64', 
    "review_count": 'Int64', 
    "review_excellent_count": 'Int64', 
    "review_good_count": 'Int64', 
    "review_average_count": 'Int64', 
    "review_bad_count": 'Int64', 
    "view_count": 'Int64', 
    "checked_in_count": 'Int64', 
    "favourite_count": 'Int64', 
    "wanted_count": 'Int64', 
    "picture_count": 'Int64', 
})
df.head(2)

Unnamed: 0,latitude,longitude,restaurant_id,district_id,city_id,name,street_address,area_address,district_address,region_address,category,cuisine,audience,average_rating,position_rating,price_rating,quality_rating,service_rating,space_rating,review_count,review_excellent_count,review_good_count,review_average_count,review_bad_count,view_count,checked_in_count,favourite_count,wanted_count,picture_count,status,time_clock,price_range,url
0,10.785022,106.698976,678064,1,217,Bún Miến Gà Vỉa Hè,"37 Mạc Đĩnh Chi, P. Đa Kao",Khu vực Đài Truyền Hình,Quận 1,TP. HCM,Quán ăn,Món Việt,"Gia đình, Nhóm hội, Giới văn phòng",8.0,7.5,9.0,8.5,8.5,6.5,2,0,2,0,0,237,0,2,6,7,Đang hoạt động,11:00 - 13:00,20.000đ - 25.000đ,http://www.foody.vn/ho-chi-minh/bun-mien-ga-vi...
1,10.771073,106.703596,210493,1,217,Thiên Nhiên - Vịt Quay Heo Quay,56 Hàm Nghi,Khu vực Bitexco Tower,Quận 1,TP. HCM,Quán ăn,Món Việt,"Gia đình, Nhóm hội, Giới văn phòng",8.0,10.0,10.0,10.0,5.0,5.0,1,0,1,0,0,635,1,1,3,2,Đang hoạt động,08:30 - 22:00,50.000đ - 200.000đ,http://www.foody.vn/ho-chi-minh/thien-nhien-vi...


In [None]:
#@markdown Drop rows without `restaurant_id`
# df = df[~df["restaurant_id"].isna()]

### Extract areas from dataframe

In [None]:
try:
    conn = lite.connect(db_path)
    print(f"sqlite3 {lite.version} has connected to database successfully")
except lite.Error as e:
    print("Error: ", e)

In [None]:
with conn:
    cur = conn.cursor()
    # cur.execute("DROP TABLE DIM_AREAS")
    # cur.execute("DELETE FROM DIM_AREAS")
    cur.execute("CREATE TABLE IF NOT EXISTS DIM_AREAS (\
      id INTEGER PRIMARY KEY AUTOINCREMENT,\
      area_address text,\
      district_address text,\
      region_address text,\
      UNIQUE(area_address, district_address, region_address)\
    )")

In [None]:
#@markdown Extract areas info from dataframe and insert into database
areas_df = df[["area_address", "district_address", "region_address"]].drop_duplicates()
cols = areas_df.columns.to_list()

# Handle existing areas from database, by merge and drop overlapped rows
existing_areas_df = pd.read_sql_query("SELECT * FROM DIM_AREAS", conn)
areas_df = areas_df.merge(existing_areas_df, on=cols, how="left", indicator=True)
areas_df = areas_df[areas_df["_merge"]=="left_only"][cols]

areas_df.to_sql("DIM_AREAS", conn, if_exists="append", index=False)
existing_areas_df = pd.read_sql_query("SELECT * from DIM_AREAS", conn)
existing_areas_df.head()

Unnamed: 0,id,area_address,district_address,region_address
0,1,Khu vực Đài Truyền Hình,Quận 1,TP. HCM
1,2,Khu vực Bitexco Tower,Quận 1,TP. HCM
2,3,Khu vực Zen Plaza,Quận 1,TP. HCM
3,4,Khu vực Chợ Tân Định,Quận 1,TP. HCM
4,5,Khu vực Cầu Ông Lãnh,Quận 1,TP. HCM


In [None]:
#@markdown Replace old columns with ID
df = df.merge(existing_areas_df).drop(cols+["district_id", "city_id"], axis=1).rename(columns={"id": "area_id"})
df.head(2)

Unnamed: 0,latitude,longitude,restaurant_id,name,street_address,category,cuisine,audience,average_rating,position_rating,price_rating,quality_rating,service_rating,space_rating,review_count,review_excellent_count,review_good_count,review_average_count,review_bad_count,view_count,checked_in_count,favourite_count,wanted_count,picture_count,status,time_clock,price_range,url,area_id
0,10.785022,106.698976,678064,Bún Miến Gà Vỉa Hè,"37 Mạc Đĩnh Chi, P. Đa Kao",Quán ăn,Món Việt,"Gia đình, Nhóm hội, Giới văn phòng",8.0,7.5,9.0,8.5,8.5,6.5,2,0,2,0,0,237,0,2,6,7,Đang hoạt động,11:00 - 13:00,20.000đ - 25.000đ,http://www.foody.vn/ho-chi-minh/bun-mien-ga-vi...,1
1,10.785622,106.699818,204951,Quán Ánh Việt - Bún Chả,"18Bis/13/1 Nguyễn Thị Minh Khai, P. Đa Kao",Quán ăn,Món Việt,"Sinh viên, Cặp đôi, Gia đình, Nhóm hội, Giới v...",8.0,7.0,8.0,9.0,8.0,8.0,1,0,1,0,0,578,0,0,4,9,Đang hoạt động,09:00 - 14:00,33.000đ - 33.000đ,http://www.foody.vn/ho-chi-minh/quan-anh-viet-...,1


### Extract foody audience list from dataframe

In [None]:
with conn:
    cur = conn.cursor()
    cur.execute("CREATE TABLE IF NOT EXISTS FOODY_RESTAURANT_AUDIENCE (\
      restaurant_id INTEGER,\
      audience text,\
      UNIQUE(restaurant_id, audience)\
    )")

In [None]:
#@markdown Extract audience column and explode the list
audience_df = df[["restaurant_id", "audience"]].drop_duplicates()

assert len(audience_df) == audience_df["restaurant_id"].nunique(),\
  "Exist restaurant with 2 different audience attribute"

audience_df["audience"] = audience_df["audience"].str.split(", ")
audience_df = audience_df.explode("audience").dropna()
audience_df

Unnamed: 0,restaurant_id,audience
0,678064,Gia đình
0,678064,Nhóm hội
0,678064,Giới văn phòng
1,204951,Sinh viên
1,204951,Cặp đôi
...,...,...
101398,695643,Nhóm hội
101398,695643,Giới văn phòng
101399,937313,Sinh viên
101399,937313,Nhóm hội


In [None]:
#@markdown Insert extracted attribute to database and drop audience column (warning: attribute may get changed over time, be careful with the data)
cols = audience_df.columns.to_list()

# Handle existing areas from database, by merge and drop overlapped rows
existing_audience_df = pd.read_sql_query("SELECT * FROM FOODY_RESTAURANT_AUDIENCE", conn)
audience_df = audience_df.merge(existing_audience_df, on=cols, how="outer", indicator=True)
audience_df[~audience_df["restaurant_id"].isin(df["restaurant_id"])]["_merge"] = "absent"

# Warning in case of an attribute get deleted
if len(audience_df[audience_df["_merge"]=="right_only"]) > 0:
  print("Warning: These rows are going to be deleted, please comfirm first before continue")
  display(len(audience_df[audience_df["_merge"]=="right_only"]))
  c = input("confirm (Y/N): ")
  if c.upper()!="Y":
    raise Exception(f"Operation terminated due to the choice: {c}")

audience_df = audience_df[audience_df["_merge"]!="right_only"][cols]
audience_df.to_sql("FOODY_RESTAURANT_AUDIENCE", conn, if_exists="replace", index=False)

df = df.drop("audience", axis=1)

### Extract foody cuisine list

In [None]:
with conn:
    cur = conn.cursor()
    cur.execute("CREATE TABLE IF NOT EXISTS FOODY_RESTAURANT_CUISINE (\
      restaurant_id INTEGER,\
      cuisine text,\
      UNIQUE(restaurant_id, cuisine)\
    )")

In [None]:
#@markdown Extract cuisine column and explode the list
cuisine_df = df[["restaurant_id", "cuisine"]].drop_duplicates()

assert len(cuisine_df) == cuisine_df["restaurant_id"].nunique(),\
  "Exist restaurant with 2 different audience attribute"

cuisine_df["cuisine"] = cuisine_df["cuisine"].str.split(", ")
cuisine_df = cuisine_df.explode("cuisine").dropna()
cuisine_df

Unnamed: 0,restaurant_id,cuisine
0,678064,Món Việt
1,210493,Món Việt
2,1059918,Món Nhật
3,1043820,Món Việt
4,1044890,Món Việt
...,...,...
101394,863533,Ý
101395,678222,Món Việt
101396,317293,Mỹ
101397,1000030101,Món Bắc


In [None]:
#@markdown Insert extracted attribute to database and drop cuisine column (warning: attribute may get changed over time, be careful with the data)
cols = cuisine_df.columns.to_list()

# Handle existing areas from database, by merge and drop overlapped rows
existing_cuisine_df = pd.read_sql_query("SELECT * FROM FOODY_RESTAURANT_CUISINE", conn)
cuisine_df = cuisine_df.merge(existing_cuisine_df, on=cols, how="outer", indicator=True)
cuisine_df[~cuisine_df["restaurant_id"].isin(df["restaurant_id"])]["_merge"] = "absent"

# Warning in case of an attribute get deleted
if len(cuisine_df[cuisine_df["_merge"]=="right_only"]) > 0:
  print("Warning: These rows are going to be deleted, please comfirm first before continue")
  display(len(cuisine_df[cuisine_df["_merge"]=="right_only"]))
  c = input("confirm (Y/N): ")
  if c.upper()!="Y":
    raise Exception(f"Operation terminated due to the choice: {c}")

cuisine_df = cuisine_df[cuisine_df["_merge"]!="right_only"][cols]
cuisine_df.to_sql("FOODY_RESTAURANT_CUISINE", conn, if_exists="replace", index=False)

df = df.drop("cuisine", axis=1)

### Extract price range

In [None]:
df["price_range"] = df["price_range"].str.replace(r"[. đ]", "", regex=True).str.split("-")
df["price_range_min"] = df["price_range"].map(lambda x: x[0])
df["price_range_max"] = df["price_range"].map(lambda x: x[-1])

In [None]:
# Filter by which value can be translated into number
mask = df["price_range_min"].str.isnumeric()

df.loc[~mask, ["price_range_min", "price_range_max"]] = np.nan
df[["price_range_min", "price_range_max"]] = df[["price_range_min", "price_range_max"]].astype("float").astype("Int64")

### Handle special case (case-by-case)

In [None]:
raise Exception("Stop right there!")

Exception: ignored

In [None]:
df[df["restaurant_id"].duplicated(keep=False)]

Unnamed: 0,latitude,longitude,restaurant_id,name,street_address,category,average_rating,position_rating,price_rating,quality_rating,service_rating,space_rating,review_count,review_excellent_count,review_good_count,review_average_count,review_bad_count,view_count,checked_in_count,favourite_count,wanted_count,picture_count,status,time_clock,price_range,url,area_id
23070,10.762771,106.661509,1003421,Chiaki Hiro - Food & Drink - Nguyễn Kim,"163/3 Nguyễn Kim, P. 7",Quán ăn,7.7,7.3,7.3,8.0,8.0,7.7,3,1,1,1,0,126,0,2,0,6,Đang hoạt động,08:00 - 23:59,30.000đ - 50.000đ,http://www.foody.vn/ho-chi-minh/chiaki-hiro-fo...,61
59617,10.784192,106.65611,1003421,Chiaki Hiro - Food & Drink - Nghĩa Phát,"278 Nghĩa Phát, P. 6",Quán ăn,7.7,7.3,7.3,8.0,8.0,7.7,3,1,1,1,0,126,0,2,0,6,Đang hoạt động,08:00 - 23:59,30.000đ - 50.000đ,http://www.foody.vn/ho-chi-minh/chiaki-hiro-fo...,191
83860,10.827785,106.679709,1035077,Cá Viên Chiên 176 - Nguyễn Oanh,"36 Nguyễn Oanh, P. 7",Ăn vặt/vỉa hè,8.6,8.7,8.7,8.7,8.3,8.7,3,1,2,0,0,475,0,0,3,12,Đang hoạt động,00:01 - 03:00 | 13:00 - 23:59,10.000đ - 15.000đ,http://www.foody.vn/ho-chi-minh/ca-vien-chien-...,279
84105,10.827785,106.679709,1035077,Cá Viên Chiên 176 - Nguyễn Oanh,"36 Nguyễn Oanh, P. 7",Ăn vặt/vỉa hè,8.6,8.7,8.7,8.7,8.3,8.7,3,1,2,0,0,474,0,0,3,12,Đang hoạt động,00:01 - 03:00 | 13:00 - 23:59,10.000đ - 15.000đ,http://www.foody.vn/ho-chi-minh/ca-vien-chien-...,279


Đây là những trường hợp thiểu số, nên thực hiện case-by-case để tiết kiệm tgian
- 23070: có thể nguyên nhân do quản đổi địa chỉ
- 84105: nguyên nhân do view tăng trong quá trình crawl

In [None]:
df = df.drop([23070, 84105], axis=0)

### Handle value error in rating

In [None]:
df[["position_rating", "price_rating", "quality_rating", "service_rating", "space_rating"]] = df[["position_rating", "price_rating", "quality_rating", "service_rating", "space_rating"]].replace("--", np.nan)
df = df.astype({
    "position_rating": "float64", 
    "price_rating": "float64", 
    "quality_rating": "float64", 
    "service_rating": "float64", 
    "space_rating": "float64"
})

### Import the rest to database

In [None]:
with conn:
    cur = conn.cursor()
    cur.execute("DROP TABLE FOODY_RESTAURANTS_FACT")
    cur.execute("CREATE TABLE IF NOT EXISTS FOODY_RESTAURANTS_FACT (\
      restaurant_id integer PRIMARY KEY,\
      latitude real,\
      longitude real,\
      name text,\
      street_address text,\
      category text,\
      average_rating real,\
      position_rating real,\
      price_rating real,\
      quality_rating real,\
      service_rating real,\
      space_rating real,\
      review_count integer,\
      review_excellent_count integer,\
      review_good_count integer,\
      review_average_count integer,\
      review_bad_count integer,\
      view_count integer,\
      checked_in_count integer,\
      favourite_count integer,\
      wanted_count integer,\
      picture_count integer,\
      status text,\
      price_range_min integer,\
      price_range_max integer,\
      area_id integer\
    )")

# These attributes were ommited due to the lack of use for BI 
# time_clock text,
# url text,\

In [None]:
df = df.drop(["time_clock", "price_range", "url"], axis=1)
df.to_sql("FOODY_RESTAURANTS_FACT", conn, if_exists="replace", index=False)

## Shopee

### Staging Shopee dish (2022/11/29)

In [None]:
shopee_file_path = "Data/1 - raw data/shopee/shopee dish/20221129_dishes/*.parquet"

In [None]:
Path("tmp").mkdir(parents=True, exist_ok=True)

for f in _root.glob(shopee_file_path):
  # Đọc file lên trước, drop các cột Null
  df = pq.read_table(f)
  df = ArrowDropNull(df)

  # Cast cột discount_pice về double
  s = ArrowSchemaToPyDict(df.schema)
  try:
    s["data"]["catalogs"][0]["dishes"][0]["discount_price"] = "double"
    df = df.cast(PyDictToArrowSchema(s))
  except Exception as e:
    print(e)
    print(f)

  # Ghi tạm xuống
  pq.write_table(df, f"tmp/{f.name}")

struct fields don't match or are in the wrong order: Input fields: struct<total_order: int64, listing_status: bool, partner_dish_id: string, description: string, display_total_order: string, total_like: int64, restaurant_id: int64, rank: int64, id: int64, property_info: struct<has_alcohol: bool>, catalog_id: int64, stock_info: struct<start_time: int64, is_out_stocked: bool, end_time: int64>, is_hidden: bool, sale_time_info: struct<loop_sale_days: list<item: struct<time_for_sales: list<item: struct<start_time_sec: int64, end_time_sec: int64>>, weekday: int64>>, is_in_sale_time: bool>, price: double, is_group_discount_item: bool, name: string> output fields: struct<total_order: int64, listing_status: bool, partner_dish_id: string, description: string, display_total_order: string, total_like: int64, restaurant_id: int64, rank: int64, id: int64, property_info: struct<has_alcohol: bool>, catalog_id: int64, stock_info: struct<start_time: int64, is_out_stocked: bool, end_time: int64>, is_hidd

In [None]:
df = spark.read.option("mergeSchema", True).parquet(*list(map(lambda p: str(p), Path("tmp/").glob("*.parquet"))))

In [None]:
df.coalesce(1).write.format("parquet").mode("append").save(str(_root.joinpath("Data/2 - cleaned data/Staging/2022.11.29.dishes.parquet")))

### Staging Shopee dish (2022/11/26)

1. Pattern tới chỗ chứa các file để đọc hàng loạt và concat lại

In [None]:
shopee_file_path = "Data/1 - raw data/shopee/shopee dish/20221126_dishes/*.parquet"

2. Bulk reading parquet

In [None]:
schemas = []
for f in _root.glob(shopee_file_path):
  _s = pq.read_schema(f)
  schemas.append(_s)

In [None]:
# Cách đọc và merge với spark
# df = spark.read.option("mergeSchema", True).parquet(str(p.joinpath(f"dishes_schema005.parquet")), str(p.joinpath(f"dishes_schema001.parquet")))

#### Old method: compare, create pre-defined schema and enforce them

3. Compare convert compatible between all schema

In [None]:
def compareSchema(mainSchema: dict, subSchema: dict, tree='', verbose=False):
  error_count = 0
  if set(mainSchema.keys()) != set(subSchema.keys()):
    if verbose:
      print(tree)
      print("Amount of key mismatch")
  if len(set(subSchema.keys())-set(mainSchema.keys())) > 0:
    if verbose:
      print(tree)
      print("Missing from main: ", set(subSchema.keys())-set(mainSchema.keys()))
    error_count += len(set(subSchema.keys())-set(mainSchema.keys()))
  for k in mainSchema.keys():
    if k in subSchema.keys():
      if mainSchema[k] == ['null'] or subSchema[k] == ['null'] or mainSchema[k] == 'null' or subSchema[k] == 'null':
        if mainSchema[k] == ['null'] and subSchema[k] != ['null']:
          if verbose:
            print(tree)
            print("Missing NESTED from main: ", k)
          error_count += 1
        if mainSchema[k] == 'null' and subSchema[k] != 'null':
          if verbose:
            print(tree)
            print("Missing NESTED from main: ", k)
          error_count += 1
      elif type(mainSchema[k]) == dict:
        error_count += compareSchema(mainSchema[k], subSchema[k], tree=tree+'->'+k, verbose=verbose)
      elif type(mainSchema[k]) == list:
        error_count += compareSchema(mainSchema[k][0], subSchema[k][0], tree=tree+'->'+k, verbose=verbose)
      elif mainSchema[k] != subSchema[k]:
        if verbose:
          print(tree)
          print("Different type at: ", k)
        error_count += 1
  return error_count

In [None]:
res = {}
best = {"idx": 0, "point": 999}
for i, key_schema in enumerate(schemas):
  s = 0
  res[i] = []
  for j, schema in enumerate(schemas):
    c = compareSchema(ArrowSchemaToPyDict(key_schema), ArrowSchemaToPyDict(schema))
    res[i].append(c)
    s += c
  if s < best["point"]:
    best.update({"idx": i, "point": s})

In [None]:
best

{'idx': 20, 'point': 28}

Cluster all schema that truly the same (pyarrow also compare the order of columns in schema, so we have to compare by dictionary)

In [None]:
cluster = {}
for i, key_schema in enumerate(schemas):
  c = 0
  for k in cluster.keys():
    if ArrowSchemaToPyDict(key_schema) != ArrowSchemaToPyDict(schemas[k]):
      c+=1
    else:
      cluster[k].append(i)
  if c == len(cluster.keys()):
    cluster[i] = [i]
    

In [None]:
cluster

{0: [0],
 1: [1],
 2: [2],
 3: [3],
 4: [4],
 5: [5, 12],
 6: [6],
 7: [7],
 8: [8, 10],
 9: [9, 18],
 11: [11, 15],
 13: [13],
 14: [14, 22, 23],
 16: [16, 33, 60],
 17: [17, 25],
 19: [19],
 20: [20, 44, 58],
 21: [21, 49],
 24: [24, 42],
 26: [26, 45],
 27: [27],
 28: [28],
 29: [29],
 30: [30],
 31: [31],
 32: [32, 55],
 34: [34, 54, 64],
 35: [35],
 36: [36],
 37: [37],
 38: [38, 69],
 39: [39],
 40: [40],
 41: [41],
 43: [43, 59],
 46: [46],
 47: [47, 68],
 48: [48],
 50: [50, 61],
 51: [51],
 52: [52],
 53: [53],
 56: [56],
 57: [57],
 62: [62, 63],
 65: [65],
 66: [66],
 67: [67],
 70: [70]}

4. Export compatible score to excel for visualization and check case by case for best schema

In [None]:
# pd.DataFrame(res).to_excel("res.xlsx")

In [None]:
compareSchema(ArrowSchemaToPyDict(schemas[20]), ArrowSchemaToPyDict(schemas[28]), verbose=True)
# Thiếu "is_discount_topping": "bool" (16) (dưới id trong dishes)
# Thằng 20 "discount_price" là "int64", còn thằng 28 là "double" 
# Thiếu "is_available" (31) (dưới catalog_id trong dishes)

->data->catalogs
Amount of key mismatch
->data->catalogs->dishes
Amount of key mismatch
->data->catalogs->dishes
Missing from main:  {'is_discount_topping'}
->data->catalogs->dishes
Different type at:  discount_price


2

5. Define schema and re-read then merge (please note: arrow schema have order, don't change the order as you please)

In [None]:
# ArrowSchemaToDefinition(schemas[20])

In [None]:
schema = pa.schema([
	pa.field("msg", pa.string()),
	pa.field("code", pa.int64()),
	pa.field("data", pa.struct([
		pa.field("catalogs", pa.list_(pa.struct([
			pa.field("dishes", pa.list_(pa.struct([
				pa.field("total_order", pa.int64()),
				pa.field("listing_status", pa.bool_()),
				pa.field("partner_dish_id", pa.string()),
				pa.field("display_total_order", pa.string()),
				pa.field("restaurant_id", pa.int64()),
				pa.field("discount_price", pa.float64()),
				pa.field("rank", pa.int64()),
				pa.field("discount_remaining_quantity", pa.int64()),
				pa.field("is_hidden", pa.bool_()),
				pa.field("id", pa.int64()),
				pa.field("is_discount_topping", pa.bool_()),
				pa.field("pictures", pa.list_(pa.struct([
					pa.field("url", pa.string()),
					pa.field("width", pa.int64()),
					pa.field("height", pa.int64())
				]))),
				pa.field("total_like", pa.int64()),
				pa.field("sale_time_info", pa.struct([
					pa.field("loop_sale_days", pa.list_(pa.struct([
						pa.field("time_for_sales", pa.list_(pa.struct([
							pa.field("start_time_sec", pa.int64()),
							pa.field("end_time_sec", pa.int64())
						]))),
						pa.field("weekday", pa.int64())
					]))),
					pa.field("custom_sale_days", pa.list_(pa.struct([
						pa.field("custom_date", pa.int64()),
						pa.field("time_for_sales", pa.list_(pa.struct([
							pa.field("start_time_sec", pa.int64()),
							pa.field("end_time_sec", pa.int64())
						])))
					]))),
					pa.field("is_in_sale_time", pa.bool_())
				])),
				pa.field("description", pa.string()),
				pa.field("price", pa.float64()),
				pa.field("period", pa.int64()),
				pa.field("property_info", pa.struct([
					pa.field("has_alcohol", pa.bool_())
				])),
				pa.field("is_searchable", pa.bool_()),
				pa.field("stock_info", pa.struct([
					pa.field("start_time", pa.int64()),
					pa.field("is_out_stocked", pa.bool_()),
					pa.field("end_time", pa.int64())
				])),
				pa.field("is_group_discount_item", pa.bool_()),
				pa.field("limit_per_order", pa.int64()),
				pa.field("name", pa.string()),
				pa.field("picture_label", pa.struct([
					pa.field("photos", pa.list_(pa.struct([
						pa.field("width", pa.int64()),
						pa.field("value", pa.string()),
						pa.field("height", pa.int64())
					]))),
					pa.field("label_position", pa.int64())
				])),
				pa.field("limit_type", pa.int64()),
				pa.field("catalog_id", pa.int64()),
				pa.field("is_available", pa.bool_()),
			]))),
			pa.field("is_group_discount", pa.bool_()),
			pa.field("id", pa.int64()),
			pa.field("rank", pa.int64()),
			pa.field("name", pa.string()),
			pa.field("restaurant_id", pa.int64()),
			pa.field("partner_catalog_id", pa.string()),
			pa.field("sort_type", pa.int64())
		])))
	])),
	pa.field("restaurant_id", pa.int64())
])

6. FAILED: Re-read the parquet files with new schema enforcement

#### New method: drop and cast all incompatible columns in Arrow, then merge them using Spark

In [None]:
# Stolen from https://stackoverflow.com/questions/71035754/pyarrow-drop-a-column-in-a-nested-structure/71039389#71039389?newreg=0e5baf2fd7184da8adad65c8e1789db3
# and improve
# def ArrowDropNull(array) (in helper function section)

3. Using Arrow to read all files, drop null column, convert discount_price column to double then temporarily write to disk

In [None]:
Path("tmp").mkdir(parents=True, exist_ok=True)

for f in _root.glob(shopee_file_path):
  # Đọc file lên trước, drop các cột Null
  df = pq.read_table(f)
  df = ArrowDropNull(df)

  # Cast cột discount_pice về double
  s = ArrowSchemaToPyDict(df.schema)
  try:
    s["data"]["catalogs"][0]["dishes"][0]["discount_price"] = "double"
    df = df.cast(PyDictToArrowSchema(s))
  except Exception as e:
    print(e)
    print(f)

  # Ghi tạm xuống
  pq.write_table(df, f"tmp/{f.name}")

struct fields don't match or are in the wrong order: Input fields: struct<total_order: int64, listing_status: bool, partner_dish_id: string, description: string, display_total_order: string, total_like: int64, pictures: list<item: null>, restaurant_id: int64, rank: int64, id: int64, property_info: struct<has_alcohol: bool>, catalog_id: int64, stock_info: struct<start_time: int64, is_out_stocked: bool, end_time: int64>, is_hidden: bool, sale_time_info: struct<loop_sale_days: list<item: struct<time_for_sales: list<item: struct<start_time_sec: int64, end_time_sec: int64>>, weekday: int64>>, custom_sale_days: list<item: null>, is_in_sale_time: bool>, price: double, is_group_discount_item: bool, name: string> output fields: struct<total_order: int64, listing_status: bool, partner_dish_id: string, description: string, display_total_order: string, total_like: int64, pictures: list<item: null>, restaurant_id: int64, rank: int64, id: int64, property_info: struct<has_alcohol: bool>, catalog_id: 

4. Use Spark to read and merge all the files, then write back the latest output

In [None]:
df = spark.read.option("mergeSchema", True).parquet(*list(map(lambda p: str(p), Path("tmp/").glob("*.parquet"))))

In [None]:
df.printSchema()

root
 |-- msg: string (nullable = true)
 |-- code: long (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- catalogs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- dishes: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- total_order: long (nullable = true)
 |    |    |    |    |    |-- listing_status: boolean (nullable = true)
 |    |    |    |    |    |-- partner_dish_id: string (nullable = true)
 |    |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |    |-- display_total_order: string (nullable = true)
 |    |    |    |    |    |-- total_like: long (nullable = true)
 |    |    |    |    |    |-- restaurant_id: long (nullable = true)
 |    |    |    |    |    |-- rank: long (nullable = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |  

In [None]:
df.coalesce(1).write.format("parquet").mode("append").save(str(_root.joinpath("Data/2 - cleaned data/Staging/2022.11.26.dishes.parquet")))

In [None]:
t = pq.read_table(_root.joinpath("Data/2 - cleaned data/Staging/2022.11.26.dishes.parquet"))

In [None]:
ArrowSchemaToPyDict(t.schema)

{'msg': 'string',
 'code': 'int64',
 'data': {'catalogs': [{'name': 'string',
    'dishes': [{'total_order': 'int64',
      'listing_status': 'bool',
      'partner_dish_id': 'string',
      'description': 'string',
      'display_total_order': 'string',
      'total_like': 'int64',
      'restaurant_id': 'int64',
      'rank': 'int64',
      'id': 'int64',
      'property_info': {'has_alcohol': 'bool'},
      'catalog_id': 'int64',
      'stock_info': {'start_time': 'int64',
       'is_out_stocked': 'bool',
       'end_time': 'int64'},
      'is_hidden': 'bool',
      'sale_time_info': {'loop_sale_days': [{'time_for_sales': [{'start_time_sec': 'int64',
           'end_time_sec': 'int64'}],
         'weekday': 'int64'}],
       'is_in_sale_time': 'bool',
       'custom_sale_days': [{'custom_date': 'int64',
         'time_for_sales': [{'start_time_sec': 'int64',
           'end_time_sec': 'int64'}]}]},
      'price': 'double',
      'is_group_discount_item': 'bool',
      'name': 'strin

### Staging Shopee topping (2022/11/21)



1. Pattern tới chỗ chứa các file để đọc hàng loạt và concat lại

In [None]:
shopee_file_path = "Data/1 - raw data/shopee/dish topping/split_dish_topping/*.csv"

2. Bulk reading csv

In [None]:
df = []
for f in _root.glob(shopee_file_path):
  _df = pd.read_csv(f, na_filter=False)
  try:
    _df = _df[_df.restaurant_id != '']
    df.append(_df)
  except:
    print(f)
    display(_df)

df = pd.concat(df).drop("Unnamed: 0", axis=1)

In [None]:
df

Unnamed: 0,restaurant_id,dish_id,topping_data
0,1114911,28885958,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
1,1114911,28885933,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
2,1114911,28885947,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
3,1114911,28885937,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
4,1114911,28885944,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
...,...,...,...
11244,1134408,71085623,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
11245,1134408,71085121,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
11246,1134408,71086298,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
11247,1134408,71085930,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."


In [None]:
# Define a clear structure for Arrow
# so as the machine can know the meaning of each field
# and whether data is missing or not
pa_schema = pa.schema([
    pa.field('msg', pa.string()),
    pa.field('code', pa.int64()),
    pa.field('data', pa.struct([
        pa.field('option_groups', pa.list_(pa.struct([
          pa.field('min_select', pa.int64()),
          pa.field('max_select', pa.int64()),
          pa.field('name', pa.string()),
          pa.field('rank', pa.int64()),
          pa.field('id', pa.int64()),
          pa.field('partner_option_group_id', pa.string()),
          pa.field('options', pa.list_(pa.struct([
              pa.field('original_price', pa.float64()),
              pa.field('price', pa.float64()),
              pa.field('weight', pa.int64()),
              pa.field('is_active', pa.bool_()),
              pa.field('rank', pa.int64()),
              pa.field('is_default', pa.bool_()),
              pa.field('stock_info', pa.struct([
                  pa.field('start_time', pa.int64()),
                  pa.field('is_out_stocked', pa.bool_()),
                  pa.field('end_time', pa.int64()),
              ])),
              pa.field('max_qty', pa.int64()),
              pa.field('id', pa.int64()),
              pa.field('group_id', pa.int64()),
              pa.field('partner_option_id', pa.string()),
          ]))),
      ]))),
    ])),
    pa.field('restaurant_id', pa.int64()),
    pa.field('dish_id', pa.int64())],
)

In [None]:
# Iterate over each row, convert them to Arrow structure using schema above
merged_df = []
for i, r in df.iterrows():
  obj = json.dumps({
      "restaurant_id": int(r.restaurant_id),
      "dish_id": int(r.dish_id),
      **json.loads(r.topping_data), 
    }).encode("utf8")
  data = pj.read_json(io.BytesIO(obj), parse_options=pa.json.ParseOptions(explicit_schema=pa_schema))
  merged_df.append(data)

In [None]:
#@markdown (DEPRICATED CODE, SAVE FOR REFERENCE, CONVERT JSON TO DICT WOULD RESULT IN RAM EXPLOSION)
# Iterate over each row, convert them to Arrow structure using schema above
# merged_df = []
# for i, r in df.iterrows():
#   obj = {
#       "restaurant_id": int(r.restaurant_id),
#       "dish_id": int(r.dish_id),
#       **json.loads(r.topping_data), 
#     }
#   # Wrap in a list since PyArrow require a map to array
#   obj = {k: [obj[k]] for k in obj}
#   data = pa.table(obj, schema=pa_schema)
#   merged_df.append(data)

In [None]:
# Concat all Arrow table into a single one
# Then save them back to the drive in Parquet format
pq.write_table(pa.concat_tables(merged_df), _root.joinpath("Data/2 - cleaned data/Staging/2022.11.21.dish_topping.parquet"))

### Staging Shopee topping (2022/11/20)



1. Pattern tới chỗ chứa các file để đọc hàng loạt và concat lại

In [None]:
shopee_file_path = "Data/1 - raw data/shopee/dish topping/split_dish_topping/*.csv"

2. Bulk reading csv

In [None]:
df = []
for f in _root.glob(shopee_file_path):
  _df = pd.read_csv(f, na_filter=False)
  try:
    _df = _df[_df.restaurant_id != '']
    df.append(_df)
  except:
    print(f)
    display(_df)

df = pd.concat(df)

In [None]:
df

Unnamed: 0.1,Unnamed: 0,restaurant_id,dish_id,topping_data
0,0,1114911,28885958,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
1,1,1114911,28885933,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
2,2,1114911,28885947,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
3,3,1114911,28885937,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
4,4,1114911,28885944,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
...,...,...,...,...
11244,11244,1134408,71085623,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
11245,11245,1134408,71085121,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
11246,11246,1134408,71086298,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."
11247,11247,1134408,71085930,"{""msg"": ""success"", ""code"": 0, ""data"": {""option..."


In [None]:
def dictString2dict(s, **kattrs):
  return {"options": eval(s), **kattrs}
  
def dictString2JSON(s, **kattrs):
  return json.dumps({"options": eval(s), **kattrs}, ensure_ascii=False).encode('utf8')

In [None]:
dictString2JSON(df.iloc[0].topping_data, restaurant_id="sdfsd")

In [None]:
# Define a clear structure for Arrow
# so as the machine can know the meaning of each field
# and whether data is missing or not
pa_schema = pa.schema([
    pa.field('options', pa.list_(pa.struct([
        pa.field('min_select', pa.int64()),
        pa.field('max_select', pa.int64()),
        pa.field('name', pa.string()),
        pa.field('rank', pa.int64()),
        pa.field('id', pa.int64()),
        pa.field('partner_option_group_id', pa.string()),
        pa.field('options', pa.list_(pa.struct([
            pa.field('original_price', pa.float64()),
            pa.field('price', pa.float64()),
            pa.field('weight', pa.int64()),
            pa.field('is_active', pa.bool_()),
            pa.field('rank', pa.int64()),
            pa.field('is_default', pa.bool_()),
            pa.field('stock_info', pa.struct([
                pa.field('start_time', pa.int64()),
                pa.field('is_out_stocked', pa.bool_()),
                pa.field('end_time', pa.int64()),
            ])),
            pa.field('max_qty', pa.int64()),
            pa.field('id', pa.int64()),
            pa.field('partner_option_id', pa.string()),
        ]))),
    ]))),
    pa.field('restaurant_id', pa.int64()),
    pa.field('dish_id', pa.int64())],
)

In [None]:
# Iterate over each row, convert them to Arrow structure using schema above
merged_df = []
for i, r in df.iterrows():
  json_io = io.BytesIO(dictString2dict(r.topping_data, restaurant_id=int(float(r.restaurant_id))))
  data = pa.Table.read_json(json_io, parse_options=pa.json.ParseOptions(explicit_schema=pa_schema))
  merged_df.append(data)

In [None]:
# Concat all Arrow table into a single one
# Then save them back to the drive in Parquet format
pq.write_table(pa.concat_tables(merged_df), _root.joinpath("Data/2 - cleaned data/Staging/2022.11.20.dish_topping_by_restaurant.parquet"))

### Staging Shopee topping (2022/11/20) (Spark reference only, do not touch)

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 48 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 65.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845514 sha256=38e74c1d49fe336dc55ae3370c0a3631b7580e3a6620485dc4148f3aaf14ce12
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .getOrCreate()

In [None]:
from pyspark.sql.types import StructField, StructType, StringType, MapType, IntegerType, FloatType, ArrayType, BooleanType

In [None]:
schema = StructType([
  StructField('options', ArrayType(
      StructType([
          StructField('min_select', IntegerType(),True),
          StructField('name', StringType(),True),
          StructField('rank', IntegerType(),True),
          StructField('options', ArrayType(
              StructType([
                StructField('original_price', FloatType(),True),
                StructField('price', FloatType(),True),
                StructField('weight', IntegerType(),True),
                StructField('is_active', BooleanType(),True),
                StructField('rank', IntegerType(),True),
                StructField('is_default', BooleanType(),True),
                StructField('stock_info', StructType([
                    StructField('start_time', IntegerType(),True),
                    StructField('is_out_stocked', BooleanType(),True),
                    StructField('end_time', IntegerType(),True),
                ]),True),
                StructField('max_qty', IntegerType(),True),
                StructField('id', IntegerType(),True),
                StructField('partner_option_id', StringType(),True),
            ]), 
            True
          ),True),
          StructField('max_select', IntegerType(),True),
          StructField('id', IntegerType(),True),
          StructField('partner_option_group_id', StringType(),True),
      ]),
      True
  ), False),
  StructField('restaurant_id', IntegerType(),True),
  StructField('dish_id', IntegerType(),True),
])

Test on 1 sample

In [None]:
d = dictString2dict(df.iloc[0].topping_data)
d

{'options': [{'min_select': 1,
   'name': 'Lựa chọn',
   'rank': 0,
   'options': [{'name': 'Nóng',
     'weight': 1,
     'price': 0.0,
     'partner_option_id': '',
     'is_active': True,
     'rank': 1,
     'is_default': True,
     'stock_info': {'start_time': 0, 'is_out_stocked': False, 'end_time': 0},
     'max_qty': 1,
     'group_id': 54703,
     'id': 215974},
    {'name': 'Đá',
     'weight': 1,
     'price': 0.0,
     'partner_option_id': '',
     'is_active': True,
     'rank': 2,
     'is_default': False,
     'stock_info': {'start_time': 0, 'is_out_stocked': False, 'end_time': 0},
     'max_qty': 1,
     'group_id': 54703,
     'id': 215975}],
   'max_select': 1,
   'id': 54703,
   'partner_option_group_id': ''}]}

In [None]:
spark.createDataFrame(data=[d], schema=schema).show()

+--------------------+-------------+-------+
|             options|restaurant_id|dish_id|
+--------------------+-------------+-------+
|[{1, Lựa chọn, 0,...|         null|   null|
+--------------------+-------------+-------+



Bulk processing

In [None]:
merged_df = []
for i, r in df.iterrows():
  d = dictString2dict(r.topping_data, restaurant_id=int(float(r.restaurant_id)))
  data = spark.createDataFrame(data=[d], schema=schema)
  merged_df.append(data)

### Staging Shopee dishes (2022/11/19)

1. Pattern tới chỗ chứa các file để đọc hàng loạt và concat lại

In [None]:
shopee_file_path = "Data/1 - raw data/shopee/shopee dish/dishes2/*.xlsx"

2. Bulk reading, concat files, drop useless column(s)

In [None]:
df = []
for f in _root.glob(shopee_file_path):
  _df = pd.read_excel(f, na_filter=False)
  df.append(_df)
stg_shopee_dishes = pd.concat(df).drop(columns={'Unnamed: 0'})

3. Check datatype and convert them to a approriate one

In [None]:
stg_shopee_dishes.dtypes

catalog_id                       Int64
dish_total_order                 Int64
catalog_name                    object
catalog_rank                     Int64
catalog_partner_catalog_id      object
catalog_description             object
dish_restaurant_id               Int64
dish_id                          Int64
dish_name                       object
dish_partner_dish_id            object
dish_listing_status               bool
dish_description                object
dish_total_like                  Int64
dish_rank                        Int64
dish_picture_label              object
dish_is_hidden                    bool
dish_price                     float64
dish_is_group_discount_item       bool
dishes_property_info            object
dtype: object

In [None]:
stg_shopee_dishes = stg_shopee_dishes.astype({
    "catalog_id": 'Int64', 
    "dish_total_order": 'Int64', 
    "catalog_rank": 'Int64', 
    "dish_restaurant_id": 'Int64', 
    "dish_id": 'Int64', 
    'dish_listing_status': "bool",
    "dish_description": 'str',
    "dish_total_like": 'Int64', 
    "dish_rank": 'Int64', 
    "dish_is_hidden": 'bool', 
    "dish_price": 'float64', 
    "dish_is_group_discount_item": 'bool',
})

4. Save the condensed data to staging folder for future referencing

In [None]:
stg_shopee_dishes.to_parquet(_root.joinpath("Data/2 - cleaned data/Staging/2022.11.19.dim_shopee_dishes.parquet"), index=False)

### Staging Shopee dishes (2022/10/16)

1. Pattern tới chỗ chứa các file để đọc hàng loạt và concat lại

In [None]:
shopee_file_path = "Data/1 - raw data/shopee/shopee dish/dishes/*.xlsx"

2. Bulk reading, concat files, drop useless column(s)

In [None]:
df = []
for f in _root.glob(shopee_file_path):
  _df = pd.read_excel(f, na_filter=False)
  df.append(_df)
stg_shopee_dishes = pd.concat(df).drop(columns={'Unnamed: 0'})

3. Check datatype and convert them to a approriate one

In [None]:
stg_shopee_dishes.dtypes

catalog_id                     float64
dish_total_order               float64
catalog_name                    object
catalog_rank                   float64
catalog_partner_catalog_id      object
catalog_description             object
dish_restaurant_id             float64
dish_id                        float64
dish_name                       object
dish_partner_dish_id            object
dish_listing_status               bool
dish_description                object
dish_total_like                 object
dish_rank                      float64
dish_picture_label              object
dish_is_hidden                    bool
dish_price                     float64
dish_is_group_discount_item       bool
dishes_property_info            object
dtype: object

In [None]:
stg_shopee_dishes.dish_total_like.replace('', pd.NA, inplace=True)

In [None]:
stg_shopee_dishes = stg_shopee_dishes.astype({
    "catalog_id": 'Int64', 
    "dish_total_order": 'Int64', 
    "catalog_rank": 'Int64', 
    "dish_restaurant_id": 'Int64', 
    "dish_id": 'Int64', 
    "dish_name": "str",
    'dish_listing_status': "bool",
    "dish_description": 'str',
    "dish_total_like": 'Int64',
    "dish_rank": 'Int64', 
    "dish_is_hidden": 'bool', 
    "dish_price": 'float64', 
    "dish_is_group_discount_item": 'bool',
})

4. Save the condensed data to staging folder for future referencing

In [None]:
stg_shopee_dishes.to_parquet(_root.joinpath("Data/2 - cleaned data/Staging/2022.10.16.dim_shopee_dishes.parquet"), index=False)

### Staging Shopee dishes (2022/12/03)

In [None]:
shopee_file_path = "Data/1 - raw data/shopee/shopee dish/20221203_dishes/*.parquet"
Path("tmp").mkdir(parents=True, exist_ok=True)

for f in _root.glob(shopee_file_path):
  # Đọc file lên trước, drop các cột Null
  df = pq.read_table(f)
  df = ArrowDropNull(df)

  # Cast cột discount_pice về double
  s = ArrowSchemaToPyDict(df.schema)
  try:
    s["data"]["catalogs"][0]["dishes"][0]["discount_price"] = "double"
    df = df.cast(PyDictToArrowSchema(s))
  except Exception as e:
    print(e)
    print(f)

  # Ghi tạm xuống
  pq.write_table(df, f"tmp/{f.name}")

struct fields don't match or are in the wrong order: Input fields: struct<listing_status: bool, partner_dish_id: string, description: string, total_like: int64, restaurant_id: int64, rank: int64, id: int64, property_info: struct<has_alcohol: bool>, catalog_id: int64, stock_info: struct<start_time: int64, is_out_stocked: bool, end_time: int64>, is_hidden: bool, sale_time_info: struct<loop_sale_days: list<item: struct<time_for_sales: list<item: struct<start_time_sec: int64, end_time_sec: int64>>, weekday: int64>>, is_in_sale_time: bool>, price: double, is_group_discount_item: bool, name: string> output fields: struct<listing_status: bool, partner_dish_id: string, description: string, total_like: int64, restaurant_id: int64, rank: int64, id: int64, property_info: struct<has_alcohol: bool>, catalog_id: int64, stock_info: struct<start_time: int64, is_out_stocked: bool, end_time: int64>, is_hidden: bool, sale_time_info: struct<loop_sale_days: list<item: struct<time_for_sales: list<item: stru

In [None]:
df = spark.read.option("mergeSchema", True).parquet(*list(map(lambda p: str(p), Path("tmp/").glob("*.parquet"))))

In [None]:
df.coalesce(1).write.format("parquet").mode("append").save(str(_root.joinpath("Data/2 - cleaned data/Staging/2022.12.03.dishes.parquet")))

### Staging Shopee dishes (2022/12/06)

In [None]:
shopee_file_path = "Data/1 - raw data/shopee/shopee dish/20221206_dishes/*.parquet"
Path("tmp").mkdir(parents=True, exist_ok=True)

for f in _root.glob(shopee_file_path):
  # Đọc file lên trước, drop các cột Null
  df = pq.read_table(f)
  df = ArrowDropNull(df)

  # Cast cột discount_pice về double
  s = ArrowSchemaToPyDict(df.schema)
  try:
    s["data"]["catalogs"][0]["dishes"][0]["discount_price"] = "double"
    df = df.cast(PyDictToArrowSchema(s))
  except Exception as e:
    print(e)
    print(f)

  # Ghi tạm xuống
  pq.write_table(df, f"tmp/{f.name}")

struct fields don't match or are in the wrong order: Input fields: struct<listing_status: bool, partner_dish_id: string, description: string, total_like: int64, restaurant_id: int64, rank: int64, id: int64, property_info: struct<has_alcohol: bool>, catalog_id: int64, stock_info: struct<start_time: int64, is_out_stocked: bool, end_time: int64>, is_hidden: bool, sale_time_info: struct<loop_sale_days: list<item: struct<time_for_sales: list<item: struct<start_time_sec: int64, end_time_sec: int64>>, weekday: int64>>, is_in_sale_time: bool>, price: double, is_group_discount_item: bool, name: string> output fields: struct<listing_status: bool, partner_dish_id: string, description: string, total_like: int64, restaurant_id: int64, rank: int64, id: int64, property_info: struct<has_alcohol: bool>, catalog_id: int64, stock_info: struct<start_time: int64, is_out_stocked: bool, end_time: int64>, is_hidden: bool, sale_time_info: struct<loop_sale_days: list<item: struct<time_for_sales: list<item: stru

In [None]:
df = spark.read.option("mergeSchema", True).parquet(*list(map(lambda p: str(p), Path("tmp/").glob("*.parquet"))))

In [None]:
df.coalesce(1).write.format("parquet").mode("append").save(str(_root.joinpath("Data/2 - cleaned data/Staging/2022.12.06.dishes.parquet")))