In [1]:
import pandas as pd
import time as time
time.time()

1671475485.2257507

## Initialize spark

In [2]:
# find spark
import findspark
findspark.init()

# override cloudpickle in order for reading to work
import cloudpickle
import pyspark.serializers
pyspark.serializers.cloudpickle = cloudpickle
import pyspark.sql.functions as f
from pyspark.ml.feature import Tokenizer, HashingTF, IDF, CountVectorizer
from pyspark.mllib.linalg import SparseVector, DenseVector
from pyspark.mllib.linalg import Vector

In [3]:
# start session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").config("spark.driver.memory", "15g").appName("YeplReviews").getOrCreate()
sc = spark.sparkContext
df = spark.read.json("hdfs://localhost:9000/test/yelp_academic_dataset_review.json")
df = df[["stars", "text"]]
# add an index column
df = df.withColumn('index', f.monotonically_increasing_id())

# sort ascending and take first 100 rows for df1
df = df.sort('index').limit(5)
df.show(5, False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|stars|text                                                                                                                                             

In [4]:
print((df.count(), len(df.columns)))
df.printSchema()

(5, 3)
root
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- index: long (nullable = false)



In [5]:
tokenizer = Tokenizer()
tokenizer.setInputCol('text').setOutputCol('words')
df = tokenizer.transform(df)
df.show(5, False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------

In [6]:
cv = CountVectorizer()
cvModel = cv.setInputCol("words").setOutputCol("features").fit(df)
counted = cvModel.transform(df)
counted.head()

Row(stars=3.0, text="If you decide to eat here, just be aware it is going to take about 2 hours from beginning to end. We have tried it multiple times, because I want to like it! I have been to it's other locations in NJ and never had a bad experience. \n\nThe food is good, but it takes a very long time to come out. The waitstaff is very young, but usually pleasant. We have just had too many experiences where we spent way too long waiting. We usually opt for another diner or restaurant on the weekends, in order to be done quicker.", index=0, words=['if', 'you', 'decide', 'to', 'eat', 'here,', 'just', 'be', 'aware', 'it', 'is', 'going', 'to', 'take', 'about', '2', 'hours', 'from', 'beginning', 'to', 'end.', 'we', 'have', 'tried', 'it', 'multiple', 'times,', 'because', 'i', 'want', 'to', 'like', 'it!', 'i', 'have', 'been', 'to', "it's", 'other', 'locations', 'in', 'nj', 'and', 'never', 'had', 'a', 'bad', 'experience.', '', '', 'the', 'food', 'is', 'good,', 'but', 'it', 'takes', 'a', 'ver

In [7]:
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import udf
from pyspark.sql.types import *
schema = StructType([
    StructField("stars", DecimalType(), False),
    StructField("dict", MapType(StringType(),DecimalType()), False)
])

def indices_to_terms(vocabulary):
    def indices_to_terms(xs):
        sv = xs
        mydict = {}
        for i in list(sv.indices):
            print(i)
            mydict[vocabulary[int(i)]] = int(sv[int(i)])
        return mydict
    return udf(indices_to_terms, MapType(StringType(),IntegerType()))

df2 = counted.withColumn("result", indices_to_terms(cvModel.vocabulary)("features"))
df2.show(5, False)

+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+---------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
time.time()

1671475608.3070838

In [9]:
from pyspark.sql.functions import explode,map_keys,col
keysDF = df2.select(explode(map_keys(df2.result))).distinct()
keysList = keysDF.rdd.map(lambda x:x[0]).collect()
keyCols = list(map(lambda x: col("result").getItem(x).alias(str(x)), keysList))
df = df2.select(df2.stars, *keyCols)
df.show()

+-----+------+----+----+-----+-----+------+----+----+------+----+-------+----------+----+------+----+-----+------+----+-------+---------------+-------+-------+------+----+----+-----+-----+-----+----------+-------+----+------+-----+-----+-----+----+----+----+--------+--------+----+------+----+------+------+-----+--------+----+-------+----+------+----+----+----+------+----+----+------+------+------+----+------+----+-------+-------+------------+----+----+-----+------+----+--------+----+-----+------+-------+----+------+-----+-------+---------+-------+-------+----+------+----+-----+--------+------+----+-------+-------+----+-----+----+------+-----+-------+----+-----+----+---------+-----+-----+----------+----+----+-----+----+-------+-------+----+--------+------+----+---------+-----+-------+---------+-----+-------+----+---------+-----+-------+----+----+----+----+-----+----+----+----+--------+------+---+----+-----+------+----+-----+-------+----+------+-------------+----+----+----+----+-----+---

In [10]:
df.na.fill(0).show()

+-----+------+---+----+-----+-----+------+---+---+------+----+-------+----------+----+------+---+-----+------+----+-------+---------------+-------+-------+------+---+---+-----+-----+-----+----------+-------+----+------+-----+-----+-----+---+---+----+--------+--------+---+------+----+------+------+-----+--------+----+-------+---+------+----+---+---+------+---+---+------+------+------+----+------+----+-------+-------+------------+---+----+-----+------+----+--------+---+-----+------+-------+----+------+-----+-------+---------+-------+-------+----+------+----+-----+--------+------+---+-------+-------+----+-----+----+------+-----+-------+----+-----+---+---------+-----+-----+----------+---+---+-----+---+-------+-------+---+--------+------+----+---------+-----+-------+---------+-----+-------+---+---------+-----+-------+----+----+----+---+-----+----+----+----+--------+------+---+----+-----+------+----+-----+-------+----+------+-------------+----+----+---+---+-----+---------+-----+-----+-------