In [137]:
import time

In [140]:
from pyspark import SparkContext, SparkConf

conf = SparkConf().set('spark.executor.memory', '16g')\
                    .set('spark.driver.memory', '16g')\
                    .setAppName("beer_analysis")\
                    .setMaster('local[*]')
sc = SparkContext(conf=conf)
sc

In [141]:
elapsed_times_list = []
start_time_overall = time.time()

In [142]:
# STEP 1
start_time1 = time.time()

rdd = sc.textFile('beer_reviews.csv')
header = rdd.first()
rdd = rdd.filter(lambda line: line != header)
split_rdd = rdd.map(lambda x: x.split(','))

end_time1 = time.time()
elapsed_time1 = end_time1 - start_time1
elapsed_times_list.append(elapsed_time1)
print(f"Elapsed time: {elapsed_time1} seconds")

Elapsed time: 0.6035358905792236 seconds


In [143]:
# STEP 2
start_time2 = time.time()

review_overall_rdd = split_rdd.map(lambda x: (float(x[4]), x))
mapped_rdd = review_overall_rdd.filter(lambda x: x[0] == 5)
mapped_rdd = mapped_rdd.map(lambda x: x[1])

end_time2 = time.time()
elapsed_time2 = end_time2 - start_time2
elapsed_times_list.append(elapsed_time2)
print(f"Elapsed time: {elapsed_time2} seconds")


mapped_rdd.collect()

Elapsed time: 0.0 seconds


[['11',
  '1075',
  'Caldera Brewing Company',
  '1283154365',
  '5.0',
  '5.0',
  '4.0',
  'MadeInOregon',
  'Herbed / Spiced Beer',
  '4.0',
  '4.0',
  'Caldera Ginger Beer',
  '4.7',
  '52159'],
 ['25',
  '1075',
  'Caldera Brewing Company',
  '1318802642',
  '5.0',
  '5.0',
  '3.5',
  'optimator13',
  'Rauchbier',
  '3.5',
  '5.0',
  'Rauch Ür Bock',
  '7.4',
  '58046'],
 ['31',
  '1075',
  'Caldera Brewing Company',
  '1315014054',
  '5.0',
  '4.5',
  '4.0',
  'Bung',
  'Rauchbier',
  '4.0',
  '5.0',
  'Rauch Ür Bock',
  '7.4',
  '58046'],
 ['42',
  '1075',
  'Caldera Brewing Company',
  '1305926255',
  '5.0',
  '4.5',
  '3.5',
  'Deuane',
  'Rauchbier',
  '4.0',
  '5.0',
  'Rauch Ür Bock',
  '7.4',
  '58046'],
 ['73',
  '1075',
  'Caldera Brewing Company',
  '1246913078',
  '5.0',
  '4.0',
  '4.5',
  'GratefulBeerGuy',
  'American Pale Ale (APA)',
  '4.5',
  '4.0',
  'Caldera Pale Ale',
  '5.5',
  '25414'],
 ['87',
  '1075',
  'Caldera Brewing Company',
  '1235940573',
  '5.0',
 

In [144]:
# STEP 3
start_time3 = time.time()

processed_rdd = split_rdd.map(lambda x: (x[2], (float(x[4]), x[11])))
top_reviews_rdd = processed_rdd.reduceByKey(lambda x,y: x if x[0] > y[0] else y)

end_time3 = time.time()
elapsed_time3 = end_time3 - start_time3
elapsed_times_list.append(elapsed_time3)
print(f"Elapsed time: {elapsed_time3} seconds")

top_reviews_rdd.collect()

Elapsed time: 0.013422489166259766 seconds


[('Pacific Coast Brewing Company', (4.5, 'Killer Whale Stout')),
 ('Asmara Breweries', (4.0, 'Asmara Lager Beer')),
 ('Bernard Family Brewery', (5.0, 'Bernard Sváte&#269;ní Le\x9eák')),
 ('Otro Mundo Brewing Company', (4.5, 'Otro Mundo Strong Red Ale')),
 ("Emmett's Brewing Company", (5.0, 'Dopplebock')),
 ('Becker Brewing Company', (4.0, 'Rooster Red')),
 ('Birrificio Indipendente Elav', (3.5, 'Raggea Stout')),
 ('Stakhanovitz', (4.5, 'Bogemia')),
 ('Richmodis-Bräu', (4.0, 'Richmodis Koelsch')),
 ('Privatbrauerei Hösl', (4.5, 'Helles Weissbier Resi')),
 ('Spanish Springs Brewing Company', (5.0, 'Spanish Springs Porter')),
 ('Archers Brewing & Wholesale Limited', (4.0, 'Village Bitter')),
 ('Lido Atputas Centrs', (4.0, 'LIDO Speci&#257;lais')),
 ('Sternquell Brauerei Plauen', (4.0, 'Sternquell Bockbier')),
 ('Cerveceros Krut S.L.', (4.0, 'Krut Gold')),
 ('Riverside Brewing Company', (3.5, 'Riverside Curleys Irish Style Stout')),
 ('Speciaalbierbrouwerij Oijen', (4.0, 'Eikelbier')),
 ('

In [145]:
# STEP 4
start_time4 = time.time()

processed_rdd = split_rdd.map(lambda x: (x[2], (float(x[4]),1)))

summed_ratings_rdd = processed_rdd.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))

avg_reviews_rdd = summed_ratings_rdd.mapValues(lambda x: round(x[0]/x[1], 3))

end_time4 = time.time()
elapsed_time4 = end_time4 - start_time4
elapsed_times_list.append(elapsed_time4)
print(f"Elapsed time: {elapsed_time4} seconds")

avg_reviews_rdd.collect()

Elapsed time: 0.012362241744995117 seconds


[('Pacific Coast Brewing Company', 3.432),
 ('Asmara Breweries', 4.0),
 ('Bernard Family Brewery', 3.813),
 ('Otro Mundo Brewing Company', 3.227),
 ("Emmett's Brewing Company", 3.85),
 ('Becker Brewing Company', 3.75),
 ('Birrificio Indipendente Elav', 3.0),
 ('Stakhanovitz', 2.583),
 ('Richmodis-Bräu', 4.0),
 ('Privatbrauerei Hösl', 4.167),
 ('Spanish Springs Brewing Company', 3.542),
 ('Archers Brewing & Wholesale Limited', 3.357),
 ('Lido Atputas Centrs', 3.75),
 ('Sternquell Brauerei Plauen', 3.333),
 ('Cerveceros Krut S.L.', 4.0),
 ('Riverside Brewing Company', 2.75),
 ('Speciaalbierbrouwerij Oijen', 3.417),
 ('Benny Brewing Company', 4.5),
 ('Ancient Lakes Brewing Company', 3.5),
 ('Klosterbrauerei Weltenburg', 3.918),
 ('Mountaineer Brewing Co.', 3.616),
 ('Island Cafe & Brew Pub', 3.0),
 ("Gentle Ben's Brewing Company", 3.767),
 ('Ufford Ales Ltd.', 3.0),
 ('Streets Of New York / Uptown Brewery', 3.4),
 ('Harar Beer Factory', 3.407),
 ('Mornington Peninsula Brewery', 3.883),
 (

In [146]:
# STEP 5
start_time5 = time.time()

processed_rdd = split_rdd.map(lambda x: (x[8], 1))
counts_rdd = processed_rdd.reduceByKey(lambda x,y: x+y)

end_time5 = time.time()
elapsed_time5 = end_time5 - start_time5
elapsed_times_list.append(elapsed_time5)
print(f"Elapsed time: {elapsed_time5} seconds")

counts_rdd.collect()

Elapsed time: 0.00940847396850586 seconds


[('Rauchbier', 3758),
 ('Belgian Strong Dark Ale', 36869),
 ('American Strong Ale', 29570),
 ('Märzen / Oktoberfest', 20951),
 ('Altbier', 7207),
 ('Scotch Ale / Wee Heavy', 16369),
 ('English India Pale Ale (IPA)', 14722),
 ('English Pale Mild Ale', 460),
 ('Lambic - Fruit', 10178),
 ('Eisbock', 2645),
 ('Faro', 608),
 ('Bière de Champagne / Bière Brut', 1041),
 ('Russian Imperial Stout', 50997),
 ('American Amber / Red Ale', 40292),
 ('Euro Pale Lager', 16824),
 ('English Bitter', 8082),
 ('Euro Strong Lager', 2637),
 ('Schwarzbier', 9343),
 ('Munich Dunkel Lager', 7033),
 ('Vienna Lager', 7570),
 ('Witbier', 27479),
 ('Kölsch', 7540),
 ('Smoked Beer', 2804),
 ('Dunkelweizen', 6700),
 ('Winter Warmer', 19103),
 ('American Double / Imperial Pilsner', 5356),
 ('Quadrupel (Quad)', 17701),
 ('Kvass', 229),
 ('Roggenbier', 395),
 ('Rye Beer', 9604),
 ('English Barleywine', 13368),
 ('Belgian Strong Pale Ale', 31289),
 ('American Brown Ale', 24075),
 ('English Stout', 2736),
 ('Braggot', 9

In [147]:
# STEP 6
start_time6 = time.time()

processed_rdd = split_rdd.map(lambda x: (x[8], (float(x[5]), 1)))

sum_per_style_rdd = processed_rdd.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
# key(sum(review_aroma, count)
sum_per_style_rdd = sum_per_style_rdd.mapValues(lambda x: (x[0], x[1], (x[0]/x[1])))
# key(sum(aroma), count, mean)

squared_diffs = processed_rdd.join(sum_per_style_rdd).mapValues(lambda x: ((x[0][0] - x[1][2]) ** 2, 1))
sum_squared_diffs = squared_diffs.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))
variance_per_style = sum_squared_diffs.mapValues(lambda x: round(x[0]/x[1], 3))

end_time6 = time.time()
elapsed_time6 = end_time6 - start_time6
elapsed_times_list.append(elapsed_time6)
print(f"Elapsed time: {elapsed_time6} seconds")

variance_per_style.collect()

Elapsed time: 0.02527451515197754 seconds


[('Rauchbier', 0.408),
 ('American Strong Ale', 0.353),
 ('Altbier', 0.337),
 ('Scotch Ale / Wee Heavy', 0.304),
 ('Eisbock', 0.28),
 ('Faro', 0.381),
 ('American Amber / Red Ale', 0.382),
 ('Euro Pale Lager', 0.485),
 ('English Bitter', 0.358),
 ('Euro Strong Lager', 0.568),
 ('Schwarzbier', 0.316),
 ('Munich Dunkel Lager', 0.362),
 ('Vienna Lager', 0.349),
 ('Witbier', 0.38),
 ('Smoked Beer', 0.334),
 ('Dunkelweizen', 0.34),
 ('Winter Warmer', 0.357),
 ('Rye Beer', 0.316),
 ('English Barleywine', 0.341),
 ('Belgian Strong Pale Ale', 0.31),
 ('English Stout', 0.318),
 ('Sahti', 0.415),
 ('German Pilsener', 0.473),
 ('Herbed / Spiced Beer', 0.495),
 ('Oatmeal Stout', 0.292),
 ('American IPA', 0.343),
 ('Belgian Pale Ale', 0.361),
 ('Low Alcohol Beer', 0.71),
 ('Extra Special / Strong Bitter (ESB)', 0.304),
 ('Bock', 0.414),
 ('Scottish Ale', 0.32),
 ('California Common / Steam Beer', 0.3),
 ('Czech Pilsener', 0.395),
 ('English Dark Mild Ale', 0.339),
 ('English Strong Ale', 0.314),
 (

In [148]:
end_time_overall = time.time()
elapsed_time_visualisation = end_time_overall - start_time_overall
elapsed_time_no_visualisation = sum(elapsed_times_list)
print(f"Elapsed time: {elapsed_time_no_visualisation} seconds without visualising data")
print(f"Elapsed time: {elapsed_time_visualisation} seconds with visualising data")

Elapsed time: 0.6640036106109619 seconds without visualising data
Elapsed time: 48.12921333312988 seconds with visualising data


In [149]:
sc.stop()