### **Mise En Garde: Certaines méthodes sont exploitables en fonction de la version de Python que vous ustilisez**

In [None]:
!pip install pyspark

In [2]:
import pyspark
print(f"{pyspark.__version__}")

3.5.0


In [3]:
import os
os.listdir(os.getcwd())

['.config', 'sample_data']

### **Point d'entrée : SparkContext**

In [4]:
sc = pyspark.SparkContext()

# Verifier si SparkContext est valide
print(sc)

<SparkContext master=local[*] appName=pyspark-shell>


In [5]:
# Check the version of SparkContext in PySpark Shell
print("La version de Spark Context dans PySpark Shell est : ", sc.version)

# Display the Python version of SparkContext
print("La version Python de Spark Context dans PySpark Shell est : ", sc.pythonVer)

# Display the master of SparkContext
print("Le maitre de Spark Context dans PySpark Shell est : ", sc.master)

La version de Spark Context dans PySpark Shell est :  3.5.0
La version Python de Spark Context dans PySpark Shell est :  3.10
Le maitre de Spark Context dans PySpark Shell est :  local[*]


### **Some Data**

**SparkSession : Point d'entrée pour les Dataframes**

In [6]:
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)

In [7]:
dirty_data = spark.createDataFrame([
          (1,'Porsche','Boxster S','Turbo',2.5,4,22,None)
        , (2,'Aston Martin','Vanquish','Aspirated',6.0,12,16,None)
        , (3,'Porsche','911 Carrera 4S Cabriolet','Turbo',3.0,6,24,None)
        , (3,'General Motors','SPARK ACTIV','Aspirated',1.4,None,32,None)
        , (5,'BMW','COOPER S HARDTOP 2 DOOR','Turbo',2.0,4,26,None)
        , (6,'BMW','330i','Turbo',2.0,None,27,None)
        , (7,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (8,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (9,'Mercedes-Benz',None,None,None,None,27,None)
        , (10,'Mercedes-Benz','CLS 550','Turbo',4.7,8,21,79231)
        , (11,'Volkswagen','GTI','Turbo',2.0,4,None,None)
        , (12,'Ford Motor Company','FUSION AWD','Turbo',2.7,6,20,None)
        , (13,'Nissan','Q50 AWD RED SPORT','Turbo',3.0,6,22,None)
        , (14,'Nissan','Q70 AWD','Aspirated',5.6,8,18,None)
        , (15,'Kia','Stinger RWD','Turbo',2.0,4,25,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (18,'FCA US LLC','300','Aspirated',3.6,6,23,None)
        , (19,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (20,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (21,'BMW','X5 M','Turbo',4.4,8,18,121231)
        , (22,'GE','K1500 SUBURBAN 4WD','Aspirated',5.3,8,18,None)
    ], ['Id','Manufacturer','Model','EngineType','Displacement',
        'Cylinders','FuelEconomy','MSRP'])

### **Handle Duplicates**

+ Exact duplicates

In [8]:
# do we have any rows that are duplicates?
dirty_data.count(), dirty_data.distinct().count()

(22, 21)

In [9]:
# What row is duplicated

(
    dirty_data
    .groupby(dirty_data.columns)
    .count()
    .filter('count > 1')
    .show()
)

+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| Id|Manufacturer|          Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| 16|      Toyota|CAMRY HYBRID LE| Aspirated|         2.5|        4|         46|NULL|    2|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+



In [10]:
# remove the duplicated rows
full_removed = dirty_data.dropDuplicates()
full_removed.count()

21

### **Only ID differs**

In [11]:
# count of rows
no_ids = (
    full_removed
    .select([col for col in full_removed.columns if col != 'Id'])
)

no_ids.count(), no_ids.distinct().count()

(21, 19)

In [12]:
# what rows is duplicated?

{
    full_removed
    .groupBy([col for col in full_removed.columns if col != 'Id'])
    .count()
    .filter('count > 1')
    .show()
}

+------------+----------+----------+------------+---------+-----------+----+-----+
|Manufacturer|     Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+------------+----------+----------+------------+---------+-----------+----+-----+
|         BMW|440i Coupe|     Turbo|         3.0|        6|         23|NULL|    2|
|     Hyundai|   G80 AWD|     Turbo|         3.3|        6|         20|NULL|    2|
+------------+----------+----------+------------+---------+-----------+----+-----+



{None}

In [13]:
# remove the duplicated record
id_removed = full_removed.dropDuplicates(
    subset = [col for col in  full_removed.columns if col != 'Id']
)

In [14]:
# count
id_removed.count()

19

### **Duplicates IDs**


In [19]:
# are there any duplicated IDs?

import pyspark.sql.functions as fn

id_removed.agg(
    fn.count('Id').alias('CountOfIDs'),
    fn.countDistinct('Id').alias('countOfDistinctsIDs')
).show()

+----------+-------------------+
|CountOfIDs|countOfDistinctsIDs|
+----------+-------------------+
|        19|                 18|
+----------+-------------------+



In [20]:
# what's duplicated?

(
    id_removed
    .groupby('Id')
    .count()
    .filter('count > 1')
    .show()
)

+---+-----+
| Id|count|
+---+-----+
|  3|    2|
+---+-----+



In [21]:
(
    id_removed
    .filter('Id=3')
    .show()
)

+---+--------------+--------------------+----------+------------+---------+-----------+----+
| Id|  Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+--------------+--------------------+----------+------------+---------+-----------+----+
|  3|General Motors|         SPARK ACTIV| Aspirated|         1.4|     NULL|         32|NULL|
|  3|       Porsche|911 Carrera 4S Ca...|     Turbo|         3.0|        6|         24|NULL|
+---+--------------+--------------------+----------+------------+---------+-----------+----+



In [22]:
new_id = (
    id_removed
    .select(
        [fn.monotonically_increasing_id().alias('Id')] +
        [col for col in id_removed.columns if col != 'Id']
    )
)

new_id.show()

+---+------------------+--------------------+----------+------------+---------+-----------+------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|  MSRP|
+---+------------------+--------------------+----------+------------+---------+-----------+------+
|  0|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21| 79231|
|  1|               BMW|                X5 M|     Turbo|         4.4|        8|         18|121231|
|  2|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     NULL|         32|  NULL|
|  3|     Mercedes-Benz|                NULL|      NULL|        NULL|     NULL|         27|  NULL|
|  4|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|  NULL|
|  5|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|  NULL|
|  6|        Volkswagen|                 GTI|     Turbo|         2.0|        4|       NULL|  NULL|
|  7|     

### **Handling missing observations**

#### **Missing observations per row**

In [23]:
(
    spark.createDataFrame(
        new_id.rdd.map(
            lambda row: (
                row['Id'],
                sum([c == None for c in row])
            )
        )
        .filter(lambda e1: e1[1] > 1)
        .collect(),
        ['Id', 'countMissing']
    )
    .orderBy('CountMissing', ascending=False)
    .show()
)

+---+------------+
| Id|countMissing|
+---+------------+
|  3|           5|
|  6|           2|
|  2|           2|
|  7|           2|
+---+------------+



In [24]:
(
    new_id
    .where('Id == 3')
    .show()
)

+---+-------------+-----+----------+------------+---------+-----------+----+
| Id| Manufacturer|Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+-------------+-----+----------+------------+---------+-----------+----+
|  3|Mercedes-Benz| NULL|      NULL|        NULL|     NULL|         27|NULL|
+---+-------------+-----+----------+------------+---------+-----------+----+



In [25]:
merc_out = new_id.dropna(thresh=4)
new_id.count(), merc_out.count()

(19, 18)

In [26]:
(
    merc_out
    .where('Id == 3')
    .show()
)

+---+------------+-----+----------+------------+---------+-----------+----+
| Id|Manufacturer|Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+------------+-----+----------+------------+---------+-----------+----+
+---+------------+-----+----------+------------+---------+-----------+----+



### **Missing observations per column**

In [27]:
for k, v in sorted(
    merc_out.agg(*[
        (1- (fn.count(c) / fn.count('*')))
              .alias(c + 'miss')
        for c in merc_out.columns
    ])
    .collect()[0]
    .asDict()
    .items(),
    key=lambda e1 : e1[1],
    reverse=True
):
  print(k, v)

MSRPmiss 0.8888888888888888
Cylindersmiss 0.11111111111111116
FuelEconomymiss 0.05555555555555558
Idmiss 0.0
Manufacturermiss 0.0
Modelmiss 0.0
EngineTypemiss 0.0
Displacementmiss 0.0


In [29]:
no_MSRP = merc_out.select([col for col in new_id.columns if col != 'MSRP'])
no_MSRP.show()

+---+------------------+--------------------+----------+------------+---------+-----------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|
+---+------------------+--------------------+----------+------------+---------+-----------+
|  0|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21|
|  1|               BMW|                X5 M|     Turbo|         4.4|        8|         18|
|  2|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     NULL|         32|
|  4|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|
|  5|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|
|  6|        Volkswagen|                 GTI|     Turbo|         2.0|        4|       NULL|
|  7|               BMW|                330i|     Turbo|         2.0|     NULL|         27|
|  8|           Porsche|           Boxster S|     Turbo|         2.5|        4| 

In [31]:
multipliers = (
    no_MSRP
    .agg (
        fn.mean(
            fn.col('FuelEconomy') /
            (
                fn.col('Displacement') * fn.col('Cylinders')
            )
        ).alias('FuelEconomy'),
        fn.mean(
            fn.col('Cylinders') /
            fn.col('Displacement')
        ).alias('Cylinders')
    )
).toPandas().to_dict('records')[0]

multipliers

{'FuelEconomy': 1.4957485048359977, 'Cylinders': 1.8353365984789107}

In [32]:
imputed = (
    no_MSRP
    .withColumn('FuelEconomy', fn.col('FuelEconomy') / fn.col('Displacement') / fn.col('Cylinders'))
    .withColumn('Cylinders', fn.col('Cylinders') / fn.col('Displacement'))
    .fillna(multipliers)
    .withColumn('Cylinders', (fn.col('Cylinders') * fn.col('Displacement')).cast('integer'))
    .withColumn('FuelEconomy', fn.col('FuelEconomy') * fn.col('Displacement') * fn.col('Cylinders'))
)

imputed.show()

+---+------------------+--------------------+----------+------------+---------+------------------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|       FuelEconomy|
+---+------------------+--------------------+----------+------------+---------+------------------+
|  0|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|              21.0|
|  1|               BMW|                X5 M|     Turbo|         4.4|        8|              18.0|
|  2|    General Motors|         SPARK ACTIV| Aspirated|         1.4|        2|4.1880958135407935|
|  4|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|              26.0|
|  5|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|              16.0|
|  6|        Volkswagen|                 GTI|     Turbo|         2.0|        4|11.965988038687982|
|  7|               BMW|                330i|     Turbo|         2.0|        3| 8.974491029015986|
|  8|     

### **Handling Outliers**

In [35]:
features = ['Displacement', 'Cylinders', 'FuelEconomy']
quantiles = [0.25, 0.75]

cut_off_points = []

for feature in features:
  quants = imputed.approxQuantile(feature, quantiles, 0.05)

  IQR = quants[1] - quants[0]
  cut_off_points.append((feature, [
      quants[0] - 1.5 * IQR,
      quants[1] + 1.5 * IQR,
  ]))

cut_off_points = dict(cut_off_points)


outliers = imputed.select(*['id'] + [
      (
          (imputed[f] < cut_off_points[f][0]) |
          (imputed[f] > cut_off_points[f][1])
      ).alias(f + '_o') for f in features
  ])

outliers.show()

+---+--------------+-----------+-------------+
| id|Displacement_o|Cylinders_o|FuelEconomy_o|
+---+--------------+-----------+-------------+
|  0|         false|      false|        false|
|  1|         false|      false|        false|
|  2|         false|      false|         true|
|  4|         false|      false|        false|
|  5|         false|       true|        false|
|  6|         false|      false|        false|
|  7|         false|      false|        false|
|  8|         false|      false|        false|
|  9|         false|      false|        false|
| 10|         false|      false|        false|
| 11|         false|      false|        false|
| 12|         false|      false|        false|
| 13|         false|      false|        false|
| 14|         false|      false|        false|
| 15|         false|      false|        false|
| 16|         false|      false|        false|
| 17|         false|      false|         true|
| 18|         false|      false|        false|
+---+--------

In [36]:
with_outliers_flag = imputed.join(outliers, on='Id')

(
    with_outliers_flag
    .filter('FuelEconomy_o')
    .select('Id', 'Manufacturer', 'Model', 'FuelEconomy')
    .show()
)

+---+--------------+---------------+------------------+
| Id|  Manufacturer|          Model|       FuelEconomy|
+---+--------------+---------------+------------------+
|  2|General Motors|    SPARK ACTIV|4.1880958135407935|
| 17|        Toyota|CAMRY HYBRID LE|              46.0|
+---+--------------+---------------+------------------+



In [37]:
no_outliers = (
    with_outliers_flag
    .filter('!FuelEconomy_o')
    .select(imputed.columns)
)

### **Exploring Descriptive Statistics**

In [38]:
descriptive_stats = no_outliers.describe(features)
descriptive_stats.show()

+-------+------------------+-----------------+------------------+
|summary|      Displacement|        Cylinders|       FuelEconomy|
+-------+------------------+-----------------+------------------+
|  count|                16|               16|                16|
|   mean|           3.44375|            6.125|19.600446608398162|
| stddev|1.3549753995306826|2.276693508870558|4.6666477673737505|
|    min|               2.0|                3| 8.974491029015986|
|    max|               6.0|               12|              26.0|
+-------+------------------+-----------------+------------------+



In [39]:
descriptive_stats_all = no_outliers.describe()
descriptive_stats_all.show()

+-------+-----------------+------------+-----+----------+------------------+-----------------+------------------+
|summary|               Id|Manufacturer|Model|EngineType|      Displacement|        Cylinders|       FuelEconomy|
+-------+-----------------+------------+-----+----------+------------------+-----------------+------------------+
|  count|               16|          16|   16|        16|                16|               16|                16|
|   mean|           9.3125|        NULL|300.0|      NULL|           3.44375|            6.125|19.600446608398162|
| stddev|5.287958017987662|        NULL| NULL|      NULL|1.3549753995306826|2.276693508870558|4.6666477673737505|
|    min|                0|Aston Martin|  300| Aspirated|               2.0|                3| 8.974491029015986|
|    max|               18|  Volkswagen| X5 M|     Turbo|               6.0|               12|              26.0|
+-------+-----------------+------------+-----+----------+------------------+------------

In [40]:
(
    no_outliers
    .select(features)
    .groupBy('Cylinders')
    .agg(*[
        fn.count('*').alias('Count'),
        fn.mean('FuelEconomy').alias('MPG_avg'),
        fn.mean('Displacement').alias('Disp_avg'),
        fn.stddev('FuelEconomy').alias('MPG_stdev'),
        fn.stddev('Displacement').alias('Disp_stdev')
    ])
    .orderBy('Cylinders')
).show()

+---------+-----+------------------+------------------+------------------+-------------------+
|Cylinders|Count|           MPG_avg|          Disp_avg|         MPG_stdev|         Disp_stdev|
+---------+-----+------------------+------------------+------------------+-------------------+
|        3|    1| 8.974491029015986|               2.0|              NULL|               NULL|
|        4|    4|21.241497009671995|             2.125| 6.413009924998987|0.24999999999999997|
|        5|    1|16.666666666666668|               2.7|              NULL|               NULL|
|        6|    5|              22.4|3.1799999999999997|1.5165750888103096| 0.2683281572999748|
|        8|    4|             18.75|               5.0|               1.5| 0.5477225575051655|
|       12|    1|              16.0|               6.0|              NULL|               NULL|
+---------+-----+------------------+------------------+------------------+-------------------+



### **Computing correlations**

In [41]:
(
    no_outliers
    .corr('Cylinders', 'Displacement')
)

0.9381829964408112

In [42]:
n_features = len(features)

corr = []

for i in range(0, n_features):
  temp = [None] * i

  for j in range(i, n_features):
    temp.append(no_outliers.corr(features[i], features[j]))
  corr.append([features[i]] + temp)

correlations = spark.createDataFrame(corr, ['Column'] + features)

correlations.show()

+------------+------------+------------------+--------------------+
|      Column|Displacement|         Cylinders|         FuelEconomy|
+------------+------------+------------------+--------------------+
|Displacement|         1.0|0.9381829964408112|-0.10757908872387667|
|   Cylinders|        NULL|               1.0| -0.0421854654503533|
| FuelEconomy|        NULL|              NULL|                 1.0|
+------------+------------+------------------+--------------------+



### **Drawing histograms**

In [43]:
histogram_MPG = (
    no_outliers
    .select('FuelEconomy')
    .rdd
    .flatMap(lambda record: record)
    .histogram(5)
)

In [44]:
for i in histogram_MPG:
  print(i)

histogram_MPG

[8.974491029015986, 12.37959282321279, 15.784694617409592, 19.189796411606395, 22.594898205803197, 26.0]
[2, 0, 5, 4, 5]


([8.974491029015986,
  12.37959282321279,
  15.784694617409592,
  19.189796411606395,
  22.594898205803197,
  26.0],
 [2, 0, 5, 4, 5])

In [45]:
for i in range(len(histogram_MPG[0])-1):
  print('[' + str(round(histogram_MPG[0][i], 2))
          + ',' + str(round(histogram_MPG[0][i+1], 2))
        + ')'
      )

[8.97,12.38)
[12.38,15.78)
[15.78,19.19)
[19.19,22.59)
[22.59,26.0)


In [46]:
(
    spark
    .createDataFrame(
        [(bins, counts)
          for bins, counts
          in zip(
            histogram_MPG[0],
            histogram_MPG[1]
        )]
        , ['bins', 'counts']
    )
    .registerTempTable('histogram_MPG')
)



In [None]:
!pip install ipython-sql

In [None]:
# Load the SQL extension
%reload_ext sql

# Connect to your database (replace 'your_connection_string' with the actual connection string)
%sql '-o hist_MPG -q'

# Execute the SQL query and fetch the results into a Pandas DataFrame
%sql SELECT * FROM histogram_MPG