Cleaning up and simplifying the dateframe to reduce memory and runtime

In [1]:
# If not already installed, do: pip install pandas fastparquet
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import json
import dask.dataframe as ddf
from dask.diagnostics import ProgressBar
from tqdm.auto import tqdm

In [2]:
URL_LOOKUP = 'https://storage.googleapis.com/dosm-public-pricecatcher/lookup_item.parquet'
URL_PREMISE = 'https://storage.googleapis.com/dosm-public-pricecatcher/lookup_premise.parquet'
url_file = 'pricecatcher/pricecatcher/price_urls.json'

In [3]:
price_dfs = []

with open(url_file, 'r') as json_file: 
    price_urls_data = json.load(json_file)

for entry in price_urls_data:
        parquet_urls = entry['parquet_files']
        for url in parquet_urls:
            df = ddf.read_parquet(url, blocksize = '1GB', npartitions = 20)
            if 'date' in df.columns: df['date'] = ddf.to_datetime(df['date'])
            df = df[(df['item_code'] != -1) | (df['premise_code'] != -1)]
            price_dfs.append(df)

price = ddf.concat(price_dfs, ignore_index = True)

premise = ddf.read_parquet(URL_PREMISE, npartitions = 8)
lookup = ddf.read_parquet(URL_LOOKUP, npartitions = 8)
premise = premise.dropna()
lookup = lookup.dropna()
if 'date' in lookup.columns: lookup['date'] = ddf.to_datetime(lookup['date'])
if 'date' in premise.columns: premise['date'] = ddf.to_datetime(premise['date'])

#premise = premise.drop(columns = ['premise_type', 'address'])
price['premise_code'] = price['premise_code'].astype('int32')
premise['premise_code'] = premise['premise_code'].astype('int32')
price = premise.merge(price, on='premise_code',how='left',indicator=False)
price = price.drop(columns = ['premise_type', 'address', 'premise', 'premise_code', 'district'])

del price_dfs
del df
del entry
del json_file
del parquet_urls
del url
del URL_PREMISE
del URL_LOOKUP
del url_file
del price_urls_data
del premise


CREATE A DICT SO USER CAN ACCESS USING ITEM AND PREMISE CODE INSTEAD OF NAME

In [4]:
#print(lookup.head())

In [5]:
lookup_dict = {}

for index, row in tqdm(lookup.iterrows(), total=len(lookup)):
    key_tuple = tuple([row['item'], row['item_category']])
    lookup_dict[key_tuple] = row['item_code']

del key_tuple
del row
del index
del lookup


  0%|          | 0/756 [00:00<?, ?it/s]

Filter the DDF based on user input

In [6]:
from dask import delayed, compute
from distributed import Client, LocalCluster
from fuzzywuzzy import fuzz
import ipywidgets as widgets
from IPython.display import display, clear_output
import concurrent.futures

In [7]:
def match(input, choices, threshold=80):
        match_score = [(choice, fuzz.partial_ratio(input, choice.lower())) for choice in choices if isinstance(choice, str)]
        matched_item = max(match_score, key=lambda x: x[1], default=None)
        if matched_item[1] >= threshold:
            return matched_item[0]
        else:
            return None

def match(input, choices, threshold=80):
    match_score = [(choice, fuzz.partial_ratio(input, choice.lower())) for choice in choices if isinstance(choice, str)]
    matched_item = max(match_score, key=lambda x: x[1], default=None)
    if matched_item[1] >= threshold:
        return matched_item[0]
    else:
        return None

def process_chunk(keys_chunk, user_item, lookup_dict):
    matching_item_codes = []

    for key in keys_chunk:
        if match(user_item, [key[0], key[1]], threshold=80) is not None:
            item_codes = lookup_dict[key]
            matching_item_codes.append(item_codes)

    return matching_item_codes

def identify_item_code(user_input, lookup_dict, chunk_size=1000):
    user_item = user_input.lower()
    keys = list(lookup_dict.keys())
    keys_chunks = [keys[i:i + chunk_size] for i in range(0, len(keys), chunk_size)]

    delayed_partitions = []

    for chunk in keys_chunks:
        delayed_partitions.append(delayed(process_chunk)(chunk, user_item, lookup_dict))

    return delayed_partitions

def identify_state(user_input, df):
    state_name = user_input.lower()
    
    def process_chunk(chunk):
        matched_state = match(state_name, df['state'], threshold=80)
        return chunk[chunk['state'].isin(matched_state)]
    
    delayed_partitions = [delayed(process_chunk)(chunk) for chunk in df.to_delayed()]

    results = compute(*delayed_partitions, scheduler='threads')
    
    return ddf.from_delayed(results, meta=df)


Just search using the code below

In [8]:
user_input = []
item = 'ayam'
state = 'johor'

if (len(item.strip()) > 0) and (len(state.strip()) > 0): user_input = [item, state]
elif (len(item.strip()) > 0): user_input = [item]
del item
del state

In [9]:
#dask.config.set({"graphiz" : "C:\Program Files\Graphviz\bin\dot.exe"})
#dask.visualize()

cluster = LocalCluster()
client = Client(cluster)

if len(user_input) >= 1: 
    chunk_size = 100000

    delayed_partitions = identify_item_code(user_input[0], lookup_dict, chunk_size)
    results = compute(*delayed_partitions)
    matching_item_codes = [item for sublist in results for item in sublist if item is not None]
    del chunk_size

filtered_price = price[price['item_code'].isin(matching_item_codes)]
del matching_item_codes
del delayed_partitions
del lookup_dict
del results

with ProgressBar():
    if len(user_input) >= 2: 
        filtered_price.from_delayed = identify_state(user_input[1], filtered_price)


del user_input

print(filtered_price.compute())
client.close()
cluster.close()




[                                        ] | 0% Completed | 110.36 ms



[##############                          ] | 35% Completed | 17.15 ss


ignoring exception in ensure_cleanup_on_exception
Traceback (most recent call last):
  File "c:\Users\camel\OneDrive - Arizona State University\Desktop\MyInflasi\myenv\Lib\site-packages\dask\dataframe\shuffle.py", line 936, in ensure_cleanup_on_exception
    yield
  File "c:\Users\camel\OneDrive - Arizona State University\Desktop\MyInflasi\myenv\Lib\site-packages\dask\dataframe\shuffle.py", line 951, in shuffle_group_3
    p.append(d, fsync=True)
  File "c:\Users\camel\OneDrive - Arizona State University\Desktop\MyInflasi\myenv\Lib\site-packages\partd\encode.py", line 25, in append
    self.partd.append(data, **kwargs)
  File "c:\Users\camel\OneDrive - Arizona State University\Desktop\MyInflasi\myenv\Lib\site-packages\partd\buffer.py", line 45, in append
    self.flush(keys)
  File "c:\Users\camel\OneDrive - Arizona State University\Desktop\MyInflasi\myenv\Lib\site-packages\partd\buffer.py", line 99, in flush
    self.slow.append(dict(zip(keys, self.fast.get(keys))))
  File "c:\Users\c

OSError: [Errno 28] No space left on device