# Sampling large Datasets
In data processing, a great deal of computing involves analysing large amounts of text mixed with numerical data.  This is what Spark is particularly suited for. Sampling is an essential pre-processing for machine leanring for proof of concept

## Recbole dataset
Recbole is a powerful recommendation system traning and evaluation platform. It has many built-in datasets(https://recbole.io/dataset_list.html), some of which is too large to process on a single computer. I will use spark to preprocess it to shrink its size. 

In [None]:

!rm url.yaml
!wget https://raw.githubusercontent.com/RUCAIBox/RecBole/master/recbole/properties/dataset/url.yaml
!pip install pyyaml

import yaml

# Specify the path to the YAML file
file_path = "url.yaml"

# Open the file and load the YAML contents
with open(file_path, "r") as file:
    dataset_urls = yaml.safe_load(file)
   
# only print the first 5 lines
for key in list(dataset_urls.keys())[:5]:
    print(key, ":", dataset_urls[key])

Set the datasets to donwload and process

In [None]:
datasets_to_download = ['amazon-digital-music', 'amazon-video-games']
# datasets_to_download = ['amazon-cds-vinyl']
import os
# Path to the folder where the zip file will be extracted
input_folder_path = "input"

# Create input folder if it doesn't exist
if not os.path.exists(input_folder_path):
    os.makedirs(input_folder_path)
    
# Path to the folder where processed file will be saved
output_folder_path = "output"

# Create out folder if it doesn't exist
if not os.path.exists(output_folder_path):
    os.makedirs(output_folder_path)

In [None]:
!pip install requests
import requests
import zipfile
import io

def download_upzip(url, dataset_name):
    # Download the zip file
    response = requests.get(url)
    zip_file = zipfile.ZipFile(io.BytesIO(response.content))

    # Extract the zip file to the specified folder of dataset_name
    folder_path = os.path.join(input_folder_path, dataset_name)
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    zip_file.extractall(folder_path)

    #TODO: if extracted file is a directory, move all files to the parent directory
    # for root, dirs, files in os.walk(folder_path):
    #     for file in files:
    #         os.rename(os.path.join(root, file), os.path.join(folder_path, file))
    #     for dir in dirs:
    #         os.rmdir(os.path.join(root, dir))

    # Close the zip file
    zip_file.close()

#  download all dataset from datasets_to_download
for dataset in datasets_to_download:
    download_upzip(dataset_urls[dataset], dataset)

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Amazon Sampling").getOrCreate()
spark.catalog.clearCache()

In [None]:
from pyspark.sql.functions import col, when, count

# read from file into dataframe
dfs = {}
for dataset in datasets_to_download:
    dataset_path = os.path.join(input_folder_path, dataset)
    dfs[dataset] = {}
    for file in os.listdir(dataset_path):
        file_path = os.path.join(dataset_path, file)
        df = spark.read.option("delimiter",'\t').option("header", True).csv(file_path)
        dfs[dataset][file] = df
        print(f"Dataset: {dataset}, File: {file}")
        df.show(5)
        
        print(f'num of {file}:',df.count())

        # check the uniqueness of key, we assume key name is ending with _id bofore :token i.e. item_id:token
        # find the header ending with _id:token
        key_columns = [col for col in df.columns if col.endswith('_id:token')]
        for key_column in key_columns:
            print(f"Number of disintict {key_column}:", df.select(key_column).distinct().count())
            

        # check the completeness of each column
        print("Number of non-null values in each column:")
        df.select([count(when(col(c).isNotNull() , c)).alias(c) for c in df.columns]).show()


## Data Processing

In [None]:
inter_map = {}
# analyze the sparse of the dataset
for dataset in datasets_to_download:
    dataset_path = os.path.join(input_folder_path, dataset)
    for file in os.listdir(dataset_path):
        if file.endswith('.inter'):
            inter_map[dataset] = file

### filter out inactive user/items

In [None]:
user_inter_threshold = 10
item_inter_threshold = 10

# filter out the user and item with less than threshold interactions
for dataset in datasets_to_download:
    print('-----------------------------------')
    print(f"Dataset: {dataset}")
    inter_df = dfs[dataset][inter_map[dataset]]
    
    print(f'num of iteractions:',inter_df.count())

    # print(f'num of {inter_map[dataset]}:',inter_df.count())
    print(f'num of user_id:',inter_df.select('user_id:token').distinct().count())
    print(f'num of item_id:',inter_df.select('item_id:token').distinct().count())
    # count the number of interactions for each user and item and rename the count column
    user_count_df = inter_df.groupBy('user_id:token').count().withColumnRenamed('count','count_user')
    item_count_df = inter_df.groupBy('item_id:token').count().withColumnRenamed('count','count_item')

    # append the count of user and item to the original df
    inter_df = inter_df.join(user_count_df, on='user_id:token', how='inner')
    inter_df = inter_df.join(item_count_df, on='item_id:token', how='inner')
    inter_df.show(5)
    
    # filter out the user and item with less than threshold interactions
    inter_df = inter_df.filter((col('count_user') >= user_inter_threshold) & (col('count_item') >= item_inter_threshold))
    
    print(f'filtered num of iteractions:',inter_df.count())
    
    # release the memory of dfs[dataset][inter_map[dataset]]
    dfs[dataset][inter_map[dataset]] = inter_df.drop('count_user','count_item')
    

### Output overlaped users between datasets 

In [None]:
# folder list of output folders
output_folder_list = []

In [None]:
base_dataset = datasets_to_download[0]
# find the common users between base_dataset and other datasets
for j in range(1,len(datasets_to_download)):
        dataset1 = base_dataset
        dataset2 = datasets_to_download[j]
        inter1 = dfs[dataset1][inter_map[dataset1]]
        inter2 = dfs[dataset2][inter_map[dataset2]]
        inter1.createOrReplaceTempView("inter1")
        inter2.createOrReplaceTempView("inter2")

        print(f"Common users between {dataset1} and {dataset2}")    
        # get the distinct users and then intersect
        inter1_dist = inter1.select('user_id:token').distinct()
        # inter1_dist.show(5)
        common_users = inter1_dist.join(inter2, inter1_dist['user_id:token'] == inter2['user_id:token'],'leftsemi')
        common_users.show(5)

        print(f'num of common_users:',common_users.count())
        # print the items count of each inter of common users
        inter1_com_user = inter1.join(common_users, 'user_id:token')
        inter2_com_user = inter2.join(common_users, 'user_id:token')
        # statictics of inter 1
        inter1_com_user_count = inter1_com_user.count()
        inter1_com_item_count = inter1_com_user.select('item_id:token').distinct().count()
        print(f'num of interactino of common users in {dataset1}:',inter1_com_user_count)
        print(f'num of related items in the interaction:',inter1_com_item_count)
        print(f'density of {dataset1} inetraction :',inter1.count()/inter1_com_user_count/inter1_com_item_count)

        # save filtered datasets to file
        inter1_out_path = os.path.join(output_folder_path, f"{dataset1}_{dataset2}")
        inter1_com_user.show(5)
        inter1_com_user.repartition(1).write.option("header", "true").csv(inter1_out_path, mode='overwrite', sep='\t')
        output_folder_list.append(inter1_out_path)
        # output_folder_map[dataset1] = inter1_out_path
        
        # statictics of inter 2
        inter2_com_user_count = inter2_com_user.count()
        inter2_com_item_count = inter2_com_user.select('item_id:token').distinct().count()
        print(f'num of interactino of common users in {dataset2}:',inter2_com_user_count)
        print(f'num of related items in the interaction:',inter2_com_item_count)
        print(f'density of {dataset2} inetraction :',inter2.count()/inter2_com_user_count/inter2_com_item_count)

        # save filtered datasets to file
        inter2_out_path = os.path.join(output_folder_path, f"{dataset2}_{dataset1}")
        inter2_com_user.show(5) 
        inter2_com_user.repartition(1).write.option("header", "true").csv(inter2_out_path, mode='overwrite', sep='\t')
        output_folder_list.append(inter2_out_path)
        # output_folder_map[dataset2] = inter2_out_path

In [None]:
# find the common users among all downloaded datasets
for dataset in datasets_to_download:
    inter = dfs[dataset][inter_map[dataset]]
    inter.createOrReplaceTempView("inter")

    print(f"Common users among all datasets")    
    # get the distinct users and then intersect
    inter_dist = inter.select('user_id:token').distinct()
    inter_dist.show(3)
    if dataset == datasets_to_download[0]:
        common_users = inter_dist
    else:
        common_users = common_users.join(inter_dist, 'user_id:token','inner')
    print(f'num of common_users after merge with {dataset}:',common_users.count())

common_users.show(3)

# export inter of common users to file
for dataset in datasets_to_download:
    inter = dfs[dataset][inter_map[dataset]]
    inter.createOrReplaceTempView("inter")
    inter_com_user = inter.join(common_users, 'user_id:token')
    inter_com_user_count = inter_com_user.count()
    inter_com_item_count = inter_com_user.select('item_id:token').distinct().count()
    print(f'num of interactino of common users in {dataset}:',inter_com_user_count)
    print(f'num of {dataset} :',inter_com_item_count)
    print(f'density of {dataset} inetraction :',inter.count()/inter_com_user_count/inter_com_item_count)

    # save filtered datasets to file
    inter_out_path = os.path.join(output_folder_path, f"{dataset}_common")
    inter_com_user.show(5)
    inter_com_user.repartition(1).write.option("header", "true").csv(inter_out_path, mode='overwrite', sep='\t')
    output_folder_list.append(inter_out_path)
    # output_folder_map[dataset] = inter_out_path

In [None]:
dataset_itemfile_map = {}
def get_itemfile_path(dataset):
    dataset_path = os.path.join(input_folder_path, dataset)
    for file in os.listdir(dataset_path):
        if file.endswith('.item'):
            return os.path.join(dataset_path, file)
    return None

for ouptput_folder in output_folder_list:
    # strip the dataset from the first part of folder
    dataset = os.path.basename(ouptput_folder).split('_')[0]
    # copy .item file from correonding input folder to output folder
    itemfile_path = get_itemfile_path(dataset)
    if itemfile_path:
        print(f"copy from {itemfile_path} to {ouptput_folder} for {dataset} ")
        out_path = os.path.join(ouptput_folder, f"{dataset}.item")
        !cp $itemfile_path $out_path
    else:
        print(f"item file not found for {dataset}")

    for file in os.listdir(ouptput_folder):
        # rename exported cvs as .inter
        if file.endswith('.csv'):
            # rename file to {folder}.inter
            file_path = os.path.join(ouptput_folder, file)
            out_path = os.path.join(ouptput_folder, f"{dataset}.inter")
            !mv $file_path $out_path

## Analyze Chronicle Characteristics
TBD

## Sampling
Stratified sampling based on hotness(interaction rate) of items

## release all the resources 

In [None]:
# unpersist the dfs
for dataset in datasets_to_download:
    for key in dfs[dataset]:
        dfs[dataset][key].unpersist()
        
# Stop the Spark session
spark.stop()

## Sammary
Spark is a powerful and efficient tool to handle sample on large scale of data. 
* flexible and powerful functionality
* runs super fast even on my laptop
* easy to apply to similar datasets(Amazon have dataset of different categories), I only focused on one categoy this time. 