In [1]:
import time
import re, ast
import numpy as np
import pandas as pd
import os
from datetime import datetime, timedelta
import datetime as dt

import geopandas as gpd
from geopandas.tools import sjoin
from shapely.geometry import shape
from shapely.geometry import Point, Polygon
from fiona.crs import from_epsg
import pickle

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
#        '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 '+\


from kafka import KafkaProducer

from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error, mean_absolute_error
        
os.environ['PYSPARK_SUBMIT_ARGS'] = '--conf spark.ui.port=4050 '+\
                        '--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 '+\
        '--conf spark.driver.memory=2g  pyspark-shell'

spark = SparkSession \
        .builder \
        .master("local[4]") \
        .appName("KafkaReceive") \
        .getOrCreate()


#This function creates a connection to a Kafka stream
#You may change the topic, or batch interval
#The Zookeeper server is assumed to be running at 127.0.0.1:2181
#The function returns the Spark context, Spark streaming context, and DStream object
def getKafkaDStream(spark,topic='dataViapass',batch_interval=10):

    #Get Spark context
    sc=spark.sparkContext

    #Create streaming context, with required batch interval
    ssc = StreamingContext(sc, batch_interval)

    #Checkpointing needed for stateful transforms
    ssc.checkpoint("checkpoint")
    
    #Create a DStream that represents streaming data from Kafka, for the required topic 
    dstream = KafkaUtils.createStream(ssc, "127.0.0.1:2181", "spark-streaming-consumer", {topic: 1})
    
    return [sc,ssc,dstream]

def updateFunction(new_values, state): 
    ## Update function
    if (len(new_values)>0 ):
        data = pd.DataFrame(new_values)
        return (data)  
    else:
        return state


#Print number of partitions and number of records for an RDD
def printInfoRDD(rdd):
    if rdd is not None:
        print("The RDD has "+ str(rdd.getNumPartitions())+" partitions")
        print("The RDD has "+ str(rdd.count())+" elements")
    else:
        print("No info to provide")

#Save state in global Python variable
state_global=None
def saveState(rdd):
    global state_global
    if rdd is not None:
        data=rdd.collect()
        print('inizio')
        state_global=data
        print('quasi')
                
[sc,ssc,dstream]=getKafkaDStream(spark=spark,topic='dataViapass', batch_interval=5)
dstream = dstream.map(lambda x: np.array(ast.literal_eval(x[1])))

#  -----------------------------
dstream.count().map(lambda x:'OBU data in this batch: %s' % x).pprint()
dstream = dstream.flatMap(lambda x: [(x[17],(x[4],x[12],x[1],x[5],x[18]))])
initialStateRDD = None
dstream = dstream.updateStateByKey(updateFunction, initialRDD=initialStateRDD)
dstream.foreachRDD(printInfoRDD)
dstream.pprint()
dstream.foreachRDD(saveState)
#  -----------------------------


ssc.start()

-------------------------------------------
Time: 2020-01-13 11:16:25
-------------------------------------------

The RDD has 4 partitions
The RDD has 0 elements
-------------------------------------------
Time: 2020-01-13 11:16:25
-------------------------------------------

inizio
quasi
-------------------------------------------
Time: 2020-01-13 11:16:30
-------------------------------------------

The RDD has 4 partitions
The RDD has 0 elements
-------------------------------------------
Time: 2020-01-13 11:16:30
-------------------------------------------

inizio
quasi
-------------------------------------------
Time: 2020-01-13 11:16:35
-------------------------------------------

The RDD has 4 partitions
The RDD has 0 elements
-------------------------------------------
Time: 2020-01-13 11:16:35
-------------------------------------------

inizio
quasi
-------------------------------------------
Time: 2020-01-13 11:16:40
-------------------------------------------

The RDD has 

-------------------------------------------
Time: 2020-01-13 11:17:50
-------------------------------------------
OBU data in this batch: 24

The RDD has 4 partitions
The RDD has 2 elements
-------------------------------------------
Time: 2020-01-13 11:17:50
-------------------------------------------
('0',                             0                           1          2   3  \
0  2018-09-23 04:04:30.000000  2018-09-23 04:10:14.058511  167048733  90   
1  2018-09-23 04:04:30.000000  2018-09-23 04:09:55.632588  167048733  90   
2  2018-09-23 04:04:30.000000  2018-09-23 04:09:38.248407  167048733  90   
3  2018-09-23 04:04:30.000000  2018-09-23 04:09:20.845778  167048733  90   
4  2018-09-23 04:04:30.000000  2018-09-23 04:09:03.513452  167048733  90   
5  2018-09-23 04:04:30.000000  2018-09-23 04:08:46.358717  167048733  90   
6  2018-09-23 04:04:30.000000  2018-09-23 04:08:28.930367  167048733  90   
7  2018-09-23 04:04:30.000000  2018-09-23 04:08:03.140902  167048733  90   

     

In [2]:
ssc.stop(stopSparkContext=False)