<h1><center>Big Data Algorithms Techniques & Platforms</center></h1>
<h2>
<hr style=" border:none; height:3px;">
<center>Spark and DataFrames</center>
<hr style=" border:none; height:3px;">
</h2>

#A. Analysis of the ``Great Expectations`` - Charles Dickens

Suppose you have a file containing the text of the ``Great Expectation``, a novel written in English by Charles Dickens. You can see an excerpt below :

"I pointed to where our village lay, on the flat in-shore among the
alder-trees and pollards, a mile or more from the church.

The man, after looking at me for a moment, turned me upside down, and
emptied my pockets. There was nothing in them but a piece of bread.
When the church came to itself,—for he was so sudden and strong that he
made it go head over heels before me, and I saw the steeple under my
feet,—when the church came to itself, I say, I was seated on a high
tombstone, trembling while he ate the bread ravenously."



## <strong>Exercise 0.</strong> Support functions

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar zxvf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark



In [107]:
# Basic Python libraries
import os
import re
import nltk
import numpy as np
import random as rn
from collections import Counter

# Initializing Spark
import findspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Window

# PySpark SQL functions and types
from pyspark.sql.functions import (
    mean, col, min, avg, count, coalesce, lit, explode, length, split, regexp_replace,
    trim, udf, size, round, rand
)
from pyspark.sql.types import ArrayType, StringType

# PySpark ML features
from pyspark.ml.feature import StopWordsRemover

# NLTK functionalities
from nltk.tokenize import sent_tokenize


os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

findspark.init()

conf = SparkConf().setMaster("local")
sc = SparkContext(conf = conf)
print("initialization successful!")

spark = SparkSession.builder.appName("AverageSentenceLength").getOrCreate()

seed_value=0
os.environ['PYTHONHASHSEED']=str(seed_value)

initialization successful!


In [120]:
### Write here all the import function and the support function you need for processing the text

filepath = '/content/great_expectations.txt'

# Use the StopWordsRemover tool from PySpark ML to get a list of common stopwords
remover = StopWordsRemover()
stopwords = set(remover.getStopWords())

chapters = [
    "Chapter I.", "Chapter II.", "Chapter III.", "Chapter IV.", "Chapter V.",
    "Chapter VI.", "Chapter VII.", "Chapter VIII.", "Chapter IX.", "Chapter X.",
    "Chapter XI.", "Chapter XII.", "Chapter XIII.", "Chapter XIV.", "Chapter XV.",
    "Chapter XVI.", "Chapter XVII.", "Chapter XVIII.", "Chapter XIX.", "Chapter XX.",
    "Chapter XXI.", "Chapter XXII.", "Chapter XXIII.", "Chapter XXIV.", "Chapter XXV.",
    "Chapter XXVI.", "Chapter XXVII.", "Chapter XXVIII.", "Chapter XXIX.", "Chapter XXX.",
    "Chapter XXXI.", "Chapter XXXII.", "Chapter XXXIII.", "Chapter XXXIV.", "Chapter XXXV.",
    "Chapter XXXVI.", "Chapter XXXVII.", "Chapter XXXVIII.", "Chapter XXXIX.", "Chapter XL.",
    "Chapter XLI.", "Chapter XLII.", "Chapter XLIII.", "Chapter XLIV.", "Chapter XLV.",
    "Chapter XLVI.", "Chapter XLVII.", "Chapter XLVIII.", "Chapter XLIX.", "Chapter L.",
    "Chapter LI.", "Chapter LII.", "Chapter LIII.", "Chapter LIV.", "Chapter LV.",
    "Chapter LVI.", "Chapter LVII.", "Chapter LVIII.", "Chapter LIX."           ]

# This function retrieves the index positions of each chapter in the RDD.
def get_chapter_indices(rdd, chapters):
    # Dictionary to hold the chapter names and their corresponding index positions.
    indices = {}

    # Loop through each chapter.
    for chapter in chapters:
        # Get the index of the chapter in the RDD.
        index = rdd.zipWithIndex().filter(lambda x: x[0] == chapter).map(lambda x: x[1]).collect()

        # If the index exists, add it to the indices dictionary with the chapter name as the key.
        if index:
            indices[chapter] = index[0]

    # Return the indices dictionary.
    return indices


# This function retrieves the content of each chapter using the indices found earlier.
def get_chapter_content(rdd, chapter_indices):
    # Dictionary to hold the chapter names and their corresponding content.
    chapters_content = {}

    # Convert the chapter names into a list.
    chapter_names = list(chapter_indices.keys())

    # Loop through each chapter name.
    for i in range(len(chapter_names)):
        start_index = chapter_indices[chapter_names[i]]

        # If there's a next chapter, get its index. Otherwise, set it to None.
        end_index = chapter_indices[chapter_names[i+1]] if i+1 < len(chapter_names) else None

        # Extract content between start and end indices.
        content = rdd.zipWithIndex().filter(lambda x: start_index < x[1] and (end_index is None or x[1] < end_index)).map(lambda x: x[0]).collect()

        # Combine the content into a single string.
        chapters_content[chapter_names[i]] = ' '.join(content)

    # Return the dictionary with chapter names and their content.
    return chapters_content

# This function creates an RDD with each record as a tuple containing chapter name and a sentence from that chapter.
def get_sentences_rdd(chapters_content, sc):
    # List to hold the chapter-sentence pairs.
    data = []

    # Loop through each chapter's content.
    for chapter, content in chapters_content.items():

        # Split the content into sentences (assuming the function split_sentences() exists).
        sentences = split_sentences(content)

        # Append each sentence with its chapter name to the data list.
        for sentence in sentences:
            data.append((chapter, sentence))

    # Convert the data list into an RDD and return.
    return sc.parallelize(data)



## <strong>Exercise 1</strong> Word number in sentences

The length of a sentence is the number of words that compose the sentence.

Write and comment on the set of Spark operations that return how many sentences we have for each available length in the text. You must also show the five most common lengths.

Notice that in the available text, a sentence is a set of lines that ends with a strong punctuation mark (i.e., ".", "!", "?", etc.).

Notice that:

* The novel starts after the <code> *** START OF THE PROJECT GUTENBERG EBOOK GREAT EXPECTATIONS ***</code>

* <code> [Illustration] </code> is not a sentence

* <code>Chapter VIII.</code> is not a sentence

You can introduce your constraints for the parsing. Multiple solutions and points of view are correct. You must comment on your point of view (i.e., the definition of ``sentence'' in your analysis).

You can choose if you are considering the stop-words in the count: add your point of view in the comment.

In [110]:
#read File and conver it into RDD
rdd = sc.textFile(filepath)

# Find the index of the start marker
start_idx = rdd.zipWithIndex().filter(lambda x: x[0] == '*** START OF THE PROJECT GUTENBERG EBOOK GREAT EXPECTATIONS ***').first()[1]

# Filter out all content before the start marker
filtered_rdd = rdd.zipWithIndex().filter(lambda x: x[1] > start_idx).map(lambda x: x[0])

# Remove any illustration from the RDD
filtered_rdd = filtered_rdd.filter(lambda line: line != '[Illustration]')

# Filter out lines that start with 'Chapter' to remove chapter headings
filtered_rdd = filtered_rdd.filter(lambda line: not line.startswith(' Chapter'))

# Remove any empty lines or lines with only white spaces
filtered_rdd = filtered_rdd.filter(lambda x: x.strip() != '')

# Combine all the lines into a single string to form the complete text of the book
entire_text = filtered_rdd.reduce(lambda a, b: a + " " + b)

# Split the entire text into individual sentences using regex
sentences = re.split('(?<=[.!?])\s+', entire_text)

# Convert the list of sentences back into an RDD
sentences_rdd = sc.parallelize(sentences)

# Using map, split each sentence into words
words_rdd = sentences_rdd.map(lambda sentence: sentence.split())

# Filter out any stopwords from the words of each sentence and convert words to lowercase
cleaned_rdd = words_rdd.map(lambda words: [word for word in words if word.lower() not in stopwords])

# Using map, return the length of each sentence
sentence_lengths = cleaned_rdd.map(lambda sentence: (len(sentence), 1))

# Reduce by key, counting how many sentences of each length
sentence_counts = sentence_lengths.reduceByKey(lambda x, y: x + y)

# Sort the result by ascending orders
sorted_sentence_counts = sentence_counts.sortBy(lambda x: x[1], ascending=False)

#show result:Display the 10 most common sentence lengths and their counts
sorted_sentence_counts.take(10)

[(5, 493),
 (6, 474),
 (9, 473),
 (4, 468),
 (3, 455),
 (8, 439),
 (7, 423),
 (2, 393),
 (10, 392),
 (11, 392)]

## <strong>Exercise 2.</strong> The average length of sentences in the entire novel
Write and comment on the set of Spark operations that returns and shows the average length of the sentences in the novel.

In [121]:
############## WRITE YOUR CODE HERE ##############

# Compute the total length and total count of sentences
total_length, total_count = sentence_lengths.reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]))

# Calculate average
average_length = total_length / total_count
average_length

11.58799504950495

## <strong>Exercise 3.</strong> Average length in chapters

Write and comment on the set of Spark operations that returns and shows the average length of sentences in each chapter.

In [124]:
# Get the indices of each chapter in the filtered RDD using the previously defined function.
chapter_indices = get_chapter_indices(filtered_rdd, chapters)

# Using the previously defined get_chapter_content function to obtain the content of each chapter.
chapters_content = get_chapter_content(filtered_rdd, chapter_indices)

# Get an RDD of chapter-sentence pairs using the content retrieved above.
sentences_rdd = get_sentences_rdd(chapters_content, spark.sparkContext)

# Convert the RDD to a DataFrame with columns "Chapter" and "Sentence".
df = spark.createDataFrame(sentences_rdd, ["Chapter", "Sentence"])

# Use the split function to break the sentences into words.
# The result is stored in a new column "Words".
df = df.withColumn("Words", split(col("Sentence"), " "))

# Use the StopWordsRemover to remove stopwords
# The cleaned words are stored in a new column "CleanedWords".
remover = StopWordsRemover(inputCol="Words", outputCol="CleanedWords")
df = remover.transform(df)

# Calculate the length of the cleaned sentences (i.e., the number of words)
# and store the result in a new column "Length".
df = df.withColumn("Length", size(col("CleanedWords")))

# Group the DataFrame by the "Chapter" column and compute the average length of cleaned sentences for each chapter.
# The result is rounded to 2 decimal places and stored in a new column "AvgLength".
result = df.groupBy("Chapter").agg(round(avg("Length"), 2).alias("AvgLength"))

# Display the results.
result.show()


+----------------+---------+
|         Chapter|AvgLength|
+----------------+---------+
|  Chapter XXXIX.|    10.39|
|    Chapter XIX.|    11.21|
|     Chapter IV.|     9.59|
| Chapter XXXIII.|    12.43|
| Chapter XXVIII.|    12.35|
|    Chapter LIX.|    14.17|
|   Chapter LIII.|    11.59|
|   Chapter XLVI.|    12.61|
|    Chapter XIV.|    13.22|
|   Chapter XXXI.|    13.16|
|     Chapter IX.|     9.75|
|   Chapter XIII.|    13.37|
|    Chapter VII.|    13.48|
|    Chapter XII.|    13.77|
|      Chapter L.|    12.75|
|    Chapter LVI.|    12.43|
|  Chapter XLIII.|    10.18|
|     Chapter LV.|     11.1|
| Chapter XLVIII.|     9.85|
|Chapter XXXVIII.|    12.98|
+----------------+---------+
only showing top 20 rows



## <strong>Exercise 4.</strong> The most repeated words in the sentences

Write and comment on the set of Spark operations that returns the ten most repeated words in a sentence and their repetition rate.

For example, the most repeated word in this set of sentences is <code> cat </code>, with an average repetition rate of 2. The word <code> table </code> is not considered as repeated.

* The cat is on the cat table.
* The table is red.
* The wooden table is broken.



In [125]:
#  Counting words in each sentence
sentences_rdd_reuse = cleaned_rdd.map(lambda words: Counter(words))

# Filtering out repetitive words from each sentence
repeated_words_rdd = sentences_rdd_reuse.flatMap(lambda counts: [(word, count) for word, count in counts.items() if count > 1])

# 3. Select the 10 words with the highest repetition rate from all sentences
top_10_repeated_words = repeated_words_rdd.takeOrdered(10, key=lambda x: -x[1])

print(top_10_repeated_words)


[('bit', 6), ('may', 5), ('“And', 5), ('rul', 5), ('looked', 5), ('go', 5), ('going', 5), ('time', 4), ('now,', 4), ('him,', 4)]


# B. Coffee Dataset

<p align="justify">
<font size="3">
For running this series of exercises, we are going to use a dataset coming from <a href="https://www.kaggle.com/datasets/schmoyote/coffee-reviews-dataset">Kaggle</a>.

As stated in the description of the dataset:
"Dataset contains information about coffee production and consumption.

All data are available from the official ICO website:
https://www.ico.org/new_historical.asp".
</font>
</p>


### The dataset

<p align="justify">
<font size="3">
The dataset is in a .csv file, and among the columns, you can find:

<ul>
<li> <code>name</code> The name of the blend </li>
<li><code>roaster</code> The name of the roaster</li>
<li><code>roast</code> The type of roast</li>
<li><code>loc_country</code> The country of the roaster</li>
<li><code>...</code> ...</li>
</ul>
</p>
</font>



In [69]:
!ls

great_expectations.txt	sample_data  spark-3.5.0-bin-hadoop3  spark-3.5.0-bin-hadoop3.tgz


In [70]:
! pip install kaggle

! mkdir ~/.kaggle

! cp kaggle.json ~/.kaggle/

! chmod 600 ~/.kaggle/kaggle.json



In [71]:
! kaggle datasets download schmoyote/coffee-reviews-dataset

Downloading coffee-reviews-dataset.zip to /content
  0% 0.00/569k [00:00<?, ?B/s]
100% 569k/569k [00:00<00:00, 45.6MB/s]


In [72]:
!unzip coffee-reviews-dataset.zip

Archive:  coffee-reviews-dataset.zip
  inflating: coffee_analysis.csv     
  inflating: simplified_coffee.csv   


## <strong> Spark and Pandas.</strong>

For this set of exercises, you must import data in Spark. After this first import, you can pass any dataset to Pandas for data analysis. At the end of each exercise (when the question is pertinent), you must return (reconvert) the dataframe in Spark.

Each time you do this conversion you must comment about this. Example:

<code> # creating a Spark dataframe </code>

<code> df = ... </code>

<code> # using Pandas and creating a Pandas dataframe </code>

<code> dfp = df. ... </code>

<code> # back to Spark </code>

<code> dfs = ... </code>

In [126]:
# Add your imports

# Create a Spark session
coffee_analysis = '/content/coffee_analysis.csv'
simplified_coffee = '/content/simplified_coffee.csv'

## <strong> Exercise 5.</strong> First import and data type
Import the .csv file in Spark DataFrame and show the structure of the dataframe.

Check and comment about the data type of each column. As you know, a good data analysis always starts from understanding your dataset.

In [146]:
# Write the command that creates (reads) a Spark DataFrame and stores the reference in the dfs variable

# Create a Spark session
spark = SparkSession.builder.appName("CoffeeAnalysis").getOrCreate()

#'''############## WRITE HERE YOUR CODE ##############'''
dfs = spark.read.csv(coffee_analysis,header=True,inferSchema=True)

# show the DataFrame schema
dfs.printSchema()

###### Write here your comment ##############
'''
name: string
this column will typically contain textual descriptions.

roaster: string
Specifies the roaster that roasted the coffee beans.

roast: string
Describes the level of roast for the coffee, e.g., light, medium, dark.
This is categorical data, and being stored as a string means it can accept various descriptors.

loc_country: string
The country where the coffee beans were sourced or where they originated.
Stored as a string, this column can take on the names of countries or regions.

origin_1 and origin_2: string
These columns seem to provide more specific details about the coffee's origin, possibly regions, estates, or farms within the broader loc_country.

100g_USD: double
Represents the price per 100 grams of the coffee in USD.
The double datatype means it's a floating-point number, allowing it to capture prices with decimal precision.

rating: integer
Provides a numerical rating for the coffee, presumably on a fixed scale (e.g., 1-10).
review_date: string
Indicates when the coffee was reviewed.

desc_1, desc_2, and desc_3: string
These columns provide tasting notes or descriptions of the coffee's flavor profile.
Storing these as strings allows for a wide range of textual descriptors.

In summary, this dataset seems to provide a comprehensive view of different coffee types,
their origins, cost, ratings, and taste profiles. To get the most out of this data,
ensure data quality (e.g., handle missing values, check for inconsistencies) and
consider converting the review_date to a date format if you plan on performing temporal analyses.
'''

root
 |-- name: string (nullable = true)
 |-- roaster: string (nullable = true)
 |-- roast: string (nullable = true)
 |-- loc_country: string (nullable = true)
 |-- origin_1: string (nullable = true)
 |-- origin_2: string (nullable = true)
 |-- 100g_USD: double (nullable = true)
 |-- rating: integer (nullable = true)
 |-- review_date: string (nullable = true)
 |-- desc_1: string (nullable = true)
 |-- desc_2: string (nullable = true)
 |-- desc_3: string (nullable = true)



"\nname: string\nthis column will typically contain textual descriptions.\n\nroaster: string\nSpecifies the roaster that roasted the coffee beans.\n\nroast: string\nDescribes the level of roast for the coffee, e.g., light, medium, dark.\nThis is categorical data, and being stored as a string means it can accept various descriptors.\n\nloc_country: string\nThe country where the coffee beans were sourced or where they originated.\nStored as a string, this column can take on the names of countries or regions.\n\norigin_1 and origin_2: string\nThese columns seem to provide more specific details about the coffee's origin, possibly regions, estates, or farms within the broader loc_country.\n\n100g_USD: double\nRepresents the price per 100 grams of the coffee in USD.\nThe double datatype means it's a floating-point number, allowing it to capture prices with decimal precision.\n\nrating: integer\nProvides a numerical rating for the coffee, presumably on a fixed scale (e.g., 1-10).\nreview_date

## <strong> Exercise 6.</strong> Data modeling choices
    
<p align="justify">
<font size="3">
Comment on the data-modeling choices and if you consider them correct from a general data-modeling point of view.
</font>
</p>


+--------------------+--------------------+------------+-------------+--------------------+--------------------+--------+------+-------------+--------------------+--------------------+--------------------+
|                name|             roaster|       roast|  loc_country|            origin_1|            origin_2|100g_USD|rating|  review_date|              desc_1|              desc_2|              desc_3|
+--------------------+--------------------+------------+-------------+--------------------+--------------------+--------+------+-------------+--------------------+--------------------+--------------------+
|“Sweety” Espresso...|              A.R.C.|Medium-Light|    Hong Kong|              Panama|            Ethiopia|   14.32|    95|November 2017|Evaluated as espr...|An espresso blend...|A radiant espress...|
|Flora Blend Espresso|              A.R.C.|Medium-Light|    Hong Kong|              Africa|        Asia Pacific|    9.05|    94|November 2017|Evaluated as espr...|An espresso b

In [None]:
###### Write here your comment
'''
From a general data-modeling perspective, the choices made for this dataset are largely appropriate, with a few considerations:

1. **Textual Data (string)**:
   - Columns like `name`, `roaster`, `roast`, `loc_country`, `origin_1`, `origin_2`, `desc_1`, `desc_2`, and `desc_3` are rightly modeled as strings. They represent names, descriptions, and categorical data that don't require numerical operations.

2. **Numeric Data**:
   - The choice of `double` for `100g_USD` is appropriate since prices can often have decimal values. This allows capturing precise pricing information.
   - Using an `integer` for `rating` suggests that the rating system is based on whole numbers, which can simplify the rating process and make it more interpretable for users. However, the range and scale of this rating system should be clearly defined somewhere to provide context.

3. **Date Data**:
   - The `review_date` column is modeled as a string. While this is not inherently wrong, if the dataset will be used for time series analysis, date comparisons, or if there's a need to extract month, year, or day from the dates, it would be more beneficial to store this column as a date or timestamp data type.

4. **Redundancy**:
   - The presence of both `origin_1` and `origin_2` might imply a hierarchical relationship (e.g., country and region). If that's the case, their naming could be more descriptive to make this relationship clear. If not, the potential for redundancy should be examined.

5. **Data Normalization**:
   - If the dataset is large and contains many repeating values (like roasters or countries), considering a normalized form might be beneficial. This involves creating separate tables for entities like roasters or countries and linking them using foreign keys. This reduces redundancy and can make updates more efficient.

6. **Descriptive Columns**:
   - Having three description columns (`desc_1`, `desc_2`, and `desc_3`) might indicate multiple tasting notes for each coffee. While this structure can accommodate varied descriptors, it may limit flexibility if more tasting notes are needed in the future. An alternative approach could involve a separate table for tasting notes, linked by a coffee ID.

In conclusion, the current data modeling choices are suitable for a straightforward analysis and representation of coffee data. However, as with any dataset, the modeling decisions should be aligned with the specific analytical needs and the nature of the operations intended to be performed on the data.
'''

## <strong> Exercise 7.</strong>  Best rated coffees
    
<p align="justify">
<font size="3">
We want to find the 5 best rated coffees.
</font>
</p>

In [128]:
############## WRITE YOUR CODE HERE ##############
# Sort by rating in descending order and take the first 5 rows
fifth_rating = dfs.orderBy(F.desc('rating')).limit(5).select('rating').collect()[-1][0]

# All coffees with ratings equal to or higher than the fifth place were then filtered out
result = dfs.filter(dfs.rating >= fifth_rating).select("name", "rating").orderBy(F.desc('rating')).show()



+--------------------+------+
|                name|rating|
+--------------------+------+
|GW01 Finca Sophia...|    98|
|Finca Sophia Gesh...|    98|
|     100% Kona SL-28|    97|
|Panama Ninety Plu...|    97|
|Kenya Karindundu ...|    97|
|     Rukera Espresso|    97|
|        Rukera Kenya|    97|
|Ethiopia Natural ...|    97|
|Ardent Ethiopia N...|    97|
|Testi Ayla Double...|    97|
|   Mama Cata Mokkita|    97|
|Colombia Finca El...|    97|
|Yemen Haraaz Red ...|    97|
+--------------------+------+



## <strong> Exercise 8.</strong> The best 10 roasters
<p align="justify">
<font size="3">
We want to find the ten best roasters. It is up to you to define and refine the ``best'' metric (best in the platform from the beginning of data collection, best in the last three years, best and with a minimum number of ratings, etc.).

Add the definition of your best metric in the comments.
</font>
</p>

In [76]:
dfs.show()
dfs.describe().show()


+--------------------+--------------------+------------+-------------+--------------------+--------------------+--------+------+-------------+--------------------+--------------------+--------------------+
|                name|             roaster|       roast|  loc_country|            origin_1|            origin_2|100g_USD|rating|  review_date|              desc_1|              desc_2|              desc_3|
+--------------------+--------------------+------------+-------------+--------------------+--------------------+--------+------+-------------+--------------------+--------------------+--------------------+
|“Sweety” Espresso...|              A.R.C.|Medium-Light|    Hong Kong|              Panama|            Ethiopia|   14.32|    95|November 2017|Evaluated as espr...|An espresso blend...|A radiant espress...|
|Flora Blend Espresso|              A.R.C.|Medium-Light|    Hong Kong|              Africa|        Asia Pacific|    9.05|    94|November 2017|Evaluated as espr...|An espresso b

In [131]:

MIN_NUM_RATINGS = 20

# 1. Filter out rows where roast rating is under 88
df_filtered = dfs.filter(col('rating') >= 88)

# 2. Filter out roasters with fewer than MIN_RATINGS
windowSpec = Window.partitionBy("roaster")

# Calculate the number of ratings for each roaster
df_with_num_ratings = df_filtered.withColumn("num_ratings", count("rating").over(windowSpec))

# Filter roasters with minimum number of ratings
df_filtered = df_with_num_ratings.filter(col("num_ratings") >= MIN_NUM_RATINGS)

# Use the previously defined windowSpec to calculate the average rating for each roaster
df_with_avg_score = df_filtered.withColumn("Average_Rating", avg("rating").over(windowSpec))

# Group by each roaster and calculate the average rating
grouped_by_roaster = df_with_avg_score.groupBy("roaster").agg(avg("Average_Rating").alias("Final_Average_Rating"))

# Sort the roasters by the average rating and select the top 10
top_10_roasters = grouped_by_roaster.sort(col("Final_Average_Rating").desc()).limit(10)

# Display the top 10 roasters and their average ratings, rounded to 2 decimal places
top_10_roasters.select("roaster", round(col("Final_Average_Rating"), 2).alias("Final_Average_Rating")).show()




+--------------------+--------------------+
|             roaster|Final_Average_Rating|
+--------------------+--------------------+
|Hula Daddy Kona C...|               95.13|
|           GK Coffee|               94.25|
|Dragonfly Coffee ...|               94.19|
|       Kakalove Cafe|               94.16|
|Bird Rock Coffee ...|               94.15|
|Red Rooster Coffe...|               93.95|
| JBC Coffee Roasters|                93.6|
|   Paradise Roasters|               93.54|
|Barrington Coffee...|               93.45|
|Big Shoulders Coffee|                93.2|
+--------------------+--------------------+



## <strong> Exercise 9.</strong> Best country
<p align="justify">
<font size="3">
If you were a roaster, in which three countries would you try to set your business?

Show them and refine your metric definition if necessary.
</font>
</p>


In [134]:
############## WRITE YOUR CODE HERE AND THE DESCRIPTION OF YOUR "SOMEHOW" ##############

# Country with High Rated Coffee Beans
avg_rating_by_origin1 = dfs.groupBy("origin_1").agg(mean("rating").alias("avg_rating_origin1"))
avg_rating_by_origin2 = dfs.groupBy("origin_2").agg(mean("rating").alias("avg_rating_origin2"))

# Price of Coffee Beans
avg_price_by_origin1 = dfs.groupBy("origin_1").agg(mean("100g_USD").alias("avg_price_origin1"))
avg_price_by_origin2 = dfs.groupBy("origin_2").agg(mean("100g_USD").alias("avg_price_origin2"))

# Join the two dataframes on their origin columns
joined_df = avg_rating_by_origin1.alias('a') \
    .join(avg_rating_by_origin2.alias('b'), col('a.origin_1') == col('b.origin_2'), "outer") \
    .join(avg_price_by_origin1.alias('c'), col('a.origin_1') == col('c.origin_1'), "outer") \
    .join(avg_price_by_origin2.alias('d'), col('a.origin_1') == col('d.origin_2'), "outer") \
    .select(
        coalesce(col("a.origin_1"), col("b.origin_2")).alias("country"),
        coalesce(col("avg_rating_origin1"), lit(0)).alias("avg_rating_origin1"),
        coalesce(col("avg_rating_origin2"), lit(0)).alias("avg_rating_origin2"),
        coalesce(col("avg_price_origin1"), lit(0)).alias("avg_price_origin1"),
        coalesce(col("avg_price_origin2"), lit(0)).alias("avg_price_origin2")
    )

# Compute a weighted score based on average ratings and prices
joined_df = joined_df.withColumn(
    "weighted_score",
    (col("avg_rating_origin1") + col("avg_rating_origin2")) / 2 * 0.7 - (col("avg_price_origin1") + col("avg_price_origin2")) / 2 * 0.3
)

# Sort by weighted_score in descending order and show the top countries
top_countries = joined_df.sort(desc("weighted_score")).select("country", "weighted_score")

top_countries.show(3)

+----------------+-----------------+
|         country|   weighted_score|
+----------------+-----------------+
|Northern Sumatra|65.28774999999999|
|        Saraguro|          64.7695|
|         Kochere|       64.6776875|
+----------------+-----------------+
only showing top 3 rows



## <strong> Exercise 10.</strong> Less common origin
<p align="justify">
<font size="3">
Show the ten less common origins of the coffees.

</font>
</p>

In [138]:
############## WRITE YOUR CODE HERE AND THE DESCRIPTION OF YOUR "SOMEHOW" ##############

# Count occurrences for origin_1
origin1_counts = dfs.groupBy("origin_1").count().withColumnRenamed("origin_1", "origin").withColumnRenamed("count", "count_1")

# Count occurrences for origin_2
origin2_counts = dfs.groupBy("origin_2").count().withColumnRenamed("origin_2", "origin").withColumnRenamed("count", "count_2")

# Full outer join on the origin column
all_origins = origin1_counts.join(origin2_counts, on="origin", how="outer")

# Replace null values with 0
all_origins = all_origins.withColumn("count_1", coalesce(col("count_1"), lit(0)))
all_origins = all_origins.withColumn("count_2", coalesce(col("count_2"), lit(0)))

# Compute the total count
combined_counts = all_origins.withColumn("total_count", col("count_1") + col("count_2"))

# Sort by total_count in ascending order and show the ten least common origins
least_common_origins = combined_counts.sort(col("total_count").asc()).limit(50)

least_common_origins.show(10)



+--------------------+-------+-------+-----------+
|              origin|count_1|count_2|total_count|
+--------------------+-------+-------+-----------+
|"""Big Island"" O...|      0|      1|          1|
|   Acatenango Valley|      1|      0|          1|
| Aceh Growing Region|      1|      0|          1|
|         Aceh Tengah|      0|      1|          1|
|      Agaro District|      1|      0|          1|
|           Al-Haimah|      1|      0|          1|
|           Al-Durrar|      1|      0|          1|
|Ahuachapán Depart...|      0|      1|          1|
|     Al Hayma Region|      1|      0|          1|
|            Al Mahjr|      1|      0|          1|
+--------------------+-------+-------+-----------+
only showing top 10 rows



## <strong> Exercise 11.</strong>  Propose your own analysis



    
<p align="justify">
<font size="3">
Propose here an analysis on the dataframe.
</font>
</p>

In [148]:
#Find 10 coffee with highest quality-price ratio
#which means higher rating per unit price

# Use the `withColumn` method to calculate the "value-for-money" index and add it as a new column to the DataFrame
dfs = dfs.withColumn("value_for_money", dfs.rating / dfs["100g_USD"])

# Sort the DataFrame based on the "value_for_money" index in descending order
sorted_df = dfs.sort(dfs.value_for_money.desc())

# Display the "name" and "value_for_money" columns from the sorted DataFrame
sorted_df.select("name", "value_for_money").show(10)


+--------------------+------------------+
|                name|   value_for_money|
+--------------------+------------------+
|           Cold Brew|             750.0|
|Karen J Kona Red ...| 552.9411764705882|
|     100% Guatemalan| 87.87878787878788|
|      Espresso Blend| 81.41592920353983|
|Asfaw Maru Ethiop...| 70.45454545454545|
|Ethiopia Nano Cha...| 70.14925373134328|
|Taiwan Natural Al...| 69.85294117647058|
|      414 Kenya SL34| 65.24822695035462|
|              5a Sur|61.904761904761905|
|         5a Poniente|61.904761904761905|
+--------------------+------------------+
only showing top 10 rows

