In [None]:
# import findspark
# findspark.init()
import pixiedust
import pyspark
import random
from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import Row
from pyspark.sql.types import DateType
from sklearn.cluster import DBSCAN
import numpy as np 
from datetime import datetime
import pandas as pd

# Import `pyplot` 
import matplotlib.pyplot as plt

# Set the style to `ggplot`
plt.style.use("ggplot")
pixiedust.enableJobMonitor()

conf = SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '4G')
        .set('spark.driver.memory', '30G')
        .set('spark.driver.maxResultSize', '10G'))

sc = SparkContext.getOrCreate(conf=conf)
sqlContext = SQLContext(sc)

In [None]:
def create_key(row):
    
    sec = int(datetime.strptime(row['DTHR'], '%d/%m/%Y %H:%M:%S').second)
    mi =  str(datetime.strptime(row['DTHR'], '%d/%m/%Y %H:%M:%S').minute)
    hr =  str(datetime.strptime(row['DTHR'], '%d/%m/%Y %H:%M:%S').hour)
    
    partition = ''
    if(sec <= 20):
         partition = hr+'-'+mi+'-020'
    elif(sec > 20 and sec <= 40):
        partition = hr+'-'+mi+'-040'
    else:
        partition = hr+'-'+mi+'-060'
    
    key = row['COD_LINHA']+'-'+str(row['DATA'])+'-'+partition
    return key


def run_dbscan(df):
    
    key    = df[0]
    values = df[1]
    
    d = [{'cod_linha':values[i]['COD_LINHA'], \
          'veic':values[i]['VEIC'],\
          'dthr':values[i]['DTHR'],\
          'lat':float(values[i]['LAT']),\
          'lon':float(values[i]['LON'])} for i in range(0,len(values))]
    
    coordinates = pd.DataFrame(d)
    
    db = DBSCAN(eps=1/6371., min_samples=3, algorithm='ball_tree', metric='haversine') \
         .fit_predict(np.radians(coordinates[['lat','lon']]))

    coordinates['cluster_id'] = db
    coordinates['key'] = key 
    return  coordinates.to_dict("records")


toDateTime =  udf(lambda x: datetime.strptime(x, '%d/%m/%Y %H:%M:%S'), DateType())

In [None]:
path='../../../datascience/data/urbs/2018-11/19-23/'

position_events = sqlContext.read.json(path+'*_veiculos.json')

position_events = position_events.withColumn("DATA", toDateTime(col('DTHR')))

In [None]:
display(position_events)

In [None]:
linhas_kv = position_events.rdd.map(lambda x: (create_key(x), x))

In [None]:
clusters_df = position_events.rdd                         \
    .map(lambda x: (create_key(x), x))                    \
    .map(lambda x: (x[0], [x[1]]))                        \
    .reduceByKey(lambda a, b: a + b)                      \
    .map(lambda x: run_dbscan(x))                         \
    .flatMap(lambda x: [item for item in x])              \
    .map(lambda l: Row(**dict(l))).toDF()

In [None]:
display(clusters_df)

In [None]:
clusters_df.filter("cluster_id != -1").repartition(1).write.mode('overwrite')       \
    .csv("/work/datascience/data/urbs-dbscan/", sep=';',header=True)

In [None]:
sc.stop()