In [None]:
# spark must be first found using findspark package
import findspark
findspark.init()

In [None]:
# establish a spark session with 50 executors
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .master("yarn") \
        .appName("testing") \
        .config("spark.executor.instances", "50") \
        .config("spark.executor.memory","5g") \
        .config("spark.driver.memory","30g") \
        .config("spark.executor.cores",'1') \
        .config("spark.scheduler.mode","FIFO") \
        .config("spark.driver.maxResultSize", '4g') \
        .getOrCreate()

In [None]:
# when wanting to stop the spark session
spark.stop()

In [None]:
# import the review dataset
review = spark.read.parquet('/yelp/review.parquet').repartition(300).cache()
business = spark.read.parquet('/yelp/business.parquet').repartition(100).cache()
users = spark.read.parquet('/yelp/users.parquet').repartition(200).cache()

In [None]:
from pyspark.sql.functions import greatest
# generate column for total aggregated count of votes
review = review.withColumn('totalvotes', review.cool+review.funny+review.useful)

# select only the reviews where one has at least 50 %
review = review.withColumn('max_vote', greatest(review.cool,review.funny, review.useful))
review = review.withColumn('max_ratio', review.max_vote/review.totalvotes)
review = review.where( (review.max_vote / review.totalvotes) >=0.4)

# filter out reviews with less than 10 reviews
review = review.where(review.totalvotes >= 10)
review.count()

In [85]:
#import lemmatized files, change format
lemmatized = spark.read.json("/yelp/flatten_ lemmatized.json").repartition(150)
lemma_pd = lemmatized.toPandas()

In [None]:
lemmatized = spark.createDataFrame(lemma_pd).repartition(150).cache()

In [None]:
lemmatized.printSchema()

In [None]:
# WORD2VEC fitting
import time
from pyspark.ml.feature import Tokenizer, StopWordsRemover, Word2Vec
start = time.time()

# tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="words")
dataset = tokenizer.transform(lemmatized)

# drop original text column
dataset = dataset.drop("text")

# Stop word removal
stopremove = StopWordsRemover(inputCol='words',outputCol='cleaned')
dataset = stopremove.transform(dataset)

dataset = dataset.drop('words').repartition(300).cache()

#fit a word2vec model 
word2Vec = Word2Vec(vectorSize=500, minCount=0, numPartitions=300, inputCol="cleaned", outputCol="word2vec")
model = word2Vec.fit(dataset)
dataset = model.transform(dataset).drop('cleaned')

end = time.time()

In [None]:
def assignclass(row):
    if row.funny / row.totalvotes == max_ratio:
        return "funny"
    if row.cool / row.totalvotes == max_ratio:
        return "cool"
    else: return "useful"

temp = dataset.rdd.map(assignclass).toDF()
temp

In [None]:
from pyspark.ml.classification import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Concept of pipeline
# Additionally how to cross validate
logit = LogisticRegression(featuresCol='word2vec',labelCol='max_category')
cart = DecisionTreeClassifier(featuresCol='word2vec',labelCol='max_category')
gbt = GBTClassifier(featuresCol='word2vec',labelCol='max_category')
rf = RandomForestClassifier(featuresCol='word2vec',labelCol="max_category")

paramGrid_logit = ParamGridBuilder() \
    .addGrid(logit.regParam, [0,0.01, 0.1]) \
    .build()
    
paramGrid_cart = ParamGridBuilder() \
    .addGrid(cart.maxDepth, [10,12,15]) \
    .build()

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [15]) \
    .addGrid(rf.numTrees, [100]) \
    .build()
    
paramGrid_gbt = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [10,12,15]) \
    .addGrid(gbt.stepSize,[0.01]) \
    .addGrid(gbt.maxIter,[20]) \
    .build()

evaluator = MulticlassClassificationEvaluator(labelCol='target')
    

cv_logit = CrossValidator(estimator=logit,evaluator=evaluator,estimatorParamMaps=paramGrid_logit,numFolds=5)
cv_cart = CrossValidator(estimator=cart,evaluator=evaluator,estimatorParamMaps=paramGrid_cart,numFolds=5)
cv_gbt = CrossValidator(estimator=gbt,evaluator=evaluator,estimatorParamMaps=paramGrid_gbt,numFolds=5)
cv_rf = CrossValidator(estimator=pipeline_rf, evaluator=evaluator, numFolds=5, estimatorParamMaps=paramGrid_rf)

In [None]:
cvmodel_logit = cv_logit.fit(dat)

In [None]:
# Using PCA on the new data to understand variance
from pyspark.ml.feature import PCA

pca = PCA(k=10, inputCol= "word2vec",outputCol="pca_text")
pca_model = pca.fit(dataset)
pca_result = pca_model.transform(dataset)

In [None]:
pca_model.explainedVariance.values

In [None]:
import numpy as np
import matplotlib.pyplot as plt
#np.arange(len(result.columns))
#result.columns
plt.plot(np.arange(10), np.array(pca_model.explainedVariance.values))
plt.title('Explained Variance - PCA')
plt.ylabel('Proportion of Variance Explained')
plt.xlabel('Principal Component')
plt.show()

In [None]:
pca_result.printSchema()

In [None]:
pca_result_pd = pca_result.toPandas()
pca_result_pd['maxcat'] = pca_result_pd[['cool','useful','funny']].idxmax(axis = 1 )
pca_result_pd['maxcat'] = pca_result_pd['maxcat'].astype('category')
pca_result_pd['maxcat_code'] = pca_result_pd['maxcat'].cat.codes

In [None]:
np.array(pca_result.pca_text)

In [None]:
def extract_pca(row):
    return tuple(row.pca_text.toArray().tolist())
pca_result = pca_result.rdd.map(extract_pca).toDF()

In [None]:
pca_result_pd = pca_result.toPandas()

In [None]:
pca_result_pd

In [None]:
from MulticoreTSNE import MulticoreTSNE as TSNE
tsne= TSNE(early_exaggeration=10,n_jobs=20)
tsne_output = tsne.fit_transform(np.array(pca_result_pd.pca_text))

In [None]:
plt.scatter(tsne_output[:,0],tsne_output[:,1],, c)
plt.show()

In [None]:
from mpl_toolkits.mplot3d import Axes3D
fig = plt.figure(figsize=(15,15))
ax = fig.add_subplot(111,projection='3d')

ax.scatter(xs=result_pd._1, ys=result_pd._2, zs = result_pd._3)
plt.title('3d Representation of Word2Vec Embeddings')
plt.show()

In [None]:
result_pd['maxcat'] = data[['cool','useful','funny']].idxmax(axis = 1 )
result_pd['maxcat'] = result_pd['maxcat'].astype('category')
result_pd['maxcat_code'] = result_pd['maxcat'].cat.codes

In [None]:
result_pd