# Welcome to the Practical Exam :)
### <span style="color:purple"> **Please read the instructions, metadata and tips carefully in the practical exam PDF available on moodle!** </span>

**Also:**

-You can add as many cells as you like to answer the questions.

-You can make use of caching or persisting your RDDs or Dataframes, this may speed up performance.

**VERY IMPORTANT:** 

**- Don't forget to comment your code. Try to explain your reasoning to solve the exercises.**

**- Add your name to the .zip file submitted.**

**- Submit a .zip file. Files with other extensions will have a penalization.**

# PART A:

##  <span style="color:blue"> **0) Warmup (1 point):** </span>

#### a) Load the pages text file (pages.txt) into your DBFS:

In [0]:
%fs ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/PORDATA_Balance_of_trade___as_a_percentage_of_GDP.csv,PORDATA_Balance_of_trade___as_a_percentage_of_GDP.csv,19199,1718054974000
dbfs:/FileStore/tables/PORDATA_Compensation_of_employees__Euro__xlsx.csv,PORDATA_Compensation_of_employees__Euro__xlsx.csv,19589,1718054437000
dbfs:/FileStore/tables/PORDATA_Compensation_of_employees_per_capita__PPS_.csv,PORDATA_Compensation_of_employees_per_capita__PPS_.csv,19981,1718054046000
dbfs:/FileStore/tables/PORDATA_GDP__Euro_.csv,PORDATA_GDP__Euro_.csv,19989,1718055115000
dbfs:/FileStore/tables/PORDATA_Savings_rate_of_households.csv,PORDATA_Savings_rate_of_households.csv,18483,1718051156000
dbfs:/FileStore/tables/csv_countrydata.xlsx,csv_countrydata.xlsx,6207,1717975519000
dbfs:/FileStore/tables/csv_pordata.xlsx,csv_pordata.xlsx,6207,1717975470000
dbfs:/FileStore/tables/insurance.csv,insurance.csv,55628,1717703140000
dbfs:/FileStore/tables/movielens.txt,movielens.txt,616155,1716487292000
dbfs:/FileStore/tables/onlineretail.csv,onlineretail.csv,45580638,1717698189000


Using the %fs command enables us to display the DBFS root and access the file path for the pages dataset. The specific file path is: 
 dbfs:/FileStore/tables/pages.txt

#### b) Using the Page Rank algorithm, which is the most popular page? (0.5):
##### Perform all the necessary transformations to the text file and implement the Page Rank algorithm in order to answer this question

In [0]:
#The textFile method is designed to access and retrieve text file pages that have been previously imported and saved in the DBFS. It will then convert these pages into an RDD of strings, which will be stored in the variable 'pages'.

pages=sc.textFile("dbfs:/FileStore/tables/pages.txt")
pages.collect()

Out[243]: ['A -> B',
 'A -> C',
 'A -> D',
 'B -> D',
 'B -> A',
 'C -> D',
 'D -> E',
 'E -> D']

1 - Extracting key-value pairs from the RDD of strings, thereby transforming it into an RDD of tuples.

In [0]:
#We aim to execute a conversion process on the RDD of strings to obtain an RDD consisting of tuples. In this new RDD, the initial component of each tuple corresponds to the segment preceding the ' -> ' symbol, while the subsequent component represents the content following this symbol. To achieve this objective, we will employ the string.split (' -> ').

pages = pages.map(lambda string: (string.split(' -> ')[0], string.split(' -> ')[1]))
pages.collect()

Out[244]: [('A', 'B'),
 ('A', 'C'),
 ('A', 'D'),
 ('B', 'D'),
 ('B', 'A'),
 ('C', 'D'),
 ('D', 'E'),
 ('E', 'D')]

2 - Presently, having acquired our tuples and being aware that our task involves calculating contributions according to the page's rank and its neighboring pages, we move forward by structuring the data in a systematic manner that enables us to easily retrieve information about each page's neighbors. In order to achieve this, it was necessary to generate key-value tuples, with the key representing the page and the value consisting of a comprehensive list of all its neighbors.

In [0]:
#As previously stated, it is necessary to categorize the page based on its adjacent elements, therefore, the groupByKey function is utilized.

pages = pages.distinct().groupByKey()

pages.map(lambda tup : (tup[0], list(tup[1]))).collect()

Out[245]: [('C', ['D']),
 ('A', ['B', 'D', 'C']),
 ('B', ['D', 'A']),
 ('D', ['E']),
 ('E', ['D'])]

In [0]:
# The persist() is being used as a paradigm for data optimization. Due to its ability to minimize execution time and lower data processing costs, this approach is both time and money-efficient. 

pages = pages.persist()

3 - Our method must begin by assigning a default rank value of 1 to each page. Thus, we must produce an RDD for the rankings.

In [0]:
# The page is the key and the default rank of 1.0 is the value in the tuples we are generating.

ranks = pages.map(lambda tup : (tup[0], 1.0))

ranks.collect()

Out[247]: [('C', 1.0), ('A', 1.0), ('B', 1.0), ('D', 1.0), ('E', 1.0)]

In [0]:
# The getContribs function takes a list including the neighbor and the rank as input. It returns a tuple containing the new rank for each neighbor, which is dependent on the number of neighbors provided as well as the rankings from the past.


def getContribs(neighbours, rank):
  for neighbour in neighbours:
    
    yield (neighbour, rank / len(neighbours))
    
# We chose to employ n=20 since we found that the more iterations, the better the outcomes. (The page ranking gets progressively better with each iteration.)

n = 20
for i in range(n):
  
  data = pages.join(ranks)
  
  
  contribs = data.flatMap(lambda pageInfo : getContribs(pageInfo[1][0], pageInfo[1][1]))
  
  contribs = contribs.reduceByKey(lambda a, b : a+b)
  
  ranks = contribs.mapValues(lambda num : num * 0.85 + 0.15)
  
ranks.sortByKey().collect()

Out[248]: [('A', 0.24301279063208753),
 ('B', 0.2188536243756648),
 ('C', 0.2188536243756648),
 ('D', 2.2287330375074665),
 ('E', 2.090546923109115)]

PageRank algorithm gives web pages a ranking score based on links from other pages
• Higher scores given for more links and links from other high ranking pages;

This being sad, our highest rank is D, and our lowest rank B/C.

#### C) Will ignoring C's contributions change these results? (0.5):
#### In other words, exclude C as a referencing page and repeat the algorithm. 

***Note that we do not intend to discard links that reference C, only the links where C is referencing other pages.***

We will use the Page Rank Algorithm that we previously conducted one more for this exercise. But as of right now, Page C will no longer be a contributor. We will proceed in the same manner as in the last exercise after removing page C from the key.

In [0]:
# For this exercise, we are going to create a new variable called pages1, which is a list of tuples with the pages as the key and the neighbors that the pages are referencing as the value.

pages1 = pages.map(lambda tup : (tup[0], list(tup[1])))

pages1.collect()

Out[249]: [('C', ['D']),
 ('A', ['B', 'D', 'C']),
 ('B', ['D', 'A']),
 ('D', ['E']),
 ('E', ['D'])]

We choose to take the following approach in order to filter out all of the RDD tuples where the key was "C":

In [0]:
# We are filtering all the tuples with the key C using the filter() function.

pages1 = pages1.filter(lambda lt : lt[0] != 'C')

pages1.collect()

Out[250]: [('A', ['B', 'D', 'C']), ('B', ['D', 'A']), ('D', ['E']), ('E', ['D'])]

In [0]:
# The persist() function is being used as a paradigm for data optimization. It is a time- and money-efficient model since it lowers the cost of data processing and saves some execution time. 

pages1 = pages1.persist()

In [0]:
# A list of tuples with the page as the key and the default rank of 1.0 as the value is required for the algorithm.

ranks1 = pages1.map(lambda tup : (tup[0], 1.0))

ranks1.collect()

Out[252]: [('A', 1.0), ('B', 1.0), ('D', 1.0), ('E', 1.0)]

In [0]:
#The getContribs function takes a list including the neighbor and the rank as input. It then returns a tuple containing the new rank for each neighbor, which depends on the number of neighbors provided as well as the rankings from the past.

def getContribs(neighbours, rank):
  for neighbour in neighbours:
    
    yield (neighbour, rank / len(neighbours))

    
# We chose to employ n=20 since we found that the more iterations, the better the outcomes. (The page ranking gets progressively better with each iteration.)
    
n = 20
for i in range(n):
  
  data = pages1.join(ranks1)
  
  
  contribs = data.flatMap(lambda pageInfo : getContribs(pageInfo[1][0], pageInfo[1][1]))
  
  contribs = contribs.reduceByKey(lambda a, b : a+b)
  
  ranks1 = contribs.mapValues(lambda num : num * 0.85 + 0.15)
  
ranks1.sortByKey().collect()

Out[253]: [('A', 0.24301279063208753),
 ('B', 0.2188536243756648),
 ('C', 0.2188536243756648),
 ('D', 1.5726173327535289),
 ('E', 1.5064920933666854)]

After running the Page Rank Algorithm both with and without Cs as contributors, we can see that eliminating C, particularly for pages (D,E), changes the rankings' values.

##  <span style="color:blue"> **1) Data Load, Cleaning, and Feature Engineering (4 points):** </span>

#### a) Load the Online Retail file (onlineretail.csv) data into your DBFS:

In [0]:
%fs ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/PORDATA_Balance_of_trade___as_a_percentage_of_GDP.csv,PORDATA_Balance_of_trade___as_a_percentage_of_GDP.csv,19199,1718054974000
dbfs:/FileStore/tables/PORDATA_Compensation_of_employees__Euro__xlsx.csv,PORDATA_Compensation_of_employees__Euro__xlsx.csv,19589,1718054437000
dbfs:/FileStore/tables/PORDATA_Compensation_of_employees_per_capita__PPS_.csv,PORDATA_Compensation_of_employees_per_capita__PPS_.csv,19981,1718054046000
dbfs:/FileStore/tables/PORDATA_GDP__Euro_.csv,PORDATA_GDP__Euro_.csv,19989,1718055115000
dbfs:/FileStore/tables/PORDATA_Savings_rate_of_households.csv,PORDATA_Savings_rate_of_households.csv,18483,1718051156000
dbfs:/FileStore/tables/csv_countrydata.xlsx,csv_countrydata.xlsx,6207,1717975519000
dbfs:/FileStore/tables/csv_pordata.xlsx,csv_pordata.xlsx,6207,1717975470000
dbfs:/FileStore/tables/insurance.csv,insurance.csv,55628,1717703140000
dbfs:/FileStore/tables/movielens.txt,movielens.txt,616155,1716487292000
dbfs:/FileStore/tables/onlineretail.csv,onlineretail.csv,45580638,1717698189000


We may list the DBFS root and obtain the filepath for the Online Retail file by using the %fs command. The path to the file is: dbfs:/FileStore/tables/onlineretail.csv

#### b) Prepare the data for analysis (0.5):

##### For instance, load the data to a spark DataFrame, or create the necessary table in SparkSQL.

In [0]:
# Initially, the CSV file was imported, and the path was copied. Next, we load the data from the secretly imported CVS file using spark.read.csv.
# We set inferSchema=true since Spark will automatically retrieve the schema for each column. 
# We set the header to true because the first row of the CVS already had the column names.

data = spark.read.csv("dbfs:/FileStore/tables/onlineretail.csv", header="true", sep=",", inferSchema="true")

In [0]:
# Using the display function to see a summary of the loaded data. 

data.head(10)

Out[255]: [Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=2.55, CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate='12/1/2010 8:26', UnitPrice=2.75, CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365', StockCode='84029E', Description='RED WOOLLY HOTTIE WHITE HEART.', Quantity=6, InvoiceDate='12/1/2010 8:26', UnitPrice=3.39, CustomerID=17850, Country='United Kingdom'),
 Row(InvoiceNo='536365'

In [0]:
# The chache function will also print the columns and their corresponding data types, even though it is an alias of the persist function.
data.cache()

Out[256]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [0]:
#1. To change the data type of the InvoiceNo and the invoiceDate to int and timestamp we have to first import it the col and to_timestamp function.
#2. We are changing the type of the InvoiceDate column into the 'MM/dd/yyyy HH:mm' format.
#3. After the transformation we are just confirming if the type was successfully changed.

from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

#data = data.withColumn('InvoiceNo', col('InvoiceNo').cast('int'))

data = data.withColumn('InvoiceDate',to_timestamp(data.InvoiceDate,'MM/dd/yyyy HH:mm'))

data.cache()

#We comment the transformation of InvoiceNo because we realized that we had letters.

Out[257]: DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: int, Country: string]


After checking the schema we notice that the InvoiceNo & the InvoiceDate column's data types are a string so we proceded by changing it to int, and timestamp, respectfully.

In [0]:
#We think the only variable we should look for some inconsistencys is Country, so we print the total rows of data set and then, per each Country.
from pyspark.sql import functions as F

# Total number of rows in the dataset
total_rows = data.count()
print("Total rows in the dataset:", total_rows)

# Count of rows for each country
country_counts = data.groupBy('Country').count().orderBy('Country')
country_counts.show(country_counts.count(), truncate=False)

Total rows in the dataset: 541909
+--------------------+------+
|Country             |count |
+--------------------+------+
|Australia           |1259  |
|Austria             |401   |
|Bahrain             |19    |
|Belgium             |2069  |
|Brazil              |32    |
|Canada              |151   |
|Channel Islands     |758   |
|Cyprus              |622   |
|Czech Republic      |30    |
|Denmark             |389   |
|EIRE                |8196  |
|European Community  |61    |
|Finland             |695   |
|France              |8557  |
|Germany             |9495  |
|Greece              |146   |
|Hong Kong           |288   |
|Iceland             |182   |
|Israel              |297   |
|Italy               |803   |
|Japan               |358   |
|Lebanon             |45    |
|Lithuania           |35    |
|Malta               |127   |
|Netherlands         |2371  |
|Norway              |1086  |
|Poland              |341   |
|Portugal            |1519  |
|RSA                 |58    |
|Saudi

In [0]:
#  By analysing this, we came to the conclusion that we want to replace the "European Community" with "Unspecified", as the European Community are not a Country.
from pyspark.sql.functions import regexp_replace
data = data.withColumn('Country', regexp_replace('Country', 'European Community', 'Unspecified'))

country_counts = data.groupBy('Country').count().orderBy('Country')
country_counts.show(country_counts.count(), truncate=False)


+--------------------+------+
|Country             |count |
+--------------------+------+
|Australia           |1259  |
|Austria             |401   |
|Bahrain             |19    |
|Belgium             |2069  |
|Brazil              |32    |
|Canada              |151   |
|Channel Islands     |758   |
|Cyprus              |622   |
|Czech Republic      |30    |
|Denmark             |389   |
|EIRE                |8196  |
|Finland             |695   |
|France              |8557  |
|Germany             |9495  |
|Greece              |146   |
|Hong Kong           |288   |
|Iceland             |182   |
|Israel              |297   |
|Italy               |803   |
|Japan               |358   |
|Lebanon             |45    |
|Lithuania           |35    |
|Malta               |127   |
|Netherlands         |2371  |
|Norway              |1086  |
|Poland              |341   |
|Portugal            |1519  |
|RSA                 |58    |
|Saudi Arabia        |10    |
|Singapore           |229   |
|Spain    

#### c) There are missing values in the columns CustomerID and Description. Discard all records that have null values in such columns (0.5):


We will now look for any missing data. For that we needed SQL functions isnull, when, count, col

In [0]:
# As previously indicated, we are importing a few methods that are required to examine the total amount of missing data for every column.

from pyspark.sql.functions import isnull, when, count, col
null_miss = data.select([count(when(isnull(c), c)).alias(c) for c in data.columns]).show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|       1454|       0|          0|        0|    135080|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



It is evident that there are 1454 missing values in the CustomerID and 135080 in the Description.

In [0]:
data1 = data.dropna(how="any", subset= ["CustomerID","Description"])
null_miss = data1.select([count(when(isnull(c), c)).alias(c) for c in data1.columns]).show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
|        0|        0|          0|       0|          0|        0|         0|      0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+



We removed the Null Values from the CustomerID and Description columns using dropna.

In [0]:
# We choose to establish the Missing_Count variable in order to clearly see how many rows were dropped. It simply calculates the difference between the number of rows in the original dataset and the number of rows in the data set where the missing values were dropped.

Missing_Count = data.count() - data1.count()
print("Number of rowns in the uncleaned data set: ", data.count())
print("Total of rows discarded:", Missing_Count)

Number of rowns in the uncleaned data set:  541909
Total of rows discarded: 135080


541909 rows make up the uncleaned data collection.

135080 rows were eliminated in total.

#### d) Discard all the records in which Quantity has a negative number (1):

In [0]:
# Even though the filter function was an option, we chose to use the where function instead. We made a replica of the private dataset, and we are only taking into account the rows where the quantity is greater than or equal to zero by using the where function. However, we thought it would be interesting to find out how many rows were removed, so we made a second duplicate of the data set and only took into account the rows with negative quantities. 

data_positive = data1.where("Quantity >= 0")
data_negative = data1.where("Quantity < 0").count()
print("There were found and discarded:", data_negative , "rows!")

There were found and discarded: 8905 rows!


#### e) Create a new feature/column that corresponds to the amount spent (amount_spent) per record (1):
##### This can be done by computing amount_spent = quantity*unit_price.

We chose to utilize the withColumn function because we needed to add a new column to our Dataframe, as demonstrated below:

In [0]:
# Since amount_spent is a monetary variable, we choose to import the round function in order to round the newly inserted column to have two decimal values.

from pyspark.sql.functions import round
data_amount = data_positive.withColumn("amount_spent", round((data_positive["Quantity"] * data_positive["UnitPrice"]),2))
data_amount.head(10)

Out[264]: [Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=2.55, CustomerID=17850, Country='United Kingdom', amount_spent=15.3),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=3.39, CustomerID=17850, Country='United Kingdom', amount_spent=20.34),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=2.75, CustomerID=17850, Country='United Kingdom', amount_spent=22.0),
 Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, InvoiceDate=datetime.datetime(2010, 12, 1, 8, 26), UnitPrice=3.39, CustomerID=17850, Country='United Kingdom', amount_spent=20.34),
 Row(InvoiceNo='536365', StockCode='84029E', Descriptio

#### f) Parse the invoice_date column and create one column for each of the following bits of information: year; month; day; hour. Then filter out all the invoices from 2011 (1):

Initially, we created the requested additional columns and set the column name to a more standardized name using alias. Following that, we limited the data to only include 2010 by filtering by year. Since the newly introduced columns now include the same date-related information, we ultimately made the decision to remove the InvoiceDate field. To provide you a summary of the changes we made, we also put the dataframe on display.

In [0]:
# To parse the invoiceDate, we imported the year, month, dayofmonth, and hour function.

from pyspark.sql.functions import year, month, dayofmonth, hour

data_date = data_amount.select('*', year(data_amount["InvoiceDate"]).alias('Year'), month(data_amount["InvoiceDate"]).alias('Month'), dayofmonth(data_amount["InvoiceDate"]).alias('Day'), hour(data_amount["InvoiceDate"]).alias('Hour'))

data_parse= data_date.filter("Year != 2011")
data_parse = data_parse.drop("InvoiceDate")

data_parse.head(10)

Out[265]: [Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, UnitPrice=2.55, CustomerID=17850, Country='United Kingdom', amount_spent=15.3, Year=2010, Month=12, Day=1, Hour=8),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, UnitPrice=3.39, CustomerID=17850, Country='United Kingdom', amount_spent=20.34, Year=2010, Month=12, Day=1, Hour=8),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, UnitPrice=2.75, CustomerID=17850, Country='United Kingdom', amount_spent=22.0, Year=2010, Month=12, Day=1, Hour=8),
 Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, UnitPrice=3.39, CustomerID=17850, Country='United Kingdom', amount_spent=20.34, Year=2010, Month=12, Day=1, Hour=8),
 Row(InvoiceNo='536365', StockCode='84029E', Description='RED WOOLLY HOTTIE WHITE HEART.', Quantity=6, UnitPrice=3.

In [0]:
#As we droped some rows, we will print the rows we still have on our dataset.

print("Removed", data_date.count() - data_parse.count(), "rows!")
print("We were left with", data_parse.count(),"remaining rows!")

Removed 371764 rows!
We were left with 26160 remaining rows!


## <span style="color:blue"> **2) Data Exploration (10 points):** </span>
### Using the ***data that you cleaned*** in the previous exercises, answer the following questions:

#### a) What are the top 5 customers with the highest number of orders? (1)
##### Remember that the same invoice number means one order even if it appears in many rows!

*Tip: if you opt for a query you might need to include the count of the number of distinct InvoiceNo!*

Imported SQL Context and import PySpark SQL functions

In [0]:
from pyspark.sql import SQLContext
import pyspark.sql.functions as fct

In [0]:
data_parse.groupBy('CustomerID').agg(fct.countDistinct('InvoiceNo').alias('Amount_of_Orders')).sort('Amount_of_Orders', ascending = False).limit(5).toPandas()

Unnamed: 0,CustomerID,Amount_of_Orders
0,12748,35
1,17850,34
2,14911,13
3,15061,12
4,13777,10


As previously indicated, the first step involved grouping our customers based on their CustomerID. Next, we needed to count the number of invoices, keeping in mind that one invoice number corresponds to one order. We dubbed the column Amount_of_Orders appropriately, and then we sorted by this, setting the ascending to False and limit to 5, in order to display the requested information.

From the table above we can see that our top 5 Customers were the one with the respective ID: 12748, 17850, 14911, 15061 and 13777.

#### b) What are the top 5 customers with the highest money spent? (1)
##### Consider using the amount_spent column you created since it is an auxiliary column that multiplies the UnitPrice by the Quantity!

Here, we also sorted our customers by Total Amount Spent, restricting the view to 5 to identify our top 5 customers, and grouped them by CustomerID. We then added up our customers' amount_spent.

In [0]:
data_top_5 = data_parse.groupBy("CustomerID").sum("amount_spent").sort("sum(amount_spent)", ascending = False).limit(5)

# We choose to rename the column to Total_Amount_Spent in order to facilitate interpretation.
data_top_5.withColumnRenamed("sum(amount_spent)","Total_Amount_Spent").toPandas()

Unnamed: 0,CustomerID,Total_Amount_Spent
0,18102,27834.61
1,15061,19950.66
2,16029,13112.52
3,14646,8591.88
4,14911,7737.94


The five most expensive clients, in order of total expenditure, were: 18021, 15061, 16029, 14646, and 14911.

#### c) What are the top 20 countries with the most orders? (1)
##### Same logic as in exercise 1, but instead of considering the customers we are considering the country.

Using the same methodology as in exercise 1, we are examining our top 20 countries based on the total number of orders. 
We didn't set a restriction of 20 because, after a few attempts, we kept getting countries with amount_of_Order = 1. Considering that we wanted to include the top 20 nations according to that variable, we added the countries where the amount of order was 1. Since we have 6 countrys with 1 order, we consider that we should show the top 21.

In [0]:
data_parse.groupBy('Country').agg(fct.countDistinct('InvoiceNo').alias('Amount_of_Orders')).sort('Amount_of_Orders', ascending = False).toPandas()

Unnamed: 0,Country,Amount_of_Orders
0,United Kingdom,1291
1,Germany,30
2,France,21
3,EIRE,15
4,Portugal,6
5,Belgium,5
6,Lithuania,4
7,Spain,4
8,Japan,3
9,Australia,3


#### d) What are the top 5 products that are purchased the most amount of times? (1)
##### Note that by most amount of times, we do NOT consider the quantity of the order, only the fact that that item was purchased.

In [0]:
# We began by classifying the products according to their descriptions in order to clearly see which ones were the most popular purchases. Following that, the InvoiceNo was uniquely counted, and we renamed that count column using the alias. Additionally, we restricted the display to 5 products and sorted that column in descending order.

data_parse.groupBy("Description").agg(fct.countDistinct("InvoiceNo").alias("Total_Purchases")).sort("Total_Purchases", ascending = False).limit(5).toPandas()

Unnamed: 0,Description,Total_Purchases
0,WHITE HANGING HEART T-LIGHT HOLDER,201
1,REGENCY CAKESTAND 3 TIER,138
2,HAND WARMER BABUSHKA DESIGN,137
3,PAPER CHAIN KIT 50'S CHRISTMAS,132
4,SCOTTIE DOG HOT WATER BOTTLE,130


The table above shows the top 5 products that were purchased the most amount of times.

#### e)  What is the name (Description) of the most expensive item? (3)

In [0]:
# In order to execute various transformations ahead, we are importing some PySpark SQL functions, such as the max and the avg.
from pyspark.sql.functions import max, avg

In [0]:
# The products are grouped based on their description. The UnitPrice was then uniquely tallied, and the column was sorted in descending order to display only 10 products. We took that action to check if the product prices changed over time.

data_parse.groupBy("Description").agg(fct.countDistinct("UnitPrice")).sort("count(UnitPrice)", ascending = False).limit(10).toPandas()

Unnamed: 0,Description,count(UnitPrice)
0,Manual,18
1,POSTAGE,5
2,PLEASE ONE PERSON METAL SIGN,4
3,PACK OF 72 SKULL CAKE CASES,3
4,HEART OF WICKER LARGE,3
5,WOODEN OWLS LIGHT GARLAND,3
6,VANILLA INCENSE IN TIN,3
7,RED TOADSTOOL LED NIGHT LIGHT,3
8,REGENCY CAKESTAND 3 TIER,3
9,PACK OF 60 PINK PAISLEY CAKE CASES,3


After confirming our assumptions by first examining our items to see if the unit costs had changed over time, we decided to offer the remedy in two alternative ways:

1 - By grouping our products by the avg Unit Price

2 - By grouping our products by the max Unit Price

In [0]:
#In this first method, we'll add up the products in the group based on their average price and round the results to two decimal places.We renamed the column using an alias and sorted it by decreasing order using the same reasoning as before. Lastly, we restricted the view so that it only displayed the best item.

data_parse.groupBy("Description").agg(fct.round(avg("UnitPrice"),2).alias("AVG_Unit_Price")).sort("AVG_Unit_Price", ascending = False).limit(1).toPandas()

Unnamed: 0,Description,AVG_Unit_Price
0,LOVE SEAT ANTIQUE WHITE METAL,175.0



With a price per unit of 175, Love Seat Antique White Metal was, on average, the most expensive product when taking into account the output above.

In [0]:
# The second strategy uses the maximum product price together with the same logic as the previous one.

data_parse.groupBy("Description").agg(fct.round(max("UnitPrice"),2).alias("MAX_Unit_Price")).sort("MAX_Unit_Price", ascending = False).limit(1).toPandas()

Unnamed: 0,Description,MAX_Unit_Price
0,VINTAGE RED KITCHEN CABINET,295.0


With a price per unit of 295, the Vintage Red Kitchen Cabinet was the most costly product when looking at the output above.

#### f) What are the top ten most popular words in the Description column? (3)

##### This is a word count exercise.
*Tip: you can use `.rdd` to convert a dataframe to an RDD. This will be an RDD of row objects. Make sure your top 5 words are actually words, you may have to remove empty strings!*

In [0]:
# We are making a duplicate of the cleaned (parse) dataset specifically for this exercise, but we are excluding the description column.
# We are then developing an RDD after that. This string collection has each string representing a phrase.
# We are showing the first 10 lines.

data_description=data_parse.select("Description")

dataRDD=data_description.rdd.flatMap(list)
dataRDD.take(10)

Out[276]: ['WHITE HANGING HEART T-LIGHT HOLDER',
 'WHITE METAL LANTERN',
 'CREAM CUPID HEARTS COAT HANGER',
 'KNITTED UNION FLAG HOT WATER BOTTLE',
 'RED WOOLLY HOTTIE WHITE HEART.',
 'SET 7 BABUSHKA NESTING BOXES',
 'GLASS STAR FROSTED T-LIGHT HOLDER',
 'HAND WARMER UNION JACK',
 'HAND WARMER RED POLKA DOT',
 'ASSORTED COLOUR BIRD ORNAMENT']

First, we divided each line into its component words so that each word could be added to a list of strings.

In [0]:
data_split = dataRDD.flatMap(lambda line: line.split(" "))

data_split.collect

Out[277]: <bound method RDD.collect of PythonRDD[3964] at RDD at PythonRDD.scala:58>

Since the same word can be expressed in multiple ways (White, WHITE), we are lowering the data such that it becomes a normalized list of strings.

In [0]:
data_lowered = data_split.map(lambda x: str(x).lower())

data_lowered.collect

Out[278]: <bound method RDD.collect of PythonRDD[3965] at RDD at PythonRDD.scala:58>

As we read through the description, we discovered that several words—like "LIGHT," "LIGHTS,"—appeared written with different punctuation. As a result, we had to figure out how to remove it.

In [0]:
import string
data_punctuation = data_lowered.map(lambda word: word.translate(str.maketrans('', '', string.punctuation)))

data_punctuation.collect

Out[279]: <bound method RDD.collect of PythonRDD[3966] at RDD at PythonRDD.scala:58>

There were some problems we had with strings like this, '', in our list. Consequently, we needed to figure out how to limit the strings in our list such that len(string)>0.

In [0]:
data_clean = data_punctuation.filter(lambda x: len(x) > 0)

Each word in the word count had to be given a 1 so that we could add up all the 1s for each word and find the total number of times each word occurred. With that, we produced tuples with the word as the key and 1 as the value. We then reducedByKey to get the sum.

In [0]:
word_count = data_clean.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)

word_count.collect()

Out[281]: [('heart', 2067),
 ('metal', 983),
 ('water', 1216),
 ('set', 1996),
 ('nesting', 64),
 ('warmer', 651),
 ('polka', 26),
 ('bird', 363),
 ('poppys', 179),
 ('charlotte', 197),
 ('mug', 614),
 ('of', 1924),
 ('blocks', 51),
 ('alphabet', 50),
 ('paris', 44),
 ('panda', 7),
 ('gift', 210),
 ('inflatable', 16),
 ('card', 470),
 ('of4', 38),
 ('i', 277),
 ('toadstool', 95),
 ('led', 56),
 ('christmas', 1600),
 ('finish', 249),
 ('frame', 359),
 ('plates', 24),
 ('cases', 485),
 ('charlielola', 36),
 ('gin', 73),
 ('cup', 236),
 ('flowers', 67),
 ('money', 135),
 ('cloche', 16),
 ('tidy', 54),
 ('baskets', 23),
 ('board', 89),
 ('chalkboard', 89),
 ('flower', 182),
 ('bread', 64),
 ('ribbons', 244),
 ('decoration', 491),
 ('10', 179),
 ('flying', 31),
 ('rain', 38),
 ('hat', 14),
 ('deluxe', 39),
 ('soldier', 27),
 ('24', 93),
 ('orange', 73),
 ('12', 492),
 ('purse', 138),
 ('poorly', 76),
 ('4', 298),
 ('family', 36),
 ('silver', 380),
 ('mirror', 70),
 ('served', 11),
 ('im', 1

In [0]:
word_count.sortBy(lambda key: key[1],ascending=False).take(10)

Out[282]: [('red', 2318),
 ('heart', 2067),
 ('set', 1996),
 ('of', 1924),
 ('retrospot', 1909),
 ('christmas', 1600),
 ('bag', 1533),
 ('box', 1432),
 ('white', 1343),
 ('design', 1337)]

These are the 10 words that most appear in our csv description and we sorted them by descending order.

# PART B:

## <span style="color:blue"> **Machine Learning (3 points):** </span>

#### a) Load the insurance dataset and perform the following operations: (0.5)
##### - Filter out (remove from the dataset) the region column.
##### - Encode the values in the smoker column in the following manner:
1. If the person is a smoker his/her value in this column should be 1
2. If the person is not a smoker his/her value in this column should be 0

In [0]:
%fs ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/PORDATA_Balance_of_trade___as_a_percentage_of_GDP.csv,PORDATA_Balance_of_trade___as_a_percentage_of_GDP.csv,19199,1718054974000
dbfs:/FileStore/tables/PORDATA_Compensation_of_employees__Euro__xlsx.csv,PORDATA_Compensation_of_employees__Euro__xlsx.csv,19589,1718054437000
dbfs:/FileStore/tables/PORDATA_Compensation_of_employees_per_capita__PPS_.csv,PORDATA_Compensation_of_employees_per_capita__PPS_.csv,19981,1718054046000
dbfs:/FileStore/tables/PORDATA_GDP__Euro_.csv,PORDATA_GDP__Euro_.csv,19989,1718055115000
dbfs:/FileStore/tables/PORDATA_Savings_rate_of_households.csv,PORDATA_Savings_rate_of_households.csv,18483,1718051156000
dbfs:/FileStore/tables/csv_countrydata.xlsx,csv_countrydata.xlsx,6207,1717975519000
dbfs:/FileStore/tables/csv_pordata.xlsx,csv_pordata.xlsx,6207,1717975470000
dbfs:/FileStore/tables/insurance.csv,insurance.csv,55628,1717703140000
dbfs:/FileStore/tables/movielens.txt,movielens.txt,616155,1716487292000
dbfs:/FileStore/tables/onlineretail.csv,onlineretail.csv,45580638,1717698189000


We may list the DBFS root and obtain the filepath for the insurance dataset by using the %fs command./FileStore/tables/insurance.csv is the filepath.

In [0]:
# Initially, we copied the path and imported the CSV file. Next, we load the data from the secretly imported CVS file using spark.read.csv.We set inferSchema=true since Spark will automatically retrieve the schema for each column. We set the header to true because the first row of the CVS already had the column names.

insurance = spark.read.csv("/FileStore/tables/insurance.csv", header="true", sep=",", inferSchema="true")

In [0]:
# As previously stated, the persist() function that we previously used is an alias for the cache() function. This was done so that we could comprehend each column's data type better.
insurance.cache()

Out[284]: DataFrame[age: int, sex: string, bmi: double, children: int, smoker: string, region: string, charges: double]

In [0]:
#Showing the dataset so that we can see an overview of our analysis.

insurance.head(10)

Out[285]: [Row(age=19, sex='female', bmi=27.9, children=0, smoker='yes', region='southwest', charges=16884.924),
 Row(age=18, sex='male', bmi=33.77, children=1, smoker='no', region='southeast', charges=1725.5523),
 Row(age=28, sex='male', bmi=33.0, children=3, smoker='no', region='southeast', charges=4449.462),
 Row(age=33, sex='male', bmi=22.705, children=0, smoker='no', region='northwest', charges=21984.47061),
 Row(age=32, sex='male', bmi=28.88, children=0, smoker='no', region='northwest', charges=3866.8552),
 Row(age=31, sex='female', bmi=25.74, children=0, smoker='no', region='southeast', charges=3756.6216),
 Row(age=46, sex='female', bmi=33.44, children=1, smoker='no', region='southeast', charges=8240.5896),
 Row(age=37, sex='female', bmi=27.74, children=3, smoker='no', region='northwest', charges=7281.5056),
 Row(age=37, sex='male', bmi=29.83, children=2, smoker='no', region='northeast', charges=6406.4107),
 Row(age=60, sex='female', bmi=25.84, children=0, smoker='no', region='n

In [0]:
# importing rge StringIndexer so we can create the desired column encoded.
from pyspark.ml.feature import StringIndexer

SmokerIndexer = StringIndexer(inputCol ="smoker", outputCol ="Smoker_Index")

insurance_1 = SmokerIndexer.fit(insurance).transform(insurance)

insurance_1.head(10)

Out[286]: [Row(age=19, sex='female', bmi=27.9, children=0, smoker='yes', region='southwest', charges=16884.924, Smoker_Index=1.0),
 Row(age=18, sex='male', bmi=33.77, children=1, smoker='no', region='southeast', charges=1725.5523, Smoker_Index=0.0),
 Row(age=28, sex='male', bmi=33.0, children=3, smoker='no', region='southeast', charges=4449.462, Smoker_Index=0.0),
 Row(age=33, sex='male', bmi=22.705, children=0, smoker='no', region='northwest', charges=21984.47061, Smoker_Index=0.0),
 Row(age=32, sex='male', bmi=28.88, children=0, smoker='no', region='northwest', charges=3866.8552, Smoker_Index=0.0),
 Row(age=31, sex='female', bmi=25.74, children=0, smoker='no', region='southeast', charges=3756.6216, Smoker_Index=0.0),
 Row(age=46, sex='female', bmi=33.44, children=1, smoker='no', region='southeast', charges=8240.5896, Smoker_Index=0.0),
 Row(age=37, sex='female', bmi=27.74, children=3, smoker='no', region='northwest', charges=7281.5056, Smoker_Index=0.0),
 Row(age=37, sex='male', bmi=

#### b) Perform a multiple linear regression in an attempt to predict the health insurance charges based on the clients information. At the end report the coeficients, the RMSE and the r2 of the model: (2.5)

##### Make sure to consider any transformation necessary to the dataframe (e.g., deriving new variables, dropping existing ones, scale transformations , etc...) Justify all your decisions.

In [0]:
#Import of libraries necessaries
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

We carried out some exploratory research in order to gain a deeper understanding of the data in our dataset. Using the dataset from the prior exercise, we carried on.

In [0]:
# We made the decision to reexamine the data types in order to obtain a deeper understanding of the data on which we will be performing the regression.
insurance_1.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: integer (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)
 |-- Smoker_Index: double (nullable = false)



In [0]:
display(insurance_1.describe())

summary,age,sex,bmi,children,smoker,region,charges,Smoker_Index
count,1338.0,1338,1338.0,1338.0,1338,1338,1338.0,1338.0
mean,39.20702541106129,,30.663396860986538,1.0949177877429,,,13270.422265141257,0.2047832585949177
stddev,14.049960379216149,,6.098186911679012,1.205492739781914,,,12110.011236693992,0.4036940375456171
min,18.0,female,15.96,0.0,no,northeast,1121.8739,0.0
max,64.0,male,53.13,5.0,yes,southwest,63770.42801,1.0


As all of our columns have the same count as the number of rows, our summary statistics show that there are no missing data. Additionally, we can see that our target column, charges, has a significant discrepancy between its minimum and maximum values. Based on this, we can deduce that the column is highly skewed. Upon looking at the age column, we can see that our sample is not that elderly, with a mean age of 39 and a standard deviation of 14. The majority of our group is overweight, as indicated by the bmi mean of thirty. The children variable indicates how many children there are in this situation; the mean is 1, the max is 5, and the min is 0.

As with the Smoker, we need to pay close attention to the "sex" column because we want to create the correlation matrix. In order to perform the correlation matrix between all the variables, we must encode that variable.

In [0]:
# Using the StringIndexer to encode the sex variable like we did before.
SexIndexer = StringIndexer(inputCol ="sex", outputCol ="Sex_Index")

insurance_1 = SexIndexer.fit(insurance_1).transform(insurance_1)

insurance_1.head(10)

Out[290]: [Row(age=19, sex='female', bmi=27.9, children=0, smoker='yes', region='southwest', charges=16884.924, Smoker_Index=1.0, Sex_Index=1.0),
 Row(age=18, sex='male', bmi=33.77, children=1, smoker='no', region='southeast', charges=1725.5523, Smoker_Index=0.0, Sex_Index=0.0),
 Row(age=28, sex='male', bmi=33.0, children=3, smoker='no', region='southeast', charges=4449.462, Smoker_Index=0.0, Sex_Index=0.0),
 Row(age=33, sex='male', bmi=22.705, children=0, smoker='no', region='northwest', charges=21984.47061, Smoker_Index=0.0, Sex_Index=0.0),
 Row(age=32, sex='male', bmi=28.88, children=0, smoker='no', region='northwest', charges=3866.8552, Smoker_Index=0.0, Sex_Index=0.0),
 Row(age=31, sex='female', bmi=25.74, children=0, smoker='no', region='southeast', charges=3756.6216, Smoker_Index=0.0, Sex_Index=1.0),
 Row(age=46, sex='female', bmi=33.44, children=1, smoker='no', region='southeast', charges=8240.5896, Smoker_Index=0.0, Sex_Index=1.0),
 Row(age=37, sex='female', bmi=27.74, childre

In [0]:
#Having columns that wouldn't fit the coorelation matrix, we divide them in 2 segments. Categorical and numerical.

from pyspark.sql import SparkSession

# Assuming `insurance_1` is your PySpark DataFrame
spark = SparkSession.builder.getOrCreate()

# Create insurance_categorical DataFrame with 'region' column
insurance_categorical = insurance_1.select('region')

# Create insurance_numerical DataFrame with specified columns
insurance_numerical = insurance_1.select('age', 'bmi', 'children', 'charges', 'Smoker_Index', 'Sex_Index')

# Show the first few rows of each DataFrame
insurance_categorical.head(10)
insurance_numerical.head(10)


Out[291]: [Row(age=19, bmi=27.9, children=0, charges=16884.924, Smoker_Index=1.0, Sex_Index=1.0),
 Row(age=18, bmi=33.77, children=1, charges=1725.5523, Smoker_Index=0.0, Sex_Index=0.0),
 Row(age=28, bmi=33.0, children=3, charges=4449.462, Smoker_Index=0.0, Sex_Index=0.0),
 Row(age=33, bmi=22.705, children=0, charges=21984.47061, Smoker_Index=0.0, Sex_Index=0.0),
 Row(age=32, bmi=28.88, children=0, charges=3866.8552, Smoker_Index=0.0, Sex_Index=0.0),
 Row(age=31, bmi=25.74, children=0, charges=3756.6216, Smoker_Index=0.0, Sex_Index=1.0),
 Row(age=46, bmi=33.44, children=1, charges=8240.5896, Smoker_Index=0.0, Sex_Index=1.0),
 Row(age=37, bmi=27.74, children=3, charges=7281.5056, Smoker_Index=0.0, Sex_Index=1.0),
 Row(age=37, bmi=29.83, children=2, charges=6406.4107, Smoker_Index=0.0, Sex_Index=0.0),
 Row(age=60, bmi=25.84, children=0, charges=28923.13692, Smoker_Index=0.0, Sex_Index=1.0)]

##CORRELATION MATRIX

In [0]:
# We choose to use Pandas Seaborn to complete the correlation matrix.
from pyspark.mllib.stat import Statistics
import pandas as pd

# outcome is compatible with Seaborn's heatmap.
def compute_correlation_matrix(df, method='pearson'):
    
    df_rdd = df.rdd.map(lambda row: row[0:])
    corr_mat = Statistics.corr(df_rdd, method=method)
    corr_mat_df = pd.DataFrame(corr_mat,
                    columns=df.columns, 
                    index=df.columns)
    return corr_mat_df

In [0]:
compute_correlation_matrix(insurance_numerical, method='pearson')

Unnamed: 0,age,bmi,children,charges,Smoker_Index,Sex_Index
age,1.0,0.109272,0.042469,0.299008,-0.025019,0.020856
bmi,0.109272,1.0,0.012759,0.198341,0.00375,-0.046371
children,0.042469,0.012759,1.0,0.067998,0.007673,-0.017163
charges,0.299008,0.198341,0.067998,1.0,0.787251,-0.057292
Smoker_Index,-0.025019,0.00375,0.007673,0.787251,1.0,-0.076185
Sex_Index,0.020856,-0.046371,-0.017163,-0.057292,-0.076185,1.0


We can see from our correlation matrix that there isn't much of a link between the variables. With the exception of Smoker_Index, which has a 0.787 correlation with our goal variable.

##Train/Test Split

In [0]:
# We decided to split our dataset in 70/30.

(Insurance_Train,Insurance_Test) = insurance_numerical.randomSplit([0.7,0.3])

print("We have %d training examples and %d test examples." % (Insurance_Train.count(), Insurance_Test.count()))

We have 914 training examples and 424 test examples.


##Outliers

We are taking a broad look at potential outliers that could exist for certain variables. by switching to a boxplot and presenting the relevant variable.

In [0]:
display(Insurance_Train.select('age'))

age
18
18
18
18
18
18
18
18
18
18


Databricks visualization. Run in Databricks to view.

In [0]:
display(Insurance_Train.select('bmi'))

bmi
15.96
20.79
21.47
21.565
21.66
21.78
22.99
23.085
23.21
23.32


Databricks visualization. Run in Databricks to view.

In [0]:
display(Insurance_Train.select('children'))

children
0
0
0
0
0
2
0
0
0
1


Databricks visualization. Run in Databricks to view.

In [0]:
display(Insurance_Train.select('charges'))

charges
1694.7964
1607.5101
1702.4553
13747.87235
14283.4594
11884.04858
1704.5681
1704.70015
1121.8739
1711.0268


Databricks visualization. Run in Databricks to view.

We can observe that only our bmi variable appears to have missing values when we look at the boxplots for our numerical variables. Due to the extreme unusualness of these values in our research, we decided to remove BMIs exceeding 50.

In [0]:
#We used the filter function to remove any bmi values more than 50 after examining the earlier boxplots.
Insurance_Train_drop = Insurance_Train.filter("bmi < 50")

In [0]:
# To determine the number of rows removed as a result of the bmi variable filter being explicitly mentioned

print("There were removed", Insurance_Train.count() - Insurance_Train_drop.count(), "rows!")
print("We were left with", Insurance_Train_drop.count(),"remaining rows!")

There were removed 2 rows!
We were left with 912 remaining rows!


##Feature Engineering

In [0]:
# In order to build dummy variables in advance, we are creating bmi categories.

def bmicat(bmi):
  
  if bmi < 18.5:
    return "Underweight"
  
  elif bmi >= 18.5 and bmi <= 24.9:
    return "Normal"
  
  elif bmi >= 25 and bmi <= 29.9:
    return "Overweight"
  
  else: 
    return "Obese"

To construct the BMI's categories, we made bins for the feature.

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

bin_udf = udf(bmicat, StringType() )

Insurance_Train = Insurance_Train_drop.withColumn("bmi_group", bin_udf("bmi"))
Insurance_Test = Insurance_Test.withColumn("bmi_group", bin_udf("bmi"))

Since our model cannot contain categorical variables, we must now encode this new variable. Initially, we believed that the BMI variable could be encoded using the StringIndex. However, after some thought, we realized that the dummy variables would be a better option because the values assigned in the StringIndex were out of order.

In [0]:
#Building dummy variables using the train dataset's bmi_group.

from pyspark.sql.functions import when
Insurance_Train = Insurance_Train.withColumn("Overweight", when(Insurance_Train.bmi_group == "Overweight", 1).otherwise(0))
Insurance_Train = Insurance_Train.withColumn("Obese", when(Insurance_Train.bmi_group == "Obese", 1).otherwise(0))
Insurance_Train = Insurance_Train.withColumn("Normal", when(Insurance_Train.bmi_group == "Normal", 1).otherwise(0))

In [0]:
# Creating those dummy variables from the test dataset's bmi_group.

Insurance_Test = Insurance_Test.withColumn("Overweight", when(Insurance_Test.bmi_group == "Overweight", 1).otherwise(0))
Insurance_Test = Insurance_Test.withColumn("Obese", when(Insurance_Test.bmi_group == "Obese", 1).otherwise(0))
Insurance_Test = Insurance_Test.withColumn("Normal", when(Insurance_Test.bmi_group == "Normal", 1).otherwise(0))

In [0]:
#Showing the test dataset to verify that the newly produced dummy variables were created correctly.
display(Insurance_Test)

age,bmi,children,charges,Smoker_Index,Sex_Index,bmi_group,Overweight,Obese,Normal
18,17.29,2,12829.4551,1.0,0.0,Underweight,0,0,0
18,25.46,0,1708.0014,0.0,0.0,Overweight,1,0,0
18,27.36,1,17178.6824,1.0,0.0,Overweight,1,0,0
18,28.215,0,2200.83085,0.0,1.0,Overweight,1,0,0
18,28.5,0,1712.227,0.0,0.0,Overweight,1,0,0
18,30.03,1,1720.3537,0.0,0.0,Obese,0,1,0
18,31.35,4,4561.1885,0.0,1.0,Obese,0,1,0
18,31.92,0,2205.9808,0.0,1.0,Obese,0,1,0
18,33.33,0,1135.9407,0.0,0.0,Obese,0,1,0
18,33.88,0,11482.63485,0.0,1.0,Obese,0,1,0



By now, we had eliminated several columns from our model that weren't required, such as "sex". But before moving on to our model and building the dummy variables, we must remove every last column that is not required. Therefore, bmi and bmi_group will be dropped.

In [0]:
#Once we have that information on the newly formed dummy variables, we are dropping the bmi and bmi_group variables from the test and train datasets using the drop function.

Insurance_Train = Insurance_Train.drop('bmi','bmi_group')

We need to drop all the remaining columns that will not be necessary. So we will drop bmi and bmi_group.

In [0]:
display(Insurance_Test)

age,bmi,children,charges,Smoker_Index,Sex_Index,bmi_group,Overweight,Obese,Normal
18,17.29,2,12829.4551,1.0,0.0,Underweight,0,0,0
18,25.46,0,1708.0014,0.0,0.0,Overweight,1,0,0
18,27.36,1,17178.6824,1.0,0.0,Overweight,1,0,0
18,28.215,0,2200.83085,0.0,1.0,Overweight,1,0,0
18,28.5,0,1712.227,0.0,0.0,Overweight,1,0,0
18,30.03,1,1720.3537,0.0,0.0,Obese,0,1,0
18,31.35,4,4561.1885,0.0,1.0,Obese,0,1,0
18,31.92,0,2205.9808,0.0,1.0,Obese,0,1,0
18,33.33,0,1135.9407,0.0,0.0,Obese,0,1,0
18,33.88,0,11482.63485,0.0,1.0,Obese,0,1,0


In [0]:
display(Insurance_Train)

age,children,charges,Smoker_Index,Sex_Index,Overweight,Obese,Normal
18,0,1694.7964,0.0,0.0,0,0,0
18,0,1607.5101,0.0,1.0,0,0,1
18,0,1702.4553,0.0,0.0,0,0,1
18,0,13747.87235,1.0,0.0,0,0,1
18,0,14283.4594,1.0,1.0,0,0,1
18,2,11884.04858,0.0,0.0,0,0,1
18,0,1704.5681,0.0,0.0,0,0,1
18,0,1704.70015,0.0,0.0,0,0,1
18,0,1121.8739,0.0,0.0,0,0,1
18,1,1711.0268,0.0,0.0,0,0,1


We will scale the data in both the training and testing datasets now that we are almost ready to run the regression since we think this will help us get better results. Nevertheless, we choose not to scale the target variable, even though it is not incorrect.

###Training Dataset

In [0]:
# To assist us in scaling the variables, we are importing the necessary libraries. First, the column type will be changed from vector to double by the UDF. With the exception of the target variable, we are then producing new, scaled columns for each of the preceding columns using the for loop. We are applying some of the changes we learnt in class to this 'for' loop, such as creating the pipeline with the assembler and scaler stages, the assembler, and the scaler (using the StandartScaler, but other scalers may have been used). We will next add the scaled columns to the training dataset using the pipeline.


from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
 

unlist = udf(lambda x: float(list(x)[0]), DoubleType())
 

for i in ["age","children","Smoker_Index","Sex_Index","Overweight","Obese","Normal"]:

    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
 

    scaler = StandardScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
 

    pipeline = Pipeline(stages=[assembler, scaler])
 

    Insurance_Train = pipeline.fit(Insurance_Train).transform(Insurance_Train).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
 
print("After Scaling :")
Insurance_Train.head(10)

After Scaling :
Out[309]: [Row(age=18, children=0, charges=1694.7964, Smoker_Index=0.0, Sex_Index=0.0, Overweight=0, Obese=0, Normal=0, age_Scaled=1.2843017578171065, children_Scaled=0.0, Smoker_Index_Scaled=0.0, Sex_Index_Scaled=0.0, Overweight_Scaled=0.0, Obese_Scaled=0.0, Normal_Scaled=0.0),
 Row(age=18, children=0, charges=1607.5101, Smoker_Index=0.0, Sex_Index=1.0, Overweight=0, Obese=0, Normal=1, age_Scaled=1.2843017578171065, children_Scaled=0.0, Smoker_Index_Scaled=0.0, Sex_Index_Scaled=1.9994850526833277, Overweight_Scaled=0.0, Obese_Scaled=0.0, Normal_Scaled=2.7332221057975796),
 Row(age=18, children=0, charges=1702.4553, Smoker_Index=0.0, Sex_Index=0.0, Overweight=0, Obese=0, Normal=1, age_Scaled=1.2843017578171065, children_Scaled=0.0, Smoker_Index_Scaled=0.0, Sex_Index_Scaled=0.0, Overweight_Scaled=0.0, Obese_Scaled=0.0, Normal_Scaled=2.7332221057975796),
 Row(age=18, children=0, charges=13747.87235, Smoker_Index=1.0, Sex_Index=0.0, Overweight=0, Obese=0, Normal=1, age_Sca

###Test Dataset

In [0]:
# In this instance, we are replicating the procedures from the preceding section, albeit in the Test dataset.

unlist = udf(lambda x: float(list(x)[0]), DoubleType())
 

for i in ["age","children","Smoker_Index","Sex_Index","Overweight","Obese","Normal"]:

    assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
 

    scaler = StandardScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
 

    pipeline = Pipeline(stages=[assembler, scaler])
 

    Insurance_Test = pipeline.fit(Insurance_Test).transform(Insurance_Test).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect")
 
print("After Scaling :")
Insurance_Test.head(10)

After Scaling :
Out[310]: [Row(age=18, bmi=17.29, children=2, charges=12829.4551, Smoker_Index=1.0, Sex_Index=0.0, bmi_group='Underweight', Overweight=0, Obese=0, Normal=0, age_Scaled=1.2755886799782168, children_Scaled=1.5943392959925058, Smoker_Index_Scaled=2.5652519050316624, Sex_Index_Scaled=0.0, Overweight_Scaled=0.0, Obese_Scaled=0.0, Normal_Scaled=0.0),
 Row(age=18, bmi=25.46, children=0, charges=1708.0014, Smoker_Index=0.0, Sex_Index=0.0, bmi_group='Overweight', Overweight=1, Obese=0, Normal=0, age_Scaled=1.2755886799782168, children_Scaled=0.0, Smoker_Index_Scaled=0.0, Sex_Index_Scaled=0.0, Overweight_Scaled=2.234554715877654, Obese_Scaled=0.0, Normal_Scaled=0.0),
 Row(age=18, bmi=27.36, children=1, charges=17178.6824, Smoker_Index=1.0, Sex_Index=0.0, bmi_group='Overweight', Overweight=1, Obese=0, Normal=0, age_Scaled=1.2755886799782168, children_Scaled=0.7971696479962529, Smoker_Index_Scaled=2.5652519050316624, Sex_Index_Scaled=0.0, Overweight_Scaled=2.234554715877654, Obese_

In [0]:
#We removed the original columns from the train and test datasets after scailing every variable in each dataset.

Insurance_Train = Insurance_Train.drop('age','children','Smoker_Index',
                                       'Sex_Index', 'Overweight', 'Obese',
                                       'Normal')
Insurance_Test = Insurance_Test.drop('age','children','Smoker_Index',
                                      'Sex_Index', 'Overweight', 'Obese',
                                      'Normal')

Once we only needed the scaled variables and the target on both datasets, we removed the non-scaled variables.

###Applying the Regression

In [0]:
# The features are being vectorized.

lrAssembler = VectorAssembler(inputCols=Insurance_Train.drop("charges").columns, outputCol="features")

In [0]:
#Here, we are configuring the model to perform the Linear Regression on the target variable.

lr = LinearRegression(labelCol="charges")

In [0]:
# Chain Vectorized features and Linear Regression into a Pipeline
pipeline = Pipeline(stages=[lrAssembler,lr])

# Train Model
lrModel = pipeline.fit(Insurance_Train)

# Predictions
lrPredictions = lrModel.transform(Insurance_Test)

# Display the Predictions
display(lrPredictions.select("charges","prediction","features"))

charges,prediction,features
12829.4551,25425.150720477686,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 2), values -> List(1.2755886799782168, 1.5943392959925058, 2.5652519050316624))"
1708.0014,67.68845309631433,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 4), values -> List(1.2755886799782168, 2.234554715877654))"
17178.6824,25975.505393494943,"Map(vectorType -> dense, length -> 7, values -> List(1.2755886799782168, 0.7971696479962529, 2.5652519050316624, 0.0, 2.234554715877654, 0.0, 0.0))"
2200.83085,86.9201489845318,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 3, 4), values -> List(1.2755886799782168, 2.0040938953260574, 2.234554715877654))"
1712.227,67.68845309631433,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 4), values -> List(1.2755886799782168, 2.234554715877654))"
1720.3537,4640.892410092696,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 1, 5), values -> List(1.2755886799782168, 0.7971696479962529, 1.9998661983607802))"
4561.1885,6151.400583525196,"Map(vectorType -> dense, length -> 7, values -> List(1.2755886799782168, 3.1886785919850116, 0.0, 2.0040938953260574, 0.0, 1.9998661983607802, 0.0))"
2205.9808,4163.031946799485,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 3, 5), values -> List(1.2755886799782168, 2.0040938953260574, 1.9998661983607802))"
1135.9407,4143.800250911268,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 5), values -> List(1.2755886799782168, 1.9998661983607802))"
11482.63485,4163.031946799485,"Map(vectorType -> sparse, length -> 7, indices -> List(0, 3, 5), values -> List(1.2755886799782168, 2.0040938953260574, 1.9998661983607802))"


In [0]:
#In this case, we are printing the coefficients for linear regression and intercepting them.
print("Coefficients:")
coefficients = lrModel.stages[1].coefficients
i=0
for feature in Insurance_Train.drop("charges").columns:
  print(feature,":",coefficients[i])
  i+=1
print()
print("Coefficients List: %s" % str(lrModel.stages[1].coefficients))
print("Intercept: %s" % str(lrModel.stages[1].intercept))

Coefficients:
age_Scaled : 3720.737715901902
children_Scaled : 623.5713570265837
Smoker_Index_Scaled : 9905.74248531883
Sex_Index_Scaled : 9.596205014679807
Overweight_Scaled : 468.7496908247716
Obese_Scaled : 2561.950711609227
Normal_Scaled : 6.043772571234061

Coefficients List: [3720.737715901902,623.5713570265837,9905.74248531883,9.596205014679807,468.7496908247716,2561.950711609227,6.043772571234061]
Intercept: -5725.889290674844


In [0]:
# By acquiring the summary, we are able to examine our model's performance in more detail.
trainingSummary = lrModel.stages[1].summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("R2  : %f" % trainingSummary.r2)
trainingSummary.residuals.show()

RMSE: 5923.360942
R2  : 0.771130
+------------------+
|         residuals|
+------------------+
|2642.1357017656255|
|2519.1429604821783|
|2633.2756289715157|
|-9513.796430478234|
|-8997.396848967572|
|11760.413961993645|
|2635.3884289715156|
|2635.5204789715153|
|2052.6942289715157|
|2114.6196554825806|
|2636.4448289715156|
| 2585.502486993243|
|2086.8884430226585|
|-8764.706147937753|
| 1618.528461511996|
|1159.1499645341255|
| 2088.605093022659|
|1506.1819430226585|
|-7660.305086893899|
|10654.706628023061|
+------------------+
only showing top 20 rows



Our R2 is relatively high, but there are a lot of other possible explanations for its high value. Nonetheless, we believe that the necessary dataset adjustments were appropriately carried out, enabling us to obtain these outcomes. We also attempted to use regression analysis to the data in which we also sailed the target variable, but the outcomes were so poor that we abandoned the plan to scaile the target charges.

# PART C:

## <span style="color:blue"> **Show us what you got! (2 points):** </span>

<img src ='https://media.tenor.com/images/d8f45316e96078d93b36374474b67bf6/tenor.png'>

##### a) Using the cleaned e-commerce dataset from Part A, search for interesting variables that could be related to the countries' purchases. You should obtain the data regarding the countries' characteristics from the Macroeconomics sector of Pordata https://www.pordata.pt/en/Theme/Europe/Macroeconomics-32

###### You should limit your analysis to no more than six countries of your choice from the e-commerce dataset.

- The idea is that you find a way to associate some variables from Pordata and the purchases dataset. Ask yourselves the following questions: **What can explain/contribute to the fact that some countries buy more? or less? What can be related to the number of sales? Or the total value of sales?**... The questions are endless! Try to be creative and look for some associations!

###### You will be evaluated based on the logic used to solve this problem, the "investigation questions" you raise, the adequacy of your methodologies and your creativity.

##### MAKE SURE YOU COMMENT YOUR LOGIC AND "TELL YOUR STORY". WHAT QUESTIONS DID YOU RAISE? HOW DID YOU SEEK TO ANSWER THEM?

### Introduction

We used the data_parse dataset for this portion of the research, which had been carefully cleaned put through a number of transformations. One of them was that we removed all the information pertaining to 2011, thus it's crucial to note that this research will only use information from 2010. We do, however, note that we only have a small amount of 2010 data, and we think it might be a drawback. This analysis may be somewhat biased when compared to the Pordata indicators, which represent numbers for an entire year.

In [0]:
# To make analysis easier, we're going to present the data_parse dataset here as it was initially used in this project.
data_parse.head(10)

Out[317]: [Row(InvoiceNo='536365', StockCode='85123A', Description='WHITE HANGING HEART T-LIGHT HOLDER', Quantity=6, UnitPrice=2.55, CustomerID=17850, Country='United Kingdom', amount_spent=15.3, Year=2010, Month=12, Day=1, Hour=8),
 Row(InvoiceNo='536365', StockCode='71053', Description='WHITE METAL LANTERN', Quantity=6, UnitPrice=3.39, CustomerID=17850, Country='United Kingdom', amount_spent=20.34, Year=2010, Month=12, Day=1, Hour=8),
 Row(InvoiceNo='536365', StockCode='84406B', Description='CREAM CUPID HEARTS COAT HANGER', Quantity=8, UnitPrice=2.75, CustomerID=17850, Country='United Kingdom', amount_spent=22.0, Year=2010, Month=12, Day=1, Hour=8),
 Row(InvoiceNo='536365', StockCode='84029G', Description='KNITTED UNION FLAG HOT WATER BOTTLE', Quantity=6, UnitPrice=3.39, CustomerID=17850, Country='United Kingdom', amount_spent=20.34, Year=2010, Month=12, Day=1, Hour=8),
 Row(InvoiceNo='536365', StockCode='84029E', Description='RED WOOLLY HOTTIE WHITE HEART.', Quantity=6, UnitPrice=3.

In [0]:
# To facilitate the execution of SQL queries that will enable us to more easily arrange and present the data, we are generating a temporary view of our dataset.

data_parse.createOrReplaceTempView("Country_data")

In [0]:
# As previously said, in order to make analysis easier, we will carry out a few alternative visualizations. In this case, we are building a new dataframe and adding some of the original dataset's columns to it. 
# We are including the following columns in the dataset:
# - The Countries;
# - The sum of the amount_spent as Total_Amount;
# - The InvoiceNo as Total_Orders;
# - The sum of Quantity as the Amount_of_Products_Sold;
# - A count of the dinstinct customer as Total_Customers;
# - The Year
# After realizing that some countries were "Unspecified," we made the decision to exclude them and noted as much in the where clause. We divided up by nation as well.

Countries_data = spark.sql("SELECT Country, \
ROUND(SUM(amount_spent),2) AS Total_Amount, \
COUNT(DISTINCT InvoiceNo) AS Total_Orders, \
SUM(Quantity) AS Amount_of_Products_Sold, \
COUNT(DISTINCT CustomerID) AS Total_Customers, \
Year \
FROM Country_data \
WHERE  Country != 'Unspecified' \
GROUP BY Country, Year")
                          
                          
                          
                          
display(Countries_data)

Country,Total_Amount,Total_Orders,Amount_of_Products_Sold,Total_Customers,Year
Channel Islands,363.53,1,80,1,2010
Norway,3787.12,2,3582,1,2010
Switzerland,1304.92,2,714,2,2010
Lithuania,1661.06,4,652,1,2010
Belgium,1809.91,5,1755,4,2010
EIRE,8813.88,15,5253,3,2010
Austria,277.2,1,51,1,2010
Australia,1032.85,3,467,2,2010
Japan,7705.07,3,4093,3,2010
Poland,248.16,1,140,1,2010


We chose to remove the strange characters Unspecified that appeared as we were writing our code from our study.

In [0]:
# As previously said, a temporary view is being created.
Countries_data.createOrReplaceTempView("Country_data1")

In [0]:
# We are building a new dataframe with a few new columns that we believe may be worthwhile to examine.These columns are:
# - Total_Amount/Total_Orders as AVG_Amount_Order;
# - Total_Amount/Total_Customers as AVG_Amount_Customers

Countries_detail = spark.sql(" SELECT * ,\
ROUND(Total_Amount/Total_Orders,2) AS AVG_Amount_Order, \
ROUND(Total_Amount/Total_Customers,2) AS AVG_Amount_Customer \
FROM Country_data1 ")                      
                          
                                       
display(Countries_detail)

Country,Total_Amount,Total_Orders,Amount_of_Products_Sold,Total_Customers,Year,AVG_Amount_Order,AVG_Amount_Customer
Channel Islands,363.53,1,80,1,2010,363.53,363.53
Norway,3787.12,2,3582,1,2010,1893.56,3787.12
Switzerland,1304.92,2,714,2,2010,652.46,652.46
Lithuania,1661.06,4,652,1,2010,415.27,1661.06
Belgium,1809.91,5,1755,4,2010,361.98,452.48
EIRE,8813.88,15,5253,3,2010,587.59,2937.96
Austria,277.2,1,51,1,2010,277.2,277.2
Australia,1032.85,3,467,2,2010,344.28,516.43
Japan,7705.07,3,4093,3,2010,2568.36,2568.36
Poland,248.16,1,140,1,2010,248.16,248.16


### Countries Data Analysis

Here, we'll concentrate on analyzing each nation's performance in relation to the variables we previously constructed. In order to assist us in deciding which countries we should analyze using the data from PorData, we continued to identify which ones were the best and worst performing while analyzing each variable.

#####Total Amount

In [0]:
# In order to clearly see which country spent the most and the least, we are developing visuals for the Total_Amount per Country.Sorting is done on two graphs: one in ascending order and the other in descending order. Since we are only examining a maximum of six countries, we restricted the view to only display six.

display(Countries_detail.select("Country", "Total_Amount").sort(Countries_detail.Total_Amount.desc()).limit(6))
display(Countries_detail.select("Country", "Total_Amount").sort(Countries_detail.Total_Amount.asc()).limit(6))

Country,Total_Amount
United Kingdom,498661.85
Germany,15241.14
France,9616.31
EIRE,8813.88
Netherlands,8784.48
Japan,7705.07


Databricks visualization. Run in Databricks to view.

Country,Total_Amount
Poland,248.16
Austria,277.2
Channel Islands,363.53
Iceland,711.79
Italy,811.5
Finland,892.8


Databricks visualization. Run in Databricks to view.

**Through the bar charts we can tell that that the countries with the highest total of sales were:**

United Kingdom, Germany, France, EIRE, Netherlands and Japan

**On the other hand the countries with the lowest total of sales were:**

Poland, Austria, Channel Islands, Iceland, Italy and Finland

#####Total Orders

In [0]:
# Here, we are visualizing the Total Orders, just as we did in the previous cell. Since we are only examining a maximum of six countries, we restricted the view to only display six.

display(Countries_detail.select("Country", "Total_Orders").sort(Countries_detail.Total_Orders.desc()).limit(6))

display(Countries_detail.select("Country", "Total_Orders").sort(Countries_detail.Total_Orders.asc()).limit(6))

Country,Total_Orders
United Kingdom,1291
Germany,30
France,21
EIRE,15
Portugal,6
Belgium,5


Databricks visualization. Run in Databricks to view.

Country,Total_Orders
Austria,1
Cyprus,1
Channel Islands,1
Denmark,1
Poland,1
Iceland,1


**Through the table we can tell that the countries with the highest total orders were:**

United Kingdom, Germany , France, EIRE, Portugal and Belgium

**On the other hand the countries with the lowest total of orders were:**

Austria, Cyprus, Channel Islands, Denmark, Poland and Iceland

#####Amount of Product Sold

In [0]:
#Similar to what we did in the previous cell, we are visualizing the Amount_of_Product_Sold in this one. Since we are only examining a maximum of six countries, we restricted the view to only display six.

display(Countries_detail.select("Country", "Amount_of_Products_Sold").sort(Countries_detail.Amount_of_Products_Sold.desc()).limit(6))

display(Countries_detail.select("Country", "Amount_of_Products_Sold").sort(Countries_detail.Amount_of_Products_Sold.asc()).limit(6))

Country,Amount_of_Products_Sold
United Kingdom,267771
Germany,6878
Netherlands,6811
EIRE,5253
France,4989
Japan,4093


Databricks visualization. Run in Databricks to view.

Country,Amount_of_Products_Sold
Austria,51
Channel Islands,80
Poland,140
Italy,295
Iceland,319
Denmark,454


Databricks visualization. Run in Databricks to view.

**Through the table we can tell that the countries with the highest Amount of Products were:**

United Kingdom, Germany, Netherlands, EIRE, France and Japan

**On the other hand the countries with the lowest Amount of Products were:**

Austria, Channel Islands, Poland, Italy, Iceland and Denmark



#####Total Customers

In [0]:
# Here, we are visualizing the Total_Customers instead of the previous cell as we did in the previous cell. Since we are only examining a maximum of six countries, we restricted the view to only display six.

display(Countries_detail.select("Country", "Total_Customers").sort(Countries_detail.Total_Customers.desc()).limit(6))

display(Countries_detail.select("Country", "Total_Customers").sort(Countries_detail.Total_Customers.asc()).limit(6))

Country,Total_Customers
United Kingdom,815
Germany,18
France,15
Portugal,6
Belgium,4
Spain,4


Databricks visualization. Run in Databricks to view.

Country,Total_Customers
Austria,1
Cyprus,1
Norway,1
Poland,1
Lithuania,1
Denmark,1


**Through the bar chart we can tell that the countries with the highest Amount of Customers were:**

United Kingdom, Germany, France, Portugal, Belgium and Spain

**On the other hand the countries with the lowest Amount of Customers were:**

Austria, Cyprus, Norway, Poland, Lithuania and Denmark

#####Average Amount Order

In [0]:
#Here, we choose to analyze the AVG_Amount_Order and restricted the view to six nations by selecting the doughnut pie chart visual.

display(Countries_detail.select("Country", "AVG_Amount_Order").sort(Countries_detail.AVG_Amount_Order.desc()).limit(6))

display(Countries_detail.select("Country", "AVG_Amount_Order").sort(Countries_detail.AVG_Amount_Order.asc()).limit(6))

Country,AVG_Amount_Order
Netherlands,2928.16
Japan,2568.36
Sweden,1917.15
Norway,1893.56
Cyprus,1590.82
Denmark,1281.5


Databricks visualization. Run in Databricks to view.

Country,AVG_Amount_Order
Poland,248.16
Austria,277.2
Australia,344.28
Belgium,361.98
Channel Islands,363.53
United Kingdom,386.26


Databricks visualization. Run in Databricks to view.

**Through the pie chart we can tell that the countries with the highest AVG_Amount_Order were:**

Netherlands, Japan, Sweden, Norway, Cyprus and Denmark

**On the other hand the countries with the lowest Amount of AVG_Amount_Order were:**

Poland, Austria, Belgium, Channel Islands and United Kingdom

##### Average Amount Customer

In [0]:
# In each of the tables, we are analyzing the top six and bottom six nations.

display(Countries_detail.select("Country", "AVG_Amount_Customer").sort(Countries_detail.AVG_Amount_Customer.desc()).limit(6))

display(Countries_detail.select("Country", "AVG_Amount_Customer").sort(Countries_detail.AVG_Amount_Customer.asc()).limit(6))

Country,AVG_Amount_Customer
Netherlands,4392.24
Sweden,3834.3
Norway,3787.12
EIRE,2937.96
Japan,2568.36
Lithuania,1661.06


Databricks visualization. Run in Databricks to view.

Country,AVG_Amount_Customer
Poland,248.16
Austria,277.2
Channel Islands,363.53
Italy,405.75
Portugal,406.66
Belgium,452.48


Databricks visualization. Run in Databricks to view.

**Through the table we can tell that the countries with the highest AVG_Amount_Customer were:**

Netherlands, Sweden, Norway, EIRE, Japan and Lithuania

**On the other hand the countries with the lowest Amount of AVG_Amount_Customer were:**

Poland, Austia, Channel Islands, Italy Portugal and Belgium

We were just able to choose from the European Union countries, since the data available from PorData didn't give us the other countries.
So we based our analysis on the following countries:

**Germany**
This is 2nd  country that have the most sales, customers, orders and amount of products sold. So it was obvious to include on our analysis

**Netherlands**
This is the top country for both avg amount per order and AVG_Amount_Customer so with that we decided to include this country.

**Austria**
We decide to choose Austria as it is one of the countrys with worst indicators.

**Portugal**
Since we are from Portugal, we thought that would make sense to include it on our analysis.

In [0]:
# We selected the following nations after running several visualizations to assess the performance of the top and bottom performing nations.With that, we are using the where clause with the chosen nations to create a new dataframe.

Select_Countries = Countries_detail.select("*").where("Country IN ('Germany','Netherlands','Austria','Portugal')")

display(Select_Countries)

Country,Total_Amount,Total_Orders,Amount_of_Products_Sold,Total_Customers,Year,AVG_Amount_Order,AVG_Amount_Customer
Austria,277.2,1,51,1,2010,277.2,277.2
Germany,15241.14,30,6878,18,2010,508.04,846.73
Portugal,2439.97,6,984,6,2010,406.66,406.66
Netherlands,8784.48,3,6811,2,2010,2928.16,4392.24


## Importing the data from PorData

For our analysis, we are beginning to import the indicators that we think may be useful. After importing and individually transforming each CSV file for each indicator, we will merge them together based on the country name.

In [0]:
%fs ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/PORDATA_Balance_of_trade___as_a_percentage_of_GDP.csv,PORDATA_Balance_of_trade___as_a_percentage_of_GDP.csv,19199,1718054974000
dbfs:/FileStore/tables/PORDATA_Compensation_of_employees__Euro__xlsx.csv,PORDATA_Compensation_of_employees__Euro__xlsx.csv,19589,1718054437000
dbfs:/FileStore/tables/PORDATA_Compensation_of_employees_per_capita__PPS_.csv,PORDATA_Compensation_of_employees_per_capita__PPS_.csv,19981,1718054046000
dbfs:/FileStore/tables/PORDATA_GDP__Euro_.csv,PORDATA_GDP__Euro_.csv,19989,1718055115000
dbfs:/FileStore/tables/PORDATA_Savings_rate_of_households.csv,PORDATA_Savings_rate_of_households.csv,18483,1718051156000
dbfs:/FileStore/tables/csv_countrydata.xlsx,csv_countrydata.xlsx,6207,1717975519000
dbfs:/FileStore/tables/csv_pordata.xlsx,csv_pordata.xlsx,6207,1717975470000
dbfs:/FileStore/tables/insurance.csv,insurance.csv,55628,1717703140000
dbfs:/FileStore/tables/movielens.txt,movielens.txt,616155,1716487292000
dbfs:/FileStore/tables/onlineretail.csv,onlineretail.csv,45580638,1717698189000


In [0]:
# Like mentioned before we are importing the csv file from pordata

indicator_1 = spark.read.csv("dbfs:/FileStore/tables/PORDATA_Savings_rate_of_households.csv", header="true", sep=";", inferSchema="true")

In [0]:
# Overview of the dataset so we have an ideia to what needs to be done in order to have the columns the way we want
display(indicator_1)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10,_c11,_c12,_c13,_c14,_c15,_c16,_c17,_c18,_c19,_c20,_c21,_c22,_c23,_c24,_c25,_c26,_c27,_c28,_c29,_c30,_c31,_c32,_c33,_c34,_c35,_c36,_c37,_c38,_c39,_c40,_c41,_c42,_c43,_c44,_c45,_c46,_c47,_c48,_c49,_c50,_c51,_c52,_c53,_c54,_c55,_c56,_c57,_c58,_c59,_c60,_c61,_c62,_c63,_c64,_c65,_c66,_c67,_c68,_c69,_c70,_c71,_c72,_c73,_c74,_c75,_c76,_c77,_c78,_c79,_c80,_c81,_c82,_c83,_c84,_c85,_c86,_c87,_c88,_c89,_c90,_c91,_c92,_c93,_c94,_c95,_c96,_c97,_c98,_c99,_c100,_c101,_c102,_c103,_c104,_c105,_c106,_c107,_c108,_c109,_c110,_c111,_c112,_c113,_c114,_c115,_c116,_c117,_c118,_c119,_c120,_c121,_c122,_c123,_c124,_c125,_c126,_c127,_c128,_c129,_c130,_c131,_c132,_c133,_c134,_c135,_c136,_c137,_c138,_c139,_c140,_c141,_c142,_c143,_c144,_c145,_c146,_c147,_c148,_c149,_c150,_c151,_c152,_c153,_c154,_c155,_c156,_c157,_c158,_c159,_c160,_c161,_c162,_c163,_c164,_c165,_c166,_c167,_c168,_c169,_c170,_c171,_c172,_c173,_c174,_c175,_c176,_c177,_c178,_c179,_c180,_c181,_c182,_c183,_c184,_c185,_c186,_c187,_c188,_c189,_c190,_c191,_c192,_c193,_c194,_c195,_c196,_c197,_c198,_c199,_c200,_c201,_c202,_c203,_c204,_c205,_c206,_c207,_c208,_c209,_c210,_c211,_c212,_c213,_c214,_c215,_c216,_c217,_c218,_c219,_c220,_c221,_c222,_c223,_c224,_c225,_c226,_c227,_c228,_c229,_c230,_c231,_c232,_c233,_c234,_c235,_c236,_c237,_c238,_c239,_c240,_c241,_c242,_c243,_c244,_c245,_c246,_c247,_c248,_c249,_c250,_c251,_c252,_c253,_c254,_c255
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,Savings rate of households,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,Proportion - %,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
Groups/Countries,Savings rate of households,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


In [0]:
#We will use the first and third columns since we just need the country's column and the column containing the data about 2022.
indicator_1 = indicator_1.select('_c0','_c2')

In [0]:
# we are creating a DataFrame with the correct column name to help us understand.
indicator_1 = indicator_1.toDF("Country","savings_rate_households")

display(indicator_1)

Country,savings_rate_households
,
,
,
,
,
,
,
,
Groups/Countries,
,


In [0]:
# It is evident that there are numerous null rows. That being said, we are only interested in the rows pertaining to the carefully selected countries. We will choose the necessary rows from our data using the where SQL function.

indicator_1= indicator_1.select('*').where("Country IN ('DE - Germany','NL - Netherlands','AT - Austria','PT - Portugal')")

In [0]:
display(indicator_1)

Country,savings_rate_households
DE - Germany,"Pro 20,4"
AT - Austria,155
NL - Netherlands,"Pro 20,6"
PT - Portugal,"Pro 6,2"


After importing the first indicator we will do the same for our other indicators!

After importing the savings_rate_household we will import the GDP, the compensation_of_employees.

In [0]:
# After importing the csv file we copied the path: /FileStore/tables/indicator_2_compensation_of_employees.csv
# following the same logic that we performed before: 

indicator_2 = spark.read.csv("dbfs:/FileStore/tables/PORDATA_Compensation_of_employees__Euro__xlsx.csv", header="true", sep=";", inferSchema="true")
indicator_2 = indicator_2.select('_c0','_c2')
indicator_2 = indicator_2.toDF("Country","compensation_of_employees")
indicator_2= indicator_2.select('*').where("Country IN ('DE - Germany','NL - Netherlands','AT - Austria','PT - Portugal')")

In [0]:
display(indicator_2)

Country,compensation_of_employees
DE - Germany,"Pro 2 154 927,0"
AT - Austria,"236 939,7"
NL - Netherlands,"s 483 301,0"
PT - Portugal,"Pro 124 527,7"


After importing the the compensation_of_employees we will import the GDP, the trade_balance_GDP.

In [0]:
indicator_3 = spark.read.csv("dbfs:/FileStore/tables/PORDATA_Balance_of_trade___as_a_percentage_of_GDP.csv", header="true", sep=";", inferSchema="true")
indicator_3 = indicator_3.select('_c0','_c2')
indicator_3 = indicator_3.toDF("Country","Trade_Balance_GDP")
indicator_3 = indicator_3.select('*').where("Country IN ('DE - Germany','NL - Netherlands','AT - Austria','PT - Portugal')")

In [0]:
display(indicator_3)

Country,Trade_Balance_GDP
DE - Germany,"Pro 5,6"
AT - Austria,33
NL - Netherlands,101
PT - Portugal,-19


After importing the GDPs above we will import the GDP POR_DATA_GDP_EURO.

In [0]:
indicator_4 = spark.read.csv("dbfs:/FileStore/tables/PORDATA_GDP__Euro_.csv", header="true", sep=";", inferSchema="true")
indicator_4 = indicator_4.select('_c0','_c2')
indicator_4 = indicator_4.toDF("Country","GDP")
indicator_4 = indicator_4.select('*').where("Country IN ('DE - Germany','NL - Netherlands','AT - Austria','PT - Portugal')")

In [0]:
display(indicator_4)

Country,GDP
DE - Germany,"Pro 4 121 160,0"
AT - Austria,"477 248,8"
NL - Netherlands,"Pro 1 032 841,0"
PT - Portugal,"Pro 265 741,9"


Joining the indicators

In [0]:
#After importing and cleaning each of the indicators we needed to join them by the Country so that we can join with the dataset itself.
# So here we are performing some join in each iterations, addind one column at the time.

first_join = indicator_1.join(indicator_2, on ='Country')
second_join = first_join.join(indicator_3, on='Country')
joined_indicators = second_join.join(indicator_4, on='Country')
display(joined_indicators)

Country,savings_rate_households,compensation_of_employees,Trade_Balance_GDP,GDP
DE - Germany,"Pro 20,4","Pro 2 154 927,0","Pro 5,6","Pro 4 121 160,0"
AT - Austria,155,"236 939,7",33,"477 248,8"
NL - Netherlands,"Pro 20,6","s 483 301,0",101,"Pro 1 032 841,0"
PT - Portugal,"Pro 6,2","Pro 124 527,7",-19,"Pro 265 741,9"


To join the indicators dataframe with the countries dataset we need to perform one last transformation on the indicators. In the Country Column we will need to keep only the country name.

In [0]:
# For that we will perform a split by the ' - ' to filter out the ISO codes and just keep the country name.
from pyspark.sql.functions import split

indicators = joined_indicators.withColumn('Country', split(joined_indicators['Country'], ' - ').getItem(1)) 
     
display(indicators)

Country,savings_rate_households,compensation_of_employees,Trade_Balance_GDP,GDP
Germany,"Pro 20,4","Pro 2 154 927,0","Pro 5,6","Pro 4 121 160,0"
Austria,155,"236 939,7",33,"477 248,8"
Netherlands,"Pro 20,6","s 483 301,0",101,"Pro 1 032 841,0"
Portugal,"Pro 6,2","Pro 124 527,7",-19,"Pro 265 741,9"


We will merge the indicators dataframe with the real country data from the private exercise now that it is in the required format. so that the required analysis can be carried out.

In [0]:
Data_for_Analysis = Select_Countries.join(indicators, on ='Country')
display(Data_for_Analysis)

Country,Total_Amount,Total_Orders,Amount_of_Products_Sold,Total_Customers,Year,AVG_Amount_Order,AVG_Amount_Customer,savings_rate_households,compensation_of_employees,Trade_Balance_GDP,GDP
Austria,277.2,1,51,1,2010,277.2,277.2,155,"236 939,7",33,"477 248,8"
Germany,15241.14,30,6878,18,2010,508.04,846.73,"Pro 20,4","Pro 2 154 927,0","Pro 5,6","Pro 4 121 160,0"
Portugal,2439.97,6,984,6,2010,406.66,406.66,"Pro 6,2","Pro 124 527,7",-19,"Pro 265 741,9"
Netherlands,8784.48,3,6811,2,2010,2928.16,4392.24,"Pro 20,6","s 483 301,0",101,"Pro 1 032 841,0"


### Final Analysis

Upon conducting extensive research on the PorData portal, we endeavored to identify indicators that hold significance for our project. Given that the dataset pertains to customer data from specific countries, we opted to focus on the following indicators: savings rate of households, balance of trade GDP, actual GDP per capita, and employees' compensation.

The analysis of the dataset reveals diverse economic behaviors and market characteristics across Austria, Germany, Portugal, and the Netherlands. In Austria, the data shows very low total amounts and orders, with only one customer. The savings rate of households is moderate at 15.5%, and the compensation of employees and GDP are relatively low. This suggests limited economic activity from the sampled data in Austria.

Germany, on the other hand, exhibits the highest total amount and number of orders among the countries listed. With 30 total orders and 18 customers, it indicates a larger and more active market. The savings rate of households is high at 20.4%, and the compensation of employees and GDP are the highest among the analyzed countries, contributing to a robust economic profile. The average amount per customer and order is moderate, indicating balanced spending behavior among German customers.

In Portugal, the total amount and number of orders are moderate, with six customers actively participating. However, Portugal's savings rate is the lowest among the countries at 6.2%, reflecting lower disposable income. The compensation of employees and GDP are also relatively low, pointing to economic constraints within the country.

The Netherlands shows a high total amount despite a low number of orders and customers, indicating high-value purchases. The savings rate of households is high at 20.6%, and both the compensation of employees and GDP are significant, suggesting a strong and affluent economy. The average amount per order and customer is notably high, underscoring the substantial purchasing power of Dutch customers.

In conclusion, Germany stands out with the highest economic activity and market size, while the Netherlands demonstrates high-value transactions despite having a smaller customer base. Portugal and Austria, however, reflect lower economic activity and spending. These insights can guide strategic decisions, such as market focus and resource allocation, for better targeting and engagement with customers in different regions.

In [0]:
display(Data_for_Analysis.select("Country","savings_rate_households"))

Country,savings_rate_households
Austria,155
Germany,"Pro 20,4"
Portugal,"Pro 6,2"
Netherlands,"Pro 20,6"


When we look at the behavior of the saving rates per household in each country, the results align with our expectations. Portugal, as expected, has the lowest savings rate, considering that our country has the lowest salaries. Regarding Germany and the Netherlands, we already knew that these two countries had the highest average amount spent, so it was more than expected that the savings rates would be extremely high in comparison with the others. Austria falls in between, with a moderate savings rate of 15.5, reflecting its balanced economic position relative to the other countries.

#### <span style="color:purple"> **Good luck and I hope you enjoyed doing this assignment :) !!!** </span>

<span style="color:red"> **Don't forget to submit your exam in a folder containing both your outputs PDF and your notebook in the format of HTML. Once again, comment your code and don't forget to name the .zip file and notebook with your student numbers :) </span>