## Recommendation using Sentiment Analysis


In [1]:
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from functools import reduce
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import matplotlib.pyplot as plt
from wordcloud import WordCloud 
import pandas as pd
import re
import string

In [2]:
spark = SparkSession.builder.master('local[*]').config('spark.dirver.maxResultSize', '8g') .config("spark.driver.memory", "20g").appName('AirBnB').getOrCreate()

In [3]:
df = spark.read.format('csv').options(header='true', inferschema='true').load('New York City/reviews.csv')


In [6]:
df.count()
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|          listing_id|                  id|                date|         reviewer_id|       reviewer_name|            comments|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                2060|                 158|          2008-09-22|                2865|                Thom|"very nice neighb...|
|comfortable bed a...|                null|                null|                null|                null|                null|
|Jennys cat is ver...| but wants to sle...|                null|                null|                null|                null|
| Meyow meyow meyow."|                null|                null|                null|                null|                null|
|                2595|               17857|          2009-11-21|               50679|                Jea

In [8]:
panda_df = pd.read_csv('New York City/reviews.csv')
panda_df.head(10)


Unnamed: 0,listing_id,id,date,reviewer_id,reviewer_name,comments
0,2060,158,2008-09-22,2865,Thom,"very nice neighborhood,close enough to ""A"" tra..."
1,2595,17857,2009-11-21,50679,Jean,Notre séjour de trois nuits.\r\nNous avons app...
2,2595,19176,2009-12-05,53267,Cate,Great experience.
3,2595,19760,2009-12-10,38960,Anita,I've stayed with my friend at the Midtown Cast...
4,2595,34320,2010-04-09,71130,Kai-Uwe,"We've been staying here for about 9 nights, en..."
5,2595,46312,2010-05-25,117113,Alicia,We had a wonderful stay at Jennifer's charming...
6,2595,1238204,2012-05-07,1783688,Sergey,Hi to everyone!\r\nWould say our greatest comp...
7,2595,1293632,2012-05-17,1870771,Loïc,"Jennifer was very friendly and helpful, and he..."
8,2595,2022498,2012-08-18,2124102,Melanie,This apartment is like a real castle old and u...
9,2595,4682989,2013-05-20,496053,Eric,Jennifer's place was in a great midtown locati...


In [11]:
len(panda_df)


1273976

In [12]:
panda_df.isna().sum()

listing_id         0
id                 0
date               0
reviewer_id        0
reviewer_name     10
comments         749
dtype: int64

In [9]:
type(df)
df.printSchema()
df = df.na.drop()
df.show(10)


root
 |-- listing_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- reviewer_id: string (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)

+----------+-------+----------+-----------+-------------+--------------------+
|listing_id|     id|      date|reviewer_id|reviewer_name|            comments|
+----------+-------+----------+-----------+-------------+--------------------+
|      2060|    158|2008-09-22|       2865|         Thom|"very nice neighb...|
|      2595|  17857|2009-11-21|      50679|         Jean|Notre séjour de t...|
|      2595|  19176|2009-12-05|      53267|         Cate|   Great experience.|
|      2595|  19760|2009-12-10|      38960|        Anita|I've stayed with ...|
|      2595|  34320|2010-04-09|      71130|      Kai-Uwe|We've been stayin...|
|      2595|  46312|2010-05-25|     117113|       Alicia|We had a wonderfu...|
|      2595|1238204|2012-05-07|    1783688|

In [10]:
df.count()

1290788

### NLTK Pyspark for reviews classification

In [5]:
reviews_rdd = df.select("comments").rdd.flatMap(lambda x: x)
reviews_rdd.collect()

['"very nice neighborhood,close enough to ""A"" train',
 'Notre séjour de trois nuits.',
 'Great experience.',
 "I've stayed with my friend at the Midtown Castle for six days and it was a lovely place to be. A big spacious room with a pointy roof, which really makes you feel like staying in a castle. The location is perfect. It is just a few steps from Macy's Time Square and Theatre District. Everything worked just perfect with the keys etc. Thank you so much Jennifer, we had a great time in New York.",
 "We've been staying here for about 9 nights, enjoying to be in the center of the city, that never sleeps...short ways to everywhere in Manhattan, by subway or by walk. Midtown castle is a beauftiful and tastful place, Jennifer and Tori relaxed and friendly hosts - thats why we - the three Berliners - recommand that place! Good to have WiFi and a little kitchen too!",
 "We had a wonderful stay at Jennifer's charming apartment! They were very organized and helpful; I would definitely rec

## Convert text into lower case

In [6]:
reviews_rdd_lower = reviews_rdd.map(lambda x: x.lower())
reviews_rdd_lower.collect()

['"very nice neighborhood,close enough to ""a"" train',
 'notre séjour de trois nuits.',
 'great experience.',
 "i've stayed with my friend at the midtown castle for six days and it was a lovely place to be. a big spacious room with a pointy roof, which really makes you feel like staying in a castle. the location is perfect. it is just a few steps from macy's time square and theatre district. everything worked just perfect with the keys etc. thank you so much jennifer, we had a great time in new york.",
 "we've been staying here for about 9 nights, enjoying to be in the center of the city, that never sleeps...short ways to everywhere in manhattan, by subway or by walk. midtown castle is a beauftiful and tastful place, jennifer and tori relaxed and friendly hosts - thats why we - the three berliners - recommand that place! good to have wifi and a little kitchen too!",
 "we had a wonderful stay at jennifer's charming apartment! they were very organized and helpful; i would definitely rec

## Sentence Tokenization

In [7]:
reviews_tokenization_rdd = reviews_rdd_lower.map(lambda x: nltk.sent_tokenize(x))
reviews_tokenization_rdd.collect()

[['"very nice neighborhood,close enough to ""a"" train'],
 ['notre séjour de trois nuits.'],
 ['great experience.'],
 ["i've stayed with my friend at the midtown castle for six days and it was a lovely place to be.",
  'a big spacious room with a pointy roof, which really makes you feel like staying in a castle.',
  'the location is perfect.',
  "it is just a few steps from macy's time square and theatre district.",
  'everything worked just perfect with the keys etc.',
  'thank you so much jennifer, we had a great time in new york.'],
 ["we've been staying here for about 9 nights, enjoying to be in the center of the city, that never sleeps...short ways to everywhere in manhattan, by subway or by walk.",
  'midtown castle is a beauftiful and tastful place, jennifer and tori relaxed and friendly hosts - thats why we - the three berliners - recommand that place!',
  'good to have wifi and a little kitchen too!'],
 ["we had a wonderful stay at jennifer's charming apartment!",
  'they were

## Word Tokenization

In [8]:
def sentTokenizeWord(x):
    splitted = [word for line in x for word in line.split()]
    return splitted

word_tokenize_rdd = reviews_tokenization_rdd.map(sentTokenizeWord)
word_tokenize_rdd.collect()

[['"very', 'nice', 'neighborhood,close', 'enough', 'to', '""a""', 'train'],
 ['notre', 'séjour', 'de', 'trois', 'nuits.'],
 ['great', 'experience.'],
 ["i've",
  'stayed',
  'with',
  'my',
  'friend',
  'at',
  'the',
  'midtown',
  'castle',
  'for',
  'six',
  'days',
  'and',
  'it',
  'was',
  'a',
  'lovely',
  'place',
  'to',
  'be.',
  'a',
  'big',
  'spacious',
  'room',
  'with',
  'a',
  'pointy',
  'roof,',
  'which',
  'really',
  'makes',
  'you',
  'feel',
  'like',
  'staying',
  'in',
  'a',
  'castle.',
  'the',
  'location',
  'is',
  'perfect.',
  'it',
  'is',
  'just',
  'a',
  'few',
  'steps',
  'from',
  "macy's",
  'time',
  'square',
  'and',
  'theatre',
  'district.',
  'everything',
  'worked',
  'just',
  'perfect',
  'with',
  'the',
  'keys',
  'etc.',
  'thank',
  'you',
  'so',
  'much',
  'jennifer,',
  'we',
  'had',
  'a',
  'great',
  'time',
  'in',
  'new',
  'york.'],
 ["we've",
  'been',
  'staying',
  'here',
  'for',
  'about',
  '9',
  'n

## Removing stop words, punctuation and spaces

In [10]:
from nltk.corpus import stopwords

def remove_stop(x):
    stop_words = set(stopwords.words('english'))
    filtered_rdd_stopwords = [word for word in x if not word in stop_words]
    return filtered_rdd_stopwords
def remove_punc(x):
    list_punct=list(string.punctuation)
    filtered = [''.join(c for c in s if c not in list_punct) for s in x] 
    filtered_space = [s for s in filtered if s] #remove empty space 
    return filtered
stp_rmv_rdd = word_tokenize_rdd.map(remove_stop)
rmv_punc_rdd = stp_rmv_rdd.map(remove_punc)
rmv_punc_rdd.collect()

[['very', 'nice', 'neighborhoodclose', 'enough', 'a', 'train'],
 ['notre', 'séjour', 'de', 'trois', 'nuits'],
 ['great', 'experience'],
 ['ive',
  'stayed',
  'friend',
  'midtown',
  'castle',
  'six',
  'days',
  'lovely',
  'place',
  'be',
  'big',
  'spacious',
  'room',
  'pointy',
  'roof',
  'really',
  'makes',
  'feel',
  'like',
  'staying',
  'castle',
  'location',
  'perfect',
  'steps',
  'macys',
  'time',
  'square',
  'theatre',
  'district',
  'everything',
  'worked',
  'perfect',
  'keys',
  'etc',
  'thank',
  'much',
  'jennifer',
  'great',
  'time',
  'new',
  'york'],
 ['weve',
  'staying',
  '9',
  'nights',
  'enjoying',
  'center',
  'city',
  'never',
  'sleepsshort',
  'ways',
  'everywhere',
  'manhattan',
  'subway',
  'walk',
  'midtown',
  'castle',
  'beauftiful',
  'tastful',
  'place',
  'jennifer',
  'tori',
  'relaxed',
  'friendly',
  'hosts',
  '',
  'thats',
  '',
  'three',
  'berliners',
  '',
  'recommand',
  'place',
  'good',
  'wifi',
  

## Lemmatization
Doing things properly with the use of a vocabulary and morphological analysis of words, normally aiming to remove inflectional endings only and to return the base or dictionary form of a word
https://nlp.stanford.edu/IR-book/html/htmledition/stemming-and-lemmatization-1.html

In [11]:
nltk.download('wordnet')
lemmatizer = WordNetLemmatizer()
lem_words_rdd = rmv_punc_rdd.map(lambda x: [lemmatizer.lemmatize(s) for s in x])
joined_tokens_rdd = lem_words_rdd.map(lambda x: ' '.join(x))
joined_tokens_rdd.collect()

[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/salihamehboob/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


['very nice neighborhoodclose enough a train',
 'notre séjour de trois nuits',
 'great experience',
 'ive stayed friend midtown castle six day lovely place be big spacious room pointy roof really make feel like staying castle location perfect step macys time square theatre district everything worked perfect key etc thank much jennifer great time new york',
 'weve staying 9 night enjoying center city never sleepsshort way everywhere manhattan subway walk midtown castle beauftiful tastful place jennifer torus relaxed friendly host  thats  three berliner  recommand place good wifi little kitchen too',
 'wonderful stay jennifers charming apartment organized helpful would definitely recommend staying midtown castle',
 'hi everyone',
 'jennifer friendly helpful place exactly advertised location convenient pleasure stay midtown castle definitely recommend ',
 'apartment like real castle old unique age related stain bathroom floor dark discoloration carpet indeed indicate building built long l

### Creating chunks of words
Chunking: shallow parsing, goruping words into chunks
Chinking: remove chunks from words
POS: a tagger assings part of speech to each word.
https://www.nltk.org/book/ch07.html

In [13]:
nltk.download('averaged_perceptron_tagger')
def extractPhraseFunct(x):
    stop_words=set(stopwords.words('english'))
    def leaves(tree):
        """Finds NP (nounphrase) leaf nodes of a chunk tree."""
        for subtree in tree.subtrees(filter = lambda t: t.label()=='NP'):
            yield subtree.leaves()
    
    def get_terms(tree):
        for leaf in leaves(tree):
            term = [w for w,t in leaf if not w in stop_words]
            yield term
    sentence_re = r'(?:(?:[A-Z])(?:.[A-Z])+.?)|(?:\w+(?:-\w+)*)|(?:\$?\d+(?:.\d+)?%?)|(?:...|)(?:[][.,;"\'?():-_`])'
    grammar = r"""
    NBAR:
    {<NN.*|JJ>*<NN.*>}  # Nouns and Adjectives, terminated with Nouns
        
    NP:
    {<NBAR>}
    {<NBAR><IN><NBAR>}  # Above, connected with in/of/etc...
    """
    chunker = nltk.RegexpParser(grammar)
    tokens = nltk.regexp_tokenize(x,sentence_re)
    postoks = nltk.tag.pos_tag(tokens) #Part of speech tagging 
    tree = chunker.parse(postoks) #chunking
    terms = get_terms(tree)
    temp_phrases = []
    for term in terms:
        if len(term):
            temp_phrases.append(' '.join(term))
    
    finalPhrase = [w for w in temp_phrases if w] #remove empty lists
    return finalPhrase
    

[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/salihamehboob/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


In [None]:
extract_phrase_rdd = joined_tokens_rdd.map(extractPhraseFunct)
extract_phrase_rdd.collect()

## Using nltk Vader for sentiments

In [12]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer
nltk.download('vader_lexicon')
def sentimentWordsFunct(x):
    analyzer = SentimentIntensityAnalyzer() 
    senti_list_temp = []
    for i in x:
        y = ''.join(i) 
        vs = analyzer.polarity_scores(y)
        senti_list_temp.append((y, vs))
        senti_list_temp = [w for w in senti_list_temp if w]
    sentiment_list  = []
    for j in senti_list_temp:
        first = j[0]
        second = j[1]
    
        for (k,v) in second.items():
        
        
            if k == 'compound':
                if v < 0.0:
                    sentiment_list.append((first, "Negative"))
                elif v == 0.0:
                    sentiment_list.append((first, "Neutral"))
                else:
                    sentiment_list.append((first, "Positive"))
    return sentiment_list
snt_rdd = joined_tokens_rdd.map(sentimentWordsFunct)
snt_rdd.collect()
#sentiment_rdd = extract_phrase_rdd.map(sentimentWordsFunct)
#sentiment_rdd.collect()

[nltk_data] Downloading package vader_lexicon to
[nltk_data]     /Users/salihamehboob/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 3.0 failed 1 times, most recent failure: Lost task 2.0 in stage 3.0 (TID 11, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 871049 ms
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [14]:
snt_rdd = joined_tokens_rdd.map(sentimentWordsFunct)
snt_rdd.collect()

NameError: name 'sentimentWordsFunct' is not defined

In [63]:
import pyspark.sql.functions as F
sentiments = sentiment_rdd.toDF()
sentiments = sentiments.select(F.col("_1._1").alias("comments"), F.col("_1._2").alias("sentiment"))
sentiments.printSchema()
sentiments.count()

root
 |-- comments: string (nullable = true)
 |-- sentiment: string (nullable = true)



Py4JJavaError: An error occurred while calling o1071.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 41.0 failed 1 times, most recent failure: Lost task 2.0 in stage 41.0 (TID 108, localhost, executor driver): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 1 fields are required while 0 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15$$anonfun$apply$15.applyOrElse(EvaluatePython.scala:184)
	at org.apache.spark.sql.execution.python.EvaluatePython$.org$apache$spark$sql$execution$python$EvaluatePython$$nullSafeConvert(EvaluatePython.scala:208)
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15.apply(EvaluatePython.scala:180)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:751)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:751)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2836)
	at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2835)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.count(Dataset.scala:2835)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 1 fields are required while 0 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15$$anonfun$apply$15.applyOrElse(EvaluatePython.scala:184)
	at org.apache.spark.sql.execution.python.EvaluatePython$.org$apache$spark$sql$execution$python$EvaluatePython$$nullSafeConvert(EvaluatePython.scala:208)
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15.apply(EvaluatePython.scala:180)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:751)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:751)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [59]:
df1 = df.withColumn("id", F.monotonically_increasing_id())

df2 = sentiments.withColumn("id", F.monotonically_increasing_id())


df3 = df2.join(df1, "id", "outer").drop("id")

df3.show()

Py4JJavaError: An error occurred while calling o826.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 34.0 failed 1 times, most recent failure: Lost task 2.0 in stage 34.0 (TID 97, localhost, executor driver): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 1 fields are required while 0 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15$$anonfun$apply$15.applyOrElse(EvaluatePython.scala:184)
	at org.apache.spark.sql.execution.python.EvaluatePython$.org$apache$spark$sql$execution$python$EvaluatePython$$nullSafeConvert(EvaluatePython.scala:208)
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15.apply(EvaluatePython.scala:180)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:751)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:751)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 1 fields are required while 0 values are provided.
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15$$anonfun$apply$15.applyOrElse(EvaluatePython.scala:184)
	at org.apache.spark.sql.execution.python.EvaluatePython$.org$apache$spark$sql$execution$python$EvaluatePython$$nullSafeConvert(EvaluatePython.scala:208)
	at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$makeFromJava$15.apply(EvaluatePython.scala:180)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:751)
	at org.apache.spark.sql.SparkSession$$anonfun$6$$anonfun$apply$5.apply(SparkSession.scala:751)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
import spark.implicits._

In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
    .master('yarn') # depends on the cluster manager of your choice
    .appName('StackOverflow')
    .config('spark.driver.extraClassPath', '/usr/local/bin/postgresql-42.2.5.jar')
    .config('spark.executor.instances', 4)
    .config('spark.executor.cores', 4)
    .config('spark.executor.memory', '10g')
    .config('spark.driver.memory', '15g')
    .config('spark.memory.offHeap.enabled', True)
    .config('spark.memory.offHeap.size', '20g')
    .config('spark.dirver.maxResultSize', '4096') 
)

In [None]:
df1 = df1.withColumn("id", monotonically_increasing_id())

df2 = df2.withColumn("id", monotonically_increasing_id())

df1.show()

df2.show()

df3 = df2.join(df1, "id", "outer").drop("id")

df3.show()