In [None]:
import io
import urllib
import gzip

# this is not the "ideal" way to load data in. Usually, you would "mount" the S3 bucket, or Google Cloud Storage to Databricks. However, that requires
# IAM (Identity Access Management) roles
TARBALL_FULL_URL = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Camera_v1_00.tsv.gz"
response = urllib.request.urlopen(TARBALL_FULL_URL)
compressed_file = io.BytesIO(response.read())
decompressed_file = gzip.GzipFile(fileobj=compressed_file)
camera_reviews = [line.decode("UTF-8") for line in decompressed_file.readlines()]

# subset the data so we only get the first quarter million
camera_reviews = camera_reviews[:250000]

# write it to our cluster's distributed file system
dbutils.fs.put("/mnt/blob/cameras.tsv", "".join(camera_reviews), True)

In [None]:
# just get 10,000 random reviews to develop with
cameras = spark.sql("select * from cameras_tsv limit 10000")

In [None]:
from pyspark.sql import functions as F

# find the average rating of different camera products
selected_columns = ["product_title", "star_rating"]
cameras.select(*selected_columns).groupBy("product_title").agg(F.avg("star_rating"), F.count("star_rating")).sort(F.desc("count(star_rating)")).show()

In [None]:
# or, if you prefer sql, you can just write it in SQL
spark.sql('''
        SELECT product_title, AVG(star_rating) AS avg_rating, COUNT(star_rating) AS count 
        FROM cameras_tsv 
        GROUP BY product_title
        ORDER BY count DESC
''').show(5)

In [None]:
# filter only for reviews that start with I
expr = "^I\s.*"
cameras.filter(cameras["review_body"].rlike(expr)).select("review_body").show()

In [None]:
# Generate a very basic word count!
from operator import add
mapped = cameras.select('review_body').rdd.flatMap(lambda review: review.review_body.split(' ') if review.review_body else ['']).map(lambda word: (word, 1))
mapped.reduceByKey(lambda x,y: x + y).sortBy(lambda result: -result[1]).take(100)