In [None]:
nums = list(range(0, 1000001))

# parallelize
nums_rdd = sc.parallelize(nums) # this distributes the data and returns an RDD to nums_rdd

# collect
nums_rdd.collect() 

# take
nums_rdd.take(5)

# map
squared_nums_rdd = nums_rdd.map(lambda x: x ** 2)
pairs = nums_rdd.map(lambda x: (x, len(str(x))))
pairs.take(25) # will show us that this worked

# filter
even_pairs = pairs.filter(lambda x: (x[1] % 2) == 0)
even_pairs.take(25)

# groupByKey
swapped_pairs = even_pairs.map(lambda x: (x[1], x[0])) # make the number of digits the key for easier grouping
grouped = swapped_pairs.groupByKey()
grouped = grouped.map(lambda x: (x[0], list(x[1]))) # makes the output look nicer
grouped.take(25)
averaged = grouped.map(lambda x: (x[0], sum(x[1]) / len(x[1])))
averaged.collect() # can collect this since we only have a few

# sql vs pyspark

# example 1
sql = """
SELECT product_id
FROM tbl_books
WHERE verified_purchase = 'Y'
AND review_date BETWEEN '2020-01-01' AND '2020-12-31'
"""

tbl_books.filter("verified_purchase = 'Y' AND review_date BETWEEN '2020-01-01' AND '2020-12-31'").select("product_id")

# example 2
sql2 = """
SELECT product_id,
COUNT(star_rating) AS total_rating,
MAX(star_rating) AS best_rating,
MIN(star_rating) AS worst_rating
FROM tbl_books
WHERE verified_purchase = 'Y'
AND review_date BETWEEN '2020-01-01' AND '2020-12-31'
GROUP BY product_id
ORDER BY total_rating DESC, product_id ASC, best_rating
LIMIT 100
"""

tbl_books.filter("verified_purchase = 'Y' AND review_date BETWEEN '2020-01-01' AND '2020-12-31'").groupBy("product_id").agg(count(col("star_rating")).alias('total_rating'), max(col("star_rating")).alias('best_rating'), min(col("star_rating")).alias('worst_rating')).select("product_id", "total_rating", "best_rating", "worst_rating").orderBy(col("total_rating").desc(), col("product_id").asc(), col("best_rating").asc()).limit(100)

