In [2]:
import os
import sys

SPARK_HOME = "/usr/hdp/current/spark2-client"
PYSPARK_PYTHON = "/opt/conda/envs/dsenv/bin/python"
os.environ["PYSPARK_PYTHON"]= PYSPARK_PYTHON
os.environ["SPARK_HOME"] = SPARK_HOME

PYSPARK_HOME = os.path.join(SPARK_HOME, "python/lib")
sys.path.insert(0, os.path.join(PYSPARK_HOME, "py4j-0.10.7-src.zip"))
sys.path.insert(0, os.path.join(PYSPARK_HOME, "pyspark.zip"))

In [3]:
import sys
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.ml.feature import *
from pyspark.ml import Estimator, Transformer
from pyspark.ml import Pipeline    

In [5]:
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel('WARN')

In [6]:
raw_data_path = '/datasets/amazon/all_reviews_5_core_train_extra_small_sentiment.json'
processed_data_path = 'HW5/processed.parquet'

In [7]:
schema = StructType([
        StructField("asin", StringType()),
        StructField("id", LongType()),
        StructField("label", IntegerType()),
        StructField("reviewText", StringType()),
        StructField("reviewTime", DateType()),
        StructField("reviewerID", StringType()),
        StructField("reviewerName", StringType()),
        StructField("vote", IntegerType()),
        StructField("summary", StringType()),
        StructField("unixReviewTime", TimestampType()),
        StructField("verified", BooleanType())
    ])

dataset = spark.read.json(raw_data_path, schema=schema,dateFormat='MM dd, yyyy').cache()

In [8]:
dataset.show(2, vertical=True)

-RECORD 0------------------------------
 asin           | B000VX4W78           
 id             | 72900                
 label          | 1                    
 reviewText     | Purchased these f... 
 reviewTime     | 2009-11-30           
 reviewerID     | A3BH3XJBBU7FF2       
 reviewerName   | The Dragon           
 vote           | null                 
 summary        | Works great, nice... 
 unixReviewTime | 2009-11-30 00:00:00  
 verified       | true                 
-RECORD 1------------------------------
 asin           | B0017U1KBK           
 id             | 104280               
 label          | 1                    
 reviewText     | This thing is too... 
 reviewTime     | 2017-01-04           
 reviewerID     | A408FUV9TO4EA        
 reviewerName   | Amber                
 vote           | null                 
 summary        | Five Stars           
 unixReviewTime | 2017-01-04 00:00:00  
 verified       | true                 
only showing top 2 rows



In [9]:
droper = SQLTransformer(statement="SELECT * FROM __THIS__ WHERE reviewText is not null")
tokenizer = Tokenizer(inputCol="reviewText", outputCol="words")
hasher = HashingTF(numFeatures=100, binary=True, inputCol=tokenizer.getOutputCol(), outputCol="word_vector")
pipeline = Pipeline(stages=[
        droper,
        tokenizer,
        hasher        
    ])
processed_dataset = pipeline.fit(dataset).transform(dataset)

In [16]:
processed_dataset.show(2, vertical=True)

-RECORD 0------------------------------
 asin           | B000VX4W78           
 id             | 72900                
 label          | 1                    
 reviewText     | Purchased these f... 
 reviewTime     | 2009-11-30           
 reviewerID     | A3BH3XJBBU7FF2       
 reviewerName   | The Dragon           
 vote           | null                 
 summary        | Works great, nice... 
 unixReviewTime | 2009-11-30 00:00:00  
 verified       | true                 
 words          | [purchased, these... 
 word_vector    | (100,[2,3,4,10,11... 
-RECORD 1------------------------------
 asin           | B0017U1KBK           
 id             | 104280               
 label          | 1                    
 reviewText     | This thing is too... 
 reviewTime     | 2017-01-04           
 reviewerID     | A408FUV9TO4EA        
 reviewerName   | Amber                
 vote           | null                 
 summary        | Five Stars           
 unixReviewTime | 2017-01-04 00:00:00  


In [17]:
processed_dataset.write.parquet(processed_data_path)     

In [19]:
processed_data_path = 'HW5/processed_test.parquet'
processed_dataset[['id','label']].write.parquet(processed_data_path)

In [None]:
spark.stop()