## <font color='red'> INSTRUCTIONS </font>

<b> 
1. Write your code only in cells below the "WRITE CODE BELOW" title. Do not modify the code below the "DO NOT MODIFY" title. <br>
2. The expected data types of the output answers for each question are given in the last cell through assertion statements. Your answers must match these expected output data types. Hint: Many of the answers need to be a Python dictionary. Consider methods like to_dict() to convert a Pandas Series to a dictionary. <br>
3. The answers are then written to a JSON file named my_results_PA1.json. You can compare this with the provided expected output file "expected_results_PA1.json". <br>
4. After you complete writing your code, click "Kernel -> Restart Kernel and Run All Cells" on the top toolbar. There should NOT be any syntax/runtime errors, otherwise points will be deducted. <br>
5. For submitting your solution, first download your notebook by clicking "File -> Download". Rename the file as &ltTEAM_ID&gt.ipynb" and upload to Canvas.</b>


## <font color='red'> DO NOT MODIFY </font>

In [1]:
import time
import json
import dask
import dask.dataframe as dd
import pandas as pd
import ast
import re
from dask.distributed import Client, as_completed
import ctypes
import numpy as np
import warnings
warnings.simplefilter('ignore')

def trim_memory() -> int:
    """
    helps to fix any memory leaks.
    """
    libc = ctypes.CDLL("libc.so.6")
    return libc.malloc_trim(0)

client = Client("127.0.0.1:8786")
client.run(trim_memory)
client = client.restart()
print(client)

<Client: 'tcp://172.31.26.81:8786' processes=4 threads=4, memory=15.62 GiB>


In [2]:
start = time.time()

## <font color='blue'> WRITE CODE BELOW </font>

In [3]:
df_reviews = dd.read_csv('user_reviews.csv')
df_products = dd.read_csv('products.csv', dtype={'asin': 'object'})

### Question 1 & 2 ###

In [4]:
def count_missing(df):
    missing_values = dask.delayed(df.isna().sum())
    total_rows = df.shape[0]
    missing_percentage = (missing_values / total_rows) * 100
    
    return missing_percentage

missing_reviews = count_missing(df_reviews)
missing_products = count_missing(df_products)

### Qustion 3 ###

In [5]:
df_merge = df_reviews[['asin', 'overall']].merge(df_products[['asin', 'price']], on='asin', how='left')
ans3 = df_merge['overall'].corr(df_merge['price'])

### Question 4 ###

In [6]:
ans4 = df_products['price'].describe()

### Question 5 ###

In [7]:
def extract_text_between_quotes(text):
    start = text.find("'")
    end = text.find("'", start + 1)
    if text[end + 1].isalpha():
        start = text.find('"')
        end = text.find('"', start + 1)
        return text[start + 1: end]
    else:
        return text[start + 1:end]  



df_products['super_category'] = df_products['categories'].dropna().astype(str).apply(extract_text_between_quotes, meta=('super_category', 'object'))
category_counts = df_products.groupby('super_category', sort=False).size()

In [8]:
valid_product_ids = df_products['asin'].unique()
unique_asins = df_products['asin'].unique()
related_asins = df_products['related'].explode()

In [9]:
ans1, ans2, ans3, ans4, valid_product_ids, related_asins, category_counts = dask.compute(missing_reviews, missing_products, ans3, ans4, valid_product_ids, related_asins, category_counts)
ans1 = ans1.round(2).to_dict()
ans2 = ans2.round(2).to_dict()
ans3 = round(ans3, 2)
ans4 = ans4[['mean', 'std', 'min', 'max', '50%']].to_dict()
ans4['median'] = ans4.pop('50%')
ans5 = dict(sorted(category_counts.items(), key=lambda x: (-x[1], x[0])))
del ans5['']

### Question 6 ###

In [10]:
def check_dangling(partition, products_ids):
    for item in partition['asin']:
        if item not in products_ids:
            return 1
    return 0


delayed_results = df_reviews.map_partitions(check_dangling, valid_product_ids, meta=int, align_dataframes=False).to_delayed()

delayed_iter = iter(delayed_results)

futures = [client.compute(next(delayed_iter)) for _ in range(min(5, len(delayed_results)))]

ans6 = 0

for future in as_completed(futures):
    result = future.result()
    if result == 1:
        ans6 = 1
        for item in futures:
            item.cancel()
        break
    try:
        next_future = client.compute(next(delayed_iter))
        futures.append(next_future)
    except StopIteration:
        break


### Question 7 ###

In [11]:
def check_dangling_2(related_asins, product_ids):
    for item in related_asins:
        if item not in product_ids:
            return 1
    return 0

ans7 = check_dangling_2(related_asins, valid_product_ids)

## <font color='red'> DO NOT MODIFY </font>

In [12]:
end = time.time()

In [13]:
print(f"execution time = {end-start}s")

execution time = 456.8798100948334s


In [14]:
# DO NOT MODIFY
assert type(ans1) == dict, f"answer to question 1 must be a dictionary like {{'reviewerID':0.2, ..}}, got type = {type(ans1)}"
assert type(ans2) == dict, f"answer to question 2 must be a dictionary like {{'asin':0.2, ..}}, got type = {type(ans2)}"
assert type(ans3) == float, f"answer to question 3 must be a float like 0.8, got type = {type(ans3)}"
assert type(ans4) == dict, f"answer to question 4 must be a dictionary like {{'mean':0.4,'max':0.6,'median':0.6...}}, got type = {type(ans4)}"
assert type(ans5) == dict, f"answer to question 5 must be a dictionary, got type = {type(ans5)}"         
assert ans6 == 0 or ans6==1, f"answer to question 6 must be 0 or 1, got value = {ans6}" 
assert ans7 == 0 or ans7==1, f"answer to question 7 must be 0 or 1, got value = {ans7}" 

ans_dict = {
    "q1": ans1,
    "q2": ans2,
    "q3": ans3,
    "q4": ans4,
    "q5": ans5,
    "q6": ans6,
    "q7": ans7,
    "runtime": end-start
}
with open('my_results_PA1.json', 'w') as outfile: json.dump(ans_dict, outfile)         