In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, udf, when
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()
spark

### Connexion à la base de données avec MongoDB Spark Connector (sur Databricks)

Il est possible de se connecter à une base de données MongoDB avec Spark. Pour cela, il faut utiliser le connecteur MongoDB Spark Connector. Il est possible de l'installer sur Databricks en créant un cluster, et en ajoutant la librairie `org.mongodb.spark:mongo-spark-connector_2.12:3.0.1`.

On peut ensuite créer la session Spark avec la configuration suivante :
```python
spark = SparkSession.builder \
    .appName("diamonds") \
    .config("spark.mongodb.input.uri", "mongodb+srv://<username>:<password>@<hostname>") \
    .config("spark.mongodb.output.uri", "mongodb+srv://<username>:<password>@<hostname>") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.13:3.0.1") \
    .getOrCreate()
spark
```

On peut ensuite lire les données de la base de données avec la commande suivante :
```python
df = spark.read \
    .format("com.mongodb.spark.sql.DefaultSource") \
    .option("spark.mongodb.input.uri", "mongodb+srv://<username>:<password>@<hostname>") \
    .option("spark.mongodb.output.uri", "mongodb+srv://<username>:<password>@<hostname>") \
    .option("database", "diamonds") \
    .option("collection", "diamonds") \
    .load()
```

In [2]:
path = "./diamonds.csv"
df = spark.read.load(path, format="csv", header=True,inferSchema=True)
df.take(5)

[Row(_c0=1, carat=0.23, cut='Ideal', color='E', clarity='SI2', depth=61.5, table=55.0, price=326, x=3.95, y=3.98, z=2.43),
 Row(_c0=2, carat=0.21, cut='Premium', color='E', clarity='SI1', depth=59.8, table=61.0, price=326, x=3.89, y=3.84, z=2.31),
 Row(_c0=3, carat=0.23, cut='Good', color='E', clarity='VS1', depth=56.9, table=65.0, price=327, x=4.05, y=4.07, z=2.31),
 Row(_c0=4, carat=0.29, cut='Premium', color='I', clarity='VS2', depth=62.4, table=58.0, price=334, x=4.2, y=4.23, z=2.63),
 Row(_c0=5, carat=0.31, cut='Good', color='J', clarity='SI2', depth=63.3, table=58.0, price=335, x=4.34, y=4.35, z=2.75)]

In [3]:

# Vérification des valeurs manquantes dans chaque colonne
missing_values_counts = df.select([col(column).isNull().cast("int").alias(column) for column in df.columns])\
                            .agg(*[count(col(column)).alias(column) for column in df.columns])

# Affichez le nombre de valeurs manquantes dans chaque colonne
missing_values_counts.show()

+-----+-----+-----+-----+-------+-----+-----+-----+-----+-----+-----+
|  _c0|carat|  cut|color|clarity|depth|table|price|    x|    y|    z|
+-----+-----+-----+-----+-------+-----+-----+-----+-----+-----+-----+
|53940|53940|53940|53940|  53940|53940|53940|53940|53940|53940|53940|
+-----+-----+-----+-----+-------+-----+-----+-----+-----+-----+-----+



In [4]:
data_with_volume = df.withColumn("volume", col("x") * col("y") * col("z"))

columns_with_missing_values = ["x", "y", "z"]
for column in columns_with_missing_values:
    mean_value = data_with_volume.agg({column: "mean"}).collect()[0][0]
    data_with_volume = data_with_volume.withColumn(column, 
                                                   when(col(column).isNull(), mean_value).otherwise(col(column)))

data_with_volume.show(5)

+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+
|_c0|carat|    cut|color|clarity|depth|table|price|   x|   y|   z|            volume|
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+
|  1| 0.23|  Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|          38.20203|
|  2| 0.21|Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|         34.505856|
|  3| 0.23|   Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|         38.076885|
|  4| 0.29|Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|          46.72458|
|  5| 0.31|   Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|51.917249999999996|
+---+-----+-------+-----+-------+-----+-----+-----+----+----+----+------------------+
only showing top 5 rows



In [5]:
data_with_volume.agg({"volume": "max"}).show()

+------------------+
|       max(volume)|
+------------------+
|3840.5980600000003|
+------------------+



In [6]:
data_with_volume.filter(col("volume") == 0).show()

+-----+-----+---------+-----+-------+-----+-----+-----+----+----+---+------+
|  _c0|carat|      cut|color|clarity|depth|table|price|   x|   y|  z|volume|
+-----+-----+---------+-----+-------+-----+-----+-----+----+----+---+------+
| 2208|  1.0|  Premium|    G|    SI2| 59.1| 59.0| 3142|6.55|6.48|0.0|   0.0|
| 2315| 1.01|  Premium|    H|     I1| 58.1| 59.0| 3167|6.66| 6.6|0.0|   0.0|
| 4792|  1.1|  Premium|    G|    SI2| 63.0| 59.0| 3696| 6.5|6.47|0.0|   0.0|
| 5472| 1.01|  Premium|    F|    SI2| 59.2| 58.0| 3837| 6.5|6.47|0.0|   0.0|
|10168|  1.5|     Good|    G|     I1| 64.0| 61.0| 4731|7.15|7.04|0.0|   0.0|
|11183| 1.07|    Ideal|    F|    SI2| 61.6| 56.0| 4954| 0.0|6.62|0.0|   0.0|
|11964|  1.0|Very Good|    H|    VS2| 63.3| 53.0| 5139| 0.0| 0.0|0.0|   0.0|
|13602| 1.15|    Ideal|    G|    VS2| 59.2| 56.0| 5564|6.88|6.83|0.0|   0.0|
|15952| 1.14|     Fair|    G|    VS1| 57.5| 67.0| 6381| 0.0| 0.0|0.0|   0.0|
|24395| 2.18|  Premium|    H|    SI2| 59.4| 61.0|12631|8.49|8.45|0.0|   0.0|

In [7]:
@udf(returnType = FloatType())
def convert_to_grams(weight_in_carat):
    return weight_in_carat * 0.2

data_with_weight = data_with_volume.withColumn("weight", convert_to_grams("carat"))

data_with_weight.orderBy("weight", ascending=False).show()

+-----+-----+---------+-----+-------+-----+-----+-----+-----+-----+----+------------------+------+
|  _c0|carat|      cut|color|clarity|depth|table|price|    x|    y|   z|            volume|weight|
+-----+-----+---------+-----+-------+-----+-----+-----+-----+-----+----+------------------+------+
|27416| 5.01|     Fair|    J|     I1| 65.5| 59.0|18018|10.74|10.54|6.98|        790.133208| 1.002|
|27631|  4.5|     Fair|    J|     I1| 65.8| 58.0|18531|10.23|10.16|6.72|        698.455296|   0.9|
|27131| 4.13|     Fair|    H|     I1| 64.8| 61.0|17329| 10.0| 9.85|6.43|           633.355| 0.826|
|26000| 4.01|  Premium|    J|     I1| 62.5| 62.0|15223|10.02| 9.94|6.24|        621.496512| 0.802|
|25999| 4.01|  Premium|    I|     I1| 61.0| 61.0|15223|10.14| 10.1|6.17|         631.89438| 0.802|
|26445|  4.0|Very Good|    I|     I1| 63.3| 58.0|15984|10.01| 9.94|6.31| 627.8412139999999|   0.8|
|26535| 3.67|  Premium|    I|     I1| 62.4| 56.0|16193| 9.86| 9.81|6.13|        592.934058| 0.734|
|23645| 3.