In [244]:
from pyspark.sql import DataFrameReader
from pyspark.sql.functions import explode, col, lit
import pyspark.sql.functions as F
import pyspark.sql.types as T

from datetime import datetime
import re
try:
    import pandas as pd
except ModuleNotFoundError:
    os.system("pip install pandas")
    import pandas as pd

In [2]:
url = 'jdbc:postgresql://postgres/amazon' 

In [3]:
properties = {'user': 'postgres', 
              'password':'spark123', 
              'driver':'org.postgresql.Driver'}

In [33]:
# reviews_df_1 = DataFrameReader(sqlContext).jdbc(url=url, 
#                                               properties=properties, 
#                                               table='reviews')

## Reviews

In [4]:
reviews_path = './Movies_and_TV.json'

In [5]:
raw_reviews_df = sqlContext.read.json(reviews_path)

                                                                                

In [6]:
raw_reviews_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- style: struct (nullable = true)
 |    |-- Color:: string (nullable = true)
 |    |-- Format:: string (nullable = true)
 |    |-- Shape:: string (nullable = true)
 |    |-- Size:: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: string (nullable = true)



In [14]:
@F.udf(returnType=T.TimestampType())
def to_datetime(x):
    return datetime.fromtimestamp(x)

In [36]:
@F.udf(returnType=T.IntegerType())
def clean_vote(x):
    return 0 if x is None else int(x.replace(',', ''))

In [37]:
reviews_df = raw_reviews_df.withColumn('reviewTime', to_datetime(col('unixReviewTime')))\
                            .withColumn('vote', clean_vote((col('vote'))))\
                            .select(col('asin'), 
                                    col('reviewerID'), 
                                    col('reviewerName'), 
                                    col('reviewTime'), 
                                    col('verified'), 
                                    col('vote'), 
                                    col('summary'), 
                                    col('reviewText'), 
                                    col('overall'))

In [38]:
reviews_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- reviewTime: timestamp (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: integer (nullable = true)
 |-- summary: string (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- overall: double (nullable = true)



In [40]:
reviews_df.show(10)

+----------+--------------+-------------+-------------------+--------+----+--------------------+--------------------+-------+
|      asin|    reviewerID| reviewerName|         reviewTime|verified|vote|             summary|          reviewText|overall|
+----------+--------------+-------------+-------------------+--------+----+--------------------+--------------------+-------+
|0001527665|A3478QRKQDOPQ2|        jacki|2013-03-11 00:00:00|    true|   0|               great|really happy they...|    5.0|
|0001527665|A2VHSG6TZHU1OB|        Ken P|2013-02-18 00:00:00|    true|   3|Realistic and Acc...|Having lived in W...|    5.0|
|0001527665|A23EJWOW1TLENE|Reina Berumen|2013-01-17 00:00:00|   false|   0|         Peace Child|Excellent look in...|    5.0|
|0001527665|A1KM9FNEJ8Q171|      N Coyle|2013-01-10 00:00:00|    true|   0|Culturally releva...|More than anythin...|    5.0|
|0001527665|A38LY2SSHVHRYB| Jodie Vesely|2012-12-26 00:00:00|    true|   0|Good Movie! Great...|This is a great m...| 

In [41]:
reviews_df.write.jdbc(url=url, properties=properties, table='reviews', mode='overwrite')

                                                                                

## Products metadata

In [42]:
products_path = './meta_Movies_and_TV.json'

In [43]:
raw_products_meta_df = sqlContext.read.json(products_path)

21/08/23 14:10:12 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [44]:
raw_products_meta_df.printSchema()

root
 |-- also_buy: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- also_view: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- category: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- date: string (nullable = true)
 |-- description: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- details: struct (nullable = true)
 |    |-- 
    Item Weight: 
    : string (nullable = true)
 |    |-- 
    Package Dimensions: 
    : string (nullable = true)
 |    |-- 
    Product Dimensions: 
    : string (nullable = true)
 |    |-- ASIN:: string (nullable = true)
 |    |-- ASIN: : string (nullable = true)
 |    |-- Audio CD: string (nullable = true)
 |    |-- Audio Description:: string (nullable = true)
 |    |-- Blu-ray Audio: string (nullable = true)
 |    |-- DVD Audio: string (nullable = true)
 |    |-- Digital Co

In [68]:
@F.udf(returnType=T.FloatType())
def clean_price(x):
    price = x.replace("$", '').replace(',', '')
    return float(price) if price.replace('.', '').isdigit() else None

In [98]:
@F.udf(returnType=T.StringType())
def clean_main_cat(x):
    return None if x.startswith('<img') else x

In [124]:
@F.udf(returnType=T.StringType())
def clean_brand(x):
    return None if x == '' else x

In [429]:
@F.udf(returnType=T.StructType([T.StructField('rank_', T.IntegerType()), 
                                T.StructField('rank_cat', T.StringType())]))
def clean_rank(x):
    if x == '[]':
        return {'rank_': None, 'rank_cat': None}
    else:
        out = x.replace(',', '')
        out = re.search(r'\D*(\d{1,}) in ([^(]+)\s+\(?', out)
        if out is not None:
            out = out.groups()
            return {'rank_': int(out[0]), 'rank_cat': out[1].replace('&amp;', '&')}
        else:
            return {'rank_': out, 'rank_cat': None}

In [156]:
(raw_products_meta_df.select(col('date')).toPandas().date == '').mean()

0.999813511577005

In [174]:
tmp.loc[(~tmp.str.contains('Movies & TV')) & (~tmp.str.contains('Books'))].tolist()[:1000]

['2,550,757 in Beauty & Personal Care (',
 '63,129 in CDs & Vinyl (',
 '[]',
 '[]',
 '[]',
 '[]',
 '[">#1,208,228 in Toys & Games (See Top 100 in Toys & Games)"]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '525,695 in Sports & Outdoors (',
 '[]',
 '[]',
 '1,073,898 in CDs & Vinyl (',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '397,192 in Movies &amp; TV\n\t\t\t\t    \t\n\t\t\t\t\t',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '308,311 i

In [176]:
tmp.loc[(~tmp.str.contains('Movies & TV')) & (~tmp.str.contains('Books'))].tolist()[1000:2000]

['[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '1,662,634 in Sports & Outdoors (',
 '435,168 in Sports & Outdoors (',
 '[]',
 '[]',
 '[]',
 '[]',
 '1,754,377 in Clothing, Shoes & Jewelry (',
 '[]',
 '[]',
 '3,117,693 in Sports & Outdoors (',
 '6,180,731 in Sports & Outdoors (',
 '2,887,772 in Sports & Outdoors (',
 '6,284,323 in Sports & Outdoors (',
 '4,221,836 in Sports & Outdoors (',
 '6,240,049 in Sports & Outdoors (',
 '4,810,087 in Sports & Outdoors (',
 '2,278,680 in Sports & Outdoors (',
 '2,281,465 in Sports & Outdoors (',
 '3,446,718 in Sports & Outdoors (',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '116,219 in Sports & Outdoors (',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '51,222 in Sports & Outdoors (',
 '[]',
 '[]',
 '[]',
 '[

In [177]:
tmp.loc[(~tmp.str.contains('Movies & TV')) & (~tmp.str.contains('Books'))].tolist()[2000:]

['[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[">#1,652,053 in Home & Kitchen (See Top 100 in Home & Kitchen)"]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '382,483 in CDs & Vinyl (',
 '[]',
 '[]',
 '[]',
 '[]',
 '508,778 in Sports & Outdoors (',
 '[]',
 '5,113,240 in Clothing, Shoes & Jewelry (',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '261,772 in Health & Household (',
 '1,421,051 in Beauty & Personal Care (',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '532,426 in Sports & Outdoors (',
 '[]',
 '[]',
 '[]',
 '[]',
 '11,811,506 in Clothing, Shoes & Jewelry (',
 '[">#2,776,393 in Toys & Games (See Top 100 in Toys & Games)"]',
 '638,651 in CDs & Vinyl (',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[]',
 '[">#3,897,567 in Home & Kitchen (See top 100)"]',
 '1,288,180 in Health & Household (',
 '146,101 in Sports & Outdoors (',
 

In [453]:
product_df = raw_products_meta_df\
.withColumn('price', clean_price(col('price')))\
.withColumn('main_cat', clean_main_cat(col('main_cat')))\
.withColumn('brand', clean_brand(col('brand')))\
.withColumn('rank', clean_rank(col('rank')))\
.withColumn('rank_cat', col('rank').getItem('rank_cat'))\
.withColumn('rank_', col('rank').getItem('rank_'))\
.withColumn('item_weight', col('details').getItem('\n    Item Weight: \n    ').alias('item_weight'))\
.withColumn('package_dimensions', col('details').getItem('\n    Package Dimensions: \n    ').alias('package_dimensions'))\
.withColumn('product_dimensions', col('details').getItem('\n    Product Dimensions: \n    ').alias('product_dimensions'))\
.withColumn('asin1', col('details').getItem('ASIN:').alias('asin1'))\
.withColumn('asin2', col('details').getItem('ASIN: ').alias('asin2'))\
.withColumn('audio_cd', col('details').getItem('Audio CD').alias('audio_cd'))\
.withColumn('audio_description', col('details').getItem('Audio Description:').alias('audio_description'))\
.withColumn('blue_ray_audio', col('details').getItem('Blu-ray Audio').alias('blue_ray_audio'))\
.withColumn('dvd_audio', col('details').getItem('DVD Audio').alias('dvd_audio'))\
.withColumn('digital_copy_expiration_date', col('details').getItem('Digital Copy Expiration Date:').alias('digital_copy_expiration_date'))\
.withColumn('domestic_shipping', col('details').getItem('Domestic Shipping: ').alias('domestic_shipping'))\
.withColumn('dubbed', col('details').getItem('Dubbed:').alias('dubbed'))\
.withColumn('isbn10', col('details').getItem('ISBN-10:').alias('isbn10'))\
.withColumn('isbn13', col('details').getItem('ISBN-13:').alias('isbn13'))\
.withColumn('international_shipping', col('details').getItem('International Shipping: ').alias('international_shipping'))\
.withColumn('item_model_number', col('details').getItem('Item model number:').alias('item_model_number'))\
.withColumn('label', col('details').getItem('Label:').alias('label'))\
.withColumn('language', col('details').getItem('Language:').alias('language'))\
.withColumn('n_discs', col('details').getItem('Number of Discs:').alias('n_discs'))\
.withColumn('please_note', col('details').getItem('Please Note:').alias('please_note'))\
.withColumn('publisher', col('details').getItem('Publisher:').alias('publisher'))\
.withColumn('run_time', col('details').getItem('Run Time:').alias('run_time'))\
.withColumn('spars_code', col('details').getItem('SPARS Code:').alias('spars_code'))\
.withColumn('series', col('details').getItem('Series:').alias('series'))\
.withColumn('shipping_weight', col('details').getItem('Shipping Weight:').alias('shipping_weight'))\
.withColumn('subtitles', col('details').getItem('Subtitles:').alias('subtitles'))\
.withColumn('subtitles_hearing_impaired', col('details').getItem('Subtitles for the Hearing Impaired:').alias('subtitles_hearing_impaired'))\
.withColumn('upc', col('details').getItem('UPC:').alias('upc'))\
.select('asin', 'title', 'main_cat', 'price', 'brand', 'rank_', 'rank_cat', 'item_weight', 
        'package_dimensions', 'product_dimensions', 'asin1', 'asin2', 'audio_cd', 'audio_description', 
        'blue_ray_audio', 'dvd_audio', 'digital_copy_expiration_date', 'domestic_shipping', 'dubbed', 
        'isbn10', 'isbn13', 'international_shipping', 'item_model_number', 'label', 'language', 
        'n_discs', 'please_note', 'publisher', 'run_time', 'spars_code', 'series', 'shipping_weight', 
        'subtitles', 'subtitles_hearing_impaired', 'upc')

In [454]:
product_df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- title: string (nullable = true)
 |-- main_cat: string (nullable = true)
 |-- price: float (nullable = true)
 |-- brand: string (nullable = true)
 |-- rank_: integer (nullable = true)
 |-- rank_cat: string (nullable = true)
 |-- item_weight: string (nullable = true)
 |-- package_dimensions: string (nullable = true)
 |-- product_dimensions: string (nullable = true)
 |-- asin1: string (nullable = true)
 |-- asin2: string (nullable = true)
 |-- audio_cd: string (nullable = true)
 |-- audio_description: string (nullable = true)
 |-- blue_ray_audio: string (nullable = true)
 |-- dvd_audio: string (nullable = true)
 |-- digital_copy_expiration_date: string (nullable = true)
 |-- domestic_shipping: string (nullable = true)
 |-- dubbed: string (nullable = true)
 |-- isbn10: string (nullable = true)
 |-- isbn13: string (nullable = true)
 |-- international_shipping: string (nullable = true)
 |-- item_model_number: string (nullable = true)
 |-- label: 

In [455]:
product_df.write.jdbc(url=url, properties=properties, table='products', mode='overwrite')

                                                                                

In [457]:
also_buy_df = raw_products_meta_df.select(col('asin'), explode(col('also_buy')).alias('also_buy'))
also_view_df = raw_products_meta_df.select(col('asin'), explode(col('also_view')).alias('also_view'))
categories_df = raw_products_meta_df.select(col('asin'), explode(col('category')).alias('category'))
products_description_df = raw_products_meta_df.select(col('asin'), explode(col('description')).alias('description'))
products_feature_df = raw_products_meta_df.select(col('asin'), explode(col('feature')).alias('feature'))

In [458]:
also_buy_df.write.jdbc(url=url, properties=properties, table='also_buy', mode='overwrite')
also_view_df.write.jdbc(url=url, properties=properties, table='also_view', mode='overwrite')
categories_df.write.jdbc(url=url, properties=properties, table='categories', mode='overwrite')
products_description_df.write.jdbc(url=url, properties=properties, table='products_description', mode='overwrite')
products_feature_df.write.jdbc(url=url, properties=properties, table='products_feature', mode='overwrite')

                                                                                