# 02807: Project 2
 
## Practical information
 
* This project must be completed in groups of 3 students.
    * The group must be registered on the course site on DTU Learn: My Course > Groups
    * Groups must be registered anew (even if you already registered for Project 1)
* This project must be handed in as a jupyter notebook to the course site on DTU Learn. 
    * Go to the Course Content > Assignments tab to upload your submission. 
* This project is due on Monday, November 29, 20:00.

## Submission rules

* Each group has to hand in *one* notebook (`.ipynb`) with their solutions, including a filled out Contribution table (see below).
* Your solution must be written in Python.
* For each question you should use the cells provided ("`# your code goes here`" and "*your explanation here*") for your solution
    * It is allowed to add code cells within a question block, but consider if it's really necessary.
* You should not remove the problem statements, and you should not modify the structure of the notebook.
* Your notebook should be runnable and readable from top to bottom.
    * Meaning that your code cells work when run in order (from top to bottom).
    * Output of any cell depends only on itself and cells above it.
* Your notebook should be submitted after having been run from top to bottom.
    * This means outputs are interpretable without necessarily running your cells.
    * The simplest way to achieve this is using the jupyter menu item Kernel > Restart & Run All just prior to submission. If any cell fails when you do this, your notebook is not ready for submission.
    * Exercise 3 in particular will take time to finish, plan accordingly, that is, make sure you have time to run your notebook from top to bottom.
* Failure to comply may make it impossible for us to evaluate your submission properly, which will likely negatively impact the points awarded.

## Solution guidelines
* Data processing is via Spark for the first three exercises and pandas/SQL in the fourth exercise.
* Where naming of dataframes and functions are explicitly stated, these must be used.
* Your solutions will be evaluated by correctness, code quality and interpretability of the output. 
    * You have to write clean, readable and efficient Spark code that will generate sensible execution plans.
    * You have to write clean, readable and efficient SQL queries.
    * Your tables and visualisations should be meaningful and easy to read. This requires, but is not limited to, including headers, legends and well-written (brief) descriptions for graphs/charts. In this step you've found the data processing solution, so put also some effort into its presentation.

## Colaboration policy
 
* It is not allowed to collaborate on the exercises with students outside your group, except for discussing the text of the exercise with teachers and fellow students enrolled on the course in the same semester. 
* It is not allowed to exchange, hand-over or in any other way communicate solutions or parts of solutions to the exercises. 
* It is not allowed to use solutions from similar courses, or solutions found elsewhere.

## Contribution table and grading

* The total amount of points in the project is 150.
* You have to indicate who has solved each part of each exercise in a **contribution table**. 
* A group member can take credit for solving a part of an exercise only if they have contributed **substantially** to the solution. 
    * Simple contributions, such as correcting a small bug or double-checking the results of functions, are not sufficient for taking credit for a solution.
    * Several group members can take credit for the same solution if they all have contributed substantially to it.
* Each group member must contribute **at least 65 points**. 
    * If no name is provided for an exercise's part, **all group members** are considered contributors to it.
* Group members should decide amongst themselves how to collaborate on the project to meet these constraints.  
* Scores are individual. The score $\text{score}(m)$ for a group member $m$ ranges from 0 to 10 and is calculated as follows: 

  * $\text{individual-score}(m) = \frac{\text{total number of points for the parts correctly solved by }m}{\text{total number of points for the parts contributed by }m}$

  * $\text{group-score} = \frac{\text{total number of points correctly solved by any group member}}{\text{total number of points in the project}}$

  * $\text{score}(m) =  7.5 \cdot \text{individual-score}(m) + 2.5 \cdot \text{group-score}$
  
  
* The following is an example of a contributions table:

|        | Exercise 1 | Exercise 2 | Exercise 3 | Exercise 4 |
|--------|------------|------------|------------|------------|
| **Part 1** | John       |    Mary        |     Ann       |   Mary, Ann         |
| **Part 2** |     Mary       |    Mary        |   Ann         |    John, Ann        |
| **Part 3** |     John, Mary, Ann       |      John, Ann      |   John         | John      |
| **Part 4** | Ann       |  Ann          |     John, Mary       | John       |
| **Part 5** | **n.a.**     | John, Mary, Ann           | **n.a.**       | **n.a.**       |


* **Example**: in the contribution table above, suppose that all parts are solved correctly except for those of Exercise 4 which are all wrong. Then Ann's score is calculated as follows:

  * $\text{individual-score}(Ann) = \frac{5+5+10+5+5+15+15}{5+5+10+5+5+15+15+15+5} = \frac{60}{80} = 0.75$

  * $\text{group-score} = \frac{95}{150} = 0.633$

  * $\text{score}(Ann) = 7.5\cdot 0.75 + 2.5 \cdot 0.633 = 7.21$


# Group contribution table 

This table must be filled before submission.



In [None]:
import pandas as pd

d = {'Exercise 1' : ['', '', '', '', 'n.a'], 
     'Exercise 2' : ['', '', '', '', ''],
     'Exercise 3' : ['', '', '', '', 'n.a'],
     'Exercise 4' : ['', '', '', '', 'n.a'],
     } 
  
ct = pd.DataFrame(d, index=['Part 1', 'Part 2', 'Part 3', 'Part 4', 'Part 5']) 

ct

# The AirBnB dataset

<img src="https://www.esquireme.com/public/images/2019/11/03/airbnb-678x381.jpg" alt="airbnb" width="400"/>

[Airbnb](http://airbnb.com) is an online marketplace for arranging or offering lodgings. In the first three exercises you will use Spark to analyze data obtained from the Airbnb website (stricly speaking via data scraped by [insideairbnb](http://insideairbnb.com/get-the-data.html)). The purpose of your analysis is to extract insights about listings as a whole, specifics about London, and sentiment analysis of reviews (word positivity).


## Loading data
The dataset consists of listings (offered lodgings) and reviews (submitted by users). The `.csv`'s you'll work with vary between the first three exercises, but is structured so that the function below will load it into a spark dataframe.

In [1]:
def load_csv_as_dataframe(path):
    return spark.read.option('header', True) \
                .option('inferSchema', True) \
                .option('multiLine', 'True') \
                .option('escape', '"') \
                .option('mode', 'DROPMALFORMED')\
                .csv(path)

## Imports and Spark session

* You'll need to adapt the `JAVA_HOME` environment variable to your setup. 
* You should set the `spark.driver.memory` value to the amount of memory on your machine. 
* It may be required for you to install some of the packages imported below (e.g. pandasql).

In [2]:
# Instructions on p. 20 Learning Spark, 2nd ed.
# Here's a quick-guide, googling may also be required
# 1) Install pyspark via conda/pip
#          pyspark requires the JAVA_HOME environment variable is set.
# 2) Install JDK 8 or 11, figure out the install location
#          Suggest to use https://adoptopenjdk.net/
# 3) Update the JAVA_HOME environment variable set programmatically below 
#    with your install location specifics

# JAVA_HOME environment variable is set programatically below
# but you must point it to your local install

import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

# If you get "Job aborted due to stage failure" and 
# "Python worker failed to connect back." exceptions, 
# this should be solved by additionally setting these 
# environment variables

# os.environ['PYSPARK_PYTHON'] = 'python'
# os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
# os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'

In [3]:
import numpy as np
import pandas as pd
import seaborn as sns
import pyspark
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark import SparkContext, SparkConf
from pandas_profiling import ProfileReport
from pyspark.sql.functions import *
from pyspark.sql.functions import countDistinct
%matplotlib inline
sns.set()

import pandasql as psql

In [4]:
# Sets memory limit on driver and to use all CPU cores
conf = SparkConf().set('spark.ui.port', '4050') \
        .set('spark.driver.memory', '4g') \
        .setMaster('local[*]')

sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

21/11/29 18:08:09 WARN Utils: Your hostname, T-XPS-15-9500 resolves to a loopback address: 127.0.1.1; using 10.209.225.210 instead (on interface wlp0s20f3)
21/11/29 18:08:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/11/29 18:08:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
spark

In [6]:
sc.getConf().getAll()

[('spark.app.startTime', '1638205690399'),
 ('spark.driver.host', 't-xps-15-9500.students.clients.local'),
 ('spark.driver.memory', '4g'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.ui.port', '4050'),
 ('spark.sql.warehouse.dir',
  'file:/home/thomas/jupyter/ComputationalToolsAss2/spark-warehouse'),
 ('spark.rdd.compress', 'True'),
 ('spark.app.id', 'local-1638205690955'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '38121')]

# Exercise 1: Listings and cities (20 pts)

In this exercise you must use Spark to do the data processing. 
* For parts where you present tabular data, this entails calling `toPandas` as the final step of your query. 
* For parts requiring visualisation, the `toPandas` call should be followed only by functions necessary to customize the plotting/layout steps (i.e. no data processing take place after your spark dataframe is materialized).


## Part 1: Preparing the dataframe (5 pts)

Your data source is [this zip archive](https://data-download.compute.dtu.dk/c02807/listings.csv.zip) which you must uncompress and place in the same folder as this notebook. It is loaded in the next cell and named `df_listings`.

After the data is read, you should select the columns necessary for exercise 1, 2 and 3 (by reading ahead or iteratively extend this loading code). Name this dataframe `df_listings_analysis` and make use of caching.

Prices are in local currency, but are nonetheless prefixed with `$` and contains thousands separator commas. You will need to remove these characters and cast the price column to `pyspark.sql.types.DoubleType`. Observe that if this casting is not possible, the result of the cast is `null`.

In [7]:
df_listings = load_csv_as_dataframe('listings.csv')


AnalysisException: Path does not exist: file:/home/thomas/jupyter/ComputationalToolsAss2/listings.csv

In [None]:
cols = ['id', 'city','neighbourhood_cleansed', 'price', 'review_scores_rating', 'property_type']
df_listings_analysis = df_listings.select(*cols).cache()

In [None]:
df_listings.columns

In [None]:
df_listings_analysis = df_listings_analysis.withColumn('price', regexp_replace(col("price"), "[\$,]", ""))
df_listings_analysis = df_listings_analysis.withColumn("price",df_listings_analysis.price.cast('double'))

In [None]:
df_listings_analysis.limit(10).toPandas()

## Part 2: Listing and neighbourhood counts (5 pts)

Compute and visualise the number of listings and the number of different neighbourhoods per city, restricted to the 15 cities having the most listings. The x-axis should be ordered by number of listings (high to low).

Make sure to use the `neighbourhood_cleansed` column in your computations.


In [None]:
# your code goes here
df_pd = df_listings_analysis.groupBy("city") \
    .agg(F.count('city').alias('listings'), F.countDistinct('neighbourhood_cleansed').alias('distinct_neighbourhood')) \
    .orderBy('listings', ascending=False).limit(15).toPandas()\
    #.plot(kind="bar", x='city', y=['listings','distinct_neighbourhood'])

In [None]:
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(13,5))
_ = df_pd.plot(ax=axes[0], kind="bar", x='city', y='listings')
_ =df_pd.plot(ax=axes[1], kind="bar", x='city', y='distinct_neighbourhood')

## Part 3: Price averages (5 pts)

Compute and visualise the average price of listings per city, restricted to the 15 cities having the most listings.

In [None]:
# your code goes here
_ = df_listings_analysis.groupBy("city") \
    .agg(F.avg('price').alias('avgPrice'), F.count('city').alias('listings'))\
    .orderBy('listings', ascending=False).limit(15) \
    .toPandas().plot(kind="bar", x='city', y='avgPrice')

## Part 4: Value for money (5 pts)

The value of a listing is its rating divided by its price. The value of a city is the average value of its listings. 

Prices are only comparable when the local currency is the same. We'll therefore consider a subset of Euro-zone cities as defined in `eurozone_cities`.

Compute and visualise the value per city, restricted to the Euro-zone cities.

In [None]:
eurozone_cities = [
    'Paris', 'Roma', 'Berlin', 'Madrid', 'Amsterdam', 'Barcelona', 'Milano', 'Lisboa',
    'München', 'Wien', 'Lyon', 'Firenze', 'Porto', 'Napoli', 'Bordeaux', 'Venezia',
    'Málaga', 'Sevilla', 'València'
]

In [None]:
# your code goes here
_ = df_listings_analysis.filter(df_listings_analysis.city.isin(eurozone_cities))\
            .withColumn('value_listing',F.col('review_scores_rating')/F.col('price')) \
            .groupBy('city').agg(F.avg('value_listing').alias('avgRating')).toPandas() \
            .plot(kind="bar", x='city', y='avgRating')

# Exercise 2: The case of London (30 pts)

In this exercise you must use Spark to do the data processing. 
* For parts where you present tabular data, this entails calling `toPandas` as the final step of your query. 
* For parts requiring visualisation, the `toPandas` call should be followed only by functions necessary to customize the plotting/layout steps (i.e. no data processing take place after your spark dataframe is materialized). 
* You may need multiple queries to solve the individual parts.

Your dataframe is a subset of `df_listings_analysis` and should be named `df_listings_london`.

In [None]:
# your code goes here
df_listings_london = df_listings_analysis.filter(df_listings_analysis.city == 'London').cache()

## Part 1: Price distribution (5 pts)

Compute and visualise the distribution of prices, for all prices up to and including the 95-percentile. Additionally, compute and visualise the distribution of prices, for all prices above the 95-percentile.

In [None]:
# your code goes here
fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(13,5))
_ = df_listings_london.filter(df_listings_analysis.price <= df_listings_london.approxQuantile('price',[0.95],0)[0]).toPandas()\
        .plot.hist(ax=axes[0],bins=50)
axes[0].title.set_text('price <= quantile(0.95)')
_ = df_listings_london.filter(df_listings_analysis.price > df_listings_london.approxQuantile('price',[0.95],0)[0]).toPandas()\
        .plot.hist(ax=axes[1],bins=50)
axes[1].title.set_text('price > quantile(0.95)')

## Part 2: Prices by type of property (5 pts)

Compute and visualise the average price and average rating per type of property, for property types with 75 or more listings. 

Your visualisation should be a single bar chart with two y-axes and two bars per property type. The x-axis should be ordered by average rating.

In [None]:
# your code goes here
pandas_temp = df_listings_london.groupBy("property_type")\
    .agg(F.count('property_type').alias('listings'), F.avg('review_scores_rating').alias('avgRating'), F.avg("price").alias("avgPrice")) \
    .orderBy('listings', ascending=False).limit(75).orderBy("avgRating",ascending=False ).toPandas()

In [None]:
fig,ax = plt.subplots()
_= pandas_temp.plot(ax= ax , kind="bar", x='property_type', y='avgRating')
ax2 = ax.twinx()
_ = pandas_temp.plot(ax= ax2 , kind="bar", x='property_type', color="red", alpha=.5, y='avgPrice')
ax2.title.set_text('average price and average rating, left y-xis = avgRating, right y-axis = avgPrice')

## Part 3: Best offering in the neighbourhood (10 pts)

The value of a listing is its rating divided by its price. Compute and display a dataframe (with the columns you selected in Exercise 1 and those computed in this part) with the 3 highest valued listings in each neighbourhood, and having a value above 5. Make sure to use the `neighbourhood_cleansed` column in your computations.

Computing ranks based on value can be achieved using `pyspark.sql.window.Window`. This may produce equal ranks (i.e. when the value of two listings are the same).

Remember to use `pd.set_option('display.max_rows', <n>)` with appropriate `<n>` so all rows are displayed.

In [None]:
# your code goes here

In [None]:
from pyspark.sql.window import Window
df_listings_london= df_listings_london.withColumn('value',df_listings_london.review_scores_rating/df_listings_london.price)
window = Window.partitionBy("neighbourhood_cleansed").orderBy(col('value').desc())
df_listings_london.withColumn("rank", rank().over(window)).filter(col('rank') <= 3).filter(df_listings_london.value > 5)\
.toPandas()

## Part 4: Activity by month (5 pts)

Activity is given by the number of reviews received in a given time period. Compute and visualise the activity based on month, that is, the total number of reviews given in January, February, etc..

Your additional data source is [this zip archive](https://data-download.compute.dtu.dk/c02807/reviews_london.csv.zip) which you must uncompress and place in the same folder as this notebook. It is loaded in the next cell and named `df_reviews_london`.

In [None]:
df_reviews_london = load_csv_as_dataframe('reviews_london.csv')

In [None]:
from pyspark.sql.functions import month
# your code goes here
_ = df_reviews_london.withColumn('month',month(df_reviews_london.date))\
        .groupby('month').agg(F.count('month').alias('activity')).orderBy('month')\
        .toPandas().plot(kind="bar", x='month', y='activity')


## Part 5: Reviews per listing (5 pts)

Each London listing has received 0 or more reviews. 

Display a dataframe showing 1) The number of listings, 2) The average number of reviews a listing receives, 3) The standard deviation of the reviews per listing distribution, 4) The minimum number of reviews any listing has received, and 5) The maximum number of reviews any listing has received.

In [None]:
# your code goes here
window = Window.partitionBy('listing_id')
df_reviews_london.withColumn('numRevPrList', F.count('listing_id').over(window))\
.select(F.countDistinct('listing_id').alias('NumberOfListings'), 
        F.avg('numRevPrList').alias('avgNumRevPrListing'), 
        F.stddev('numRevPrList').alias('std'),
        F.min('numRevPrList').alias('minReviews'),
        F.max('numRevPrList').alias('maxReviews'))\
.limit(10).toPandas().T


# Exercise 3: Word sentiment (45 pts)

In this exercise you must use Spark to do the data processing. For parts where you present tabular data, this entails calling `toPandas` as the final step of your query. You may need multiple queries to solve the individual parts.

The goal here is to determine what sentiment (positive or negative) words in reviews have. Roughly speaking, we want each word to be assigned a score based on the rating of the reviews in which the word occurs in the review comment. We'd expect words such as "clean", "comfortable", "superhost" to receive high scores, while words such as "unpleasant", "dirty", "disgusting" would receive low scores.

As individual reviews do not have a rating, we'll consider the rating of individual reviews to be the rating of its related listing (i.e. assuming each review gave the average rating (`review_scores_rating`) of the listing). 

The score of a word is given by the mean review rating over the reviews in which that word occurs in the comment. We require words to appear in at least 0.5% (1 in 200) listings, and to be at least 4 characters, for it to have a defined score.

Formally, when a word $w$ occurs in at least $0.5\%$ of listings and $|w| > 3$, its score is

$
\begin{align*}
score(w) = \frac{1}{|C_w|}\sum_{comment \in C_w} \text{review_rating}(comment)
\end{align*}
$
, where 
* $C_w = \{comment \mid w \text{ occurs in } \text{clean_text}(comment)\}$, the set (so no duplicates) of comments in which $w$ occurs, and
* $\text{clean_text}(comment)$ is the result of your `clean_text` function defined below, and
* $\text{review_rating}(comment)$ is the `review_scores_rating` of the listing which this $comment$ is related to.


## Part 1: Toy data (15 pts)

To get started we'll consider a toy example where the input is `df_sentiment_listings_toy` and `df_sentiment_reviews_toy` defined in the next code cell. You should provide an implementation of `calculate_word_scores_toy` in the subsequent code cell. Your implementation should result in a query that when given the toy example dataframes as input and is materialized with `toPandas()` produces this table:

|    | word   |   word_score |   listing_occurences |   word_occurences |   comment_occurences |
|---:|:-------|-------------:|---------------------:|------------------:|---------------------:|
|  0 | aaaa   |      7       |                    3 |                 5 |                    5 |
|  1 | bbbb   |      6.66667 |                    2 |                 3 |                    3 |
|  2 | eeee   |      0       |                    1 |                 1 |                    1 |
|  3 | dddd   |      5       |                    1 |                 1 |                    1 |
|  4 | cccc   |      5       |                    2 |                 2 |                    2 |'

Observe that `word_occurences` and `comment_occurences` are the same as words occuring multiple times in a comment are counted once, and that `clean_text` is used to ignore casing and discard non-words. Additionally, any word occuring at least once will occur in more than 1 out of 200 listings on this toy data.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

schema_listings = StructType([
    StructField('id', StringType(), True),
    StructField('review_scores_rating', StringType(), True),
])
data_listings = [
    {'id': '0', 'review_scores_rating': '10'},
    {'id': '1', 'review_scores_rating': '5'},
    {'id': '2', 'review_scores_rating': '0'},
]
df_sentiment_listings_toy = spark.createDataFrame(data_listings, schema_listings)

schema_reviews = StructType([
    StructField('listing_id', StringType(), True),
    StructField('id', StringType(), True),
    StructField('comments', StringType(), True),
])
data_reviews = [
    {'listing_id': '0', 'id': '100', 'comments': 'aaaa bbbb          cccc'},
    {'listing_id': '0', 'id': '101', 'comments': 'aaaa bbbb '},
    {'listing_id': '0', 'id': '102', 'comments': 'aaaa aAAa          aaaa'},
    {'listing_id': '1', 'id': '103', 'comments': 'Aaaa bbb ccc'},
    {'listing_id': '1', 'id': '104', 'comments': 'dddd %ˆ&*'},
    {'listing_id': '2', 'id': '105', 'comments': 'AaaA'},
    {'listing_id': '2', 'id': '106', 'comments': 'bbbb ccc e&eˆˆee'},
    {'listing_id': '2', 'id': '107', 'comments': 'cccc cccc'},
]

df_sentiment_reviews_toy = \
    spark.createDataFrame(data_reviews, schema_reviews) \
        .select(F.col('listing_id'), F.col('id').alias('comment_id'), F.col('comments'))


In [None]:
df_sentiment_listings_toy.toPandas()

In [None]:
df_sentiment_reviews_toy.toPandas()

In [None]:
import re
from pyspark.sql.functions import lower, regexp_replace, array_distinct, explode    

def clean_text(col):
    """
        Cleans the text (comment) associated with col. The
        cleaning should:
            1) Lower case the text
            2) Turn multiple whitespaces into single whitespaces
            3) Remove anything but letters, digits and whitespaces
        
        :col: A Spark Column object containing text data
        :returns: A Spark Column object.
    """
    
    col = re.sub("[^A-Za-z0-9W\s]", "", re.sub("\s\s+"," ", str(col))).strip().lower()
    
    return col

    
clean_text_udf = F.udf(clean_text)

df_sentiment_reviews_toy = df_sentiment_reviews_toy.withColumn("clean_comments", clean_text_udf(F.col("comments")))  


def calculate_word_scores_toy(df_list, df_rev):
    """
        Calculates the word score over listings in df_list and
        reviews in df_rev. The table produced should have the 
        same columns as specified in part 1.
        
        :returns: A pandas DataFrame
    """
    
    df_joined = df_sentiment_reviews_toy.join(df_sentiment_listings_toy, 
                  df_sentiment_listings_toy['id'] == df_sentiment_reviews_toy['listing_id']).drop('id')
    
    df_joined = df_joined.withColumn("word", F.explode(array_distinct(F.split("clean_comments"," ")))) \
    .filter(length("word")>3).groupBy("word")\
    .agg(F.mean("review_scores_rating").alias("word_score"),\
        countDistinct("listing_id").alias("listing_occurences"), \
        countDistinct("comment_id").alias("word_occurences"), \
        countDistinct("comment_id").alias("comment_occurences"))


    return df_joined
    

df = calculate_word_scores_toy(df_sentiment_listings_toy, df_sentiment_reviews_toy)
df.toPandas()

## Part 2: London comments (15 pts)

In this part we'll calculate word scores for the comments related to London listings only. You should implement `count_relevant_listings` and `calculate_word_scores` (it will be an extension of your function from part 1) below. See the mathematical definition and docstrings for intended behaviour.

The function `calculate_word_scores` should return the top 10 and bottom 10 words by score. You should **not** use caching in your function.

Make sure your satisfy all conditions for a word to be scored (e.g. correctly calculating how many total listings scores are computed over). You should also consider whether your query is optimally structured in terms of computation time. Moreover, `pd.set_option('display.max_rows', <n>)` should be set with sufficiently high `n` to show all words.

In [None]:
# your code goes here   


def count_relevant_listings(df_list, df_rev):
    """
        Calculates the number of listings in df_list that has a 
        review in df_rev. A listing that is reviewed more than once
        should only count as one.
        
        :returns: An integer 
    """

    
    df_joined = df_reviews_london.join(df_listings_london, 
                    df_listings_london['id'] == df_reviews_london['listing_id']).drop('id')
    
    
    listings = df_joined.select(F.countDistinct('listing_id')).first()[0]
    
    return listings

    

def calculate_word_scores(df_list, df_rev, listings_count):
    """
        Calculates the word score over listings in df_list and
        reviews in df_rev. The value of listings_count should 
        be used to filter out words not occuring frequently enough
        in comments. The table produced should have the same columns
        as in part 1 of this exercise.
        
        :returns: A pandas DataFrame containing the top 10 and 
        bottom 10 words based on their word score, sorted by word_score.
    """
        
    df_rev = df_rev.withColumn("clean_comments", clean_text_udf(F.col("comments"))) \
                        .withColumnRenamed('id', 'comment_id')
    
    df_joined = df_rev.join(df_list, 
            df_list['id'] == df_rev['listing_id']).drop('id')
    
    df_words = df_joined.withColumn("word", F.explode(array_distinct(F.split("clean_comments"," ")))) \
                        .filter(length("word")>3).groupBy("word")\
                        .agg(F.mean("review_scores_rating").alias("word_score"),\
                            countDistinct("listing_id").alias("listing_occurences"), \
                            countDistinct("comment_id").alias("word_occurences"), \
                            countDistinct("comment_id").alias("comment_occurences")) \
                        .filter(F.col('listing_occurences') >= 0.5 / 100 * listings_count) \
                        .orderBy(col("word_score").desc())

    return pd.concat([df_words.toPandas().head(10), df_words.toPandas().tail(10)])



relevant_listings_count_london = count_relevant_listings(df_listings_london, df_reviews_london)

calculate_word_scores(df_listings_london, df_reviews_london, relevant_listings_count_london)

In [None]:
# should not be modified
from IPython.display import display

relevant_listings_count_london = count_relevant_listings(df_listings_london, df_reviews_london)
word_scores_london_timing = %timeit -o -n1 -r1 display( \
    calculate_word_scores(df_listings_london, \
                          df_reviews_london, \
                          relevant_listings_count_london) \
)

word_scores_london_timing.best

## Part 3: Scalability (10 pts)

The listings from London make up a little less than 2% of the entire set of listings. In this part we're interested in how the amount of input data impacts computation time, that is, how `calculate_word_scores` scales as data increases. To this end, we've made multiple samples of the dataset of varying sizes.

The experiment reuses `count_relevant_listings` and `calculate_word_scores` that you implemented in part 2. Code needed for this part is provided to you. 

Your task is to obtain the data sources, run the code cells below, and explain the results you get. Specifically, you must explain any non-linear relationship between data size and computation time, using the markdown cell at the end of this part. In finding explanations, using the Spark UI to investigate the anatomy of your queries may prove valuable. Once you've found an explanation, state a potential solution to remedy the issue. Lastly, include a paragraph stating the specifications of your computer hardware (memory, CPU cores and clock speed, solid state disk or not) on which the experiment has been run.

*Implementation note* Make sure you've properly configured `spark.driver.memory` (it requires a kernel restart to update the value). It may be that your query fails on the larger samples due to running out of compute resources. This is likely caused by a suboptimal `calculate_word_scores`, but can be from reaching the limits of your hardware. If you think the latter is the case, argue for this perspective in the markdown cell.

Your data sources are (uncompress and place in the same directory as this notebook):
* 0.25%: [listings](https://data-download.compute.dtu.dk/c02807/listings_0-dot-25percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_0-dot-25percent.csv.zip)
* 0.5%: [listings](https://data-download.compute.dtu.dk/c02807/listings_0-dot-5percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_0-dot-5percent.csv.zip)
* 1%: [listings](https://data-download.compute.dtu.dk/c02807/listings_1-dot-0percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_1-dot-0percent.csv.zip)
* 2%: [listings](https://data-download.compute.dtu.dk/c02807/listings_2-dot-0percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_2-dot-0percent.csv.zip)
* 4%: [listings](https://data-download.compute.dtu.dk/c02807/listings_4-dot-0percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_4-dot-0percent.csv.zip)
* 8%: [listings](https://data-download.compute.dtu.dk/c02807/listings_8-dot-0percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_8-dot-0percent.csv.zip)
* 12.5%: [listings](https://data-download.compute.dtu.dk/c02807/listings_12-dot-5percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_12-dot-5percent.csv.zip)
* 16%: [listings](https://data-download.compute.dtu.dk/c02807/listings_16-dot-0percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_16-dot-0percent.csv.zip)
* 25%: [listings](https://data-download.compute.dtu.dk/c02807/listings_25-dot-0percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_25-dot-0percent.csv.zip)
* 50%: [listings](https://data-download.compute.dtu.dk/c02807/listings_50-dot-0percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_50-dot-0percent.csv.zip)
* 75%: [listings](https://data-download.compute.dtu.dk/c02807/listings_75-dot-0percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_75-dot-0percent.csv.zip)
* 100%: [listings](https://data-download.compute.dtu.dk/c02807/listings_100-dot-0percent.csv.zip), [reviews](https://data-download.compute.dtu.dk/c02807/reviews_100-dot-0percent.csv.zip)

In [None]:
import time
def calculate_word_scores_timed(percent_str):
    """
        Calculates word scores over a sampled dataset indicated
        by percent_str.
        
        :returns: A dictionary with benchmarking information and
        the calculated values.
    """
    df_listings = load_csv_as_dataframe(f'listings_{percent_str}percent.csv')
    df_reviews = load_csv_as_dataframe(f'reviews_{percent_str}percent.csv')
    
    listings_count = count_relevant_listings(df_listings, df_reviews)

    start = time.time()
    df_word_scores = calculate_word_scores(df_listings, df_reviews, listings_count)
    end = time.time()
    return {
        'percentage': float(percent_str.replace('-dot-', '.')), 
        'time_spent': f"{end - start:.2f}", 
        'relevant_listings': listings_count, 
        'df': df_word_scores
    }

In [None]:
data_percentages = [
    '0-dot-25', '0-dot-5', '1-dot-0', '2-dot-0', '4-dot-0', '8-dot-0',
    '12-dot-5', '16-dot-0', '25-dot-0'
]
score_data = {
    percentage_str: calculate_word_scores_timed(percentage_str) for percentage_str in data_percentages
}

In [None]:
score_data['50-dot-0'] = calculate_word_scores_timed('50-dot-0')

In [None]:
score_data['75-dot-0'] = calculate_word_scores_timed('75-dot-0')

In [None]:
score_data['100-dot-0'] = calculate_word_scores_timed('100-dot-0')

In [None]:
df_scores_scaling = pd.DataFrame(score_data).T.convert_dtypes()
df_scores_scaling.time_spent = df_scores_scaling.time_spent.astype(float)

# Access to word scores of 2 percent data: df_scores_scaling.loc['2-dot-0'].df
df_scores_scaling

In [None]:
fig, axes = plt.subplots(ncols=3, figsize=(15, 5))

lower_range = ['0-dot-25', '0-dot-5', '1-dot-0', '2-dot-0', '4-dot-0', '8-dot-0', '16-dot-0']
df_scores_scaling[df_scores_scaling.index.isin(lower_range)] \
    .plot.line(x='percentage', y='time_spent', ax=axes[0], style='-o', title='Lower range')
df_scores_scaling[~df_scores_scaling.index.isin(lower_range)] \
    .plot.line(x='percentage', y='time_spent', ax=axes[1], style='-o', title='Upper range')
_ = df_scores_scaling \
    .plot.line(x='percentage', y='time_spent', ax=axes[2], style='-o', title='All')

*Your explanation to the questions outlined at the start of this part goes here. Make sure you've addressed all questions asked.*

## Part 4: Robustness (5 pts)

In this part we'll explore robustness of our word scores, using the values we computed in part 3. We'll do so by comparing top/bottom words for three different samples of the dataset. Specifically, the scores from your maximum (e.g. 100%) computed sample are to be compared with the 12.5% and 2.0% scores.

Compute and display a dataframe that accounts for any word found in either of the three samples' top/bottom words, and additionally shows the related `word_score` and `word_occurences` values.

Note that `df_scores_scaling.loc['100-dot-0'].df` provides the word scores dataframe of the 100% sample (similarly for the other two). For this part you should rely on pandas functionality only.  Moreover, `pd.set_option('display.max_rows', <n>)` should be set with sufficiently high `n` to show all rows.

In [None]:
# your code goes here

# Exercise 4: Transactions analysis (55 pts)

In this exercise the goal is to analyse historical business transactions (sales of parts to other companies), and derive insights about both products and customers.

The company X produces and globally sells gadget parts to a number of other companies. You requested the sales department of X to provide you with access to the customer and sales transactions database. To your horror, you've found no such database exists, but the data is instead manually maintained in a spreadsheet (error-prone solution). Intrepid as you are, you've accepted to receive the spreadsheet data as a `.csv`, realizing already data cleaning will be necessary.

Your first step (parts 1 and 2) is to clean the data after which you will derive insights about X's business operations (parts 3 and 4).

The input data is available here: [transactions.csv](http://courses.compute.dtu.dk/02807/2021/projects/project2/transactions.csv)

**Using SQL**

In this final exercise you must write SQL to do the data processing in parts 3 and 4. This entails using `psql.sqldf` to execute your queries (up against `df_transactions_cleaned`) which will return a pandas dataframe. Each question should be answered with a *single* query. For visualisation the `psql.sqldf` call should be followed only by functions necessary to customize the plotting/layout steps or reshape the dataframe (i.e. no data processing take place after your SQL statement is materialized as a pandas dataframe).

In part 1 and 2 of this exercise, you should make use of pandas functionality.



## Part 1: Data cleaning (15 pts)

For each column in the dataframe, investigate and **correct** problematic aspects such as,
* Missing values: Insert meaningful values (data imputation). Detectable as `np.nan`'s. A typical value for imputation is the *mode* (most frequent value) of the distribution. If no proper data imputation is possible, you may resort to dropping rows.
* Incorrect values: Typos and other data mishaps are present as values are manually entered. Detectable as low-prevalence categorical values, or ambigious data links (e.g. company listed in multiple countries). If no proper value correction is possible, you may resort to dropping rows.

In both cases, your strategy for replacing values should be data-driven, that is, shaped by the patterns you observe in the data. It is allowed to skip correcting the data (and instead drop the rows) if few rows are improved by your corrections. If in doubt, do the correction.

After all your cleaning steps are completed, you should run the `PandasProfiler` on your cleaned dataset, which should now contain 0% missing cells. Lastly, summarize the issues you identified and how you addressed them.

### Read, profile and explain

As the first step, load the data naming the dataframe `df_transactions`, and make a copy named `df_transactions_cleaned` on which your data cleaning steps will be done. Establish an overview using `PandasProfiler` (but realize there's more to cleaning than what this tool will let you know). Write a paragraph on what the data is about (e.g. what does a row constitute), and a paragraph on what the profile report tells you.

In [None]:
# your code goes here
#!pip install pandasql==0.7.3
import pandas as pd
import numpy as np
import re
import seaborn as sns
import matplotlib.pyplot as plt
import pandasql as psql
import math
from pandas_profiling import ProfileReport 
%matplotlib inline
sns.set()


df_transations = pd.read_csv("transactions.csv")
df_transations.head()

In [None]:
profile = ProfileReport(df_transations, title="Pandas Profiling Report")
profile.to_notebook_iframe()

In [None]:
df_transations.isna().any()

*From above it can be noticed which columns contain NaN values need to be replaced or removed. In addition, some of the columns, for instance city, contain special characters we will try to remove the special characters and keep only the name of the city-country-company. For the price we will try to split the actual price from the currency sign in two different columns. Then we will first create and populate the `prices_euro` column and the NaN will be populated with the median to avoid outliers, once the column does not contain NaN values we will fill the NaN in the price column. We follow this approach because the exchange ratio between the currencies differs a lot. Furthermore, the prices contain negative values which we keep them since a transaction  can be positive or negative if for example is sale or return order respectively. In some cases for almost all the NaN contained columns there were values like "-", "void" or spelling mistakes these we identify manually and replace them since there were not a lot. If the price was NaN we populate the currency by USD. The columns with no NaN values we used to group by them and populate the columns with NaN like country, city, part. From the profiler above we can see we have 1.8% missing data, and also some of the column are high correlated. The column date we tried to convert to date type in order to use them below in the SQL queries easily.*

In [None]:
# typos-regex-city-country-company
def keep_only_name(c_name):
    if c_name == np.nan:
        return np.nan
    # avoid special characters in name city-country
    return re.sub(r"[^a-zA-Z0-9]+", '', str(c_name))

### Country column

In [None]:
# your code goes here
df_transations['country'] = df_transations.groupby("company")['country'].apply(lambda x: x.fillna(x.mode()[0]))
df_transations["country"] = df_transations["country"].apply(lambda c: keep_only_name(c))

mask_portuga = df_transations["country"] == "Portuga"
index_portuga = df_transations.index[mask_portuga]
df_transations = df_transations.drop(index_portuga)

mask_us = df_transations["country"] == "US"
index_us = df_transations.index[mask_us]
df_transations = df_transations.drop(index_us)
print("Unique Countries: ", df_transations["country"].unique())

In [None]:
df_transations[df_transations['company'] == 'Flipstorm'].country.unique()

*For the above company we found that exists in two countries in Greece with 861 rows and in France with 332 so we will replace the country value to all the rows with Greece*

In [None]:
df_transations.loc[df_transations['company'] == 'Flipstorm', 'country']= 'Greece'
df_transations.loc[df_transations['company'] == 'Flipstorm', 'city']= 'Athens'

### Company column

In [None]:
# your code goes here
df_transations["company"] = df_transations["company"].apply(lambda c: keep_only_name(c))


mask_lajo = df_transations["company"] == "Laj0"
index_lajo = df_transations.index[mask_lajo]
df_transations = df_transations.drop(index_lajo)

mask = (df_transations["company"] == 'aa') | (df_transations["company"] == 'a')\
        | (df_transations["company"] == '')
index_company = df_transations.index
index_to_drop_company = list(index_company[mask])
df_transations = df_transations.drop(index_to_drop_company)

mask_th = df_transations["company"] == "Thoughtmixz"
index_th = df_transations.index[mask_th]
df_transations = df_transations.drop(index_th)


mask_nt = df_transations["company"] == "Ntagz"
index_nt = df_transations.index[mask_nt]
df_transations = df_transations.drop(index_nt)

print("Unique Countries: ", df_transations["company"].unique())

### City column

In [None]:
# your code goes here
df_transations['city'] = df_transations['city'].apply(lambda c: keep_only_name(c))
mask_duss = df_transations["city"] == "Dsseldorf"
index_duss = df_transations.index[mask_duss]
df_transations.loc[index_duss, "city"] = "Dusseldorf"

### Parts column

In [None]:
# your code goes here
df_transations['part'] = df_transations.groupby("company")['part'].apply(lambda x: x.fillna(x.mode()[0]))

### Price column

In [None]:
# price-currency
def split_prices(price):
    if str(price) == 'nan' :
        return np.nan, 'USD'
    curr = "$"
    curr_dict = {'€': 'EUR', '£': "GBP", '$': "USD", '¥': "JPY"}
    num = re.findall("-?[0-9]+[\.]*[0-9]*", price)
    curr_list = re.split("-?[0-9]+[\.]*[0-9]*", price)
    
    if not num:
        num = np.nan
    else:
        num = num[0]
    
    if curr_list:
        for item in curr_list:
            if item != "":
                curr = item
    else:
        curr = "$"
        
    if curr_dict.get(curr) is None:
        curr = "$"
    
    
    return float(num), curr_dict.get(curr)

In [None]:
# your code goes here
index_price = df_transations.index
mask1  = df_transations["price"] == "-"
mask2 = df_transations["price"] == "void"

mask1_index = list(index_price[mask1])
mask2_index = list(index_price[mask2])

df_transations.price.iloc[mask1_index+mask2_index] = np.nan

In [None]:
df_temp = df_transations['price'].apply(lambda p: split_prices(p))
df_temp = pd.DataFrame(df_temp.tolist(), index=df_temp.index)
df_temp.columns = ["price", "curr"]
# df_temp.price = df_temp.groupby(['curr'], sort=False)['price'].fillna(df_temp['price'].median())

In [None]:
df_transations["price"], df_transations["currency_code"] = df_temp["price"], df_temp["curr"]
df_transations.currency_code.unique()

### Date column

In [None]:
#your code goes here
# we used coerce to avoid date out of range by reaplacing them with nan because there are only two values
df_transations['date'] =  pd.to_datetime(df_transations['date'], errors='coerce')
df_transations['date'] = df_transations.groupby(['company'], sort=False)['date'].fillna(df_transations['date'].mode()[0])

## Part 2: Standardise prices (5 pts)

Transaction prices are recorded in the local currency of the client (EUR, GBP, USD or JPY). You will need to convert these prices from local currency into the common currency (chosen here as) EUR, for comparability. These standardised prices should be added as a column to the dataframe called `prices_euro`.

Consider a two step process where you 1) Identify what currency has been used, and 2) Calculate the price conversion. Step 1 may reveal the data is still not completely clean (so either correct by impute or drop). For Step 2 look up exchange rates on the Internet.

In [None]:
# your code goes here
df_transations["price"], df_transations["currency_code"] = df_temp["price"], df_temp["curr"]
df_transations.currency_code.unique()

In [None]:
def exchange(cc):
    ratio = {"EUR":1, "GBP":1.19, "USD":0.89, "JPY":0.007}
    
    return ratio.get(cc)
    
    
df_transations['ex_ratio'] = df_transations["currency_code"].apply(lambda cc: exchange(cc))

In [None]:
df_transations["prices_euro"] = df_transations["price"] * df_transations["ex_ratio"]

In [None]:
df_transations.prices_euro = df_transations.groupby(['currency_code'], sort=False)['prices_euro'].fillna(df_transations['prices_euro'].median())

In [None]:
# your code goes here
# exchange euro prices populated with the median to the original currency
df_transations["price"] = df_transations["prices_euro"] / df_transations["ex_ratio"]
df_transations

*Since we have seperated the currency and the actual price we can now convert all the prices to euro and then we will populate the nulls in the new column. At the end of the process we can calculate the NaN values in the column `price` as well.*

### Profile `df_transactions_cleaned` and summarize corrections made

In [None]:
profile = ProfileReport(df_transations, title="Pandas Profiling Report")
profile.to_notebook_iframe()

*From the above tables of the profiler we can notice that there are no missing values, still some columns are high collerated.*

## Part 3: Business insights (15 pts)

### Company by revenue

The revenue of a company is its total value of orders, all time. Compute and visualise all companies by revenue in descending order.

In [None]:
# your code goes here
total_orders = psql.sqldf("""
    select company
           ,sum(prices_euro) as revenue
    from df_transations
    group by company
    order by revenue desc    
""")

fig = plt.figure()
ax = plt.axes()

ax.bar(total_orders.company, 
       total_orders.revenue,
       color=sns.color_palette(),
      log=True)
ax.tick_params(axis='x', rotation=75)
fig.set_size_inches(20, 5)
      
plt.ylabel("Revenue in EUR", size=12)
plt.xlabel("Company", size=12)
plt.title("Company by revenue", size=18)

### Country by revenue, per year

The revenue of a country in a time period, is its total value of orders in that time period. Compute and visualise all countries by revenue, for years 2016, 2017 and 2018. Your visualisation should have countries on the x-axis and multiple bars (one for each year).

In [None]:
# your code goes here
revenue_year_country = psql.sqldf("""
    select strftime('%Y', date) as year
        ,country
        ,sum(prices_euro) as revenue
    from df_transations  
    where 1=1
        and strftime('%Y', date) in ('2016', '2017', '2018')
    group by country, year 
""")

plt.figure(figsize=(20, 6))
splot = sns.barplot(x="country", y="revenue", hue="year",
                    data=revenue_year_country, palette=sns.color_palette(), log=True)

  
for p in splot.patches:
    splot.annotate(format(p.get_height()),
                   (p.get_x() + p.get_width() / 2., p.get_height()),
                   ha='center', va='center',
                   xytext=(0, 10))
      
plt.ylabel("Revenue in EUR", size=12)
plt.xlabel("Country", size=12)
plt.title("Revenue for country from 2016 to 2018", size=18)

### Orders per quarter, all companies

Compute and visualise the number of orders each company has placed in each quarter. Exclude quarters where the order count is less than 3. As always, be mindful to not produce a cluttered visualisation.

Part of your query should form a variable that converts `date` into `YEAR_QUARTER` format. Dealing with dates is via `STRFTIME` [docs](https://www.sqlite.org/lang_datefunc.html) which doesn't allow quarter extraction. Instead, it allows for extraction of month, which you can case on in order to produce the quarter (Q1, Q2, Q3, Q4).  

In [None]:
# your code goes here
quarter_orders = psql.sqldf("""
    select count(prices_euro) number_of_orders
           ,company
           ,strftime('%Y', date) || "_" || case 
               when strftime('%m', date) in ('01', '02', '03') then "Q1"
               when  strftime('%m', date) in ('04', '05', '06') then "Q2"
               when  strftime('%m', date) in ('07', '08', '09', '10') then "Q3"
               else "Q4"
           end as year_quarter
           ,case 
               when strftime('%m', date) in ('01', '02', '03') then "Q1"
               when  strftime('%m', date) in ('04', '05', '06') then "Q2"
               when  strftime('%m', date) in ('07', '08', '09', '10') then "Q3"
               else "Q4"
            end as quarter
    from df_transations 
    group by company, YEAR_QUARTER
    having  1=1
        and count(price) >= 3
    order by company

""")

quarter_orders


plt.figure(figsize=(20, 6))
splot = sns.lineplot(x="year_quarter", y="number_of_orders", hue="company",
                    data=quarter_orders)

  
for p in splot.patches:
    splot.annotate(format(p.get_height()),
                   (p.get_x() + p.get_width() / 2., p.get_height()),
                   ha='center', va='center',
                   xytext=(0, 10))
      
plt.ylabel("Order", size=12)
plt.xlabel("Country", size=12)
plt.title("Orders per country for all the countries", size=18)

## Part 4: Parts and prices (20 pts)

### Parts demand changes

A different amount of orders are placed on parts each year. The demand of a part is the number of orders placed on it. The demand change of a part is the absolute difference between its average demand in 2016/2017, and its demand in 2018.

Compute and visualise the 15 parts whose demand change has been the largest.

In [None]:
# your code goes here
parts_demands = psql.sqldf("""
    WITH trans67 as (
        select count(part)/2  as demand
               ,part
        from df_transations
        where 1=1
            and strftime('%Y', date) in ('2017', '2016')
        group by part
    )
    select abs(count(tran18.part) - trans67.demand) demand
           ,tran18.part
    from df_transations tran18
        join trans67 on trans67.part = tran18.part
    where 1=1
        and strftime('%Y', tran18.date) in ('2018')
    group by tran18.part
    order by demand desc
    limit 15
""")

fig = plt.figure()
ax = plt.axes()

ax.bar(parts_demands.part, 
       parts_demands.demand,
       color=sns.color_palette(),
      log=True)
ax.tick_params(axis='x', rotation=75)
fig.set_size_inches(20, 5)
      
plt.ylabel("Demand", size=12)
plt.xlabel("Part", size=12)
plt.title("Part and demand from 2016/2017 to 2018", size=18)

### Popular parts pricing

The most popular parts are those whose demand has increased the most from its 2016/2017 average to 2018. We're interested to find out if popularity is due to a price drop, and also inform us if prices of these parts are properly adjusted.

The demand increase of a part is its 2018 demand minus its 2016/2017 average demand. The price change of a part is its average 2018 price minus its average 2016/2017 price.

Compute the parts whose demand has increased (has positive demand increase) and the change in price for each of these parts. Then visualise this relationship and include in the figure title the correlation (compute via pandas) between these two variables. Conclude which is most likely 1) Parts became more popular from a drop in prices, or 2) The sales department deserved its bonuses.

In [None]:
# your code goes here
popular_parts = psql.sqldf("""
    WITH trans67 as (
        select count(part)/2  as demand
               ,part
               ,avg(prices_euro) as avg_price
        from df_transations
        where 1=1
            and strftime('%Y', date) in ('2017', '2016')
        group by part
    )
    select (count(tran18.part) - trans67.demand) as demand -- if positve demand inc
           ,tran18.part
           ,(avg(tran18.prices_euro) - trans67.avg_price) as avg_price
    from df_transations tran18
        join trans67 on trans67.part = tran18.part
    where 1=1
        and strftime('%Y', tran18.date) in ('2018')
    group by tran18.part
    having (count(tran18.part) - trans67.demand) > 0
""")

corr = popular_parts.demand.corr(popular_parts.avg_price)

txt_title = 'Demand with average price, left y-xis = Demand between the 2016/2017 and 2018, right y-axis =\
avgPrice between the 2016/2017 and 2018, correlation between these columns is: ' + str(corr)

popular_parts

fig,ax = plt.subplots(figsize=(25, 10))
_= popular_parts.plot(ax= ax , kind="bar", x='part', y='demand', legend=False)
ax2 = ax.twinx()
_ = popular_parts.plot(ax= ax2 , kind="line", x='part', color="red", alpha=.6, y='avg_price', legend=False)
ax2.title.set_text(txt_title)
legend = ax2.legend(['Avg price in thousands'], loc='upper right', shadow=True, fontsize='x-large')
legend = ax.legend(["Demand between years"], loc='upper left', shadow=True, fontsize='x-large')

# Put a nicer background color on the legend.
legend.get_frame().set_facecolor('C0')

*From the plot it can be noticed that some of the parts for instane 49520-501 the demand increased but also the price was increased, more specific it is the largest price different between the 2016/2017 and 2018. So good job salesperson! In contrast for the second product 0699-7041 it seems that the demand was increased due to the price drop.*