In [1]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information

24/10/11 12:56:40 WARN Utils: Your hostname, MacBook-Pro-dAudric.local resolves to a loopback address: 127.0.0.1; using 192.168.1.31 instead (on interface en0)
24/10/11 12:56:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/11 12:56:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Streaming with socket 

In [2]:
from pyspark.sql.functions import explode, split, current_timestamp, window

port = 9999

# df spark représentant le flux d'entrée à partir de la connexion socket
stream = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", port) \
    .load()\
    .withColumn("timestamp", current_timestamp()) \

#Utilisation de fenêtres temporelles
w = stream.withColumn("window", window("timestamp", "1 minute"))

# Définir la fonction qui traite chaque batch de données
def split_words(df, batch_id):
    # Séparer les lignes en mots et sauvegarder le résultat dans une table Spark
    words = df.withColumn("word", explode(split(df["value"], " ")))
    
    # Sauvegarder le DataFrame dans une table Spark
    words.write \
        .mode("append") \
        .saveAsTable("array_words2")

# Afficher les résultats en continu
query = w.writeStream \
            .foreachBatch(split_words) \
            .outputMode("append") \
            .start()

# Attendre que la requête se termine
query.awaitTermination()


24/10/11 12:56:46 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
24/10/11 12:56:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/q7/05h3nfts6n3_246h_khmw2z00000gn/T/temporary-8744f160-803b-454d-90d0-6eeaf6e3ff95. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/10/11 12:56:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/Users/aaudric/miniconda3/envs/pyspark_env/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.se

KeyboardInterrupt: 

## Requêtes sur le flux

words = spark.read.table("array_words2")
words.show()

In [11]:
words.filter(
    (col("window.start") >= F.lit("2024-10-04 10:05:00")) & 
    (col("window.end") <= F.lit("2024-10-04 10:07:00"))
).show()


+--------------------+--------------------+--------------------+----------+
|               value|           timestamp|              window|      word|
+--------------------+--------------------+--------------------+----------+
|L'étudiant constr...|2024-10-04 10:06:...|{2024-10-04 10:06...|L'étudiant|
|L'étudiant constr...|2024-10-04 10:06:...|{2024-10-04 10:06...| construit|
|L'étudiant constr...|2024-10-04 10:06:...|{2024-10-04 10:06...|        la|
|L'étudiant constr...|2024-10-04 10:06:...|{2024-10-04 10:06...|   musique|
|Le professeur con...|2024-10-04 10:06:...|{2024-10-04 10:06...|        Le|
|Le professeur con...|2024-10-04 10:06:...|{2024-10-04 10:06...|professeur|
|Le professeur con...|2024-10-04 10:06:...|{2024-10-04 10:06...| construit|
|Le professeur con...|2024-10-04 10:06:...|{2024-10-04 10:06...|        le|
|Le professeur con...|2024-10-04 10:06:...|{2024-10-04 10:06...|    jardin|
|Le professeur con...|2024-10-04 10:05:...|{2024-10-04 10:05...|        Le|
|Le professe

In [15]:
words.filter(
    (col("window.start") >= F.lit("2024-10-04 10:05:00")) & 
    (col("window.end") <= F.lit("2024-10-04 10:07:00"))).groupBy("word").count().show()

+----------+-----+
|      word|count|
+----------+-----+
|        Le|   21|
|    ballon|    7|
|    jardin|    6|
|   détruit|    4|
|professeur|    7|
|   fromage|    6|
| construit|    8|
|        le|   38|
|L'étudiant|    8|
|   musique|    5|
|        la|   12|
|L'éléphant|    7|
|      film|    8|
|   cherche|    6|
|     livre|    6|
|   déteste|    5|
|  enseigne|    3|
|   apprend|    6|
|     mange|    6|
|     pomme|    4|
+----------+-----+
only showing top 20 rows



In [4]:
#Mettre le DataFrame en cache
words.cache()

# Exécuter une action pour déclencher la mise en cache
words.count()

# Vérifier le niveau de persistance et d'utilisation de la mémoire
print(words.storageLevel)

Disk Memory Deserialized 1x Replicated


In [5]:
# Vérifier les tables mises en cache
cached_tables = spark.catalog.listTables()
for table in cached_tables:
    print(f"Nom de la table : {table.name}, Est en cache : {table.isTemporary}")


Nom de la table : array_words2, Est en cache : False


In [15]:
# Charger un DataFrame
df = spark.read.csv("New_data.csv", header=True)

# Mettre en cache le DataFrame
df.cache()

# Effectuer une action pour activer le cache
df.count()

# Vérifier le niveau de persistance du cache
print(df.storageLevel)


Disk Memory Deserialized 1x Replicated


In [6]:
# Obtenir des informations de mémoire à partir du SparkContext
memory_info = spark.sparkContext._jsc.sc().getExecutorMemoryStatus()

# Afficher les informations de mémoire
for executor, (total_mem, used_mem) in memory_info.items():
    print(f"Executor: {executor}, Mémoire totale: {total_mem / (1024 * 1024)} Mo, Mémoire utilisée: {used_mem / (1024 * 1024)} Mo")


Py4JError: An error occurred while calling o650.items. Trace:
py4j.Py4JException: Method items([]) does not exist
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:329)
	at py4j.Gateway.invoke(Gateway.java:274)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)



24/10/11 16:24:19 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 956957 ms exceeds timeout 120000 ms
24/10/11 16:24:19 WARN SparkContext: Killing executors is not supported by current scheduler.
24/10/11 16:24:20 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$