<a href="https://colab.research.google.com/github/dbenayoun/IASD/blob/main/Copie_de_%5BStudents%5DEMIASD_Promo5_DeltaLake_usecase.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

* Formation Continue EMIASD, Univ. Paris Dauphine, Promo 5
* Author: Mohamed-Amine Baazizi
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr
* Reusing without consent of the author is strictly forbidden
* October 2024



# Homework


## Outline

This homework is about building an effective data preparation pipeline.
It covers the following aspects covered throughout the session:

* ingest raw data, curate it, transform it
* load the data into delta tables to enforce constraints and allow updates
* chose an optimal data layout to speedup query evaluation

It is based on raw data about car prices crawled from a public source.

You are kindly asked to understand the data and decide about a relevant analysis (2 or 3 analytical queries) that can be performed on this data.
For example, you could suggest to derive insights (min, max, avg) about the price per year of registration.
You can use any other descriptive column that you may find useful.
You are also kindly invited to briefly comment the choices you made at each phase.








## Prerequisite

### System setup

In [4]:
%%capture
!pip install -q pyspark
!pip install -q delta-spark
!pip install pyngrok

In [5]:
!pip list|grep spark

delta-spark                        3.2.1
pyspark                            3.5.3


In [6]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

local = "local[*]"
appName = "Formation Continue EMIASD - Delta Lake "
localConfig = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "8G").\
  set("spark.driver.memory","8G").\
  set("spark.sql.catalogImplementation","in-memory").\
  set("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension").\
  set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").\
  set("spark.jars.packages","io.delta:delta-spark_2.12:3.1.0").\
  set("spark.databricks.delta.schema.autoMerge.enabled","true")


spark = SparkSession.builder.config(conf = localConfig).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [7]:
spark

### Data import

In [8]:
%%capture
!wget --no-verbose https://nuage.lip6.fr/s/89BG8HD9r3iE693/download/MLData.tgz -O /tmp/MLData.tgz
!tar -xzvf /tmp/MLData.tgz  --directory /tmp/

In [9]:
!ls -hal /tmp/MLData

total 73M
drwxr-xr-x 2  501 staff 4.0K Jan  6  2022 .
drwxrwxrwt 1 root root  4.0K Oct 25 13:14 ..
-rw-r--r-- 1  501 staff  66M Jan  6  2022 autos.csv
-rw-r--r-- 1  501 staff  176 Jan  6  2022 ._loan.csv
-rw-r--r-- 1  501 staff 6.8M Jan  6  2022 loan.csv


In [10]:
query = """
CREATE TABLE IF NOT EXISTS raw_vehiculePrices
USING csv
OPTIONS (
  header "true",
  path "/tmp/MLData/autos.csv",
  inferSchema "true"
)
"""
spark.sql(query)

DataFrame[]

## Phase 0: Understanding the data

In this part, you are invited to get some knowledge about the data by reading its schema and extracting  some basic statistical information about the values of columns that you will find interesting.

In [11]:
query = """
DESCRIBE raw_vehiculePrices
"""
spark.sql(query).show()

+-------------------+---------+-------+
|           col_name|data_type|comment|
+-------------------+---------+-------+
|        dateCrawled|timestamp|   NULL|
|               name|   string|   NULL|
|             seller|   string|   NULL|
|          offerType|   string|   NULL|
|              price|      int|   NULL|
|             abtest|   string|   NULL|
|        vehicleType|   string|   NULL|
| yearOfRegistration|      int|   NULL|
|            gearbox|   string|   NULL|
|            powerPS|      int|   NULL|
|              model|   string|   NULL|
|          kilometer|      int|   NULL|
|monthOfRegistration|      int|   NULL|
|           fuelType|   string|   NULL|
|              brand|   string|   NULL|
|  notRepairedDamage|   string|   NULL|
|        dateCreated|timestamp|   NULL|
|       nrOfPictures|      int|   NULL|
|         postalCode|      int|   NULL|
|           lastSeen|timestamp|   NULL|
+-------------------+---------+-------+



In [12]:
query = """
SELECT * FROM raw_vehiculePrices TABLESAMPLE (5 ROWS);
"""
spark.sql(query).show()


+-------------------+--------------------+------+---------+-----+------+-----------+------------------+---------+-------+-----+---------+-------------------+--------+----------+-----------------+-------------------+------------+----------+-------------------+
|        dateCrawled|                name|seller|offerType|price|abtest|vehicleType|yearOfRegistration|  gearbox|powerPS|model|kilometer|monthOfRegistration|fuelType|     brand|notRepairedDamage|        dateCreated|nrOfPictures|postalCode|           lastSeen|
+-------------------+--------------------+------+---------+-----+------+-----------+------------------+---------+-------+-----+---------+-------------------+--------+----------+-----------------+-------------------+------------+----------+-------------------+
|2016-03-24 11:52:17|          Golf_3_1.6|privat|  Angebot|  480|  test|       NULL|              1993|  manuell|      0| golf|   150000|                  0|  benzin|volkswagen|             NULL|2016-03-24 00:00:00|     

In [13]:
query = """
SELECT  min(yearOfRegistration), max(yearOfRegistration),
          avg(yearOfRegistration), median(yearOfRegistration)
FROM raw_vehiculePrices
"""
spark.sql(query).show()

+-----------------------+-----------------------+-----------------------+--------------------------+
|min(yearOfRegistration)|max(yearOfRegistration)|avg(yearOfRegistration)|median(yearOfRegistration)|
+-----------------------+-----------------------+-----------------------+--------------------------+
|                   1000|                   9999|     2004.5767206439623|                    2003.0|
+-----------------------+-----------------------+-----------------------+--------------------------+



In [None]:
# query = """
# SELECT  yearOfRegistration, count(*)
# FROM vehiculePrices
# GROUP BY yearOfRegistration
# order by 1 desc,2 desc
# """
# spark.sql(query).show(150)

In [14]:
query = """
SELECT  min(price), max(price),
          avg(price), median(price)
FROM raw_vehiculePrices
"""
spark.sql(query).show()

+----------+----------+------------------+-------------+
|min(price)|max(price)|        avg(price)|median(price)|
+----------+----------+------------------+-------------+
|         0|2147483647|17286.338865535483|       2950.0|
+----------+----------+------------------+-------------+



In [15]:
query = """
SELECT  min(kilometer), max(kilometer),
          avg(kilometer), median(kilometer)
FROM raw_vehiculePrices
"""
spark.sql(query).show()

+--------------+--------------+------------------+-----------------+
|min(kilometer)|max(kilometer)|    avg(kilometer)|median(kilometer)|
+--------------+--------------+------------------+-----------------+
|          5000|        150000|125618.56044408226|         150000.0|
+--------------+--------------+------------------+-----------------+



## Phase 1: Cleaning the data and selecting relevant columns

In this part you are invited to decide which columns are useful for you analysis and to perform some cleaning on the data by removing outlier values (e.g. remove records with strange values for a specific column).
The result of your cleaning and selection should be stored in a table called `phase1`

Let;s focus on 3 metrics:
1. AVG price per seller and vehicle type
2. MED kilometer per fueltype
3. Count of gearbox per Year

In [16]:
#sanity check
query = """
SELECT * FROM raw_vehiculePrices TABLESAMPLE (5 ROWS);
"""
spark.sql(query).show()

+-------------------+--------------------+------+---------+-----+------+-----------+------------------+---------+-------+-----+---------+-------------------+--------+----------+-----------------+-------------------+------------+----------+-------------------+
|        dateCrawled|                name|seller|offerType|price|abtest|vehicleType|yearOfRegistration|  gearbox|powerPS|model|kilometer|monthOfRegistration|fuelType|     brand|notRepairedDamage|        dateCreated|nrOfPictures|postalCode|           lastSeen|
+-------------------+--------------------+------+---------+-----+------+-----------+------------------+---------+-------+-----+---------+-------------------+--------+----------+-----------------+-------------------+------------+----------+-------------------+
|2016-03-24 11:52:17|          Golf_3_1.6|privat|  Angebot|  480|  test|       NULL|              1993|  manuell|      0| golf|   150000|                  0|  benzin|volkswagen|             NULL|2016-03-24 00:00:00|     

In [17]:
query = """
SELECT DISTINCT vehicletype FROM raw_vehiculePrices
"""
spark.sql(query).show()

#kleinwagen means compact
#replace andere means other
#replace NULL with andere
#kombi means break


+-----------+
|vehicletype|
+-----------+
|      coupe|
| kleinwagen|
|        bus|
|     andere|
|  limousine|
|     cabrio|
|        suv|
|      kombi|
|       NULL|
+-----------+



In [22]:
query = """
SELECT DISTINCT seller FROM raw_vehiculePrices
"""
spark.sql(query).show()

#gewerblich means professional

+----------+
|    seller|
+----------+
|gewerblich|
|    privat|
|      NULL|
+----------+



In [18]:
query = """
SELECT DISTINCT gearbox FROM raw_vehiculePrices
"""
spark.sql(query).show()


+---------+
|  gearbox|
+---------+
|automatik|
|  manuell|
|     NULL|
+---------+



In [19]:
query = """
SELECT DISTINCT Yearofregistration FROM raw_vehiculePrices
where Yearofregistration > 2024
Order by Yearofregistration asc
Limit 10
"""
spark.sql(query).show()

# keep between 1900 and 2030

+------------------+
|Yearofregistration|
+------------------+
|              2066|
|              2200|
|              2222|
|              2290|
|              2500|
|              2800|
|              2900|
|              3000|
|              3200|
|              3500|
+------------------+



In [31]:
# Drop rows with null values in 'price', 'yearOfRegistration', 'kilometer'
query = """
CREATE OR REPLACE TEMPORARY VIEW phase1 AS
SELECT
  seller,
  price,
  CASE WHEN vehicleType is null then 'andere' else vehicleType end as vehicleType,
  yearOfRegistration,
  gearbox,
  kilometer,
  fuelType,
  brand
FROM
    raw_vehiculePrices
WHERE

    price IS NOT NULL
    AND yearOfRegistration between 1950 and year(CURRENT_DATE)
    AND price > 0
    AND kilometer > 0
"""
spark.sql(query)

# Show the first 20 rows of the 'phase1' table
spark.sql("SELECT * FROM phase1 LIMIT 20").show()

+------+-----+-----------+------------------+---------+---------+--------+-------------+
|seller|price|vehicleType|yearOfRegistration|  gearbox|kilometer|fuelType|        brand|
+------+-----+-----------+------------------+---------+---------+--------+-------------+
|privat|  480|     andere|              1993|  manuell|   150000|  benzin|   volkswagen|
|privat|18300|      coupe|              2011|  manuell|   125000|  diesel|         audi|
|privat| 9800|        suv|              2004|automatik|   125000|  diesel|         jeep|
|privat| 1500| kleinwagen|              2001|  manuell|   150000|  benzin|   volkswagen|
|privat| 3600| kleinwagen|              2008|  manuell|    90000|  diesel|        skoda|
|privat|  650|  limousine|              1995|  manuell|   150000|  benzin|          bmw|
|privat| 2200|     cabrio|              2004|  manuell|   150000|  benzin|      peugeot|
|privat|14500|        bus|              2014|  manuell|    30000|  benzin|         ford|
|privat|  999| kleinw

Give a brief summary of your choices

## Phase 2: Organizing the data

In this part you are invited to load the data into delta tables where you will define meaningful constraints and conditions to be fulfiled by any future incoming data.
The result of this phase should a delta table called `deltaPrices`

In [100]:
# Create a Delta table with constraints
query = """
CREATE OR REPLACE TABLE deltaPrices
USING DELTA
PARTITIONED BY (VehicleType, Seller)
AS
SELECT * FROM phase1
"""
spark.sql(query)

DataFrame[]

In [93]:
# Add constraints (you can add more as needed)
spark.sql("""
ALTER TABLE deltaPrices ADD CONSTRAINT price_positive CHECK (price > 0)
""")
spark.sql("""
ALTER TABLE deltaPrices ADD CONSTRAINT year_valid CHECK (yearOfRegistration BETWEEN 1950 AND 2024)
""")
spark.sql("""
ALTER TABLE deltaPrices ADD CONSTRAINT kilometer_positive CHECK (kilometer > 0)
""")
spark.sql("""
ALTER TABLE deltaPrices ADD CONSTRAINT vehicleType_notnull CHECK (vehicleType is not null)
""")

DataFrame[]

Comment on the constraints you added

1. price must be positive
2. year must be between 1950 and the current year
3. kilometer must be positive
4. vehicletype cannot be null

Given the use case we could set contrains on type and null allowed for every columns

## Phase 3: Analysing the data and ensuring query evaluation effeciency

Suggest 2 or 3 meaningfull queries as described above and suggest a data organization scheme for optimizing one such query of your choice.

In [None]:
#2mTufPgT05aKRe6NI5bFGNHW3nj_3ExvRo7V3MvMwfPBUrVpi

In [35]:
from pyngrok import ngrok, conf
import getpass

print("Enter your authtoken, which can be copied "
"from https://dashboard.ngrok.com/get-started/your-authtoken")
conf.get_default().auth_token = getpass.getpass()

ui_port = 4040
public_url = ngrok.connect(ui_port).public_url
print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{ui_port}\"")

Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken
··········




 * ngrok tunnel "https://4bc3-35-237-9-200.ngrok-free.app" -> "http://127.0.0.1:4040"


We partionned by VehicleTye and Seller to optimize this request

In [102]:
spark.sparkContext.setJobDescription('AVG PRICE PER SELELR AND VEHICLETYPE')
query = """
select VehicleType, Seller, AVG(price)
from phase1
Group By VehicleType, Seller
Order By VehicleType, Seller
"""
result_df  = spark.sql(query).toPandas()
result_df

Unnamed: 0,VehicleType,Seller,avg(price)
0,andere,privat,81476.583425
1,bus,privat,10450.72242
2,cabrio,privat,15267.785851
3,coupe,privat,26691.669263
4,kleinwagen,gewerblich,1100.0
5,kleinwagen,privat,5823.27963
6,kombi,gewerblich,100.0
7,kombi,privat,7910.699805
8,limousine,gewerblich,6900.0
9,limousine,privat,11352.065446


In [73]:
spark.sparkContext.setJobDescription('MED Kilometers per fueltype')
query = """
select FuelType, MEDIAN(kilometer)
from phase1
Group By FuelType
Order by 2 desc
"""
result_df  = spark.sql(query).toPandas()
result_df

Unnamed: 0,FuelType,median(kilometer)
0,benzin,150000.0
1,,150000.0
2,diesel,150000.0
3,cng,150000.0
4,lpg,150000.0
5,andere,95000.0
6,hybrid,80000.0
7,elektro,30000.0


In [75]:
spark.sparkContext.setJobDescription('GEARBOX per year')
query = """
select  YearOfRegistration, GearBox, count(*)
from phase1
Group By  YearOfRegistration, GearBox
Order by YearOfRegistration, GearBox
"""
result_df  = spark.sql(query).toPandas()
result_df

Unnamed: 0,YearOfRegistration,GearBox,count(1)
0,1950,,1
1,1950,automatik,2
2,1950,manuell,18
3,1951,,3
4,1951,automatik,2
...,...,...,...
204,2018,automatik,484
205,2018,manuell,2883
206,2019,,2
207,2019,automatik,4


## Ingesting new data and reruning analytics  

In this part you are invited to suggest the insertion of fictious new data that conforms to the schema established in phase 2 and to rerun some queries of phase 3 to see the evolution of the result. Ideally, write a query that compares an aggregation value in two different versions of the data by exploiting the delta history feature.

In [99]:
# Insert new fictitious data into the delta table
new_data = [
    ("privat", 15000, "andere", 2023, "manuell", 50000, "benzin", "BMW"),
    ("privat", 20000, "coupe", 2022, "automatik", 30000, "diesel", "Mercedes"),
    ("privat", 18000, "kombi", 2023, "manuell", 40000, "benzin", "Audi"),
    ("gewerblich", 12000, "andere", 2021, "manuell", 60000, "diesel", "Volkswagen")
]

new_data_df = spark.createDataFrame(new_data, schema=spark.table("deltaPrices").schema)
new_data_df.write.format("delta").mode("append").saveAsTable("deltaPrices")


# Rerun the average price per seller and vehicle type query and compare with the previous result
query_compare_avg_price = """
SELECT A.VehicleType, A.Seller, AVG(A.price) AS avg_price, AVG(avg_price) as avg_price_previous
FROM deltaPrices A
  LEFT JOIN (
    SELECT VehicleType, Seller, AVG(price) AS avg_price
    FROM deltaPrices VERSION AS OF 52
    GROUP BY VehicleType, Seller
  ) B ON A.VehicleType = B.VehicleType AND A.Seller = B.Seller
GROUP BY A.VehicleType, A.Seller
ORDER BY A.VehicleType, A.Seller;
"""

result_df = spark.sql(query_compare_avg_price).toPandas()
print("Comparison of average price per seller and vehicle type between current version and previous version:")
result_df



Comparison of average price per seller and vehicle type between current version and previous version:


Unnamed: 0,VehicleType,Seller,avg_price,avg_price_previous
0,andere,gewerblich,12000.0,
1,andere,privat,81473.000458,81476.583425
2,bus,privat,10450.72242,10450.72242
3,cabrio,privat,15267.785851,15267.785851
4,coupe,privat,26690.94537,26691.669263
5,kleinwagen,gewerblich,1100.0,1100.0
6,kleinwagen,privat,5823.27963,5823.27963
7,kombi,gewerblich,100.0,100.0
8,kombi,privat,7911.004807,7910.699805
9,limousine,gewerblich,6900.0,6900.0


We see that the AVG prices have changed between the actual and the previous verison of our delta table.