In [6]:
!pip install pyspark




In [3]:
from pyspark.sql import SparkSession


In [4]:
spark=SparkSession.builder.appName('Practise').getOrCreate()

In [5]:
spark

In [7]:
dff7=spark.read.csv(
    path="spark.dat",
    sep=";",
    header=True,
    quote='"',
    inferSchema=True,
)


#dff7.show(3,truncate=False)
dff7.columns

#dff7.select(['nomClient','quantiteAchete','prixUnitaire','ChiffreDDD']).show()

from pyspark.sql.window import Window
import pyspark.sql.functions as F
#cette requete permet de savoir les achats les plus anciens en se basant sur la date d'achat et la quantité achetée
dff7=dff7.withColumn("row_number",F.row_number().over(Window.partitionBy(dff7['quantiteAchete']).orderBy(dff7['dateAchat'])))
dff7.select(['idTransaction','nomClient','quantiteAchete','dateAchat','row_number']).show()

+-------------+---------+--------------+-------------------+----------+
|idTransaction|nomClient|quantiteAchete|          dateAchat|row_number|
+-------------+---------+--------------+-------------------+----------+
|      9788460|  Barrett|            12|1970-01-03 00:00:00|         1|
|      2394878|    Grant|            12|1970-01-04 00:00:00|         2|
|      3645136|  Jackson|            12|1970-01-05 00:00:00|         3|
|      5884156|  Frazier|            12|1970-01-07 00:00:00|         4|
|      1560813|     Page|            12|1970-01-09 00:00:00|         5|
|      5833095|  Russell|            12|1970-01-09 00:00:00|         6|
|      8565782|    Booth|            12|1970-01-11 00:00:00|         7|
|      6424499|  Johnson|            12|1970-01-12 00:00:00|         8|
|      2460489|   Garcia|            12|1970-01-13 00:00:00|         9|
|      2985467|   Sutton|            12|1970-01-14 00:00:00|        10|
|      2086909|   Palmer|            12|1970-01-15 00:00:00|    

In [24]:
#le chiffre d'affaires par client
dff7=dff7.withColumn("ChiffreDDD",dff7["quantiteAchete"]*dff7["prixUnitaire"])
dff7.show()
#le chiffre d'affaires total
dff7.agg({'ChiffreDDD':'sum'}).show()

+-------------+------------+---------+--------------------+---------------------------+---------------------+-----------------------------+---------------------------+--------------+--------------------+---------------+--------------+------------+-------------------+------------------------------+----------+----------+
|idTransaction|prenomClient|nomClient|        metierClient|plaqueImmatriculationClient|numeroCarteBleuClient|dateExpirationCarteBleuClient|codeSecuriteCarteBleuClient|codeBarProduit|    motDePasseClient|adresseIPClient|quantiteAchete|prixUnitaire|          dateAchat|nomFilialeAyantVendueLeProduit|row_number|ChiffreDDD|
+-------------+------------+---------+--------------------+---------------------------+---------------------+-----------------------------+---------------------------+--------------+--------------------+---------------+--------------+------------+-------------------+------------------------------+----------+----------+
|      1100765|     Anthony| Gonzales

In [13]:
# Verifier si la colonne du chiffre d'affaires ChiffreDDD a été ajoutée.
dff7.columns


+---------------+
|sum(ChiffreDDD)|
+---------------+
|    63894693298|
+---------------+



['idTransaction',
 'prenomClient',
 'nomClient',
 'metierClient',
 'plaqueImmatriculationClient',
 'numeroCarteBleuClient',
 'dateExpirationCarteBleuClient',
 'codeSecuriteCarteBleuClient',
 'codeBarProduit',
 'motDePasseClient',
 'adresseIPClient',
 'quantiteAchete',
 'prixUnitaire',
 'dateAchat',
 'nomFilialeAyantVendueLeProduit',
 'row_number',
 'ChiffreDDD']

In [14]:
#ça permet de voir le type de données dans chaque colonne
dff7.printSchema() 

root
 |-- idTransaction: integer (nullable = true)
 |-- prenomClient: string (nullable = true)
 |-- nomClient: string (nullable = true)
 |-- metierClient: string (nullable = true)
 |-- plaqueImmatriculationClient: string (nullable = true)
 |-- numeroCarteBleuClient: long (nullable = true)
 |-- dateExpirationCarteBleuClient: string (nullable = true)
 |-- codeSecuriteCarteBleuClient: integer (nullable = true)
 |-- codeBarProduit: long (nullable = true)
 |-- motDePasseClient: string (nullable = true)
 |-- adresseIPClient: string (nullable = true)
 |-- quantiteAchete: integer (nullable = true)
 |-- prixUnitaire: integer (nullable = true)
 |-- dateAchat: timestamp (nullable = true)
 |-- nomFilialeAyantVendueLeProduit: string (nullable = true)
 |-- row_number: integer (nullable = false)
 |-- ChiffreDDD: integer (nullable = true)



In [25]:
#Cette requête permet de voir la date d'achat des produits des 20 premières lignes
dff7.select(['dateAchat']).show()
#permet de voir le type des données de chaque colonne
dff7.printSchema()
#cette requete nous permet de savoir le code barre du produit le plus acheté 
dff7.select(['codeBarProduit']).show()
#dff7.select(['codeBarProduit']).duplicated() # sans doublons mais ça ne marche pas !

+-------------------+
|          dateAchat|
+-------------------+
|1980-12-31 00:00:00|
|2005-02-07 00:00:00|
|1972-05-09 00:00:00|
|1980-04-01 00:00:00|
|1985-12-07 00:00:00|
|2009-06-14 00:00:00|
|2001-11-21 00:00:00|
|2009-04-18 00:00:00|
|2017-08-01 00:00:00|
|1980-06-13 00:00:00|
|2021-06-24 00:00:00|
|2020-10-24 00:00:00|
|2018-12-28 00:00:00|
|2019-02-14 00:00:00|
|2011-11-02 00:00:00|
|1984-03-13 00:00:00|
|1974-11-23 00:00:00|
|2014-02-04 00:00:00|
|2019-08-21 00:00:00|
|2013-04-17 00:00:00|
+-------------------+
only showing top 20 rows

root
 |-- idTransaction: integer (nullable = true)
 |-- prenomClient: string (nullable = true)
 |-- nomClient: string (nullable = true)
 |-- metierClient: string (nullable = true)
 |-- plaqueImmatriculationClient: string (nullable = true)
 |-- numeroCarteBleuClient: long (nullable = true)
 |-- dateExpirationCarteBleuClient: string (nullable = true)
 |-- codeSecuriteCarteBleuClient: integer (nullable = true)
 |-- codeBarProduit: long (nullable

In [18]:
#Supprimer une colonne - On a supprimé la colonne du Chiffre d'affaire
dff7.drop('ChiffreDDD') 

DataFrame[idTransaction: int, prenomClient: string, nomClient: string, metierClient: string, plaqueImmatriculationClient: string, numeroCarteBleuClient: bigint, dateExpirationCarteBleuClient: string, codeSecuriteCarteBleuClient: int, codeBarProduit: bigint, motDePasseClient: string, adresseIPClient: string, quantiteAchete: int, prixUnitaire: int, dateAchat: timestamp, nomFilialeAyantVendueLeProduit: string, row_number: int]

In [21]:
#Renommer une colonne
dff7.withColumnRenamed('idTransaction', 'idTransactionNouveaunom').select(['idTransactionNouveaunom']).show()

+-----------------------+
|idTransactionNouveaunom|
+-----------------------+
|                      1|
|                      2|
|                      3|
|                      4|
|                      5|
|                      6|
|                      7|
|                      8|
|                      9|
|                     10|
|                     11|
|                     12|
|                     13|
|                     14|
|                     15|
|                     16|
|                     17|
|                     18|
|                     19|
|                     20|
+-----------------------+
only showing top 20 rows



In [22]:
#supprimer les lignes avec des valeurs null
dff7.na.drop().show()

+-------------+------------+---------+--------------------+---------------------------+---------------------+-----------------------------+---------------------------+--------------+--------------------+---------------+--------------+------------+-------------------+------------------------------+----------+----------+
|idTransaction|prenomClient|nomClient|        metierClient|plaqueImmatriculationClient|numeroCarteBleuClient|dateExpirationCarteBleuClient|codeSecuriteCarteBleuClient|codeBarProduit|    motDePasseClient|adresseIPClient|quantiteAchete|prixUnitaire|          dateAchat|nomFilialeAyantVendueLeProduit|row_number|ChiffreDDD|
+-------------+------------+---------+--------------------+---------------------------+---------------------+-----------------------------+---------------------------+--------------+--------------------+---------------+--------------+------------+-------------------+------------------------------+----------+----------+
|      1100765|     Anthony| Gonzales

In [None]:
######### MLLIB ########

In [33]:
#à partir d'ici, on va faire des requetes de la librairie spark apache MLlib
from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.linalg import Vectors
from pyspark.ml. feature import VectorAssembler
from pyspark.ml.clustering import KMeans

spark = SparkSession.builder.appName("PySpark_MLlib").getOrCreate()
spark



In [34]:
from pyspark.ml.linalg import Vectors
from pyspark.ml. feature import VectorAssembler



In [40]:
#lire le fichier 
dataset_1=spark.read.csv(
    path="spark.dat",
    sep=";",
    header=True,
    quote='"',
    inferSchema=True,)

dataset_1.columns
#selectionner des colonnes avec des nombres entiers
dataset=dataset_1.select(['quantiteAchete','prixUnitaire'])
feat_cols= ['quantiteAchete','prixUnitaire']
dataset.show()

+--------------+------------+
|quantiteAchete|prixUnitaire|
+--------------+------------+
|           198|           9|
|           224|          16|
|           159|           4|
|           263|           3|
|           184|          45|
|           223|          24|
|           363|          24|
|            91|          38|
|           354|          48|
|           163|          41|
|           269|          44|
|           371|          34|
|           499|          25|
|            24|          13|
|           220|           5|
|           187|          30|
|            25|          13|
|           117|          38|
|           245|          26|
|           275|          50|
+--------------+------------+
only showing top 20 rows



In [42]:
#regroupper plusieurs colonnes d'un dataframe en un seule de caractéristiques
vec_assembler = VectorAssembler(inputCols=feat_cols, outputCol='features')
final_data=vec_assembler.transform(dataset)

In [30]:
#Standard Scaler : permet de normaliser les données
from pyspark.ml.feature import StandardScaler 

In [31]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True,withMean=False)

In [32]:
scalerModel = scaler.fit(final_data)
cluster_final_data = scalerModel.transform(final_data)
cluster_final_data.show()

+--------------+------------+------------+--------------------+
|quantiteAchete|prixUnitaire|    features|      scaledFeatures|
+--------------+------------+------------+--------------------+
|           198|           9| [198.0,9.0]|[1.37170670925851...|
|           224|          16|[224.0,16.0]|[1.55182981249448...|
|           159|           4| [159.0,4.0]|[1.10152205440456...|
|           263|           3| [263.0,3.0]|[1.82201446734843...|
|           184|          45|[184.0,45.0]|[1.27471734597761...|
|           223|          24|[223.0,24.0]|[1.54490200083156...|
|           363|          24|[363.0,24.0]|[2.51479563364061...|
|            91|          38| [91.0,38.0]|[0.63043086132588...|
|           354|          48|[354.0,48.0]|[2.45244532867432...|
|           163|          41|[163.0,41.0]|[1.12923330105625...|
|           269|          44|[269.0,44.0]|[1.86358133732596...|
|           371|          34|[371.0,34.0]|[2.57021812694399...|
|           499|          25|[499.0,25.0

In [34]:
kmeans3 = KMeans(featuresCol='scaledFeatures',k=3)
kmeans2 = KMeans(featuresCol='scaledFeatures',k=2)
model_k3 = kmeans3.fit(cluster_final_data)
model_k2 = kmeans2.fit(cluster_final_data)
wssse_k3 =model_k3.transform(cluster_final_data)
wssse_k2 = model_k2.transform(cluster_final_data)
wssse_k3.show()

+--------------+------------+------------+--------------------+----------+
|quantiteAchete|prixUnitaire|    features|      scaledFeatures|prediction|
+--------------+------------+------------+--------------------+----------+
|           198|           9| [198.0,9.0]|[1.37170670925851...|         1|
|           224|          16|[224.0,16.0]|[1.55182981249448...|         1|
|           159|           4| [159.0,4.0]|[1.10152205440456...|         1|
|           263|           3| [263.0,3.0]|[1.82201446734843...|         0|
|           184|          45|[184.0,45.0]|[1.27471734597761...|         2|
|           223|          24|[223.0,24.0]|[1.54490200083156...|         1|
|           363|          24|[363.0,24.0]|[2.51479563364061...|         0|
|            91|          38| [91.0,38.0]|[0.63043086132588...|         2|
|           354|          48|[354.0,48.0]|[2.45244532867432...|         2|
|           163|          41|[163.0,41.0]|[1.12923330105625...|         2|
|           269|         

In [43]:
print("with K=3")
print("within Set Sun Squared Errore= "+ str(wssse_k3))

print("with K=2")
print("within Set Sum of Squared Errors a "+ str(wssse_k2))

with K=3
within Set Sun Squared Errore= DataFrame[quantiteAchete: int, prixUnitaire: int, features: vector, scaledFeatures: vector, prediction: int]
with K=2
within Set Sum of Squared Errors a DataFrame[quantiteAchete: int, prixUnitaire: int, features: vector, scaledFeatures: vector, prediction: int]


In [40]:
from pyspark.ml.clustering import KMeans

# Charger les données
data = spark.read.format("libsvm").load("spark.dat")

# Initialiser l'algorithme K-Means
kmeans = KMeans(k=3, seed=1)

# Ajuster l'algorithme K-Means sur les données
model = kmeans.fit(data)

# Prédire les clusters pour les données
predictions = model.transform(data)

# Afficher les résultats
predictions.show()

Py4JJavaError: An error occurred while calling o523.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 249.0 failed 1 times, most recent failure: Lost task 4.0 in stage 249.0 (TID 3208) (LAPTOP-RAGD177R executor driver): java.lang.NumberFormatException: For input string: "2928346;Christopher;Prince;Magazine"
	at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054)
	at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
	at java.base/java.lang.Double.parseDouble(Double.java:543)
	at scala.collection.immutable.StringLike.toDouble(StringLike.scala:321)
	at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:321)
	at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
	at org.apache.spark.mllib.util.MLUtils$.parseLibSVMRecord(MLUtils.scala:131)
	at org.apache.spark.mllib.util.MLUtils$.$anonfun$parseLibSVMFile$4(MLUtils.scala:126)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.reduceLeft(TraversableOnce.scala:237)
	at scala.collection.TraversableOnce.reduceLeft$(TraversableOnce.scala:220)
	at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$2(RDD.scala:1097)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2323)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$1(RDD.scala:1111)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1093)
	at org.apache.spark.mllib.util.MLUtils$.computeNumFeatures(MLUtils.scala:94)
	at org.apache.spark.ml.source.libsvm.LibSVMFileFormat.$anonfun$inferSchema$1(LibSVMRelation.scala:105)
	at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.ml.source.libsvm.LibSVMFileFormat.inferSchema(LibSVMRelation.scala:96)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:210)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:411)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:185)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NumberFormatException: For input string: "2928346;Christopher;Prince;Magazine"
	at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054)
	at java.base/jdk.internal.math.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
	at java.base/java.lang.Double.parseDouble(Double.java:543)
	at scala.collection.immutable.StringLike.toDouble(StringLike.scala:321)
	at scala.collection.immutable.StringLike.toDouble$(StringLike.scala:321)
	at scala.collection.immutable.StringOps.toDouble(StringOps.scala:33)
	at org.apache.spark.mllib.util.MLUtils$.parseLibSVMRecord(MLUtils.scala:131)
	at org.apache.spark.mllib.util.MLUtils$.$anonfun$parseLibSVMFile$4(MLUtils.scala:126)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.reduceLeft(TraversableOnce.scala:237)
	at scala.collection.TraversableOnce.reduceLeft$(TraversableOnce.scala:220)
	at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$reduce$2(RDD.scala:1097)
	at org.apache.spark.SparkContext.$anonfun$runJob$6(SparkContext.scala:2322)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [49]:
from pyspark.ml.classification import LogisticRegression
dataset_1=spark.read.csv(
    path="spark.dat",
    sep=";",
    header=True,
    quote='"',
    inferSchema=True,
)
dataset=dataset_1.select(['quantiteAchete','prixUnitaire'])

trainingData,testData = dataset.randomSplit([0.7, 0.3], seed=1)

# Initialiser l'algorithme de régression logistique
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Ajuster l'algorithme de régression logistique sur les données d'entraînement
modele = lr.fit(dataset)

# Prédire les résultats sur les données de test
predictions_1 = modele.transform(testData)

# Afficher les résultats
predictions_1.show()

IllegalArgumentException: features does not exist. Available: quantiteAchete, prixUnitaire

In [50]:
trainingData,testData = dataset.randomSplit([0.7, 0.3], seed=1)

In [51]:
# Initialiser l'algorithme de régression logistique
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)


In [54]:
# Ajuster l'algorithme de régression logistique sur les données d'entraînement
modele = lr.fit(dataset)

IllegalArgumentException: features does not exist. Available: quantiteAchete, prixUnitaire

In [56]:
###### GRAPH X ######

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from graphframes import GraphFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *

NameError: name 'GraphLoader' is not defined

In [44]:
#Charger un graphe depuis un fichier
val_graph = GraphLoader.edgeListFile(spark, "spark.dat")

NameError: name 'SparkConf' is not defined

In [None]:
#Opérations de filtrage sur les sommets
val_graph = GraphLoader.edgeListFile(spark, "path/to/edge_list_file")

In [None]:
#Opétions de filtrage sur les arrêtes
val filteredAretes = graph.edges.filter(e => e._attr >= 5)

In [None]:
#Compter le nombre de sommets et d'arrêtes 
val vertexCount = graph.vertices.count # Sommet
val edgeCount = graph.edges.count #arrêtes

In [None]:
#Trouver le degré moyen de sommets
val avgDegree = graph.degrees.map(_._2).reduce(_ + _) / vertexCount

In [None]:
#Trouver le nombre de composantes connexes
val connectedComponents = graph.connectedComponents.vertices.map(_._2).countByValue

In [None]:
#Trouver le plus court chemin entre 2 sommets
val sources = Seq(1, 2)
val destinations = Seq(10, 20)
val distances = graph.shortestPaths.sources(sources).destinations(destinations).run

In [None]:
#trouver les noeuds ayant le plus grand nombre de voisins
graph.degrees.sortBy(_._2, ascending=false).take(10)