In [None]:
## USE DOCKER

In [1]:
import os

def gen_pyspark_submit_args(packages, jars):
    submit_args = ""
    if len(packages):
        submit_args += ' --packages '
        submit_args += ",".join(packages)
    if len(jars):
        submit_args += ' --jars '
        submit_args += ",".join(jars)
    if submit_args == "":
        return None
    return submit_args + " pyspark-shell"

def pyspark_submit_args(packages, jars):
    args = gen_pyspark_submit_args(packages, jars)
    print(args)
    os.environ['PYSPARK_SUBMIT_ARGS'] = args

In [2]:
### Add java/python dependencies

jar_dir = "/home/jovyan/work/extra_jars"
packages = [
    "io.delta:delta-core_2.11:0.2.0"
]
jars = [
]
pyspark_submit_args(packages, jars)
!pip install -qr requirements.txt

 --packages io.delta:delta-core_2.11:0.2.0 pyspark-shell


In [3]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("WorldWineWeb") \
    .getOrCreate()

In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
#url ="https://s3.amazonaws.com/dataviz-curriculum/day_2/yelp_reviews.csv"
url = "final_wine_data_172k.csv"
spark.sparkContext.addFile(url)
#df = spark.read.csv(SparkFiles.get("yelp_reviews.csv"), sep=",", header=True)
df = spark.read.csv(SparkFiles.get("final_wine_data_172k.csv"), sep=",", header=True)
# Show DataFrame
df.show()

+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+------------------+---------------------+--------------------+------------------+-------------------+
| id|  country|         description|         designation|points|price|         province|           region_1|         region_2|       taster_name|taster_twitter_handle|               title|           variety|             winery|
+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+------------------+---------------------+--------------------+------------------+-------------------+
|  1| Portugal|This is ripe and ...|            Avidagos|    87| 15.0|            Douro|               null|             null|        Roger Voss|           @vossroger|Quinta dos Avidag...|    Portuguese Red|Quinta dos Avidagos|
|  2|       US|Tart and snappy, ...|                null|    87| 14.0|           Oregon|

In [5]:
from pyspark.sql.functions import length
# Create a length column to be used as a future feature 
data_df = df.withColumn('length', length(df['description']))
data_df.show()

+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+------------------+---------------------+--------------------+------------------+-------------------+------+
| id|  country|         description|         designation|points|price|         province|           region_1|         region_2|       taster_name|taster_twitter_handle|               title|           variety|             winery|length|
+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+------------------+---------------------+--------------------+------------------+-------------------+------+
|  1| Portugal|This is ripe and ...|            Avidagos|    87| 15.0|            Douro|               null|             null|        Roger Voss|           @vossroger|Quinta dos Avidag...|    Portuguese Red|Quinta dos Avidagos|   227|
|  2|       US|Tart and snappy, ...|                null|   

# Feature Transformations

* where we stop passing into our pre-processing steps
* we are trying to predict price or rating from wine description
* find the TF/IDF for all words in all descriptions
* watch out for stopwords and stems...do b4 matrix construction
* construct the matrix then use it as our features to the prediction of price or rating using a linear regression model
   
 

In [6]:
# remove null descriptions
data_df = data_df.filter("length>10")

In [7]:
# remove null prices
from pyspark.sql.functions import col
data_df = data_df.filter(col("price").isNotNull())

In [8]:
from pyspark.sql.types import FloatType
data_df = data_df.withColumn("price_value", col("price").cast(FloatType()))

In [9]:
data_df.count()

171056

In [10]:
#from wine reviews...below
# target          text                length
#-----------------------------------------------
# price      the review text       length of text
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
tokenizer = Tokenizer(inputCol="description", outputCol="token_description")
stopremove = StopWordsRemover(inputCol=tokenizer.getOutputCol(),outputCol='stop_tokens')
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol='hash_token')
idf = IDF(inputCol=hashingTF.getOutputCol(), outputCol='idf_token')

In [11]:
#turn everything into a matrix
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vector
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Create feature vectors
clean_up = VectorAssembler(inputCols=['idf_token', 'length'], outputCol='features')
lr = LinearRegression(featuresCol = 'features', labelCol='price_value', maxIter=10, regParam=0.3, elasticNetParam=0.8)
pipeline = Pipeline(stages=[tokenizer, stopremove, hashingTF, idf, clean_up, lr])

In [12]:
model = pipeline.fit(data_df)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35921)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35921)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-7b857ac0e5a0>", line 1, in <module>
    model = pipeline.fit(data_df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/pipeline.py", line 109, in _fit
    model = stage.fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/ja

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35921)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-7b857ac0e5a0>", line 1, in <module>
    model = pipeline.fit(data_df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/pipeline.py", line 109, in _fit
    model = stage.fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/ja

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35921)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-7b857ac0e5a0>", line 1, in <module>
    model = pipeline.fit(data_df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/pipeline.py", line 109, in _fit
    model = stage.fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/ja

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35921)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-7b857ac0e5a0>", line 1, in <module>
    model = pipeline.fit(data_df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/pipeline.py", line 109, in _fit
    model = stage.fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/ja

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35921)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-7b857ac0e5a0>", line 1, in <module>
    model = pipeline.fit(data_df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/pipeline.py", line 109, in _fit
    model = stage.fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/ja

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35921)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-7b857ac0e5a0>", line 1, in <module>
    model = pipeline.fit(data_df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/pipeline.py", line 109, in _fit
    model = stage.fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/ja

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35921)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-7b857ac0e5a0>", line 1, in <module>
    model = pipeline.fit(data_df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/pipeline.py", line 109, in _fit
    model = stage.fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/ja

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:35921)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/IPython/core/interactiveshell.py", line 3296, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-12-7b857ac0e5a0>", line 1, in <module>
    model = pipeline.fit(data_df)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/pipeline.py", line 109, in _fit
    model = stage.fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/base.py", line 132, in fit
    return self._fit(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 295, in _fit
    java_model = self._fit_java(dataset)
  File "/usr/local/spark/python/pyspark/ml/wrapper.py", line 292, in _fit_java
    return self._java_obj.fit(dataset._jdf)
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/ja

Py4JError: An error occurred while calling o58.fit

In [None]:
# Transform the model with the testing data
# features are the input to the model
# features produces the rawPRediciton
# from the rawPrediction it produces a Probability
# then there's a prediction
test_results = model.transform(testing)
test_results.show(5)

In [None]:
# Use the Class Evaluator for a cleaner description
# from pyspark.ml.evaluation import 

# acc_eval = MulticlassClassificationEvaluator()
# acc = acc_eval.evaluate(test_results)
# print("Accuracy of model at predicting reviews was: %f" % acc)