In [1]:
from datetime import datetime
import os
from functools import partial
from pyspark import SparkContext
import json
import re
import argparse

In [2]:
index2amazon_column = {
    2: "review_id",
    14: "review_date",
    6: "product_category",
    7: "star_rating"
}

basepath = "/home/de2020/Documents/amazon" 

In [3]:
sc = SparkContext()

In [4]:
amazon_rdd = sc.textFile(basepath)

In [5]:
amazon_rdd.take(1)

['US\t53096384\tR63J84G1LOX6R\t1563890119\t763187671\tThe Sandman Vol. 1: Preludes and Nocturnes\tBooks\t4\t0\t1\tN\tN\tignore the review below\tthis is the first 8 issues of the series. it is the starting point of all this... it also contains the sound of her wings. issue #8. which is the first appearance of death. and many peoples favorite issue. its not the best of the collected works. but its the start of them.\t1995-08-13']

In [6]:
tab_split_func = lambda input_line: input_line.split("\t") #functie met inpu_line als input variabele (namelijk het eerste record als string)

In [7]:
amazon_split_rdd = amazon_rdd.map(tab_split_func) #map neemt 1 fucntie als input, functie neemt 1 input
                                                    # map voert tab_split_func uit op alle rijen

In [8]:
#nu gaan we de informatie die in deze rdd zit, in een andere rdd steken als json format.
def transform_to_json(record, index2column):
    return_value = dict()
    for index, column_name in index2column.items():
        return_value[column_name] = record[index]
    return return_value

amazon_json_rdd = amazon_split_rdd.map(partial(transform_to_json, index2column = index2amazon_column))
#dus we gaan de functie transform_to_json uitvoeren op elke rij van amazon_split_rdd.
#indien te wazig -> amazon_split_rdd.take(1) in vorige tab. 
#partiele functie want we gaan index2column record al partieel invullen met index2amazon_column dictionary
#omdat index2amazon_column bepaalt welke kolommen we willen extracten en welke naam het moet krijgen.
#customer_id is de key.
        

In [9]:
amazon_json_rdd.take(1)

[{'review_id': 'R63J84G1LOX6R',
  'review_date': '1995-08-13',
  'product_category': 'Books',
  'star_rating': '4'}]

In [10]:
def convert_dtstr_dt(dtstr):
    format_string = "%Y-%m-%d"
    dt = datetime.strptime(dtstr, format_string)
    if dt.month < 10:
        return str(dt.year) + "0" + str(dt.month)
    else:
        return str(dt.year) + str(dt.month)
type_converters = {
    "review_date": convert_dtstr_dt,
    "star_rating": int
}

def convert_types(record, converters):
    for col_name, convert_func in converters.items():
        record[col_name] = convert_func(record[col_name])
    return record

In [11]:
amazon_converted_rdd = amazon_json_rdd.map(partial(convert_types, converters = type_converters)) 

In [12]:
amazon_converted_rdd.take(1) 

[{'review_id': 'R63J84G1LOX6R',
  'review_date': '199508',
  'product_category': 'Books',
  'star_rating': 4}]

In [13]:
star_date_count_rdd = amazon_converted_rdd.map(lambda js:((int(js["review_date"]),js["product_category"],js["star_rating"]),1)).reduceByKey(lambda x, y: x+y).collect()

In [14]:
star_date_count_rdd[:10]

[((199806, 'Video DVD', 1), 1),
 ((199809, 'Books', 4), 49),
 ((199809, 'Books', 1), 30),
 ((200005, 'Video DVD', 5), 833),
 ((200005, 'Video DVD', 3), 136),
 ((200106, 'Video', 3), 51),
 ((200112, 'Video Games', 5), 4),
 ((200202, 'Books', 3), 84),
 ((200208, 'Video Games', 5), 9),
 ((200210, 'Video', 5), 239)]

In [15]:
star_date_count_rdd_sorted = sorted(star_date_count_rdd, key = lambda x: x[0])

In [16]:
def unpacking(list_packed):
    unpacked_list = [(*x, z) for x, z in list_packed]
    return unpacked_list 

In [17]:
star_date_count_unpacked= unpacking(star_date_count_rdd_sorted)

In [18]:
star_date_count_unpacked_rdd= sc.parallelize(star_date_count_unpacked)

In [19]:
index2new_column= {
    0: "review_date",            
    1: "product_category",
    2: "star_rating",
    3: "review_count",
}

In [20]:
amazon_mapped_json_rdd = star_date_count_unpacked_rdd.map(partial(transform_to_json, index2column = index2new_column))

In [21]:
amazon_mapped_json_rdd.take(10)

[{'review_date': 199508,
  'product_category': 'Books',
  'star_rating': 4,
  'review_count': 2},
 {'review_date': 199508,
  'product_category': 'Books',
  'star_rating': 5,
  'review_count': 1},
 {'review_date': 199509,
  'product_category': 'Books',
  'star_rating': 5,
  'review_count': 1},
 {'review_date': 199510,
  'product_category': 'Books',
  'star_rating': 5,
  'review_count': 1},
 {'review_date': 199511,
  'product_category': 'Books',
  'star_rating': 5,
  'review_count': 2},
 {'review_date': 199511,
  'product_category': 'Music',
  'star_rating': 5,
  'review_count': 1},
 {'review_date': 199511,
  'product_category': 'Video',
  'star_rating': 5,
  'review_count': 1},
 {'review_date': 199605,
  'product_category': 'Books',
  'star_rating': 5,
  'review_count': 13},
 {'review_date': 199605,
  'product_category': 'Music',
  'star_rating': 4,
  'review_count': 1},
 {'review_date': 199606,
  'product_category': 'Books',
  'star_rating': 4,
  'review_count': 4}]

In [27]:
def stringify_record(record): 
    return json.dumps(record)

In [28]:
amazon_to_string= amazon_mapped_json_rdd.map(stringify_record)

In [None]:
amazon_to_string.take(10)

In [25]:
import mysql.connector

def connection_factory(user, password, host, database):
    cnx = mysql.connector.connect(
                    user=user, 
                    password=password, 
                    host=host, 
                    database=database
    )
    cursor = cnx.cursor()
    return cnx, cursor

connection_factory = partial(connection_factory,
                            user ="root",
                            password="de2020",
                            host="localhost",
                            database="Amazon2020"
                    )

In [26]:
def store_records(records, connection_factory):
    cnx, cursor = connection_factory()
    
    insert_statement_str = "insert into Review(review_date,product_category,star_rating,review_count) VALUES(%s, %s, %s, %s)"
    
    record_list = list()
    for record in records:
        record_list.append((
        record["review_date"],
        record["product_category"],
        record["star_rating"],
        record["review_count"],
        ));
    cursor.executemany(insert_statement_str,record_list)
    cnx.commit()
    cnx.close()


In [None]:
amazon_mapped_json_rdd.foreachPartition(partial(store_records, connection_factory = connection_factory))