In [1]:
import pymysql
import prettytable as pt
import json
import pandas as pd
from tqdm import tqdm
import csv
import os
import random

In [14]:

db = pymysql.connect(
    host = 'localhost',
    user = 'root',
    password = 'mysql'
)
cursor = db.cursor()
cursor.execute("create database if not exists cs550")


cursor.execute("use cs550")

cursor.execute("select database()")

table = pt.from_db_cursor(cursor)
print(table)

db.close()

+------------+
| database() |
+------------+
|   cs550    |
+------------+


## Global Variables

### Data Directory

In [2]:
dataDir = "../Data"

### Some data about two large datasets

In [16]:
totalReviews = 157260921
totalReviewPath = os.path.join(dataDir, "All_Amazon_Review_5.json")

totalMeta = 15023060
totalMetaPath = os.path.join(dataDir, "All_Amazon_Meta.json")

## Simple Analysis on Review Dataset

In [108]:
# simple analysis on review dataset
totalReviews = 157260921
columns = ['verified', 'image', 'style', 'asin', 'reviewerID', 'overall', 'reviewText', 'reviewTime', 'unixReviewTime', 'summary', 'reviewerName', 'vote']
column_dict = dict()
style = set()
dataPath = totalReviewPath
with open(dataPath, 'r', encoding="utf-8") as f:
    with tqdm(total=totalReviews, desc="Processing", leave=True, unit_scale=True) as pbar:
        index = 0
        line = f.readline()
        while line:
            pbar.update(1)
            row = json.loads(line)
            for key in row.keys():
                valueType = type(row[key])
                if key not in column_dict:
                    if valueType in [list, dict, str]:
                        column_dict[key] = (valueType.__name__, index, len(row[key]))
                    else:
                        column_dict[key] = (valueType.__name__)
                else:
                    if valueType in [list, dict, str] and len(row[key]) > column_dict[key][2]:
                        column_dict[key] = (valueType.__name__, index, len(row[key]))
            line = f.readline()
            index += 1
print(column_dict)

Processing: 100%|█████████▉| 157M/157M [30:48<00:00, 85.1kit/s]   

{'overall': 'float', 'verified': 'bool', 'reviewTime': ('str', 1, 11), 'reviewerID': ('str', 11586, 20), 'asin': ('str', 0, 10), 'reviewerName': ('str', 28782337, 1725), 'reviewText': ('str', 140467823, 35094), 'summary': ('str', 151619415, 1730), 'unixReviewTime': 'int', 'vote': ('str', 257028, 6), 'image': ('list', 136488683, 508), 'style': ('dict', 2377009, 7)}





## Split columns

### Single Process

In [40]:
def splitColumns(selectedKeys:list, dataPath:str, totalRowNum:int, outputPath:str):
    """function used to split dataset

    Args:
        selectedKeys (list): list contains column names
        dataPath (str): input file path
        totalRowNum (int): number of rows
        outputPath (str): output file path
    """
    with open(outputPath, "w", newline="") as outputFile:
        writer = csv.DictWriter(outputFile, selectedKeys)
        writer.writeheader()
        with open(dataPath, encoding="utf-8") as file:
            with tqdm(total=totalRowNum, desc="Processing", leave=True, unit_scale=True) as pbar:
                for line in file:
                    row = json.loads(line)
                    writer.writerow(dict([(key, row.get(key)) for key in selectedKeys]))
                    pbar.update()

totalReviews = 157260921 # total number of reviews
dataPath = "../Data/All_Amazon_Review_5.json" # input dataset path
outputPath = "../Data/All_Amazon_Review_User_Item_Rating.csv" # output datset path
selectedKeys = ['reviewerID', 'asin', 'overall'] # choose columns to split from dataset
splitColumns(selectedKeys, dataPath, totalReviews, outputPath)

Processing: 100%|█████████▉| 157M/157M [23:15<00:00, 113kit/s]  


### Multi-process

In [3]:
import multiprocess_methods
from multiprocessing import cpu_count
if __name__ == '__main__':
    dataPath = os.path.join(dataDir, "All_Amazon_Review_5.json")
    totalSize = 157260921
    selectedKeys = ['reviewerID', 'asin', 'overall']
    outputPath = os.path.join(dataDir, "All_Amazon_Review_User_Item_Rating.csv")
    processNum = cpu_count() # number of processes (customize this variable based on different CPUs)
    chunkSize = 1024 # the size of each chunk splitted from the iterable
    printParameters = True
    multiprocess_methods.splitDataset(dataPath, totalSize, selectedKeys, outputPath, processNum, chunkSize, printParameters)

Dataset File: ../Data\All_Amazon_Review_5.json
Total Size: 157260921
Selected Keys: ['reviewerID', 'asin', 'overall']
Output File: ../Data\All_Amazon_Review_User_Item_Rating.csv
Number of processes: 12
Size of each chunk: 1024


Processing: 100%|█████████▉| 157M/157M [11:32<00:00, 227kit/s]      


## Sample Data

### Single Process

In [30]:
def sampleData(sampleSize:int, totalSize:int, dataPath:str, outputPath:str):
    """function used to sample a small dataset from a large csv file

    Args:
        sampleSize (int): the size of the sample
        totalSize (int): the size of the original dataset
        dataPath (str): original dataset path
        outputPath (str): sample dataset output path
    """
    sampleIndicesList = sorted(random.sample(range(totalSize), sampleSize))
    sampleIndex = 0
    with open(dataPath, encoding="utf-8", newline='') as file:
        csvReader = csv.reader(file)
        csvHeader = next(csvReader) # read header
        with open(outputPath, "w", newline='') as outputFile:
            csvWriter = csv.writer(outputFile, csvHeader)
            csvWriter.writerow(csvHeader) # write header
            with tqdm(total=sampleSize, desc="Processing", leave=True, unit_scale=True) as pbar:
                for index, row in enumerate(csvReader):
                    if index == sampleIndicesList[sampleIndex]:
                        csvWriter.writerow(row)
                        sampleIndex += 1
                        pbar.update(1)
                    if sampleIndex == sampleSize:
                        break
                    

data_dir = "../Data/"
input_csv_path = os.path.join(data_dir, "All_Amazon_Review_User_Item_Rating.csv")
output_csv_path = os.path.join(data_dir, "sampled_data.csv")
total_records = 157260921
required_records = 2500000
sampleData(required_records, total_records, input_csv_path, output_csv_path)

Processing: 100%|██████████| 2.50M/2.50M [01:39<00:00, 25.0kit/s]


KeyboardInterrupt: 