In [2]:
from pyspark.sql import SparkSession
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from pyspark.sql.functions import col, when, mean,sum
from pyspark.sql.types import DecimalType



In [3]:
# Initialiser Spark avec le connecteur Cassandra
spark = SparkSession.builder \
    .appName("Save Data to Cassandra") \
    .config("spark.cassandra.connection.host", "172.18.0.2") \
    .config("spark.cassandra.connection.port", "9042") \
    .config("spark.cassandra.auth.username", "cassandra") \
    .config("spark.cassandra.auth.password", "cassandra") \
    .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.2.0") \
    .getOrCreate()

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0a54823d-c3c5-4f5d-9a62-b38d90818668;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.2.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.2.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [4]:
# Se connecter à Cassandra pour vérifier les tables
auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')
cluster = Cluster(['172.18.0.2'], port=9042, auth_provider=auth_provider)
session = cluster.connect('concessionnaire')

In [5]:
# Charger les données traitées depuis les fichiers CSV
clients_processed = spark.read.csv("data/processed/clients_processed.csv", header=True, inferSchema=True)
catalogue_processed = spark.read.csv("data/processed/catalogue_processed.csv", header=True, inferSchema=True)
immatriculations_processed = spark.read.csv("data/processed/immatriculations_processed.csv", header=True, inferSchema=True)

                                                                                

### Sauvegarder les données dans Cassandra

In [6]:
# Sélectionner uniquement les colonnes d'origine (sans les colonnes index)
clients_original_columns = ["immatriculation", "age", "sexe", "taux", "situationFamiliale", "nbEnfantsAcharge", "2eme voiture"]
catalogue_original_columns = ["marque", "nom", "puissance", "longueur", "nbPlaces", "nbPortes", "couleur", "occasion", "prix"]
immatriculation_original_columns = ["immatriculation", "marque", "nom", "puissance", "longueur", "nbPlaces", "nbPortes", "couleur", "occasion", "prix"]

In [7]:
# Filtrer les colonnes
clients_processed = clients_processed.select(clients_original_columns)
catalogue_processed = catalogue_processed.select(catalogue_original_columns)
immatriculations_processed = immatriculations_processed.select(immatriculation_original_columns)

In [8]:
clients_processed.show(3)

+---------------+----+----+------+------------------+----------------+------------+
|immatriculation| age|sexe|  taux|situationFamiliale|nbEnfantsAcharge|2eme voiture|
+---------------+----+----+------+------------------+----------------+------------+
|     1435 TU 40|27.0|   M| 990.0|         En Couple|             3.0|       false|
|     2458 CW 38|33.0|   M|1288.0|         En Couple|             0.0|       false|
|     9316 TP 82|23.0|   M|1342.0|       Célibataire|             0.0|       false|
+---------------+----+----+------+------------------+----------------+------------+
only showing top 3 rows



In [9]:
clients_processed.printSchema()

root
 |-- immatriculation: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sexe: string (nullable = true)
 |-- taux: double (nullable = true)
 |-- situationFamiliale: string (nullable = true)
 |-- nbEnfantsAcharge: double (nullable = true)
 |-- 2eme voiture: boolean (nullable = true)



In [10]:
# Convertir les colonnes en types compatibles avec Cassandra
clients_processed = clients_processed.withColumn("age", col("age").cast("int")) \
                                     .withColumn("taux", col("taux").cast("int")) \
                                     .withColumn("nbEnfantsAcharge", col("nbEnfantsAcharge").cast("int"))

In [11]:
# Rename columns to match Cassandra table
clients_processed = clients_processed.withColumnRenamed("nbEnfantsAcharge", "nbenfantsacharge") \
                                     .withColumnRenamed("situationFamiliale", "situationfamiliale")

In [12]:
# Afficher le schéma du DataFrame après conversion
print("Schéma du DataFrame après conversion :")
clients_processed.printSchema()

Schéma du DataFrame après conversion :
root
 |-- immatriculation: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sexe: string (nullable = true)
 |-- taux: integer (nullable = true)
 |-- situationfamiliale: string (nullable = true)
 |-- nbenfantsacharge: integer (nullable = true)
 |-- 2eme voiture: boolean (nullable = true)



In [13]:
# 1. Sauvegarder les clients
clients_processed.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="clients", keyspace="concessionnaire") \
    .mode("append") \
    .save()

print("Données sauvegardées avec succès dans Cassandra.")

25/01/19 00:12:27 WARN PlainTextAuthProviderBase: [] /172.18.0.2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
25/01/19 00:12:28 WARN PlainTextAuthProviderBase: [] /172.18.0.2:9042 did not send an authentication challenge; This is suspicious because the driver expects authentication
[Stage 7:>                                                          (0 + 1) / 1]

Données sauvegardées avec succès dans Cassandra.


                                                                                

In [14]:
clients_processed.printSchema()

root
 |-- immatriculation: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- sexe: string (nullable = true)
 |-- taux: integer (nullable = true)
 |-- situationfamiliale: string (nullable = true)
 |-- nbenfantsacharge: integer (nullable = true)
 |-- 2eme voiture: boolean (nullable = true)



In [15]:
# Rename columns to match Cassandra table
catalogue_processed = catalogue_processed.withColumnRenamed("nbPlaces", "nbplaces") \
                                         .withColumnRenamed("nbPortes", "nbportes")

In [16]:
# Cast columns to match Cassandra data types
catalogue_processed = catalogue_processed.withColumn("prix", col("prix").cast(DecimalType(10, 2))) \
                                         .withColumn("puissance", col("puissance").cast("int")) \
                                         .withColumn("nbplaces", col("nbplaces").cast("int")) \
                                         .withColumn("nbportes", col("nbportes").cast("int")) \
                                         .withColumn("occasion", col("occasion").cast("boolean"))

In [17]:
# 2. Sauvegarder le catalogue
catalogue_processed.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="catalogue", keyspace="concessionnaire") \
    .mode("append") \
    .save()

print("Données sauvegardées avec succès dans Cassandra.")

Données sauvegardées avec succès dans Cassandra.


In [18]:
# Rename columns to match Cassandra table
immatriculations_processed = immatriculations_processed.withColumnRenamed("nbPlaces", "nbplaces") \
                                         .withColumnRenamed("nbPortes", "nbportes")

In [19]:
# Cast columns to match Cassandra data types
immatriculations_processed = immatriculations_processed.withColumn("prix", col("prix").cast(DecimalType(10, 2))) \
                                         .withColumn("puissance", col("puissance").cast("int")) \
                                         .withColumn("nbplaces", col("nbplaces").cast("int")) \
                                         .withColumn("nbportes", col("nbportes").cast("int")) \
                                         .withColumn("occasion", col("occasion").cast("boolean"))

In [20]:
# 3. Sauvegarder les immatriculations
immatriculations_processed.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="immatriculation", keyspace="concessionnaire") \
    .mode("append") \
    .save()

print("Données sauvegardées avec succès dans Cassandra (sans les colonnes index).")



Données sauvegardées avec succès dans Cassandra (sans les colonnes index).


                                                                                

In [38]:

# Stop Spark session
spark.stop()


25/01/18 23:34:54 WARN ChannelPool: [s0|/172.18.0.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=fb4089bb-958e-4221-9c31-3c9f50d3d83f, APPLICATION_NAME=Spark-Cassandra-Connector-local-1737240500963}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException))
25/01/18 23:34:54 WARN ControlConnection: [s0] Error connecting to Node(endPoint=/172.18.0.2:9042, hostId=d5adf778-4e79-4aec-aaea-dc8e8b1ecf98, hashCode=13662114), trying next node (ConnectionInitException: [s0|control|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=fb4089bb-958e-4221-9c31-3c9f50d3d83f, APPLICATION_NAME=Spark-Cassandra-Connector-local-1737240500963}):