# Spark Practice

In this context, we are going to be working with a dataset composed of three individual dataframes. These three dataframes contain sales transaction data, the products and the information of the clients that participated in said transactions.

## Environment installation

In order to use PySpark, the environment needs to built accordingly.

In [None]:
# Mounting personal Google Drive (where the files are located)
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## JDK Installation


In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

## Spark installation

In [None]:
!pip install findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
# We need to do a bit fo webscrapìng so we get BeatifulSoup
from bs4 import BeautifulSoup
import requests

In [None]:
# We obtain the latest spark versions straight from the website via API request
url = 'https://downloads.apache.org/spark/' 
r = requests.get(url)
html_doc = r.text
soup = BeautifulSoup(html_doc)

In [None]:
# Read the website and obtain the available versions.
link_files = []
for link in soup.find_all('a'):
  link_files.append(link.get('href'))
spark_link = [x for x in link_files if 'spark' in x]  
print(spark_link)

['spark-3.1.3/', 'spark-3.2.2/', 'spark-3.3.0/']


In [None]:
import os # Operating system library
# Automatically install the wanted spark version.
!wget -q https://archive.apache.org/dist/spark/spark-3.2.2/spark-3.2.2-bin-hadoop2.7.tgz
!tar xf spark-3.2.2-bin-hadoop2.7.tgz
# Install PySpark
!pip install -q pyspark==3.2.2

[K     |████████████████████████████████| 281.5 MB 36 kB/s 
[K     |████████████████████████████████| 199 kB 54.0 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
# Defining appname and master node.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F, types as T
spark = SparkSession.builder\
                            .appName("Varios")\
                            .master("local[*]")\
                            .getOrCreate()
spark

## Defining environment variables

In [None]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-3.2.2-bin-hadoop2.7"

#Datasets




In [None]:
# Building the data frames from the CSVs for distributed processing.
transactions_df = spark.read.option("header", True).csv("/content/drive/MyDrive/PRACTICA_EJERCICIOS/us_superstore_transactions.csv")
transactions_df.show(5)
customers_df = spark.read.options(header= True, delimiter=";").csv("/content/drive/MyDrive/PRACTICA_EJERCICIOS/us_superstores_customers.csv")
customers_df.show(5)
products_df = spark.read.options(header= True, delimiter=";").csv("/content/drive/MyDrive/PRACTICA_EJERCICIOS/us_superstores_products.csv")
products_df.show(5)

+------+--------------+----------+----------+--------------+-----------+---------------+------------------+--------+--------+--------+
|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|Customer ID|     Product ID|             Sales|Quantity|Discount|  Profit|
+------+--------------+----------+----------+--------------+-----------+---------------+------------------+--------+--------+--------+
|     1|CA-2016-152156| 11/8/2016|11/11/2016|  Second Class|   CG-12520|FUR-BO-10001798|            261.96|       2|     0.0| 41.9136|
|     2|CA-2016-152156| 11/8/2016|11/11/2016|  Second Class|   CG-12520|FUR-CH-10000454|            731.94|       3|     0.0| 219.582|
|     3|CA-2016-138688| 6/12/2016| 6/16/2016|  Second Class|   DV-13045|OFF-LA-10000240|             14.62|       2|     0.0|  6.8714|
|     4|US-2015-108966|10/11/2015|10/18/2015|Standard Class|   SO-20335|FUR-TA-10000577|          957.5775|       5|    0.45|-383.031|
|     5|US-2015-108966|10/11/2015|10/18/2015|Standard C

In [None]:
# Joining the data frames on the appropiate KEYS.
tr_cst_df = transactions_df.join(customers_df, on="Customer ID")
full_df = tr_cst_df.join(products_df, on="Product ID")
full_df.show()

+---------------+-----------+------+--------------+----------+----------+--------------+------------------+--------+--------+--------+---------------+---------+-------------+---------------+------------+-----------+-------+---------------+------------+--------------------+
|     Product ID|Customer ID|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|             Sales|Quantity|Discount|  Profit|  Customer Name|  Segment|      Country|           City|       State|Postal Code| Region|       Category|Sub-Category|        Product Name|
+---------------+-----------+------+--------------+----------+----------+--------------+------------------+--------+--------+--------+---------------+---------+-------------+---------------+------------+-----------+-------+---------------+------------+--------------------+
|FUR-BO-10001798|   CG-12520|     1|CA-2016-152156| 11/8/2016|11/11/2016|  Second Class|            261.96|       2|     0.0| 41.9136|    Claire Gute| Consumer|United States|    

In [None]:
# Calculating the unit sale price (considering possible discounts)
full_df = full_df.withColumn("Precio_venta", (full_df.Sales/full_df.Quantity)*(1 - full_df.Discount))
full_df.show()

+---------------+-----------+------+--------------+----------+----------+--------------+------------------+--------+--------+--------+---------------+---------+-------------+---------------+------------+-----------+-------+---------------+------------+--------------------+------------------+
|     Product ID|Customer ID|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|             Sales|Quantity|Discount|  Profit|  Customer Name|  Segment|      Country|           City|       State|Postal Code| Region|       Category|Sub-Category|        Product Name|      Precio_venta|
+---------------+-----------+------+--------------+----------+----------+--------------+------------------+--------+--------+--------+---------------+---------+-------------+---------------+------------+-----------+-------+---------------+------------+--------------------+------------------+
|FUR-BO-10001798|   CG-12520|     1|CA-2016-152156| 11/8/2016|11/11/2016|  Second Class|            261.96|       2|     

In [None]:
# Marginal benefit 
full_df.createOrReplaceTempView("full_df")
full_df = spark.sql("SELECT *, (Profit/Quantity) AS Beneficio_unitario FROM full_df")
full_df.show()

+---------------+-----------+------+--------------+----------+----------+--------------+------------------+--------+--------+--------+---------------+---------+-------------+---------------+------------+-----------+-------+---------------+------------+--------------------+------------------+------------------+
|     Product ID|Customer ID|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|             Sales|Quantity|Discount|  Profit|  Customer Name|  Segment|      Country|           City|       State|Postal Code| Region|       Category|Sub-Category|        Product Name|      Precio_venta|Beneficio_unitario|
+---------------+-----------+------+--------------+----------+----------+--------------+------------------+--------+--------+--------+---------------+---------+-------------+---------------+------------+-----------+-------+---------------+------------+--------------------+------------------+------------------+
|FUR-BO-10001798|   CG-12520|     1|CA-2016-152156| 11/8/2016|11

In [None]:
# Marginal Cost (substracting unit sale price and marginal benefit)
full_df.createOrReplaceTempView("full_df")
full_df = spark.sql(
    """SELECT *, (Precio_venta - Beneficio_unitario) AS Coste_unitario 
     FROM full_df"""
    )
full_df.show()

+---------------+-----------+------+--------------+----------+----------+--------------+------------------+--------+--------+--------+---------------+---------+-------------+---------------+------------+-----------+-------+---------------+------------+--------------------+------------------+------------------+------------------+
|     Product ID|Customer ID|Row ID|      Order ID|Order Date| Ship Date|     Ship Mode|             Sales|Quantity|Discount|  Profit|  Customer Name|  Segment|      Country|           City|       State|Postal Code| Region|       Category|Sub-Category|        Product Name|      Precio_venta|Beneficio_unitario|    Coste_unitario|
+---------------+-----------+------+--------------+----------+----------+--------------+------------------+--------+--------+--------+---------------+---------+-------------+---------------+------------+-----------+-------+---------------+------------+--------------------+------------------+------------------+------------------+
|FUR-BO

In [None]:
# Highest profitability by product category.
full_df.createOrReplaceTempView("full_df")
spark.sql("SELECT Category, SUM(Profit) AS Total_rentabilidad FROM full_df GROUP BY Category ORDER BY Total_rentabilidad DESC").show()

+---------------+------------------+
|       Category|Total_rentabilidad|
+---------------+------------------+
|     Technology|1083783.5844000045|
|Office Supplies| 924020.6516999817|
|      Furniture|161104.80640000163|
+---------------+------------------+



In [None]:
# Products with an applied discount that have a negative total transaction amount.

full_df = full_df.withColumnRenamed("Product Name","Product_name")
full_df = full_df.withColumn("Discount",full_df.Discount.cast("float"))
full_df = full_df.withColumn("Profit",full_df.Profit.cast("float"))
full_df.createOrReplaceTempView("full_df")

#full_df.printSchema()
spark.sql("SELECT DISTINCT Product_name, Discount, Beneficio_unitario FROM full_df WHERE Discount > 0 AND Beneficio_unitario < 0 ORDER BY Discount DESC").show()


+--------------------+--------+-------------------+
|        Product_name|Discount| Beneficio_unitario|
+--------------------+--------+-------------------+
|"Acco Recycled 2"...|     0.8|            -4.7685|
|Ibico Plastic Spi...|     0.8|            -10.336|
|Insertable Tab Po...|     0.8|            -2.6466|
|Pressboard Hangin...|     0.8|            -1.5744|
|Acco 6 Outlet Gua...|     0.8|             -6.045|
|Acco D-Ring Binde...|     0.8| -6.627800000000001|
|Avery Non-Stick B...|     0.8|            -1.5715|
|Wilson Jones Inte...|     0.8|-5.5360000000000005|
|Cardinal Hold-It ...|     0.8|            -2.6334|
|Acco Smartsocket ...|     0.8|           -23.7654|
|"Acco Pressboard ...|     0.8|            -1.5221|
|"Wilson Jones Ell...|     0.8|            -13.268|
|Recycled Easel Ri...|     0.8|-5.2219999999999995|
|Avery Poly Binder...|     0.8|            -1.1456|
|Surelock Post Bin...|     0.8| -9.168000000000001|
|Computer Printout...|     0.8|-0.5376000000000001|
|GBC Laser I

In [None]:
# Average profitability from clients participating in last point.
full_df = full_df.withColumnRenamed("Customer ID","Customer_ID")

full_df.createOrReplaceTempView("full_df")
new_df = spark.sql("""
SELECT Customer_ID, SUM(Profit/Quantity) AS rentabilidad_media
FROM full_df
WHERE Discount >0 AND Profit < 0
GROUP BY Customer_ID
""").show()

+-----------+-------------------+
|Customer_ID| rentabilidad_media|
+-----------+-------------------+
|   VW-21775| -2837.330047395494|
|   RR-19315|-146.99439334869385|
|   MY-17380|-117.23880092302961|
|   MS-17530| -8.484399795532227|
|   EM-13960| -5.609999895095825|
|   AH-10690|-342.73439927782306|
|   BD-11500| -128.1450023651123|
|   JF-15490|-1075.0020337104797|
|   JF-15415| -21.58919906616211|
|   IM-15070|  -643.008109974861|
|   PW-19240|  -509.457913915316|
|   OT-18730|  -1110.36802927653|
|   NW-18400|-1406.7304773012795|
|   JH-15985|  -859.528789813702|
|   MG-18145|  -832.520838646662|
|   SM-20320|-1877.1234741806986|
|   ND-18370| -636.7292854785919|
|   BG-11695| -9.058799743652344|
|   BS-11380| -256.1645098527272|
|   SO-20335|-473.30460691452026|
+-----------+-------------------+
only showing top 20 rows



In [None]:
# Calculating a new profit price of 50% on the marginal cost of each product with and without discount.
spark.sql("""
SELECT 
          DISTINCT Product_name,
         (Coste_unitario*1.50) AS Nuevo_precio_sin_descuento,
         (Coste_unitario*1.50*0.90) AS Nuevo_precio_con_descuento
FROM full_df
""").show()

+--------------------+--------------------------+--------------------------+
|        Product_name|Nuevo_precio_sin_descuento|Nuevo_precio_con_descuento|
+--------------------+--------------------------+--------------------------+
|OIC Bulk Pack Met...|                    1.9893|                   1.79037|
|Home/Office Perso...|        30.762600000000006|        27.686340000000005|
|Avery Durable Bin...|                    2.2032|                   1.98288|
|Wilson Jones Easy...|        0.8370000000000001|        0.7533000000000001|
|           Avery 499|        2.8386000000000005|        2.5547400000000007|
|"Bevis Round Bull...|        202.57379999999998|                 182.31642|
|     Staple envelope|                    6.1851|         5.566590000000001|
|Fellowes Bankers ...|        35.497800000000005|        31.948020000000007|
|"Adams Telephone ...|                    4.4982|                   4.04838|
|Polycom SoundPoin...|                   169.395|                  152.4555|

In [None]:
# The most profitable client:
full_df = full_df.withColumnRenamed("Customer Name","Customer_name")
full_df.createOrReplaceTempView("full_df")
print("GANADOR NÚMERO 1:")
spark.sql("""
SELECT Customer_ID, COUNT(*) AS Number_of_transactions, State, Customer_name
FROM full_df
WHERE State = "Florida"
GROUP BY Customer_ID, State, Customer_name
ORDER BY Number_of_transactions DESC
LIMIT 1
""").show()

GANADOR NÚMERO 1:
+-----------+----------------------+-------+-------------+
|Customer_ID|Number_of_transactions|  State|Customer_name|
+-----------+----------------------+-------+-------------+
|   EP-13915|                    96|Florida|   Emily Phan|
+-----------+----------------------+-------+-------------+



In [None]:
# The most loyal client (with the most amount of purchases)
print("GANADOR NÚMERO 2:")
spark.sql(
"""SELECT Customer_ID, SUM(Profit) AS profit_total, Customer_name
FROM full_df
WHERE State = "Florida"
GROUP BY Customer_ID, Customer_name
ORDER BY profit_total DESC
LIMIT 1
""").show()

GANADOR NÚMERO 2:
+-----------+-----------------+-------------+
|Customer_ID|     profit_total|Customer_name|
+-----------+-----------------+-------------+
|   SC-20095|5839.255176946521| Sanjit Chand|
+-----------+-----------------+-------------+

