In [0]:
#!pip install findspark

In [0]:
#!pip install pyspark

In [0]:
#!pip install ipython-sql

In [0]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import from_json
from pyspark.sql.functions import schema_of_json
from time import sleep
import json
import time
import findspark

In [0]:
#Initialisation de findspark
findspark.init()

In [0]:
spark = (SparkSession.builder.appName("Kafka Pyspark Streaming Learning")
        .master("local")
        .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
        .getOrCreate())
spark.sparkContext.setLogLevel("ERROR")

In [0]:
raw_json = spark.read.json("dbfs:/FileStore/tables/test.json", multiLine=True)

In [0]:
KAFKA_TOPIC_NAME = "vlib_status_ville"
KAFKA_BOOTSTRAP_SERVER = "51.38.185.58:9092"

df = (
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
.option("subscribe", KAFKA_TOPIC_NAME)
.option("startingOffsets", "latest") #The default value of “latest” is used and only data that arrives after the query starts will be processed.
.load()
.withColumn("value", from_json(col("value").cast("string"), raw_json.schema))\
.select(col('value.fields.name'), col('value.datasetid'), col('value.*'))\
)

print(df.schema)
print(df.isStreaming)

rep = df.select(col('name').alias('name'),
                col('fields.mechanical').alias('mechanical'),
                col('fields.is_renting').alias('isrenting'),
                col('fields.ebike').alias('ebike'),
                col('fields.numdocksavailable').alias('numdocksavailable'),
                col('fields.nom_arrondissement_communes').alias('commune'))

#Moyenne pour chaque station du nombre de vélos méchaniques disponibles
query = rep.filter(rep.isrenting == 'OUI').groupBy('name').avg('mechanical') \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("memory_spark")\
    .start()

while(query.isActive):
    time.sleep(60)
    spark.sql("select * from memory_spark").show()


query.awaitTermination()

In [0]:
#Moyenne pour chaque station du nombre de vélos méchaniques disponibles, de vélos électriques disponibles, de place libres disponibles
mean = rep.filter(rep.isrenting == "OUI")\
                    .groupBy("name").agg(
                        avg("ebike").alias("MoyVeloElectrique"),\
                        avg("mechanical").alias("MoyVeloClassique"),\
                        avg("numdocksavailable").alias("MoyPlacesDispo"))

In [0]:
query = mean.writeStream.outputMode("complete").format("memory").queryName("mean").start()

while(query.isActive):
    time.sleep(60)
    spark.sql("select * from mean").show()


query.awaitTermination()

In [0]:
mean_occupation = rep.filter(rep.isrenting == "OUI").groupBy("commune").avg("numdocksavailable")

query = mean_occupation.writeStream.outputMode("complete").format("memory").queryName("mean_occ").start()

while(query.isActive):
    time.sleep(60)
    spark.sql("select * from mean_occ").show()

query.awaitTermination()

In [0]:
mean_area = rep.filter(rep.isrenting == "OUI")\
.groupBy("commune").agg(
avg("ebike").alias("MoyVeloElectrique"),\
avg("mechanical").alias("MoyVeloClassique"),\
avg("numdocksavailable").alias("MoyPlacesDispo"))

query = mean_area.writeStream.outputMode("complete").format("memory").queryName("mean_area").start()

while(query.isActive):
    time.sleep(60)
    spark.sql("select * from mean_area").show()

query.awaitTermination()