# Apache Spark (Pyspark) Text Prep

## Notebook Setup

The following will load the required libraries and setup the environment for use.

In [1]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF

In [2]:
spark = SparkSession.builder.master('local[*]').config("spark.driver.memory", "24g").appName('spark').getOrCreate()
sc = spark.sparkContext
print(f'pyspark version: {sc.version}')

22/11/13 21:09:27 WARN Utils: Your hostname, cheetah resolves to a loopback address: 127.0.1.1; using 192.168.1.66 instead (on interface eno1)
22/11/13 21:09:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/13 21:09:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
pyspark version: 3.3.0


## Load the App Reviews Dataset

Here we load the mobile application reviews data set

In [3]:
mobile_app_reviews_path = '../../data/raw/appsearch_reviews/appsearch_reviews.txt'
mobile_app_reviews_df = spark.read.json(mobile_app_reviews_path)

[Stage 0:>                                                          (0 + 8) / 8]                                                                                

In [4]:
mobile_app_reviews_df.printSchema()

root
 |-- app_id: string (nullable = true)
 |-- num_reviews: long (nullable = true)
 |-- reviews: array (nullable = true)
 |    |-- element: string (containsNull = true)



## Prepare the App Reviews for Explorations

In [5]:
mobile_app_reviews_df.show()
print(f'There are {mobile_app_reviews_df.count()} applications in the dataset')

+--------------------+-----------+--------------------+
|              app_id|num_reviews|             reviews|
+--------------------+-----------+--------------------+
|         com.nut.man|         50|[This game is ver...|
|com.gamevil.darkn...|         50|[Life Changer.  I...|
|      jamgame.absorb|         50|[Frantic and Unpr...|
|com.imangi.temple...|         50|[.  Well to be ho...|
|com.glu.contractk...|         50|[.  Inspite of ha...|
|     com.iugome.dawn|         50|[Great game.  Pla...|
|com.fungames.snip...|         50|[Super!.  Would b...|
|   com.glu.deerhunt2|         50|[I only play regi...|
|com.activision.wi...|         50|[Really.  The con...|
|com.gameloft.andr...|         50|[Why i cant login...|
|com.topfreegames....|         50|[Never Playing Ev...|
|com.libra.chickchick|         50|[Luv it.  Its ver...|
|com.rockstargames...|         50|[Gta v ifruit.  W...|
|com.nekki.shadowf...|         50|[Ai counter timin...|
|com.kabam.creatur...|         50|[Security Dete

The app reviews are stored as lists in each record. We need to create new rows for each one.

In [6]:
mobile_app_reviews_df = mobile_app_reviews_df.select(F.col('app_id'), F.explode(F.col('reviews')).alias('text'))
mobile_app_reviews_df.show()
print(f'There are {mobile_app_reviews_df.count()} individual reviews in the app reviews dataset')

+-----------+--------------------+
|     app_id|                text|
+-----------+--------------------+
|com.nut.man|This game is very...|
|com.nut.man|Terrific just get...|
|com.nut.man|I CAN'T STOP TAPP...|
|com.nut.man|.  The game itsel...|
|com.nut.man|ADS GALORE!.  I w...|
|com.nut.man|Really fun... But...|
|com.nut.man|Literally a copy ...|
|com.nut.man|Challenging Game!...|
|com.nut.man|Addicted!!.  I am...|
|com.nut.man|Nut-man? HA!.  I ...|
|com.nut.man|.  Needs less ads...|
|com.nut.man|SHAME ON YOU.  Sh...|
|com.nut.man|What a ripoff.  E...|
|com.nut.man|Fun but annoyingl...|
|com.nut.man|This game is cool...|
|com.nut.man|.  Fun game for a...|
|com.nut.man|It's alright.  Ki...|
|com.nut.man|.  This game is r...|
|com.nut.man|Wtf.  The gears t...|
|com.nut.man|Great game.  I wo...|
+-----------+--------------------+
only showing top 20 rows

There are 1385607 individual reviews in the app reviews dataset


The mobile apps reviews data set has ~138k individual reviews, so it should be large enough to show a performance difference between methods.

## Prep the App Data for LDA

To prep the data, we will:

1. Remove leading and trailing whitespaces
1. Remove non-alpha numeric characters and multiple spaces
1. Split reviews into their individual words (tokenize)
1. Create Term count and Inverse document frequency columns
1. Collect the results and compute the time to prep text

In [7]:
%%time
mobile_app_reviews_df = mobile_app_reviews_df.withColumn('text', F.trim(F.col('text')))

mobile_app_reviews_df = mobile_app_reviews_df.withColumn('text', F.regexp_replace(F.col('text'), r'[^a-zA-Z ]', ''))
mobile_app_reviews_df = mobile_app_reviews_df.withColumn('text', F.regexp_replace(F.col('text'), r'\s{2,}', ' '))

tokenizer = Tokenizer(inputCol='text', outputCol="tokens")
mobile_app_reviews_df = tokenizer.transform(mobile_app_reviews_df)

cv = CountVectorizer(inputCol='tokens', outputCol='vectors')
cv_model = cv.fit(mobile_app_reviews_df)
mobile_app_reviews_df = cv_model.transform(mobile_app_reviews_df)

idf = IDF(minDocFreq=3, inputCol='vectors', outputCol='idf')
idf_model = idf.fit(mobile_app_reviews_df)
mobile_app_reviews_df = idf_model.transform(mobile_app_reviews_df)

# Compute the entire dataframe, but only print the first row
mobile_app_reviews_df.collect()[0]

                                                                                

22/11/13 21:09:40 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB




22/11/13 21:09:46 WARN DAGScheduler: Broadcasting large task binary with size 2.9 MiB


                                                                                

22/11/13 21:09:46 WARN DAGScheduler: Broadcasting large task binary with size 6.9 MiB


                                                                                

CPU times: user 1min 53s, sys: 2.56 s, total: 1min 56s
Wall time: 2min 20s


Row(app_id='com.nut.man', text='This game is very good Kudos to the developer Wow absolutely stunning simple addictive and a distinctive flair Can you imagine this game with upgrades The ads are fine just control yourself and keep calm', tokens=['this', 'game', 'is', 'very', 'good', 'kudos', 'to', 'the', 'developer', 'wow', 'absolutely', 'stunning', 'simple', 'addictive', 'and', 'a', 'distinctive', 'flair', 'can', 'you', 'imagine', 'this', 'game', 'with', 'upgrades', 'the', 'ads', 'are', 'fine', 'just', 'control', 'yourself', 'and', 'keep', 'calm'], vectors=SparseVector(262144, {0: 2.0, 2: 1.0, 4: 2.0, 5: 1.0, 6: 2.0, 8: 1.0, 9: 2.0, 15: 1.0, 19: 1.0, 26: 1.0, 29: 1.0, 33: 1.0, 35: 1.0, 36: 1.0, 119: 1.0, 148: 1.0, 164: 1.0, 246: 1.0, 287: 1.0, 330: 1.0, 346: 1.0, 422: 1.0, 433: 1.0, 1107: 1.0, 1198: 1.0, 2197: 1.0, 2225: 1.0, 2258: 1.0, 3214: 1.0, 13128: 1.0, 13448: 1.0}), idf=SparseVector(262144, {0: 1.8558, 2: 1.0315, 4: 2.2732, 5: 1.2821, 6: 2.5463, 8: 1.4355, 9: 3.4094, 15: 1.9219

## Save the prepped data to file

In [8]:
%%time
mobile_app_reviews_df.write.mode("overwrite").parquet("../../data/cleaned/appsearch_reviews_clean.txt")

22/11/13 21:11:54 WARN DAGScheduler: Broadcasting large task binary with size 7.1 MiB




CPU times: user 7.22 ms, sys: 0 ns, total: 7.22 ms
Wall time: 10.2 s


                                                                                