# Loading the libraries

In [18]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /usr/share/java/postgresql-42.2.23.jar --jars /usr/share/java/postgresql-42.2.23.jar pyspark-shell'
import databricks.koalas as ks
import pandas as pd
import networkx as nx
import psycopg2 as pg
import psycopg2.extras as pgExtras

# Loading the data

In [19]:
params = {'user': 'cristiano', 'password': 'cristiano'}

def loadGraph():
	kdf = ks.read_sql_query('''	select	EDGE.IDVERTEXORIG_FK,
										EDGE.IDVERTEXDEST_FK,
										EDGE.LENGTH,
										EDGE.IDEDGE,
										EDGE.UTILITYVALUE
								from	STREETSEGMENT as EDGE ''',
					'jdbc:postgresql:afterqualifying', options=params)

	#return kdf
	return nx.from_pandas_edgelist(kdf.to_pandas(), 'idvertexorig_fk', 'idvertexdest_fk', ['length', 'idedge', 'utilityvalue']).to_undirected()

#kdfGraph = loadGraph()
#G = loadGraph()

# Deleting the smaller components

In [20]:
def deleteSmallerComponents(G):
	components = sorted(nx.connected_components(G), key=len, reverse=True)
	compToDelete = []
	for comp in components[1:]:
		compToDelete.extend(list(comp))
	compToDelete = [(x,) for x in compToDelete]
	compToDeleteDouble = [(x, y) for x, y in zip(compToDelete, compToDelete)]

	params.update({'host':'localhost', 'port':'5432', 'database':'afterqualifying'})
	conn = None
	try:
		conn = pg.connect(**params)
		cur = conn.cursor()
		cur.executemany('delete from STREETSEGMENT where IDVERTEXORIG_FK = %s or IDVERTEXDEST_FK = %s', compToDeleteDouble)
		cur.executemany('delete from STREETINTERSECTION where IDVERTEX = %s', compToDelete)
		conn.commit()
		cur.close()
	except(Exception, pg.DatabaseError) as error:
		print(error)
	finally:
		if conn is not None:
			conn.close()

#deleteSmallerComponents(G)

# Calculating and storing the shortest distances

In [21]:
def storeShortestDistances(G):
	cutOffThreshold = 1000.000001
	distances = nx.all_pairs_dijkstra_path_length(G, cutoff=cutOffThreshold, weight='length')
	G = None
	distances = dict(distances)
	
	params.update({'host':'localhost', 'port':'5432', 'database':'afterqualifying'})
	conn = None
	try:
		conn = pg.connect(**params)
		cur = conn.cursor()

		lengthToInsert = 10000
		distancesToInsert = []
		for source in distances:
			for destination in distances[source]:
				distancesToInsert.append((source, destination, distances[source][destination]))

			if len(distancesToInsert) > lengthToInsert:
				pgExtras.execute_values(cur, 'insert into VERTICESPAIRSNEARBY (IDVERTEX1_FK, IDVERTEX2_FK, WALKINGDISTANCE) values %s', distancesToInsert)
				distancesToInsert = []

		conn.commit()
		cur.close()
	except(Exception, pg.DatabaseError) as error:
		print(error)
	finally:
		if conn is not None:
			conn.close()

	distances = None

#storeShortestDistances(G)

In [22]:
def distancesFromCenter(G):
	centerGraph = nx.center(G, usebounds=True)[0]
	dictDistancesCenter = nx.single_source_dijkstra_path_length(G, source=centerGraph, weight='length')
	distancesCenter = sorted(dictDistancesCenter.items(), key=lambda item: item[1])
	
	with open('centerDistances.txt', 'w') as f:
		f.write('%s' % centerGraph)
		for item in distancesCenter:
			f.write('\n%s %s' % (item[0], item[1]))

	return distancesCenter, centerGraph

#distancesCenter, centerGraph = distancesFromCenter(G)

In [23]:
def storePlacesOfTrip():
    kdf = pd.read_excel('/home/cristiano/Dropbox/UFMG/Cristiano/Doutorado/Segunda Etapa da Qualificação/Pesquisas OD/OD 2017/Banco de dados/OD_2017.xlsx',
                        usecols=[   'ID_PESS', 'DATA', 'FE_VIA', 'TOT_VIAG',
                                    'CO_O_X', 'CO_O_Y', 'CO_D_X', 'CO_D_Y',
                                    'MODO1', 'MODO2', 'MODO3', 'MODO4',
                                    'H_SAIDA', 'MIN_SAIDA', 'ANDA_O', 'H_CHEG', 'MIN_CHEG', 'ANDA_D',
                                    'MODOPRIN', 'VL_EST'])
    kdf = ks.from_pandas(kdf)

    resultSQL = ks.sql('''  select  *
                            from    {kdf} OD
                            where   exists (    select  1
                                                from    {kdf} OD_SUBQUERY
                                                where   (OD_SUBQUERY.MODO1 = 9 or OD_SUBQUERY.MODO2 = 9 or OD_SUBQUERY.MODO3 = 9 or OD_SUBQUERY.MODO4 = 9) and
                                                        OD_SUBQUERY.ID_PESS = OD.ID_PESS
                                    )
    ''')
    params.update({'host':'localhost', 'port':'5432', 'database':'afterqualifying'})
    conn = None
    try:
        conn = pg.connect(**params)
        cur = conn.cursor()

        lengthToInsert = 10000
        placesToInsert = []
        countRows = 0
        for row in resultSQL.itertuples():
            dictRow = row._asdict()
            countRows += 1
            placesToInsert.append((countRows, dictRow['ID_PESS'], dictRow['FE_VIA'],
                    dictRow['CO_O_X'], dictRow['CO_O_Y'], dictRow['CO_D_X'], dictRow['CO_D_Y'],
                    str(int(dictRow['H_SAIDA'])), str(int(dictRow['MIN_SAIDA'])), str(int(dictRow['H_CHEG'])), str(int(dictRow['MIN_CHEG'])), dictRow['VL_EST']))

            if len(placesToInsert) > lengthToInsert:
                    pgExtras.execute_values(cur, '''insert into PLACEOFTRIP (IDTRIP, IDDRIVER, DRIVEREXPANSIONFACTOR, GEOMDEPARTURE, GEOMDESTINATION,
                                                    TIMEDEPARTURE, TIMEARRIVAL, PARKINGEXPENSES) values %s''', placesToInsert,
                                                    template='''(%s, %s, %s, ST_Transform(ST_SetSRID(ST_MakePoint(%s, %s), 22523), 4326),
                                                    ST_Transform(ST_SetSRID(ST_MakePoint(%s, %s), 22523), 4326), (%s||':'||%s), (%s||':'||%s), %s)''')
                    placesToInsert = []
                
        conn.commit()
        cur.close()
    except(Exception, pg.DatabaseError) as error:
        print(error)
    finally:
        if conn is not None:
            conn.close()

    return kdf

kdf = storePlacesOfTrip()