In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import regexp_replace,col,lower
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *       # for datatype conversion
from pyspark.sql.functions import *   # for col() function
from pyspark.ml.feature import StopWordsRemover, Tokenizer
from pyspark.mllib.feature import HashingTF, IDF

import os
import re
import string


spark = SparkSession \
    .builder \
    .master("local") \
    .appName("Exploratory_Analysis") \
    .config("spark.executor.memory", '8g') \
    .config("spark.executor.cores", '4') \
    .config('spark.cores.max', '4') \
    .config('spark.driver.memory', '8g') \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
beers = spark.read.format('csv'). \
    option("header", "true"). \
    option("inferSchema", "true"). \
    load("C:/Users/Eri/Documents/PSTAT 135/beers.csv")

In [4]:
breweries = spark.read.format('csv'). \
    option("header", "true"). \
    option("inferSchema", "true"). \
    load("C:/Users/Eri/Documents/PSTAT 135/breweries.csv")

In [5]:
reviews = spark.read.format('csv'). \
    option("header", "true"). \
    option("inferSchema", "true"). \
    load("C:/Users/Eri/Documents/PSTAT 135/reviews.csv")

In [6]:
beers.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- brewery_id: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- style: string (nullable = true)
 |-- availability: string (nullable = true)
 |-- abv: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- retired: string (nullable = true)



In [7]:
breweries.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- types: string (nullable = true)



In [8]:
reviews.printSchema()

root
 |-- beer_id: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- look: string (nullable = true)
 |-- smell: string (nullable = true)
 |-- taste: string (nullable = true)
 |-- feel: string (nullable = true)
 |-- overall: string (nullable = true)
 |-- score: string (nullable = true)



In [9]:
beers.show(5)

+------+--------------------+----------+-----+-------+--------------------+------------+----+--------------------+-------+
|    id|                name|brewery_id|state|country|               style|availability| abv|               notes|retired|
+------+--------------------+----------+-----+-------+--------------------+------------+----+--------------------+-------+
|202522|      Olde Cogitator|      2199|   CA|     US|English Oatmeal S...|    Rotating| 7.3|No notes at this ...|      f|
| 82352|Konrads Stout Rus...|     18604| null|     NO|Russian Imperial ...|    Rotating|10.4|No notes at this ...|      f|
|214879|      Scottish Right|     44306|   IN|     US|        Scottish Ale|  Year-round|   4|No notes at this ...|      t|
|320009|MegaMeow Imperial...|      4378|   WA|     US|American Imperial...|      Winter| 8.7|Every time this year|      f|
|246438|     Peaches-N-Cream|     44617|   PA|     US|  American Cream Ale|    Rotating| 5.1|No notes at this ...|      f|
+------+--------

In [10]:
beers.count()

358873

In [11]:
beers.groupBy('style').count().sort('count', ascending = False).show(20)

+--------------------+-----+
|               style|count|
+--------------------+-----+
|        American IPA|44719|
|American Pale Ale...|22159|
|American Imperial...|18336|
|      Belgian Saison|18166|
|   American Wild Ale|12972|
|American Imperial...|11180|
|     American Porter|10168|
|American Amber / ...| 9748|
|      American Stout| 9103|
|Fruit and Field Beer| 7729|
| American Blonde Ale| 7089|
|  American Brown Ale| 7008|
|   German Hefeweizen| 6019|
|     Belgian Witbier| 5613|
|American Pale Whe...| 5266|
|     Berliner Weisse| 5036|
|      German Pilsner| 4748|
|    Belgian Pale Ale| 4523|
|Russian Imperial ...| 4426|
|English Sweet / M...| 4192|
+--------------------+-----+
only showing top 20 rows



In [12]:
breweries.show(5)

+-----+--------------------+--------------+-----+-------+--------------------+--------------------+
|   id|                name|          city|state|country|               notes|               types|
+-----+--------------------+--------------+-----+-------+--------------------+--------------------+
|19730|     Brouwerij Danny|     Erpe-Mere| null|     BE|No notes at this ...|             Brewery|
|32541|Coachella Valley ...|Thousand Palms|   CA|     US|No notes at this ...|Brewery, Bar, Bee...|
|44736|    Beef 'O' Brady's|    Plant City|   FL|     US|No notes at this ...|         Bar, Eatery|
|23372|Broadway Wine Mer...| Oklahoma City|   OK|     US|No notes at this ...|               Store|
|35328|Brighton Beer Dis...|      Brighton|  GB2|     GB|Duplicate of http...|         Bar, Eatery|
+-----+--------------------+--------------+-----+-------+--------------------+--------------------+
only showing top 5 rows



In [13]:
reviews.show(5)

+-------+---------------+-------------------+--------------------+--------------------+--------------------+------+--------------------+-----------------+------------------+
|beer_id|       username|               date|                text|                look|               smell| taste|                feel|          overall|             score|
+-------+---------------+-------------------+--------------------+--------------------+--------------------+------+--------------------+-----------------+------------------+
| 271781|   bluejacket74|2017-03-17 00:00:00|   750 ml bottle,...|                   4|                   4|     4|                4.25|                4|              4.03|
| 125646|        _dirty_|2017-12-21 00:00:00|                    |                 4.5|                 4.5|   4.5|                 4.5|              4.5|               4.5|
| 125646|        CJDUBYA|2017-12-21 00:00:00|                    |                4.75|                4.75|  4.75|               

In [14]:
(reviews.filter(reviews['text'] != '\xa0\xa0')).count()

2987993

In [15]:
non_empty_reviews = reviews.filter(reviews['text'] != '\xa0\xa0')

In [16]:
non_empty_reviews.show(5)

+-------+---------------+-------------------+--------------------+--------------------+--------------------+------+--------------------+-----------------+------------------+
|beer_id|       username|               date|                text|                look|               smell| taste|                feel|          overall|             score|
+-------+---------------+-------------------+--------------------+--------------------+--------------------+------+--------------------+-----------------+------------------+
| 271781|   bluejacket74|2017-03-17 00:00:00|   750 ml bottle,...|                   4|                   4|     4|                4.25|                4|              4.03|
| 125646|GratefulBeerGuy|2017-12-20 00:00:00|"   0% 16 oz can....| bloomin' like a ...| totally unfilter...| thick| all-white clumps...| mellon and mango| grainy earthiness|
| 125646|       LukeGude|2017-12-20 00:00:00|   Classic TH NEI...|                4.25|                 4.5|  4.25|               

In [17]:
#dtype = datatype 
non_empty_reviews.dtypes

[('beer_id', 'int'),
 ('username', 'string'),
 ('date', 'timestamp'),
 ('text', 'string'),
 ('look', 'string'),
 ('smell', 'string'),
 ('taste', 'string'),
 ('feel', 'string'),
 ('overall', 'string'),
 ('score', 'string')]

In [18]:
non_empty_reviews.describe().show()

+-------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|          beer_id|            username|                text|                look|               smell|               taste|                feel|             overall|               score|
+-------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|          2987993|             2984209|             2987993|             2831961|             2830771|             2829717|             2828842|             2828088|             2983385|
|   mean|63296.20292818624|1.8038932242394958E9|                null|  3.9394156210280866|  3.8445426409163534|   3.870134391442983|   3.835104948841218|   3.864983629690377|  3.8468598629637483|
| stddev|76771.34267

In [111]:
non_empty_reviews.filter(F.col("look").cast("int").isNotNull() == False).show(5)

+-------+---------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|beer_id|       username|               date|                text|                look|               smell|               taste|                feel|             overall|               score|
+-------+---------------+-------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 125646|GratefulBeerGuy|2017-12-20 00:00:00|"   0% 16 oz can....| bloomin' like a ...| totally unfilter...|               thick| all-white clumps...|    mellon and mango|   grainy earthiness|
| 206623|   rodbeermunch|2016-01-27 00:00:00|"   Dark brown po...| whisps away quic...| possibly the bes...| good irish malt ...| bourbon and oak ...| relatively easy ...| good bourbon del...|
|  96331|       dirtylou|2013-07-09

In [112]:
non_empty_reviews.filter(F.col("look").cast("int").isNotNull() == False).count()

372068

In [113]:
372068/2987993

0.12452104138128837

In [114]:
non_empty_reviews.groupBy('beer_id').count().sort('count', ascending = False).show(10)

+-------+-----+
|beer_id|count|
+-------+-----+
|    645| 4364|
|  11757| 4300|
|   2093| 4252|
|   7971| 4155|
|   1093| 4054|
|    412| 4001|
|  17112| 3905|
|    695| 3786|
|  19960| 3738|
|   1904| 3675|
+-------+-----+
only showing top 10 rows



In [115]:
non_empty_reviews.agg(F.countDistinct("beer_id")).show()

+-----------------------+
|count(DISTINCT beer_id)|
+-----------------------+
|                 210311|
+-----------------------+



In [19]:
beerStyles = beers.select("id","style")

In [20]:
beerStyles.show(5)

+------+--------------------+
|    id|               style|
+------+--------------------+
|202522|English Oatmeal S...|
| 82352|Russian Imperial ...|
|214879|        Scottish Ale|
|320009|American Imperial...|
|246438|  American Cream Ale|
+------+--------------------+
only showing top 5 rows



In [21]:
beerStyles = beerStyles.withColumnRenamed('id', 'beer_id')

In [22]:
beerStyles.show(5)

+-------+--------------------+
|beer_id|               style|
+-------+--------------------+
| 202522|English Oatmeal S...|
|  82352|Russian Imperial ...|
| 214879|        Scottish Ale|
| 320009|American Imperial...|
| 246438|  American Cream Ale|
+-------+--------------------+
only showing top 5 rows



In [23]:
#joint dataframe with review + beer style 
test = non_empty_reviews.join(beerStyles, "beer_id")

In [24]:
test.show(5)

+-------+---------------+-------------------+--------------------+--------------------+--------------------+------+--------------------+-----------------+------------------+--------------------+
|beer_id|       username|               date|                text|                look|               smell| taste|                feel|          overall|             score|               style|
+-------+---------------+-------------------+--------------------+--------------------+--------------------+------+--------------------+-----------------+------------------+--------------------+
| 271781|   bluejacket74|2017-03-17 00:00:00|   750 ml bottle,...|                   4|                   4|     4|                4.25|                4|              4.03|American Imperial...|
| 125646|GratefulBeerGuy|2017-12-20 00:00:00|"   0% 16 oz can....| bloomin' like a ...| totally unfilter...| thick| all-white clumps...| mellon and mango| grainy earthiness|     New England IPA|
| 125646|       LukeGude|

In [27]:
test.count()

2987925

In [28]:
test.groupBy('style').count().sort('count', ascending = False).show(20)

+--------------------+------+
|               style| count|
+--------------------+------+
|        American IPA|301774|
|American Imperial...|212697|
|American Imperial...|150160|
|American Pale Ale...|126489|
|      Belgian Saison| 91000|
|Russian Imperial ...| 86117|
|     American Porter| 71189|
|   American Wild Ale| 63393|
|American Amber / ...| 62818|
|Fruit and Field Beer| 58342|
|Belgian Strong Da...| 53097|
|     Belgian Witbier| 46545|
|Belgian Strong Pa...| 45732|
|      Belgian Tripel| 45686|
|  American Brown Ale| 44774|
| American Strong Ale| 43575|
|   German Hefeweizen| 42930|
|      American Stout| 41879|
| American Barleywine| 40873|
|American Adjunct ...| 39404|
+--------------------+------+
only showing top 20 rows



In [29]:
test.groupBy(['beer_id', 'style']).count().sort('count', ascending = False).show(20)

+-------+--------------------+-----+
|beer_id|               style|count|
+-------+--------------------+-----+
|    645|Belgian Quadrupel...| 4364|
|  11757|American Imperial...| 4300|
|   2093|American Imperial...| 4252|
|   7971|American Imperial...| 4155|
|   1093|        American IPA| 4054|
|    412|Russian Imperial ...| 4001|
|  17112|American Imperial...| 3905|
|    695|Belgian Strong Pa...| 3786|
|  19960|American Imperial...| 3738|
|   1904|        American IPA| 3675|
|  10672|American Imperial...| 3422|
|    276|American Pale Ale...| 3312|
|     88|        American IPA| 3304|
|     92| American Strong Ale| 3280|
|  30420|        American IPA| 3234|
|   4083|American Imperial...| 3196|
|   2671| American Barleywine| 3175|
|     34|      Belgian Tripel| 3160|
|  16814|     New England IPA| 3081|
|   1708|Belgian Quadrupel...| 3043|
+-------+--------------------+-----+
only showing top 20 rows



In [30]:
test.select('beer_id').distinct().count()

210294

In [126]:
test.groupBy('beer_id').agg(F.concat_ws('; ', F.collect_list('text'))).show(20)

+-------+---------------------------------+
|beer_id|concat_ws(; , collect_list(text))|
+-------+---------------------------------+
|    148|                This one was s...|
|    463|                22oz : tulip C...|
|    471|                Pours a clear,...|
|    496|                Presentation: ...|
|    833|                Out of the sum...|
|   1088|                Midnight Sun O...|
|   1238|                From a six-pac...|
|   1580|                This beer is a...|
|   1591|                Very good wint...|
|   1645|                Poured into sn...|
|   1959|                Okay beer. Not...|
|   2122|                Muddy and thic...|
|   2142|                Had on tap. Ni...|
|   2659|                This beer pour...|
|   2866|                Fair head for ...|
|   3175|             "   This was serv...|
|   3794|                Tan head settl...|
|   3918|                12oz bottle po...|
|   3997|                Pours a clear ...|
|   4519|                Clear b

In [25]:
combinedReviews = test.groupBy('beer_id').agg(F.concat_ws(' ', F.collect_list('text')))

In [26]:
countStyles = combinedReviews.join(beerStyles, "beer_id")

In [27]:
countStyles.show(1)

+-------+--------------------------------+--------------------+
|beer_id|concat_ws( , collect_list(text))|               style|
+-------+--------------------------------+--------------------+
|    148|               This one was s...|American Amber / ...|
+-------+--------------------------------+--------------------+
only showing top 1 row



In [34]:
countStyles.groupBy('style').count().sort('count', ascending = False).show(10, truncate = False)

+------------------------+-----+
|style                   |count|
+------------------------+-----+
|American IPA            |24380|
|American Pale Ale (APA) |12216|
|American Imperial IPA   |11517|
|Belgian Saison          |9744 |
|American Wild Ale       |7390 |
|American Imperial Stout |7016 |
|American Porter         |5889 |
|American Amber / Red Ale|5573 |
|American Stout          |4782 |
|Fruit and Field Beer    |4471 |
+------------------------+-----+
only showing top 10 rows



In [28]:
styleTargets = ['American IPA', 'American Pale Ale (APA)', 'American Imperial IPA', 'Belgian Saison']

In [133]:
test.filter(test['style'].isin(styleTargets)).count()

731960

In [29]:
mainStyles = test.filter(test['style'].isin(styleTargets))

In [30]:
mainStyles.printSchema()

root
 |-- beer_id: integer (nullable = true)
 |-- username: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- look: string (nullable = true)
 |-- smell: string (nullable = true)
 |-- taste: string (nullable = true)
 |-- feel: string (nullable = true)
 |-- overall: string (nullable = true)
 |-- score: string (nullable = true)
 |-- style: string (nullable = true)



In [142]:
mainStyles.filter(F.col("look").cast("int").isNotNull() == True)\
            .filter(F.col("smell").cast("int").isNotNull() == True)\
            .filter(F.col("taste").cast("int").isNotNull() == True)\
            .filter(F.col("feel").cast("int").isNotNull() == True)\
            .filter(F.col("overall").cast("int").isNotNull() == True)\
            .filter(F.col("score").cast("int").isNotNull() == True)\
            .count()

641356

In [31]:
#creating model_df from mainStyles (has top 4 styles)
model_df = mainStyles.filter(F.col("look").cast("int").isNotNull() == True)\
            .filter(F.col("smell").cast("int").isNotNull() == True)\
            .filter(F.col("taste").cast("int").isNotNull() == True)\
            .filter(F.col("feel").cast("int").isNotNull() == True)\
            .filter(F.col("overall").cast("int").isNotNull() == True)\
            .filter(F.col("score").cast("int").isNotNull() == True)

In [32]:
model_df = model_df.drop("username", "date")

In [33]:
model_df.printSchema()

root
 |-- beer_id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- look: string (nullable = true)
 |-- smell: string (nullable = true)
 |-- taste: string (nullable = true)
 |-- feel: string (nullable = true)
 |-- overall: string (nullable = true)
 |-- score: string (nullable = true)
 |-- style: string (nullable = true)



In [41]:
model_df.show(5)

+-------+--------------------+----+-----+-----+----+-------+-----+------------+
|beer_id|                text|look|smell|taste|feel|overall|score|       style|
+-------+--------------------+----+-----+-----+----+-------+-----+------------+
| 150672|   Beautiful, cry...|4.75|    4| 4.25|4.25|   4.25| 4.22|American IPA|
| 150672|   Poured a bit l...|3.75| 3.75| 3.75|3.75|   3.75| 3.75|American IPA|
| 150672|   355ml can. Bri...|4.25|    4|    4|4.25|      4| 4.04|American IPA|
| 150672|   Quite balanced...|4.25|  4.5| 4.25| 4.5|   4.25| 4.34|American IPA|
| 150672|   Can: Poured a ...|3.75| 3.75| 3.75|3.75|   3.75| 3.75|American IPA|
+-------+--------------------+----+-----+-----+----+-------+-----+------------+
only showing top 5 rows



In [42]:
model_df.groupBy('style').count().sort('count', ascending = False).show(20)

+--------------------+------+
|               style| count|
+--------------------+------+
|        American IPA|264697|
|American Imperial...|188767|
|American Pale Ale...|109490|
|      Belgian Saison| 78402|
+--------------------+------+



In [43]:
model_df.groupBy(['beer_id', 'style']).count().sort('count', ascending = False).show(20)

+-------+--------------------+-----+
|beer_id|               style|count|
+-------+--------------------+-----+
|   2093|American Imperial...| 3808|
|   7971|American Imperial...| 3706|
|   1093|        American IPA| 3680|
|  17112|American Imperial...| 3525|
|   1904|        American IPA| 3321|
|    276|American Pale Ale...| 3000|
|     88|        American IPA| 2971|
|  30420|        American IPA| 2876|
|   4083|American Imperial...| 2793|
|  29619|        American IPA| 2665|
|   1005|        American IPA| 2416|
|   2751|        American IPA| 2264|
|   9086|American Imperial...| 2260|
|  35738|American Imperial...| 2116|
|    141|      Belgian Saison| 2085|
|   5441|        American IPA| 2036|
|   6108|        American IPA| 1989|
|   3158|        American IPA| 1941|
|     39|American Pale Ale...| 1890|
|   6518|American Pale Ale...| 1879|
+-------+--------------------+-----+
only showing top 20 rows



In [34]:
model_df = model_df.drop('beer_id')

In [35]:
model_df.printSchema()

root
 |-- text: string (nullable = true)
 |-- look: string (nullable = true)
 |-- smell: string (nullable = true)
 |-- taste: string (nullable = true)
 |-- feel: string (nullable = true)
 |-- overall: string (nullable = true)
 |-- score: string (nullable = true)
 |-- style: string (nullable = true)



In [36]:
model_df = model_df.withColumn('look', model_df['look'].cast("float"))\
        .withColumn('smell', model_df['smell'].cast("float"))\
        .withColumn('feel', model_df['feel'].cast("float"))\
        .withColumn('taste', model_df['taste'].cast("float"))\
        .withColumn('overall', model_df['overall'].cast("float"))\
        .withColumn('score', model_df['score'].cast("float"))\

In [37]:
model_df.printSchema()

root
 |-- text: string (nullable = true)
 |-- look: float (nullable = true)
 |-- smell: float (nullable = true)
 |-- taste: float (nullable = true)
 |-- feel: float (nullable = true)
 |-- overall: float (nullable = true)
 |-- score: float (nullable = true)
 |-- style: string (nullable = true)



In [48]:
#explore data by aaron 
model_df.groupBy('style')\
        .agg(F.mean('look'), F.mean('smell'), F.mean('taste'), F.mean('feel'), F.mean('overall'),
            F.mean('score')).show(truncate = True)

+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|               style|         avg(look)|        avg(smell)|        avg(taste)|         avg(feel)|      avg(overall)|        avg(score)|
+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|        American IPA| 3.986425988960963|3.9190130602160207|3.9318579356773973|3.9012531309383935|3.9446924219012685| 3.932736264468442|
|American Imperial...| 4.107345298701574|4.0981487760042805|4.1012994856092435| 4.058349711549159|4.0595800643120885| 4.089419074101148|
|American Pale Ale...|3.8744497214357474|3.7821011051237554|3.8132181021097815|3.7911087770572656|3.8725111882363685|3.8200986432859327|
|      Belgian Saison|3.9922546618708705|3.9455913114461367|3.9475109053340476|3.9248201576490396| 3.955256243463177|3.9501903062389685|
+--------------------+------------------+

In [53]:
model_df.printSchema()

root
 |-- text: string (nullable = true)
 |-- look: float (nullable = true)
 |-- smell: float (nullable = true)
 |-- taste: float (nullable = true)
 |-- feel: float (nullable = true)
 |-- overall: float (nullable = true)
 |-- score: float (nullable = true)
 |-- style: string (nullable = true)



In [87]:
#clean data
data1 = model_df.select('text', 'style')
#for multiple regex expressions use OR |
data1 = data1.withColumn('text', regexp_replace(col('text'), "\\.|\xa0|!|,|:", ""))\
             .withColumn('text',lower(col('text')))

In [88]:
data1.show(10)

+--------------------+------------+
|                text|       style|
+--------------------+------------+
| beautiful crysta...|American IPA|
| poured a bit liv...|American IPA|
| 355ml can bright...|American IPA|
| quite balanced a...|American IPA|
| can poured a cle...|American IPA|
| can bought onlin...|American IPA|
| passing through ...|American IPA|
| yes please i've ...|American IPA|
| a well put toget...|American IPA|
| 355ml can the la...|American IPA|
+--------------------+------------+
only showing top 10 rows



In [89]:
#get rid of white space and separate each word 
data1 = data1.withColumn('text', split(data1['text'], ' '))\
            .withColumn('text', array_remove('text', ''))

In [86]:
#data1 = data1.withColumn('text', array_remove('text', ''))

In [90]:
#remove stop words 
remover = StopWordsRemover(inputCol="text", outputCol="filtered")
data2 = remover.transform(data1)
data2 = data2.select('filtered', 'style')

In [91]:
data2.show(10)

+--------------------+------------+
|            filtered|       style|
+--------------------+------------+
|[beautiful, cryst...|American IPA|
|[poured, bit, liv...|American IPA|
|[355ml, bright, c...|American IPA|
|[quite, balanced,...|American IPA|
|[poured, clear, a...|American IPA|
|[bought, online, ...|American IPA|
|[passing, swift, ...|American IPA|
|[yes, please, hop...|American IPA|
|[well, put, toget...|American IPA|
|[355ml, latest, n...|American IPA|
+--------------------+------------+
only showing top 10 rows



In [105]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
hashtf = HashingTF(numFeatures=2**16, inputCol="filtered", outputCol="rawFeatures")

In [106]:
featurizedData = hashtf.transform(data2)

In [107]:
idf = IDF(inputCol="rawFeatures", outputCol="features")

In [108]:
idfModel = idf.fit(featurizedData)

In [109]:
rescaledData = idfModel.transform(featurizedData)

In [114]:
rescaledData.select("filtered","features").show(5, truncate = False)

+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
# https://www.tutorialkart.com/apache-spark/spark-mllib-tf-idf/

In [92]:
#TF-IDF 
text = data2.select('filtered')
textrdd = text.rdd
#hashingTF = HashingTF()
#tf = hashingTF.transform(textrdd)

In [94]:
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=2**16) #not sure what numFeatures is 
featurizedData = hashingTF.transform(data2)

TypeError: __init__() got an unexpected keyword argument 'inputCol'

In [65]:
# While applying HashingTF only needs a single pass to the data, applying IDF needs two passes:
# First to compute the IDF vector and second to scale the term frequencies by IDF.
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

Py4JJavaError: An error occurred while calling o377.fitIDF.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 41.0 failed 1 times, most recent failure: Lost task 0.0 in stage 41.0 (TID 325, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 35 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
	at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1161)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:54)
	at org.apache.spark.mllib.feature.IDF.fit(IDF.scala:67)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.fitIDF(PythonMLLibAPI.scala:669)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 35 more


In [57]:
htf = HashingTF(100)

In [58]:
doc = "a a b b c d".split(" ")

In [60]:
type(doc)

list

In [59]:
htf.transform(doc)

SparseVector(100, {8: 1.0, 15: 2.0, 61: 2.0, 69: 1.0})

In [66]:
grouped_df = data2.groupby('style').agg(F.collect_list('filtered').alias("text"))

In [67]:
grouped_df.show(4)

+--------------------+--------------------+
|               style|                text|
+--------------------+--------------------+
|        American IPA|[[Beautiful, crys...|
|American Imperial...|[[Drank, 6/6/18, ...|
|American Pale Ale...|[[16oz, PKG, 5/8/...|
|      Belgian Saison|[[0%], [tap, CBC,...|
+--------------------+--------------------+



In [69]:
#make into rdd 
dat = grouped_df.rdd.map(lambda x : ''.join(x))

In [72]:
dat.take(1)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 96.0 failed 1 times, most recent failure: Lost task 0.0 in stage 96.0 (TID 2392, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 14 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 14 more


In [153]:
model_df.toPandas().to_csv('/home/aaron/BigData135/135-project/model_df.csv', header = True)