In [58]:
import os
import sys

SPARK_HOME = "/usr/hdp/current/spark2-client"
PYSPARK_PYTHON = "/opt/conda/envs/dsenv/bin/python"
os.environ["PYSPARK_PYTHON"]= PYSPARK_PYTHON
os.environ["SPARK_HOME"] = SPARK_HOME

PYSPARK_HOME = os.path.join(SPARK_HOME, "python/lib")
sys.path.insert(0, os.path.join(PYSPARK_HOME, "py4j-0.10.7-src.zip"))
sys.path.insert(0, os.path.join(PYSPARK_HOME, "pyspark.zip"))

In [59]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()
conf.set("spark.ui.port", 4123) # подставьте случайное пятизначное число

spark = SparkSession.builder.config(conf=conf).appName("Spark ML Intro").getOrCreate()

In [60]:
spark

In [5]:
path = '/datasets/amazon/all_reviews_5_core_train_small.json'
dataset = spark.read.json(path)

In [6]:
dataset.printSchema()

root
 |-- asin: string (nullable = true)
 |-- id: long (nullable = true)
 |-- image: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- overall: double (nullable = true)
 |-- reviewText: string (nullable = true)
 |-- reviewTime: string (nullable = true)
 |-- reviewerID: string (nullable = true)
 |-- reviewerName: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- unixReviewTime: long (nullable = true)
 |-- verified: boolean (nullable = true)
 |-- vote: string (nullable = true)



In [7]:
dataset.select(dataset['overall']).distinct().show()

+-------+
|overall|
+-------+
|    1.0|
|    4.0|
|    3.0|
|    2.0|
|    5.0|
+-------+



In [8]:
target = dataset.select(dataset.overall)

In [9]:
target.groupBy('overall').count().show()

+-------+------+
|overall| count|
+-------+------+
|    1.0| 89431|
|    4.0|160880|
|    3.0| 80170|
|    2.0| 52944|
|    5.0|650617|
+-------+------+



In [10]:
dataset.count()

1034042

In [11]:
dataset.select('reviewerID').distinct().count()

956712

In [12]:
X = dataset.select('reviewText')

In [13]:
X.show()

+--------------------+
|          reviewText|
+--------------------+
|quick shipping, g...|
|Most delicious Ever!|
|This item was eas...|
|good brand, good ...|
|Piece of junk. At...|
|I order xl. The o...|
|The case definite...|
|Liked most that m...|
|goodI think it's ...|
|I apparently don'...|
|WARM AND COMFORTA...|
|somehow this hat ...|
|I was worried abo...|
|The Best of the B...|
|The bag is small,...|
|This is a very br...|
|Great shirt! Very...|
|I'm a size 0- ord...|
|Perfect fit. Woul...|
|I ordered these a...|
+--------------------+
only showing top 20 rows



In [14]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF

In [15]:
tokenizer = Tokenizer(inputCol='reviewText', outputCol='reviewWords')

dataset = tokenizer.transform(dataset)

dataset.show(1, truncate=True, vertical=True)

In [21]:
stop_words_remover = StopWordsRemover(inputCol='reviewWords', outputCol='reviewWordsWithoutTrash')

In [22]:
dataset = stop_words_remover.transform(dataset)

In [24]:
dataset.show(2, truncate=True, vertical=True)

-RECORD 0---------------------------------------
 asin                    | B00005MDZ8           
 id                      | 6500                 
 image                   | null                 
 overall                 | 5.0                  
 reviewText              | quick shipping, g... 
 reviewTime              | 10 23, 2014          
 reviewerID              | AEZ4DZCUL021H        
 reviewerName            | Stephen              
 summary                 | great product        
 unixReviewTime          | 1414022400           
 verified                | true                 
 vote                    | null                 
 reviewWords             | [quick, shipping,... 
 reviewWordsWithoutTrash | [quick, shipping,... 
-RECORD 1---------------------------------------
 asin                    | B000DZE0XK           
 id                      | 42580                
 image                   | null                 
 overall                 | 5.0                  
 reviewText         

In [25]:
hashing = HashingTF(numFeatures=300, binary=True, inputCol='reviewWordsWithoutTrash', outputCol="word_vector")

In [26]:
dataset = hashing.transform(dataset)

In [28]:
dataset.show(2, truncate=True, vertical=True)

-RECORD 0---------------------------------------
 asin                    | B00005MDZ8           
 id                      | 6500                 
 image                   | null                 
 overall                 | 5.0                  
 reviewText              | quick shipping, g... 
 reviewTime              | 10 23, 2014          
 reviewerID              | AEZ4DZCUL021H        
 reviewerName            | Stephen              
 summary                 | great product        
 unixReviewTime          | 1414022400           
 verified                | true                 
 vote                    | null                 
 reviewWords             | [quick, shipping,... 
 reviewWordsWithoutTrash | [quick, shipping,... 
 word_vector             | (300,[1,55,184,26... 
-RECORD 1---------------------------------------
 asin                    | B000DZE0XK           
 id                      | 42580                
 image                   | null                 
 overall            

In [29]:
feature = dataset.select('word_vector')

In [30]:
feature.show(2)

+--------------------+
|         word_vector|
+--------------------+
|(300,[1,55,184,26...|
|(300,[14,165],[1....|
+--------------------+
only showing top 2 rows



In [43]:
from pyspark.ml.regression import LinearRegression, GBTRegressor
from pyspark.ml import Pipeline
from pyspark.ml.tuning import TrainValidationSplit

In [35]:
train, test = dataset.randomSplit([0.9, 0.1], seed=12345)

In [36]:
model = LinearRegression(featuresCol='word_vector', labelCol='overall')

In [37]:
model = model.fit(train)

In [42]:
model.transform(test)\
    .select('overall', 'prediction')\
    .show(10, truncate=True, vertical=True)

-RECORD 0------------------------
 overall    | 5.0                
 prediction | 4.9501105076740135 
-RECORD 1------------------------
 overall    | 5.0                
 prediction | 3.7980904669585294 
-RECORD 2------------------------
 overall    | 5.0                
 prediction | 3.6607454898977707 
-RECORD 3------------------------
 overall    | 5.0                
 prediction | 4.541608295651211  
-RECORD 4------------------------
 overall    | 4.0                
 prediction | 4.597739674896557  
-RECORD 5------------------------
 overall    | 5.0                
 prediction | 4.466037850805992  
-RECORD 6------------------------
 overall    | 5.0                
 prediction | 2.3104162628407563 
-RECORD 7------------------------
 overall    | 5.0                
 prediction | 5.205687406845654  
-RECORD 8------------------------
 overall    | 5.0                
 prediction | 4.241129982037104  
-RECORD 9------------------------
 overall    | 3.0                
 prediction | 

In [44]:
gbt_model = GBTRegressor(featuresCol='word_vector', labelCol='overall')

In [46]:
gbt_model = gbt_model.fit(train)

In [47]:
gbt_model.transform(test)\
    .select('overall', 'prediction')\
    .show(10, truncate=True, vertical=True)

-RECORD 0------------------------
 overall    | 5.0                
 prediction | 4.5194348302073735 
-RECORD 1------------------------
 overall    | 5.0                
 prediction | 3.949040996121991  
-RECORD 2------------------------
 overall    | 5.0                
 prediction | 3.5161389016311353 
-RECORD 3------------------------
 overall    | 5.0                
 prediction | 4.772838551671828  
-RECORD 4------------------------
 overall    | 4.0                
 prediction | 4.130910738147576  
-RECORD 5------------------------
 overall    | 5.0                
 prediction | 4.526417922022976  
-RECORD 6------------------------
 overall    | 5.0                
 prediction | 2.661097654673532  
-RECORD 7------------------------
 overall    | 5.0                
 prediction | 4.580194023083057  
-RECORD 8------------------------
 overall    | 5.0                
 prediction | 4.277918902654275  
-RECORD 9------------------------
 overall    | 3.0                
 prediction | 

In [48]:
from pyspark.ml.evaluation import RegressionEvaluator

In [49]:
# Load and parse the data file, converting it to a DataFrame.
path = '/datasets/amazon/all_reviews_5_core_train_small.json'
data = spark.read.json(path)
data = data.select('reviewText', 'overall')

# Split the data into training and test sets (30% held out for testing)
train, test = data.randomSplit([0.7, 0.3])

tokenizer = Tokenizer(inputCol='reviewText', outputCol='reviewWords')
stop_words_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='reviewWordsWithoutTrash')
hashing = HashingTF(numFeatures=300, binary=True, inputCol=stop_words_remover.getOutputCol(), outputCol="word_vector")
lr = LinearRegression(featuresCol=hashing.getOutputCol(), labelCol='overall')


pipeline = Pipeline(stages=[tokenizer, stop_words_remover, hashing, lr])

# Train model.  This also runs the indexer.
model = pipeline.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.select('overall', 'prediction').show(5, truncate=True, vertical=True)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="overall", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

-RECORD 0------------------------
 overall    | 5.0                
 prediction | 4.191688679090648  
-RECORD 1------------------------
 overall    | 5.0                
 prediction | 4.642542898602038  
-RECORD 2------------------------
 overall    | 5.0                
 prediction | 4.373698456892848  
-RECORD 3------------------------
 overall    | 4.0                
 prediction | 3.1519344908035647 
-RECORD 4------------------------
 overall    | 5.0                
 prediction | 2.9546876016495984 
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 1.17204


In [50]:
path = '/datasets/amazon/all_reviews_5_core_train.json'
data = spark.read.json(path)
data = data.select('reviewText', 'overall')

# Split the data into training and test sets (30% held out for testing)
train, test = data.randomSplit([0.7, 0.3])

tokenizer = Tokenizer(inputCol='reviewText', outputCol='reviewWords')
stop_words_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='reviewWordsWithoutTrash')
hashing = HashingTF(numFeatures=300, binary=True, inputCol=stop_words_remover.getOutputCol(), outputCol="word_vector")
lr = LinearRegression(featuresCol=hashing.getOutputCol(), labelCol='overall')


pipeline = Pipeline(stages=[tokenizer, stop_words_remover, hashing, lr])

# Train model.  This also runs the indexer.
model = pipeline.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.select('overall', 'prediction').show(5, truncate=True, vertical=True)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="overall", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

-RECORD 0------------------------
 overall    | 5.0                
 prediction | 4.252797332720861  
-RECORD 1------------------------
 overall    | 5.0                
 prediction | 4.477268602769726  
-RECORD 2------------------------
 overall    | 5.0                
 prediction | 3.8387137538392184 
-RECORD 3------------------------
 overall    | 5.0                
 prediction | 5.270086480898681  
-RECORD 4------------------------
 overall    | 5.0                
 prediction | 4.126752815293975  
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 1.17276


In [51]:
from pyspark.ml.feature import CountVectorizer

In [55]:
# Load and parse the data file, converting it to a DataFrame.
path = '/datasets/amazon/all_reviews_5_core_train_small.json'
data = spark.read.json(path)
data = data.select('reviewText', 'overall')

# Split the data into training and test sets (30% held out for testing)
train, test = data.randomSplit([0.7, 0.3])

tokenizer = Tokenizer(inputCol='reviewText', outputCol='reviewWords')
stop_words_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol='reviewWordsWithoutTrash')
vectorizer = CountVectorizer(inputCol=stop_words_remover.getOutputCol(), outputCol="word_vector", minDF=150)
lr = LinearRegression(featuresCol=hashing.getOutputCol(), labelCol='overall')


pipeline = Pipeline(stages=[tokenizer, stop_words_remover, vectorizer, lr])

# Train model.  This also runs the indexer.
model = pipeline.fit(train)

# Make predictions.
predictions = model.transform(test)

# Select example rows to display.
predictions.select('overall', 'prediction').show(5, truncate=True, vertical=True)

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="overall", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

-RECORD 0-----------------------
 overall    | 5.0               
 prediction | 4.052440714707367 
-RECORD 1-----------------------
 overall    | 5.0               
 prediction | 4.077742239037248 
-RECORD 2-----------------------
 overall    | 4.0               
 prediction | 4.805718073446657 
-RECORD 3-----------------------
 overall    | 5.0               
 prediction | 4.69261489758261  
-RECORD 4-----------------------
 overall    | 5.0               
 prediction | 4.28826739601142  
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data = 1.00306


In [61]:
path = '/datasets/amazon/all_reviews_5_core_train.json'
data = spark.read.json(path)

In [64]:
data.show(2, truncate=True, vertical=True)

-RECORD 0------------------------------
 asin           | 0783225911           
 id             | 234                  
 image          | null                 
 overall        | 5.0                  
 reviewText     | great                
 reviewTime     | 03 4, 2017           
 reviewerID     | A2U0QAHUCW6ZGZ       
 reviewerName   | Debop                
 summary        | Five Stars           
 unixReviewTime | 1488585600           
 verified       | true                 
 vote           | null                 
-RECORD 1------------------------------
 asin           | 630580785X           
 id             | 4637                 
 image          | null                 
 overall        | 3.0                  
 reviewText     | Another one banne... 
 reviewTime     | 09 16, 2001          
 reviewerID     | A1KLJA9E10SAGP       
 reviewerName   | Dave B               
 summary        | Now this is shock... 
 unixReviewTime | 1000598400           
 verified       | false                


In [65]:
import numpy as np

In [78]:
a = np.array([[1], [2]])
a.shape

(2, 1)

In [79]:
res = a
for i in range(5000-1):
    res = np.hstack((res, a))

In [80]:
res.shape

(2, 5000)

In [84]:
np.tile(a, 5000)

array([[1, 1, 1, ..., 1, 1, 1],
       [2, 2, 2, ..., 2, 2, 2]])

In [70]:
np.hstack((a, a))

(2, 2)

In [57]:
spark.stop()