In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5 pyspark-shell'

In [2]:
import pyspark
import os
import re

In [3]:
DATA_PATH = '../../data/madrid_sim'

In [4]:
files = [f for f in os.listdir(DATA_PATH) if (os.path.isfile(os.path.join(DATA_PATH, f)) and re.match('\d_MNO_MNO1.csv', f[-14:]))]

In [5]:
sc = pyspark.SparkContext()

In [6]:
sc

In [7]:
sqlContext = pyspark.SQLContext(sc)
sc.addFile(os.path.join(DATA_PATH, files[0]))

In [8]:
from pyspark.sql.types import *

schema = StructType([StructField("t", IntegerType(), True), 
                     StructField("AntennaId", IntegerType(), True), 
                     StructField("EventCode", IntegerType(), True),
                     StructField("PhoneId", IntegerType(), True), 
                     StructField("x", FloatType(), True),
                     StructField("y", FloatType(), True),
                     StructField("TileId", IntegerType(), True)
                   ])

In [9]:
main_df = sqlContext.read.csv(pyspark.SparkFiles.get(files[0]), header=True, schema=schema)

In [10]:
main_df.show()

+---+---------+---------+-------+---+---+------+
|  t|AntennaId|EventCode|PhoneId|  x|  y|TileId|
+---+---------+---------+-------+---+---+------+
+---+---------+---------+-------+---+---+------+



# **Partie Streaming de donnée**

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
  .appName("Spark Structured Streaming from Kafka") \
  .getOrCreate()

In [4]:
sdfAntennes = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "antennes") \
  .option("startingOffsets", "latest") \
  .load() \
  .selectExpr("CAST(value AS STRING)")

In [5]:
from pyspark.sql.types import *

schema_ant = StructType([StructField("t", IntegerType()),
                     StructField("AntennaId", IntegerType()),
                     StructField("EventCode", IntegerType()),
                     StructField("PhoneId", IntegerType()),
                     StructField("x", FloatType()), 
                     StructField("y", FloatType()),
                     StructField("TileId", IntegerType()) ])

In [6]:
def parse_data_from_kafka_message(sdf, schema):
    
    from pyspark.sql.functions import split
    
    assert sdf.isStreaming == True, "DataFrame doesn't receive streaming data"
    col = split(sdf['value'], ',')
    
    for idx, field in enumerate(schema): 
        sdf = sdf.withColumn(field.name, col.getItem(idx).cast(field.dataType))
        
    return sdf.select([field.name for field in schema])

In [7]:
sdfAntennes = parse_data_from_kafka_message(sdfAntennes, schema_ant)

In [8]:
query = sdfAntennes.select("AntennaId").writeStream.format("console").start()
# Pour arrêter le stream on fait query.stop()

In [26]:
query

<pyspark.sql.streaming.StreamingQuery at 0x7fae646f0cd0>

In [None]:
!cat /home/cesar/cours/ensae/donnees_distrib/projet/mobile_data/kafka_minimal_df.csv \
| split -l 10 --filter="$KAFKA/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic antennes; sleep 0.5" \
> /dev/null

# **Partie Retraitement**

In [None]:
def actualisation(base, batch):
    
    base = base.union(batch.na.drop() \
                           .orderBy('t', ascending=False) \
                           .dropDuplicates(subset = ['PhoneId']))
    
    base = base.orderBy('t', ascending=False) \
               .coalesce(1) \
               .dropDuplicates(subset = ['PhoneId'])
    
    return base