# Task Description

You will be required to produce code to process and transform some sample data.
The sample data is in the file called `data.json`.
There are also some duplicate rows.

The produced code should be able to acheive the following

1. remove duplicates over the columns `id` and `created_at` (considered simultaneously)
2. compute the rank of each user's `user_score` within each age group and output the rank in a new column called `sub_group_rank`
3. process the column `widget_list` by
    1. flattening the list items i.e. each item in the list is put into its own row
    2. extracting the values in the JSON elements into their own columns called `widget_name` and `widget_amount`
4. anonymize the column `email` and output the anonymized version in a new column `email_anon`.
This column `email_anon` should have the following properties.
    1. given an anonymized value the original value can be recovered
5. create a new table that is an inverted index that gives, for each country in `location,` which `id`s are located in that country
6. write the processed tables/data into separate `parquet` file(s).
Exactly how the files/tables are organized is not as important as having all the data present.

Your code will be evaluated for correctness, scalability and maintainability.

In [None]:
!pip install pandas fastparquet

In [None]:
import pandas as pd

In [None]:
raw_data = pd.read_json('data.json', lines=True)

In [None]:
len(raw_data)

In [None]:
raw_data.head()

In [None]:
# Question 1 - remove duplicates over the columns id and created_at (considered simultaneously)
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.drop_duplicates.html
# https://github.com/pandas-dev/pandas/blob/ad190575aa75962d2d0eade2de81a5fe5a2e285b/pandas/core/frame.py#L6033
df_drop_duplicates = raw_data.drop_duplicates(subset=['id','created_at'], keep='last')

In [None]:
len(df_drop_duplicates)

In [None]:
# Question 2
# compute the rank of each user's user_score within each age group 
# and output the rank in a new column called sub_group_rank
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.rank.html
# https://dfrieds.com/data-analysis/rank-method-python-pandas.html#Find-Rank-of-Homes-Sold-for-Each-Seller-by-Close-Date
sub_group_rank = df_drop_duplicates.groupby('age_group')['user_score'].rank(method='first')
# https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.assign.html
df_sub_group_rank = df_drop_duplicates.assign(sub_group_rank = sub_group_rank)

In [None]:
df_sub_group_rank.groupby('age_group').size()

In [None]:
df_sub_group_rank.head()

In [None]:
# Question 3
# process the column widget_list by
#     flattening the list items i.e. each item in the list is put into its own row
#     extracting the values in the JSON elements into their own columns called widget_name and widget_amount
widget_list = df_sub_group_rank['widget_list']

In [None]:
widget_list.head()

In [None]:
widget_name = widget_list.map(lambda x:[i['name'] for i in x])

In [None]:
widget_amount = widget_list.map(lambda x:[i['amount'] for i in x])

In [None]:
df_widget_list = df_sub_group_rank.assign(widget_name=widget_name, widget_amount=widget_amount)

In [None]:
df_widget_list.head()

In [None]:
# Question 4
# anonymize the column email and output the anonymized version in a new column email_anon. 
# This column email_anon should have the following properties.
#     given an anonymized value the original value can be recovered
email = df_widget_list['email']

In [None]:
# https://docs.python.org/3/library/base64.html#module-base64
import base64

email_anon = email.map(lambda x: base64.b64encode(x.encode("utf-8")))

In [None]:
df_email_anon =  df_widget_list.assign(email_anon=email_anon)

In [None]:
df_email_anon.head()

In [None]:
# Question 5
# create a new table that is an inverted index that gives, for each country in location, which ids are located in that country
inverted_index = df_email_anon.groupby('location')['id'].transform(lambda x: ','.join(x))
df_inverted_index = df_email_anon.assign(inverted_index=inverted_index)[['location','inverted_index']].drop_duplicates()

In [None]:
len(df_inverted_index)

In [None]:
df_inverted_index.head()

In [None]:
# Question 6
# write the processed tables/data into separate parquet file(s). 
# Exactly how the files/tables are organized is not as important as having all the data present.
raw_data.to_parquet('raw_data.parquet')
df_email_anon.to_parquet('processed_data.parquet')
df_inverted_index.to_parquet('inverted_index_data.parquet')