In [1]:
# Do all imports and installs here
import pandas as pd
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StringType

In [2]:
spark = SparkSession.builder\
                     .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0")\
                     .getOrCreate()

In [3]:
# Read in the data here
df_hotel_reviews = spark.read.csv('../Data/Original/Hotel_Reviews.csv', header=True, inferSchema=True)
df_hotel_reviews.printSchema()


root
 |-- Hotel_Address: string (nullable = true)
 |-- Additional_Number_of_Scoring: integer (nullable = true)
 |-- Review_Date: string (nullable = true)
 |-- Average_Score: double (nullable = true)
 |-- Hotel_Name: string (nullable = true)
 |-- Reviewer_Nationality: string (nullable = true)
 |-- Negative_Review: string (nullable = true)
 |-- Review_Total_Negative_Word_Counts: integer (nullable = true)
 |-- Total_Number_of_Reviews: integer (nullable = true)
 |-- Positive_Review: string (nullable = true)
 |-- Review_Total_Positive_Word_Counts: integer (nullable = true)
 |-- Total_Number_of_Reviews_Reviewer_Has_Given: integer (nullable = true)
 |-- Reviewer_Score: double (nullable = true)
 |-- Tags: string (nullable = true)
 |-- days_since_review: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lng: string (nullable = true)



In [4]:
print((df_hotel_reviews.count(), len(df_hotel_reviews.columns)))

(515738, 17)


In [12]:
from gensim.parsing import preprocessing
text = preprocessing.strip_multiple_whitespaces(preprocessing.strip_punctuation('Hotel-nam$e  '))
text.strip()

'Hotel nam e'

In [48]:
df_hotel_reviews.limit(5).toPandas()

Unnamed: 0,Hotel_Address,Additional_Number_of_Scoring,Review_Date,Average_Score,Hotel_Name,Reviewer_Nationality,Negative_Review,Review_Total_Negative_Word_Counts,Total_Number_of_Reviews,Positive_Review,Review_Total_Positive_Word_Counts,Total_Number_of_Reviews_Reviewer_Has_Given,Reviewer_Score,Tags,days_since_review,lat,lng
0,s Gravesandestraat 55 Oost 1092 AA Amsterdam ...,194,8/3/2017,7.7,Hotel Arena,Russia,I am so angry that i made this post available...,397,1403,Only the park outside of the hotel was beauti...,11,7,2.9,"[' Leisure trip ', ' Couple ', ' Duplex Double...",0 days,52.3605759,4.9159683
1,s Gravesandestraat 55 Oost 1092 AA Amsterdam ...,194,8/3/2017,7.7,Hotel Arena,Ireland,No Negative,0,1403,No real complaints the hotel was great great ...,105,7,7.5,"[' Leisure trip ', ' Couple ', ' Duplex Double...",0 days,52.3605759,4.9159683
2,s Gravesandestraat 55 Oost 1092 AA Amsterdam ...,194,7/31/2017,7.7,Hotel Arena,Australia,Rooms are nice but for elderly a bit difficul...,42,1403,Location was good and staff were ok It is cut...,21,9,7.1,"[' Leisure trip ', ' Family with young childre...",3 days,52.3605759,4.9159683
3,s Gravesandestraat 55 Oost 1092 AA Amsterdam ...,194,7/31/2017,7.7,Hotel Arena,United Kingdom,My room was dirty and I was afraid to walk ba...,210,1403,Great location in nice surroundings the bar a...,26,1,3.8,"[' Leisure trip ', ' Solo traveler ', ' Duplex...",3 days,52.3605759,4.9159683
4,s Gravesandestraat 55 Oost 1092 AA Amsterdam ...,194,7/24/2017,7.7,Hotel Arena,New Zealand,You When I booked with your company on line y...,140,1403,Amazing location and building Romantic setting,8,3,6.7,"[' Leisure trip ', ' Couple ', ' Suite ', ' St...",10 days,52.3605759,4.9159683


In [6]:
dist_hotel_columns = ['Hotel_Name', 'Hotel_Address', 'lat', 'lng']
df_distinct_hotels = df_hotel_reviews.dropDuplicates(subset=dist_hotel_columns)

In [7]:
print((df_distinct_hotels.count(), len(df_distinct_hotels.columns)))

(1494, 17)


There are therefore only 1494 unique hotels, even though there are more than half a million reviews.

In [47]:
df_distinct_hotels.limit(3).toPandas()

Unnamed: 0,Hotel_Address,Additional_Number_of_Scoring,Review_Date,Average_Score,Hotel_Name,Reviewer_Nationality,Negative_Review,Review_Total_Negative_Word_Counts,Total_Number_of_Reviews,Positive_Review,Review_Total_Positive_Word_Counts,Total_Number_of_Reviews_Reviewer_Has_Given,Reviewer_Score,Tags,days_since_review,lat,lng
0,Savoyenstra e 2 16 Ottakring 1160 Vienna Austria,86,7/31/2017,8.3,Austria Trend Hotel Schloss Wilhelminenberg Wien,Netherlands,Although the building looks majestic on the o...,79,1558,Excellent location with spectacular view over...,55,3,8.3,"[' Leisure trip ', ' Group ', ' Classic Room '...",3 days,,
1,Epping Forest 30 Oak Hill London IG8 9NY Unite...,193,8/2/2017,7.5,Best Western PLUS Epping Forest,United Kingdom,Bed seemed small for a double Room a bit gloo...,24,587,Easy check in Friendly staff Reasonably price...,24,3,7.9,"[' Leisure trip ', ' Couple ', ' Standard Doub...",1 days,51.603207,0.010607
2,Passeig de Gr cia 29 Eixample 08007 Barcelona ...,172,7/30/2017,8.3,Eurostars Bcn Design,Australia,The hallways feel like your in a hostel and t...,69,1601,Location and great black out curtains which r...,17,26,7.1,"[' Leisure trip ', ' Couple ', ' Basic Double ...",4 days,41.3907208,2.1660732


Get the (distinct) countries which the hotels are located in.

In [8]:
def get_last_word(address):
    """ Gets the last word of the string 'address'"""
    return address.split()[-1]

udf_get_last_word = F.udf(get_last_word, StringType())

In [9]:
df_distinct_countries = df_distinct_hotels.withColumn('country', udf_get_last_word("Hotel_Address"))

In [10]:
df_distinct_countries.select('country').distinct().show()

+-----------+
|    country|
+-----------+
|     France|
|      Italy|
|      Spain|
|    Kingdom|
|    Austria|
|Netherlands|
+-----------+



Get the number of nationalities.

In [11]:
df_hotel_reviews.select('Reviewer_Nationality').distinct().count()

227

So, we see that there are hotels from 6 countries, but reviewers from 227 countries.

In [12]:
list_of_reviewer_nationalities = df_hotel_reviews.select('Reviewer_Nationality').distinct().collect()

In [22]:
reviewer_nationalities = [row.__getitem__('Reviewer_Nationality') for row in list_of_reviewer_nationalities]

In [23]:
reviewer_nationalities

[' Montenegro ',
 ' Saint Vincent Grenadines ',
 ' Northern Mariana Islands ',
 ' Gibraltar ',
 ' Syria ',
 ' Czech Republic ',
 ' Guyana ',
 ' New Zealand ',
 ' Austria ',
 ' Bulgaria ',
 ' Saint Martin ',
 ' Argentina ',
 ' Eritrea ',
 ' United States of America ',
 ' Azerbaijan ',
 ' Fiji ',
 ' Spain ',
 ' Zambia ',
 ' Laos ',
 ' Australia ',
 ' South Sudan ',
 ' Ecuador ',
 ' Yemen ',
 ' Suriname ',
 ' Svalbard Jan Mayen ',
 ' Turkey ',
 ' American Samoa ',
 ' Guernsey ',
 ' Bosnia and Herzegovina ',
 ' Ghana ',
 ' Vanuatu ',
 ' U S Virgin Islands ',
 ' United Kingdom ',
 ' Rwanda ',
 ' Gambia ',
 ' Liechtenstein ',
 ' Kuwait ',
 ' Comoros ',
 ' Mauritius ',
 ' Kosovo ',
 ' Senegal ',
 ' St Pierre and Miquelon ',
 ' Tajikistan ',
 ' Luxembourg ',
 ' Bolivia ',
 ' Pakistan ',
 ' Sudan ',
 ' Madagascar ',
 ' Latvia ',
 ' Brunei ',
 ' Libya ',
 ' Malawi ',
 ' Somalia ',
 ' Palau ',
 ' Canada ',
 ' Tunisia ',
 ' Djibouti ',
 ' Bermuda ',
 ' Central Africa Republic ',
 ' Bahamas ',
 ' A

There are weird trailing and leading spaces here, these need to be removed remove them and also remove ' '

Count NANs, Nulls, empty strings and zeros in the following 4 cells (in that order)

In [27]:
from pyspark.sql.functions import isnan, when, count, col
#https://stackoverflow.com/questions/44627386/how-to-find-count-of-null-and-nan-values-for-each-column-in-a-pyspark-dataframe
df_hotel_reviews.select([count(when(isnan(c), c)).alias(c) for c in df_hotel_reviews.columns]).toPandas()

Unnamed: 0,Hotel_Address,Additional_Number_of_Scoring,Review_Date,Average_Score,Hotel_Name,Reviewer_Nationality,Negative_Review,Review_Total_Negative_Word_Counts,Total_Number_of_Reviews,Positive_Review,Review_Total_Positive_Word_Counts,Total_Number_of_Reviews_Reviewer_Has_Given,Reviewer_Score,Tags,days_since_review,lat,lng
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [31]:
# Note: this needs col(c)
df_hotel_reviews.select([count(when(col(c).isNull(), c)).alias(c) for c in df_hotel_reviews.columns]).toPandas()


Unnamed: 0,Hotel_Address,Additional_Number_of_Scoring,Review_Date,Average_Score,Hotel_Name,Reviewer_Nationality,Negative_Review,Review_Total_Negative_Word_Counts,Total_Number_of_Reviews,Positive_Review,Review_Total_Positive_Word_Counts,Total_Number_of_Reviews_Reviewer_Has_Given,Reviewer_Score,Tags,days_since_review,lat,lng
0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0


In [41]:
from pyspark.sql.functions import udf
@udf(returnType=StringType())
def stripv(val):
    return val.strip()

string_columns = ['Hotel_Address', 'Review_Date', 'Hotel_Name', 'Reviewer_Nationality',
                  'Negative_Review', 'Positive_Review', 'Tags', 'days_since_review', 'lat', 'lng']
df_hotel_reviews.select([count(when(stripv(c)=='', c)).alias(c) for c in string_columns]).toPandas()

Unnamed: 0,Hotel_Address,Review_Date,Hotel_Name,Reviewer_Nationality,Negative_Review,Positive_Review,Tags,days_since_review,lat,lng
0,0,0,0,523,849,183,0,0,0,0


In [51]:
numeric_columns = ['Additional_Number_of_Scoring', 'Average_Score', 'Review_Total_Negative_Word_Counts', 'Total_Number_of_Reviews',
                   'Review_Total_Positive_Word_Counts', 'Total_Number_of_Reviews_Reviewer_Has_Given', 'Reviewer_Score'  ]

df_hotel_reviews.select([count(when(col(c)==0, c)).alias(c) for c in numeric_columns]).toPandas()

Unnamed: 0,Additional_Number_of_Scoring,Average_Score,Review_Total_Negative_Word_Counts,Total_Number_of_Reviews,Review_Total_Positive_Word_Counts,Total_Number_of_Reviews_Reviewer_Has_Given,Reviewer_Score
0,0,0,127890,0,35946,0,0


ACTION TO BE TAKEN:
1. The issue with the trailing and leading spaces needs to be fixed. A strip udf will do the trick
2. There are no nulls or nans. But there are empty strings -- these need to be replaced with nulls
Note: Zeroes in numeric columns can be left alone

In [61]:

# Replace the existing columns
for c in string_columns:
    df_hotel_reviews= df_hotel_reviews.withColumn(c, stripv(c))

In [66]:
df_hotel_reviews.select('Reviewer_Nationality').distinct().show()

+--------------------+
|Reviewer_Nationality|
+--------------------+
|Turks Caicos Islands|
|              Russia|
|            Paraguay|
|            Anguilla|
|               Yemen|
|          St Maarten|
|             Senegal|
|              Sweden|
|            Kiribati|
|              Guyana|
|              Jersey|
|         Philippines|
|             Eritrea|
|            Djibouti|
|            Malaysia|
|           Singapore|
|                Fiji|
|              Turkey|
|              Malawi|
|                Iraq|
+--------------------+
only showing top 20 rows



Let's try to perform the same process as earlier -- let's get the list of distinct reviewer nationalities. We see that there are no trailing and leading spaces in this column.

In [67]:
list_of_reviewer_nationalities = df_hotel_reviews.select('Reviewer_Nationality').distinct().collect()

In [68]:
reviewer_nationalities = [row.__getitem__('Reviewer_Nationality') for row in list_of_reviewer_nationalities]

In [69]:
reviewer_nationalities

['Turks Caicos Islands',
 'Russia',
 'Paraguay',
 'Anguilla',
 'Yemen',
 'St Maarten',
 'Senegal',
 'Sweden',
 'Kiribati',
 'Guyana',
 'Jersey',
 'Philippines',
 'Eritrea',
 'Djibouti',
 'Malaysia',
 'Singapore',
 'Fiji',
 'Turkey',
 'Malawi',
 'Iraq',
 'Germany',
 'Northern Mariana Islands',
 'Comoros',
 'Crimea',
 'Cambodia',
 'Afghanistan',
 'Rwanda',
 'Jordan',
 'Maldives',
 'Ivory Coast',
 'Sudan',
 'Palau',
 'France',
 'Greece',
 'Kosovo',
 'Svalbard Jan Mayen',
 'Sri Lanka',
 'Montserrat',
 'Taiwan',
 'British Virgin Islands',
 'Dominica',
 'Togo',
 'Algeria',
 'Equatorial Guinea',
 'Slovakia',
 'Macau',
 'Reunion',
 'Argentina',
 'Belgium',
 'Angola',
 'San Marino',
 'East Timor',
 'Qatar',
 'Ecuador',
 'Lesotho',
 'Albania',
 'Madagascar',
 'Finland',
 'New Caledonia',
 'Ghana',
 'Nicaragua',
 'Myanmar',
 'Guernsey',
 'Peru',
 'Sierra Leone',
 'Benin',
 'India',
 'China',
 'Bahamas',
 'Belarus',
 'Malta',
 'Kuwait',
 'American Samoa',
 'Central Africa Republic',
 'Somalia',
 '

In [70]:
def blank_as_null(val):
    return when(col(val) != "", col(val)).otherwise(None)

for c in string_columns:
    df_hotel_reviews= df_hotel_reviews.withColumn(c, blank_as_null(c))

Count empty strings and nulls again to see if this operation worked.

In [71]:
string_columns = ['Hotel_Address', 'Review_Date', 'Hotel_Name', 'Reviewer_Nationality',
                  'Negative_Review', 'Positive_Review', 'Tags', 'days_since_review', 'lat', 'lng']
df_hotel_reviews.select([count(when(stripv(c)=='', c)).alias(c) for c in string_columns]).toPandas()

Py4JJavaError: An error occurred while calling o1520.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 66.0 failed 1 times, most recent failure: Lost task 0.0 in stage 66.0 (TID 1291, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-41-2468b537e4c9>", line 4, in stripv
AttributeError: 'NoneType' object has no attribute 'strip'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3257)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3254)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3254)
	at sun.reflect.GeneratedMethodAccessor72.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/opt/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-41-2468b537e4c9>", line 4, in stripv
AttributeError: 'NoneType' object has no attribute 'strip'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
There is a None, so the operation was successfully completed.