# Project 3: Airbnb
**This is the third of three mandatory projects to be handed in as part of the assessment for the course 02807 Computational Tools for Data Science at Technical University of Denmark, autumn 2019.**

#### Practical info
- **The project is to be done in groups of at most 3 students**
- **Each group has to hand in _one_ Jupyter notebook (this notebook) with their solution**
- **The hand-in of the notebook is due 2019-12-05, 23:59 on DTU Inside**

#### Your solution
- **Your solution should be in Python/PySpark**
- **For each question you may use as many cells for your solution as you like**
- **You should not remove the problem statements**
- **Your notebook should be runnable, i.e., clicking [>>] in Jupyter should generate the result that you want to be assessed**
- **You are not expected to use machine learning to solve any of the exercises**

# Introduction
[Airbnb](http://airbnb.com) is an online marketplace for arranging or offering lodgings. In this project you will use Spark to analyze data obtained from the Airbnb website. The purpose of the analysis is to extract information about trends and patterns from the data.

The project has two parts.

### Part 1: Loading, describing and preparing the data
There's quite a lot of data. Make sure that you can load and correctly parse the data, and that you understand what the dataset contains. You should also prepare the data for the analysis in part two. This means cleaning it and staging it so that subsequent queries are fast.

### Par 2: Analysis
In this part your goal is to learn about trends and usage patterns from the data. You should give solutions to the tasks defined in this notebook, and you should use Spark to do the data processing. You may use other libraries like for instance Pandas and matplotlib for visualisation.

## Guidelines
- Processing data should be done using Spark. Once data has been reduced to aggregate form, you may use collect to extract it into Python for visualisation.
- Your solutions will be evaluated by correctness, code quality and interpretability of the output. This means that you have to write clean and efficient Spark code that will generate sensible execution plans, and that the tables and visualisations that you produce are meaningful and easy to read.
- You may add more cells for your solutions, but you should not modify the notebook otherwise.

### Create Spark session and define imports

In [1]:
from pyspark.sql import *
from pyspark.sql import functions as f
from pyspark.sql.types import *

spark = SparkSession.builder.appName("SparkIntro").getOrCreate()

# Part 1: Loading, describing and preparing the data
The data comes in two files. Start by downloading the files and putting them in your `data/` folder.

- [Listings](https://files.dtu.dk/u/siPzAasj8w2gI_ME/listings.csv?l) (5 GB)
- [Reviews](https://files.dtu.dk/u/k3oaPYp6GjKBeho4/reviews.csv?l) (9.5 GB)

### Load the data
The data has multiline rows (rows that span multiple lines in the file). To correctly parse these you should use the `multiline` option and set the `escape` character to be `"`.

In [3]:
import itertools
import csv
import pandas as pd
import pyspark.sql.functions as f
from functools import reduce
from pyspark.sql.types import *
from pyspark.sql.functions import substring, length, col, expr

##### Slicing the data

In [None]:


entries = []
with open('/home/jovyan/work/Project3/listings.csv', 'r') as f:
    mycsv = csv.reader(f)
    for row in itertools.islice(mycsv, 10000):
        entries.append(row)

columns_ = entries[0]
df = pd.DataFrame(entries,columns=columns_)
df = df.iloc[1:-1,:]
df.head()
#df.to_csv("listings_sub_10000.csv")

##### Reading into spark

In [None]:
df = spark.read.option("delimiter",",").option("multiline",True).option("header", True).option("escape",'"').option("inferSchema",True).csv('listings_sub_10000.csv')


In [None]:
df.show()

In [None]:
df.select("listing_url").show()

### Describe the data
List the features (schema) and sizes of the datasets.

In [None]:
df.printSchema()

In [None]:
print("rows: ", df.count(), "columns: ", len(df.columns))

In [None]:
#Removing all rows which has null
df.where(reduce(lambda x, y: x & y,  (f.col(x).isNotNull() for x in df.columns))).count()

In [None]:
#Removing all rows which has null
df.dropna(how='any').count()

### Prepare the data for analysis
You should prepare two dataframes to be used in the analysis part of the project. You should not be concerned with cleaning the data. There's a lot of it, so it will be sufficient to drop rows that have bad values. You may want to go back and refine this step at a later point when doing the analysis.

You may also want to consider if you can stage your data so that subsequent processing is more efficient (this is not strictly necessary for Spark to run, but you may be able to decrease the time you sit around waiting for Spark to finish things)

# Part 2: Analysis
Use Spark and your favorite tool for data visualization to solve the following tasks.

## The basics
Compute and show a dataframe with the number of listings and neighbourhoods per city.

Based on the table above, you should choose a city that you want to continue your analysis for. The city should have mulitple neighbourhoods with listings in them.

Compute and visualize the number of listings of different property types per neighbourhood in your city.

## Prices
Compute the minimum, maximum and average listing price in your city. 

In [None]:
city = "Stockholm"

In [None]:
df_s = df.filter(f.col("city")==city)

In [None]:
df_s.count()

In [None]:
#df_s.price = df_s.price.cast('float')

In [None]:
from pyspark.sql.types import *

In [None]:
from pyspark.sql.functions import substring, length, col, expr
df_s = df_s.withColumn("price",expr("substring(price, 2, length(price))"))

In [None]:
from  pyspark.sql.functions import regexp_replace,col

In [None]:
df_s = df_s.withColumn("price", regexp_replace(f.col("price"), ",", ""))

In [None]:
#df_s.show()

In [None]:
#df_s.select(df_s.price.cast("float").alias('price_new')).collect()

In [None]:
from pyspark.sql.types import *
df_d = df_s.withColumn("price",df_s["price"].cast(DoubleType()))

In [None]:
#df_d.show()

In [None]:
print("Basic statistics for", city)
df_d.select(f.avg('price').alias("Averge price"), f.max('price').alias("Max price"), f.min('price').alias("Min price")).show()

Compute and visualize the distribution of listing prices in your city.

In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy import stats
sns.set(rc={'figure.figsize':(16,6)})

In [None]:
df_city_p = df_d.toPandas()

In [None]:
sns.distplot(df_city_p.price, kde=False, rug=False, bins=150);

The value of a listing is its rating divided by its price.

Compute and show a dataframe with the 3 highest valued listings in each neighbourhood.

In [None]:
from pyspark.sql.functions import lit

In [None]:
df_d = df_d.withColumn("review_scores_rating",df_s["review_scores_rating"].cast(DoubleType()))

In [None]:
df_d = df_d.withColumn("value_listing",lit(f.col("review_scores_rating")/f.col("price")))

In [None]:
df_value = df_d.select(f.col("city"),f.col("neighbourhood"),(f.col("value_listing")))

In [None]:
#df_value.orderBy(df_value.value_listing.desc()).show()

In [None]:
sorted_by_value = Window.partitionBy('neighbourhood').orderBy(f.desc('value_listing'))

In [None]:
ranked_by_value = df_d.withColumn('value_listing_rank', f.rank().over(sorted_by_value))

In [None]:
ranked_df = ranked_by_value.filter(f.col('value_listing_rank') <= 3).drop('value_listing_rank').orderBy('neighbourhood', f.desc('value_listing'))

In [None]:
ranked_df.select(f.col("neighbourhood"),f.col("value_listing"),f.col("city"),f.col("name")).show(50)

## Trends
Now we want to analyze the "popularity" of your city. The data does not contain the number of bookings per listing, but we have a large number of reviews, and we will assume that this is a good indicator of activity on listings.

Compute and visualize the popularity (i.e., number of reviews) of your city over time.

Compute and visualize the popularity of neighbourhoods over time. If there are many neighbourhoods in your city, you should select a few interesting ones for comparison.

Compute and visualize the popularity of your city by season. For example, visualize the popularity of your city per month.

## Reviews
In this part you should determine which words used in reviews that are the most positive. 

The individual reviews do not have a rating of the listing, so we will assume that each review gave the average rating to the listing, i.e., the one on the listing.

You should assign a positivity weight to each word seen in reviews and list the words with the highest weight. It is up to you to decide what the weight should be. For example, it can be a function of the rating on the listing on which it occurs, the number of reviews it occurs in, and the number of unique listings for which it was used to review.

Depending on your choice of weight function, you may also want to do some filtering of words. For example, remove words that only occur in a few reviews.

In [None]:
import itertools
import csv

entries = []
with open('../Project3/reviews.csv', 'r') as f:
    mycsv = csv.reader(f)
    for row in itertools.islice(mycsv, 10000):
        entries.append(row)
        
import pandas as pd
columns_ = entries[0]
df = pd.DataFrame(entries,columns=columns_)
df = df.iloc[1:-1,:]
#df.head()
#df.to_csv("reviews_sub_10000.csv")

In [3]:
import pandas as pd

In [2]:
r = pd.read_csv("reviews_sub_10000.csv")
l = pd.read_csv("listings_sub_10000.csv")

NameError: name 'pd' is not defined

In [None]:
r.head()

In [None]:
l.head()

From the listings we need to:
- calculate the average rating for each ID
- join the average rating on to the ratings

In [89]:
dfl = spark.read.option("delimiter",",").option("multiline",True).option("header", True).option("escape",'"').option("inferSchema",True).csv('listings_sub_10000.csv')

In [90]:
dfr = spark.read.option("delimiter",",").option("multiline",True).option("header", True).option("escape",'"').option("inferSchema",True).csv('reviews_sub_10000.csv')

In [91]:
dfr.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- listing_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)



In [92]:
from pyspark.sql.functions import col, lower, regexp_replace, split

def clean_text(c):
    c = lower(c)
    c = regexp_replace(c, "(https?\://)\S+", "")
    c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
    return c

In [93]:
dfr = dfr.withColumn("text_clean", clean_text(col("comments")))

In [94]:
dfr.select(f.col("text_clean")).show()

+--------------------+
|          text_clean|
+--------------------+
|prima plek om sto...|
|cosy and clean fl...|
|the host canceled...|
|kims place was ou...|
|great spacious ap...|
|kim is a very fri...|
|the apartment is ...|
|nicely appointed ...|
|it was a pleasure...|
|kims place is sim...|
|we absolutely lov...|
|amazing place to ...|
|very pleasant sta...|
|we had a wonderfu...|
|great location co...|
|lovely flat in a ...|
|kims flat is in a...|
|i would definitel...|
|kims place is per...|
|the flat is so co...|
+--------------------+
only showing top 20 rows



In [95]:
#mean_scores = dfl.groupBy("id").agg(f.mean("review_scores_rating").alias("average_rating"))
#mean_scores = mean_scores.select(f.col("id").alias("id_"), f.col("average_rating"))

In [96]:
dfl = dfl.select(f.col("id").alias("id_"), f.col("review_scores_rating").alias("average_rating"))

In [97]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)

In [98]:
dfl.show()

+------+--------------+
|   id_|average_rating|
+------+--------------+
|145320|            97|
|155220|          null|
|155685|            94|
|164448|            97|
|170651|            93|
|206221|            98|
|220233|          null|
|220851|            93|
|242188|            97|
|259025|            89|
|259722|            92|
|273906|           100|
|274876|            99|
|278311|            92|
|283446|            97|
|299154|            92|
|301680|            92|
|302496|            99|
|313195|            96|
|313763|            98|
+------+--------------+
only showing top 20 rows



In [99]:
dfr = dfr.join(dfl, (f.col('listing_id') == f.col('id_')), 'inner').drop('id_')

In [100]:
dfr.show()

+----+----------+--------+-------------------+-----------+------------------+--------------------+--------------------+--------------+
| _c0|listing_id|      id|               date|reviewer_id|     reviewer_name|            comments|          text_clean|average_rating|
+----+----------+--------+-------------------+-----------+------------------+--------------------+--------------------+--------------+
|9766|   1271843| 8513212|2013-11-03 00:00:00|    4641577|            Pamela|André was my firs...|andr was my first...|            99|
|9767|   1271843| 9043505|2013-12-02 00:00:00|    6890450|              Emma|Andre was the bes...|andre was the bes...|            99|
|9768|   1271843| 9533868|2014-01-01 00:00:00|    9496791|   Julie And Chris|André was a fanta...|andr was a fantas...|            99|
|9769|   1271843|10049736|2014-01-27 00:00:00|     940929|             Denis|It was a right ch...|it was a right ch...|            99|
|9770|   1271843|10324840|2014-02-13 00:00:00|   114540

In [101]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer

tokenizer = Tokenizer(inputCol="text_clean", outputCol="words")

In [102]:
dfr = tokenizer.transform(dfr)
#tokenized.select("comments", "words")\
    #.withColumn("tokens", countTokens(col("words"))).show(truncate=False)

In [103]:
dfr.show()

+----+----------+--------+-------------------+-----------+------------------+--------------------+--------------------+--------------+--------------------+
| _c0|listing_id|      id|               date|reviewer_id|     reviewer_name|            comments|          text_clean|average_rating|               words|
+----+----------+--------+-------------------+-----------+------------------+--------------------+--------------------+--------------+--------------------+
|9766|   1271843| 8513212|2013-11-03 00:00:00|    4641577|            Pamela|André was my firs...|andr was my first...|            99|[andr, was, my, f...|
|9767|   1271843| 9043505|2013-12-02 00:00:00|    6890450|              Emma|Andre was the bes...|andre was the bes...|            99|[andre, was, the,...|
|9768|   1271843| 9533868|2014-01-01 00:00:00|    9496791|   Julie And Chris|André was a fanta...|andr was a fantas...|            99|[andr, was, a, fa...|
|9769|   1271843|10049736|2014-01-27 00:00:00|     940929|      

In [104]:
from pyspark.ml.feature import StopWordsRemover

# Define a list of stop words or use default list
remover = StopWordsRemover()
stopwords = remover.getStopWords() 

In [109]:
# Specify input/output columns
remover.setInputCol("words")
remover.setOutputCol("comments_wo_stopwords")

# Transform existing dataframe with the StopWordsRemover
dfr = remover.transform(dfr)

In [110]:
dfr.show()

+----+----------+--------+-------------------+-----------+------------------+--------------------+--------------------+--------------+--------------------+---------------------+
| _c0|listing_id|      id|               date|reviewer_id|     reviewer_name|            comments|          text_clean|average_rating|               words|comments_wo_stopwords|
+----+----------+--------+-------------------+-----------+------------------+--------------------+--------------------+--------------+--------------------+---------------------+
|9766|   1271843| 8513212|2013-11-03 00:00:00|    4641577|            Pamela|André was my firs...|andr was my first...|            99|[andr, was, my, f...| [andr, first, hos...|
|9767|   1271843| 9043505|2013-12-02 00:00:00|    6890450|              Emma|Andre was the bes...|andre was the bes...|            99|[andre, was, the,...| [andre, best, hos...|
|9768|   1271843| 9533868|2014-01-01 00:00:00|    9496791|   Julie And Chris|André was a fanta...|andr was a f

In [16]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="words", outputCol="features")

model = cv.fit(tokenized)

result = model.transform(tokenized)
result.show(truncate=False)

+----+----------+--------+-------------------+-----------+------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------+-----------------------------------------------------------------------------------------------------------------------------------------

In [17]:
result.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- listing_id: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- reviewer_id: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)



#### -----

In [22]:
daa = spark.createDataFrame([([1, 2], {"key": "value"})], ["l", "d"])

In [23]:
daa.show()

+------+--------------+
|     l|             d|
+------+--------------+
|[1, 2]|[key -> value]|
+------+--------------+



In [24]:
daa.printSchema()

root
 |-- l: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- d: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [28]:
daa.select(daa.l.getItem(0)).show()

+----+
|l[0]|
+----+
|   1|
+----+



#### -----

In [None]:
from pyspark.sql.functions import col,when
orderitems.withColumn("listing_id",
          when(col("order_item_subtotal") != (col("order_item_product_price") * col("order_item_quantity")),"N")
          .otherwise("Y")).show()

In [None]:
orderitems.withColumn("valid",
          when(col("order_item_subtotal") != (col("order_item_product_price") * col("order_item_quantity")),"N")
          .otherwise("Y")).show()

In [42]:
from pyspark.sql.functions import col,when

In [45]:
result.withColumn("listing_id",when(col("reviewer_id") != (col("features") * col("average_rating")),"N")\
                  .otherwise("Y")).show()

AnalysisException: "cannot resolve '(`features` * CAST(`average_rating` AS DOUBLE))' due to data type mismatch: differing types in '(`features` * CAST(`average_rating` AS DOUBLE))' (struct<type:tinyint,size:int,indices:array<int>,values:array<double>> and double).;;\n'Project [_c0#214, CASE WHEN NOT (reviewer_id#218 = (features#329 * cast(average_rating#232 as double))) THEN N ELSE Y END AS listing_id#444, id#216, date#217, reviewer_id#218, reviewer_name#219, comments#220, average_rating#232, words#313, features#329]\n+- Project [_c0#214, listing_id#215, id#216, date#217, reviewer_id#218, reviewer_name#219, comments#220, average_rating#232, words#313, UDF(words#313) AS features#329]\n   +- Project [_c0#214, listing_id#215, id#216, date#217, reviewer_id#218, reviewer_name#219, comments#220, average_rating#232, UDF(comments#220) AS words#313]\n      +- Project [_c0#214, listing_id#215, id#216, date#217, reviewer_id#218, reviewer_name#219, comments#220, average_rating#232]\n         +- Join Inner, (listing_id#215 = cast(id_#231 as int))\n            :- Relation[_c0#214,listing_id#215,id#216,date#217,reviewer_id#218,reviewer_name#219,comments#220] csv\n            +- Project [id#1 AS id_#231, review_scores_rating#87 AS average_rating#232]\n               +- Relation[_c0#0,id#1,listing_url#2,scrape_id#3,last_scraped#4,name#5,summary#6,space#7,description#8,experiences_offered#9,neighborhood_overview#10,notes#11,transit#12,access#13,interaction#14,house_rules#15,thumbnail_url#16,medium_url#17,picture_url#18,xl_picture_url#19,host_id#20,host_url#21,host_name#22,host_since#23,... 83 more fields] csv\n"

In [29]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

In [32]:
firstelement=udf(lambda x, y: x*y,FloatType())

In [34]:
def col_mul(col1,col2):
    return col1*col2

In [35]:
mul = udf(col_mul)

In [39]:
res = result.withColumn(mul(f.col("features"),f.col('average_rating')))

TypeError: withColumn() missing 1 required positional argument: 'col'

In [33]:
result.select(firstelement('features','average_rating')).show()

Py4JJavaError: An error occurred while calling o541.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 33.0 failed 1 times, most recent failure: Lost task 0.0 in stage 33.0 (TID 627, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-32-280f59bb0cab>", line 1, in <lambda>
TypeError: can't multiply sequence by non-int of type 'SparseVector'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3383)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3364)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3363)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2544)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2758)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 345, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 334, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in <lambda>
    return lambda *a: f(*a)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-32-280f59bb0cab>", line 1, in <lambda>
TypeError: can't multiply sequence by non-int of type 'SparseVector'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [27]:
result.select(result.features[0]).show()

AnalysisException: "Can't extract value from features#329: need struct type but got struct<type:tinyint,size:int,indices:array<int>,values:array<double>>;"

In [18]:
w = result.select(f.col("features"))

In [19]:
w.show()

+--------------------+
|            features|
+--------------------+
|(36166,[0,1,2,3,4...|
|(36166,[0,1,2,3,4...|
|(36166,[0,1,2,3,5...|
|(36166,[0,1,2,3,4...|
|(36166,[0,1,2,3,5...|
|(36166,[0,1,2,6,7...|
|(36166,[0,1,2,3,4...|
|(36166,[0,1,3,5,6...|
|(36166,[0,2,3,5,7...|
|(36166,[4,6,9,31,...|
|(36166,[0,1,3,5,6...|
|(36166,[0,1,2,3,4...|
|(36166,[0,1,2,3,4...|
|(36166,[0,1,3,4,5...|
|(36166,[0,1,3,5,9...|
|(36166,[0,1,2,3,5...|
|(36166,[0,1,2,3,4...|
|(36166,[0,1,2,3,4...|
|(36166,[0,1,2,3,6...|
|(36166,[1,2,3,5,6...|
+--------------------+
only showing top 20 rows



In [49]:
#w = tokenized.select(f.col("words"))

In [57]:
from pyspark.mllib.linalg.distributed import *

In [67]:
from pyspark.mllib.linalg.distributed import *

def as_block_matrix(rdd, rowsPerBlock=1024, colsPerBlock=1024):
    return IndexedRowMatrix(
        rdd.zipWithIndex().map(lambda xi: IndexedRow(xi[1], xi[0]))
    ).toBlockMatrix(rowsPerBlock, colsPerBlock)

#as_block_matrix(rows_1).multiply(as_block_matrix(rows_2))

In [84]:
[w*result.select(f.col("")) list(w.collect()[0][0])

SyntaxError: invalid syntax (<ipython-input-84-c6b4ffdb9a0d>, line 1)

In [73]:
result.withColumn("words",f.col("average_rating")*f.col("features")[0]).show()

AnalysisException: "Can't extract value from features#1542: need struct type but got struct<type:tinyint,size:int,indices:array<int>,values:array<double>>;"

In [None]:
def extract_title(name):
    name_parts = name.split(',')name.split(',')
    title_and_first = name_parts[1].split('.')
    return title_and_first[0]

extract_title_udf = f.udf(extract_title)

df.withColumn('Title', extract_title_udf(f.col('Name'))).select('Name', 'Title').show()

A simple way to assess how positive each word is:
- for every review: tokenize sentence
- create a bag of words --> a document-term matrix
- create a dictionary, containing every word from the reviews and create an aggregated score on each word. The aggreated word review could be as simple as:
    - adding all the average reviews and dividing with the total number of reviews it has been a part of (to account for non-frequent words) 
    - creating a much simpler score, where bad reviews= -1, middle= 0, high= 1. These simple metrics can then be added up and normalised again. 

In [None]:
dfl.groupBy('country').agg(f.sum('review_scores_rating')).filter(f.col('country') == 'Sweden').explain()

In [None]:
dfl.filter(f.col('country') == 'Sweden').groupBy('country').agg(f.sum('review_scores_rating')).explain()