In [1]:
path = "gs://big_data_hw_zhl/project/20190928-items.csv"
allProductsdf = spark.read.format("csv").option("header", "true").load(path)

In [2]:
allProductsdf.show(100)

+----------+--------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+---------------+
|      asin|   brand|               title|                 url|               image|     rating|           reviewUrl|        totalReviews|         prices|
+----------+--------+--------------------+--------------------+--------------------+-----------+--------------------+--------------------+---------------+
|B0000SX2UC|   Nokia|Dual-Band / Tri-M...|https://www.amazo...|https://m.media-a...|          3|https://www.amazo...|                  14|           null|
|B0009N5L7K|Motorola| Motorola I265 phone|https://www.amazo...|https://m.media-a...|        2.9|https://www.amazo...|                   7|         $49.95|
|B000SKTZ0S|Motorola|MOTOROLA C168i AT...|https://www.amazo...|https://m.media-a...|        2.6|https://www.amazo...|                  22|           null|
|B00198M12M|   Nokia|Nokia 6500 Slide ...|https://www.amazo...|https:/

In [3]:
allProductsdf.count()

792

In [4]:
validProductsdf = allProductsdf.filter(allProductsdf.prices.isNotNull()).cache()

In [5]:
validProductsdf.show(100)

+----------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+
|      asin|   brand|               title|                 url|               image|              rating|           reviewUrl|        totalReviews|         prices|
+----------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+
|B0009N5L7K|Motorola| Motorola I265 phone|https://www.amazo...|https://m.media-a...|                 2.9|https://www.amazo...|                   7|         $49.95|
|B001DZY4KI|    Sony|Sony Ericsson G70...|https://www.amazo...|https://m.media-a...|                   2|https://www.amazo...|                   1|         $78.99|
|B0027VKQPE|   Nokia|Nokia New 1100 fo...|https://www.amazo...|https://m.media-a...|                 3.2|https://www.amazo...|                   8|         $99.99|
|B00280QJFU| Sam

In [6]:
# transforming prices column to priceList

from pyspark.sql.functions import udf
from pyspark.sql.types import *

import re

def prices2priceList(prices):
    if prices:
        x = re.findall("\$(\w+)", prices)
    else:
        x = ["invalid"]
    if len(x) == 0:
        x = ["invalid"]
    return int(x[0]) if x[0] != "invalid" else x[0]

udfValueToCategory = udf(prices2priceList, StringType())
resDf = validProductsdf.withColumn("priceNum", udfValueToCategory("prices"))

In [7]:
resDf.show(100)

+----------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------+
|      asin|   brand|               title|                 url|               image|              rating|           reviewUrl|        totalReviews|         prices|priceNum|
+----------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------+
|B0009N5L7K|Motorola| Motorola I265 phone|https://www.amazo...|https://m.media-a...|                 2.9|https://www.amazo...|                   7|         $49.95|      49|
|B001DZY4KI|    Sony|Sony Ericsson G70...|https://www.amazo...|https://m.media-a...|                   2|https://www.amazo...|                   1|         $78.99|      78|
|B0027VKQPE|   Nokia|Nokia New 1100 fo...|https://www.amazo...|https://m.media-a...|                 3.2|https://www.amazo...|         

In [8]:
# filter out data with invalid price column
resDf = resDf.filter(resDf.priceNum!="invalid")

In [9]:
resDf.count()

516

In [10]:
# transforming sting in totalReviews column to int totalReviewsNum col

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def str2int(x):
    return int(x)

def str2float(rating):
    return float(rating)

udfStr2Int = udf(str2int, FloatType())
udfStr2Float = udf(str2float, FloatType())
f1Df = resDf.withColumn("totalReviewsNum", udfStr2Float("totalReviews")) \
             .withColumn("ratingNum", udfStr2Float("rating")) \
             .withColumn("priceNum", udfStr2Float("priceNum"))

In [11]:
f1Df.show(100)

+----------+--------+--------------------+--------------------+--------------------+------+--------------------+------------+---------------+--------+---------------+---------+
|      asin|   brand|               title|                 url|               image|rating|           reviewUrl|totalReviews|         prices|priceNum|totalReviewsNum|ratingNum|
+----------+--------+--------------------+--------------------+--------------------+------+--------------------+------------+---------------+--------+---------------+---------+
|B0009N5L7K|Motorola| Motorola I265 phone|https://www.amazo...|https://m.media-a...|   2.9|https://www.amazo...|           7|         $49.95|    49.0|            7.0|      2.9|
|B001DZY4KI|    Sony|Sony Ericsson G70...|https://www.amazo...|https://m.media-a...|     2|https://www.amazo...|           1|         $78.99|    78.0|            1.0|      2.0|
|B0027VKQPE|   Nokia|Nokia New 1100 fo...|https://www.amazo...|https://m.media-a...|   3.2|https://www.amazo...|   

In [12]:
f1Df.printSchema()

root
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- image: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- reviewUrl: string (nullable = true)
 |-- totalReviews: string (nullable = true)
 |-- prices: string (nullable = true)
 |-- priceNum: float (nullable = true)
 |-- totalReviewsNum: float (nullable = true)
 |-- ratingNum: float (nullable = true)



In [32]:
resDf = f1Df.select(["asin", "brand", "title", "url", "image", "priceNum", "totalReviewsNum", "ratingNum"])
# resDf = f1Df.select(["asin", "url", "image", "priceNum", "totalReviewsNum", "ratingNum"])

In [33]:
resDf.printSchema()

root
 |-- asin: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- title: string (nullable = true)
 |-- url: string (nullable = true)
 |-- image: string (nullable = true)
 |-- priceNum: float (nullable = true)
 |-- totalReviewsNum: float (nullable = true)
 |-- ratingNum: float (nullable = true)



In [34]:
resDf.show()

+----------+--------+--------------------+--------------------+--------------------+--------+---------------+---------+
|      asin|   brand|               title|                 url|               image|priceNum|totalReviewsNum|ratingNum|
+----------+--------+--------------------+--------------------+--------------------+--------+---------------+---------+
|B0009N5L7K|Motorola| Motorola I265 phone|https://www.amazo...|https://m.media-a...|    49.0|            7.0|      2.9|
|B001DZY4KI|    Sony|Sony Ericsson G70...|https://www.amazo...|https://m.media-a...|    78.0|            1.0|      2.0|
|B0027VKQPE|   Nokia|Nokia New 1100 fo...|https://www.amazo...|https://m.media-a...|    99.0|            8.0|      3.2|
|B00280QJFU| Samsung|Samsung T301G Pre...|https://www.amazo...|https://m.media-a...|    59.0|          133.0|      3.5|
|B0029X7UHC|Motorola|Motorola I205 cel...|https://www.amazo...|https://m.media-a...|    99.0|            2.0|      2.9|
|B002AS9WEA| Samsung|Samsung a167 Prep..

In [35]:
resDf.write \
    .format('csv') \
    .options(delimiter='\t') \
    .option("header", "true") \
    .save('gs://big_data_hw_zhl/project/F1DF')

In [None]:
# for function 3

In [1]:
path = "gs://big_data_hw_zhl/project/semtimentalRes"
senDf = spark.read.format("csv").option("header", "true").options(delimiter='\t').load(path)

In [2]:
path = "gs://big_data_hw_zhl/project/F1DF"
f1Df = spark.read.format("csv").option("header", "true").options(delimiter='\t').load(path)

In [3]:
senDf.count()

29562

In [4]:
from pyspark.sql.functions import col
senDf = senDf.filter(col("sentimental_score").isNotNull()).filter(col("sentimental_magnitude").isNotNull()).cache()

In [5]:
senDf.count()

26930

In [6]:
senDf.printSchema()

root
 |-- asin: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- allReview: string (nullable = true)
 |-- sentimental_score: string (nullable = true)
 |-- sentimental_magnitude: string (nullable = true)



In [7]:
# transforming sting in some columns to number type 

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def str2int(x):
    return int(x)

def str2float(x):
    return float(x)

udfStr2Int = udf(str2int, IntegerType())
udfStr2Float = udf(str2float, FloatType())
senNumDf = senDf.withColumn("rating", udfStr2Int("rating")) \
             .withColumn("sentimental_score", udfStr2Float("sentimental_score")) \
             .withColumn("sentimental_magnitude", udfStr2Float("sentimental_magnitude"))

In [9]:
senNumDf.filter(senNumDf.sentimental_score<=0).filter(senNumDf.rating==5).show(20, False)

+----------+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------+---------------------+
|asin      |rating|allReview                                                                                          

In [8]:
senNumDf.printSchema()

root
 |-- asin: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- allReview: string (nullable = true)
 |-- sentimental_score: float (nullable = true)
 |-- sentimental_magnitude: float (nullable = true)



In [9]:
senNumDf.show()

+----------+------+--------------------+-----------------+---------------------+
|      asin|rating|           allReview|sentimental_score|sentimental_magnitude|
+----------+------+--------------------+-----------------+---------------------+
|B07HK4JNV1|     5|Smart choice One ...|              0.5|                  1.0|
|B07HK4JNV1|     5|I recomended to b...|              0.9|                  0.9|
|B07HK4JNV1|     5|excelente recomen...|              0.9|                  0.9|
|B07HK4JNV1|     5|Love it Obsessed ...|              0.9|                  0.9|
|B07HK4JNV1|     5|Phone is a good o...|              0.5|                  1.6|
|B07HK4JNV1|     5|        Good Good 1;|              0.8|                  0.8|
|B07HK4JNV1|     5|Love it Takes muc...|              0.5|                  2.5|
|B07HK4JNV1|     5|Great phone for t...|              0.4|                  3.1|
|B07HK4JNV1|     5|Great battery lif...|              0.4|                  3.7|
|B07HK4JNV1|     5|Great buy

In [10]:
import pyspark.sql.functions as func

groupDf = senNumDf.groupBy("asin").agg(
     func.mean("sentimental_score").alias("avg_sentimental_score"), 
     func.mean("sentimental_magnitude").alias("avg_sentimental_magnitude"),
     func.count(func.lit(1)).alias("reviewNum")
   )

In [25]:
groupDf.distinct().count()

658

In [14]:
senNumDf.filter(senNumDf.asin=="B01NB1IGR8").count()

40

In [17]:
f1Df.show()

+----------+--------+--------------------+--------------------+--------------------+--------+---------------+---------+
|      asin|   brand|               title|                 url|               image|priceNum|totalReviewsNum|ratingNum|
+----------+--------+--------------------+--------------------+--------------------+--------+---------------+---------+
|B0009N5L7K|Motorola| Motorola I265 phone|https://www.amazo...|https://m.media-a...|    49.0|            7.0|      2.9|
|B001DZY4KI|    Sony|Sony Ericsson G70...|https://www.amazo...|https://m.media-a...|    78.0|            1.0|      2.0|
|B0027VKQPE|   Nokia|Nokia New 1100 fo...|https://www.amazo...|https://m.media-a...|    99.0|            8.0|      3.2|
|B00280QJFU| Samsung|Samsung T301G Pre...|https://www.amazo...|https://m.media-a...|    59.0|          133.0|      3.5|
|B0029X7UHC|Motorola|Motorola I205 cel...|https://www.amazo...|https://m.media-a...|    99.0|            2.0|      2.9|
|B002AS9WEA| Samsung|Samsung a167 Prep..

In [22]:
from pyspark.sql.functions import col

inner_join = groupDf.alias('groupDf').join(f1Df.alias('f1Df'), groupDf.asin == f1Df.asin, 'inner').select([col("f1Df.asin"), col("f1Df.brand"), col("f1Df.title"), col("f1Df.url"), col("f1Df.image"), col("f1Df.priceNum"), col("f1Df.ratingNum"),
    col("groupDf.avg_sentimental_score"), col("groupDf.avg_sentimental_magnitude"), col("groupDf.reviewNum")]).cache()

In [28]:
inner_join.filter(inner_join.reviewNum > 10).select("brand").distinct().show()

+--------+
|   brand|
+--------+
|   Nokia|
|    Sony|
|Motorola|
|  Xiaomi|
| Samsung|
|  Google|
| OnePlus|
|    ASUS|
|   Apple|
|  HUAWEI|
+--------+



In [29]:
inner_join.coalesce(1).write \
    .format('csv') \
    .options(delimiter='\t') \
    .option("header", "true") \
    .save('gs://big_data_hw_zhl/project/projInnerJoin')

In [50]:
senNumDf.coalesce(1).write \
    .format('csv') \
    .options(delimiter='\t') \
    .option("header", "true") \
    .save('gs://big_data_hw_zhl/project/F3DF')

In [26]:
path = "gs://big_data_hw_zhl/project/F3DF"
senDf = spark.read.format("csv").option("header", "true").options(delimiter='\t').load(path)

In [27]:
senDf.show()

+----------+------+--------------------+-----------------+---------------------+
|      asin|rating|           allReview|sentimental_score|sentimental_magnitude|
+----------+------+--------------------+-----------------+---------------------+
|B07HK4JNV1|     5|Smart choice One ...|              0.5|                  1.0|
|B07HK4JNV1|     5|I recomended to b...|              0.9|                  0.9|
|B07HK4JNV1|     5|excelente recomen...|              0.9|                  0.9|
|B07HK4JNV1|     5|Love it Obsessed ...|              0.9|                  0.9|
|B07HK4JNV1|     5|Phone is a good o...|              0.5|                  1.6|
|B07HK4JNV1|     5|        Good Good 1;|              0.8|                  0.8|
|B07HK4JNV1|     5|Love it Takes muc...|              0.5|                  2.5|
|B07HK4JNV1|     5|Great phone for t...|              0.4|                  3.1|
|B07HK4JNV1|     5|Great battery lif...|              0.4|                  3.7|
|B07HK4JNV1|     5|Great buy

In [28]:
senDf.printSchema()

root
 |-- asin: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- allReview: string (nullable = true)
 |-- sentimental_score: string (nullable = true)
 |-- sentimental_magnitude: string (nullable = true)



In [29]:
path = "gs://big_data_hw_zhl/project/F1DF"
f1Df = spark.read.format("csv").option("header", "true").options(delimiter='\t').load(path)

In [30]:
f1Df.show()

+----------+--------+--------------------+--------------------+--------------------+--------+---------------+---------+
|      asin|   brand|               title|                 url|               image|priceNum|totalReviewsNum|ratingNum|
+----------+--------+--------------------+--------------------+--------------------+--------+---------------+---------+
|B0009N5L7K|Motorola| Motorola I265 phone|https://www.amazo...|https://m.media-a...|    49.0|            7.0|      2.9|
|B001DZY4KI|    Sony|Sony Ericsson G70...|https://www.amazo...|https://m.media-a...|    78.0|            1.0|      2.0|
|B0027VKQPE|   Nokia|Nokia New 1100 fo...|https://www.amazo...|https://m.media-a...|    99.0|            8.0|      3.2|
|B00280QJFU| Samsung|Samsung T301G Pre...|https://www.amazo...|https://m.media-a...|    59.0|          133.0|      3.5|
|B0029X7UHC|Motorola|Motorola I205 cel...|https://www.amazo...|https://m.media-a...|    99.0|            2.0|      2.9|
|B002AS9WEA| Samsung|Samsung a167 Prep..

In [31]:
from pyspark.sql.functions import col

joinedSenDf = senDf.alias('s').join(f1Df.alias('f'), senDf.asin == f1Df.asin, 'left').select([col("s.asin"), col("s.rating"), col("s.allReview"),
    col("s.sentimental_score"), col("s.sentimental_magnitude"), col("f.title"), col("f.brand"), col("f.url"), col("f.image"),
    col("f.priceNum"), col("f.totalReviewsNum"), col("f.ratingNum")]).cache()

In [32]:
joinedSenDf.show()

+----------+------+--------------------+-----------------+---------------------+--------------------+------+--------------------+--------------------+--------+---------------+---------+
|      asin|rating|           allReview|sentimental_score|sentimental_magnitude|               title| brand|                 url|               image|priceNum|totalReviewsNum|ratingNum|
+----------+------+--------------------+-----------------+---------------------+--------------------+------+--------------------+--------------------+--------+---------------+---------+
|B07HK4JNV1|     5|Smart choice One ...|              0.5|                  1.0|"Xiaomi Redmi Not...|Xiaomi|https://www.amazo...|https://m.media-a...|   172.0|          393.0|      4.3|
|B07HK4JNV1|     5|I recomended to b...|              0.9|                  0.9|"Xiaomi Redmi Not...|Xiaomi|https://www.amazo...|https://m.media-a...|   172.0|          393.0|      4.3|
|B07HK4JNV1|     5|excelente recomen...|              0.9|            

In [33]:
joinedSenDf.filter(joinedSenDf.title.isNotNull()).select("asin").distinct().count()

433

In [34]:
joinedSenDf.coalesce(1).write \
    .format('csv') \
    .options(delimiter='\t') \
    .option("header", "true") \
    .save('gs://big_data_hw_zhl/project/F1AllJoinDf')

In [None]:
bq load \
--source_format CSV --quote "" \
--field_delimiter "\t" \
--max_bad_records 10 \
--autodetect \
-E UTF-8   \
amazonPhoneReview.F1 \
gs://big_data_hw_zhl/project/F1DF/part-00000-ce7a9476-c9a5-40eb-8896-e2c1a3da88e2-c000.csv

In [None]:
bq load \
--source_format CSV --quote "" \
--field_delimiter "\t" \
--max_bad_records 10 \
--autodetect \
-E UTF-8   \
amazonPhoneReview.F3 \
gs://big_data_hw_zhl/project/F3DF/part-00000-2b7e2d5c-1d4b-40ec-94f0-145af3d7a9cb-c000.csv

In [None]:
bq load \
--project_id rich-city-252820 \
--source_format CSV --quote "" \
--field_delimiter "\t" \
--max_bad_records 10 \
--autodetect \
-E UTF-8   \
amazonPhoneReview.F1joinF3 \
gs://big_data_hw_zhl/project/projInnerJoin/part-00000-efaf930c-3cee-420f-9d27-4652750da0c4-c000.csv

In [None]:
bq load \
--source_format CSV --quote "" \
--field_delimiter "\t" \
--max_bad_records 10 \
--autodetect \
-E UTF-8   \
amazonPhoneReview.F1joinF3 \
gs://big_data_hw_zhl/project/F1JoinDf/part-00000-af77f2f2-b348-4159-bd5f-f04688787ecc-c000.csv

In [None]:
bq load \
--project_id rich-city-252820 \
--source_format CSV --quote "" \
--field_delimiter "\t" \
--max_bad_records 10 \
--autodetect \
-E UTF-8   \
amazonPhoneReview.F1joinF3_2 \
gs://big_data_hw_zhl/project/F1AllJoinDf/part-00000-742b7cfd-9f81-46b9-a7e3-87f5404c2f8b-c000.csv