## 1. Librerias

In [1]:
import pandas as pd
import geopandas as gpd
from google.cloud import bigquery
import matplotlib as mpl
import pygeos
import multiprocessing as mp
import numpy as np
import pyproj

In [2]:
# !pip install geopandas

## 2. Autenticacion

In [3]:
client = bigquery.Client(location="US")
print("Client creating using default project: {}".format(client.project))

Client creating using default project: insights-295219


## 3. Funciones

In [4]:
def sjoin_propio(data_1, data_2):
    return gpd.sjoin_nearest(data_1,data_2, how='left')

In [5]:
def parallelize():
    cpus = mp.cpu_count()
    
    intersection_chunks = np.array_split(final, cpus)
    
    pool = mp.Pool(processes=cpus)
    
    chunk_processes = [pool.apply_async(sjoin_propio, args=(chunk, localidades)) for chunk in intersection_chunks]
    
    intersection_results = [chunk.get() for chunk in chunk_processes]
    
    intersections_dist = gpd.GeoDataFrame(pd.concat(intersection_results), crs=final.crs)

    return intersections_dist

## 3. Cargar Bases

In [None]:
## Base de credibanco segundas variables
query = """
    SELECT ventas,coordinates
    FROM `landing_prod.credibanco_transactions`
    WHERE fecha_autorizacion >= '2021-05-01'
"""
query_job = client.query(
    query,
    # Location must match that of the dataset(s) referenced in the query.
    location="US",
)  # API request - starts the query

credibanco_2= query_job.to_dataframe()

## Base de credibanco primeras variables
query = """
    SELECT fecha_autorizacion, id_mcc, id_lugar
    FROM `landing_prod.credibanco_transactions`
    WHERE fecha_autorizacion >= '2021-05-01'
"""
query_job = client.query(
    query,
    # Location must match that of the dataset(s) referenced in the query.
    location="US",
)  # API request - starts the query

credibanco_1= query_job.to_dataframe()
## Base de administrativos
localidades=gpd.read_file('GE00_BOG_administrativo.geojson', encoding='utf-8')

## 4. Proceso de datos

### 4.1 Union bases credibanco

In [None]:
final = pd.concat([credibanco_1,credibanco_2], axis=1)
final['ventas']=final['ventas'].astype(float)

In [None]:
final.info()

### 4.2 Bases de credibanco en formato geografico

In [None]:
final['coordinates'] = gpd.GeoSeries.from_wkt(final['coordinates'])
final = gpd.GeoDataFrame(final , geometry='coordinates',crs='epsg:4326')

In [None]:
# final.info()

In [None]:
# localidades.info()

### 4.3 Base credibanco con mismo crs que localidades

In [None]:
final = final.to_crs(localidades.crs)

In [None]:
# final.crs

In [None]:
# localidades.crs

### 4.4 Variables de año y mes

In [None]:
final['mes']= pd.DatetimeIndex(final['fecha_autorizacion']).month
final['year']= pd.DatetimeIndex(final['fecha_autorizacion']).year

## 5. Union espacial credibanco y su respectivas variables geograficas

In [None]:
if __name__ == '__main__':
    
    final= parallelize()
    final.info()

In [None]:
# final = gpd.sjoin_nearest(final,localidades, how='left')
# final.info() 

## 6 .Creacion de Variables colapsadas

In [None]:
ventas = final.groupby(['year','mes','Nombre_Loc','ID_estrato','id_mcc','ID_Barrio'], dropna=False).agg(ventas_sum= ('ventas','sum'),
                                                                                        ventas_count= ('ventas','count'),
                                                                                        comercios= ('id_lugar','count')).reset_index()
# numero = final.groupby(['mes','Nombre_Loc','Nombre_UPZ','ID_estrato'], dropna=False).count().reset_index()[['ventas']]
# promedio = final.groupby(['mes','Nombre_Loc','Nombre_UPZ','ID_estrato'], dropna=False).mean().reset_index()[['ventas']]

In [None]:
# ventas['day']=1
# ventas['month']=ventas['mes']
# pd.to_datetime(ventas[['year','month','day']])

## 7. Bases de mapas

In [None]:
mapa = final.groupby(['year','mes','Nombre_Loc','ID_Barrio','Nombre_UPZ','ID_estrato'], dropna=False).agg(ventas_sum= ('ventas','sum'),
                                                                                    ventas_count= ('ventas','sum')).reset_index()
mapa = pd.merge(mapa,localidades, on = ['Nombre_Loc','ID_Barrio','Nombre_UPZ','ID_estrato'], how='left', validate='many_to_one')
mapa = gpd.GeoDataFrame(mapa,geometry='geometry', crs=localidades.crs)
mapa.info()

In [None]:
mapa

In [None]:
mapa_agr= mapa.dissolve(by=['Nombre_Loc','year','mes'], aggfunc='sum')
mapa_agr=mapa_agr.reset_index()
mapa_agr= mapa_agr.explode(index_parts=False)
# from shapely.ops import orient # version >=1.7a2
# mapa_agr['geometry'] = mapa_agr['geometry'].apply(orient, args=(-1,))
from shapely import wkt
mapa_agr['geometry'] = mapa_agr['geometry'].boundary
# mapa_agr = pd.DataFrame(mapa_agr)

# mapa_agr.to_file("mapa_agr.geojson", driver='GeoJSON')
# mapa_agr =gpd.read_file('mapa_agr.geojson', encoding='utf-8')
# mapa_agr= mapa_agr.explode()
# mapa_agr=mapa_agr.reset_index(drop=True)
# # mapa_agr['geometry']=mapa_agr['geometry'].to_wkt()
# mapa_agr = gpd.GeoDataFrame(mapa_agr , crs = mapa.crs, geometry='geometry')


In [None]:
mapa_agr.plot()

In [None]:
mapa_agr.info()

In [None]:
# mapa_agr=pd.DataFrame(mapa_agr)

In [None]:
# mapa_agr.to_file("mapa_agr.geojson", driver='GeoJSON')
# mapa_agr =gpd.read_file('mapa_agr.geojson', encoding='utf-8')

In [None]:
ventas = ventas.fillna('NaN')
# mapa['geometry'] = mapa['geometry'].fillna()
# mapa = mapa.fillna('NaN')

# mapa_agr['geometry'] = mapa_agr['geometry'].fillna()
# mapa_agr = mapa_agr.fillna('NaN')

In [None]:
mapa.info()

In [None]:
ventas.info()

## Exportar Base

In [None]:
client = bigquery.Client(location="US")
print("Client creating using default project: {}".format(client.project))

In [None]:
# Define a name for the new dataset.
dataset_id = 'workzone'

# The project defaults to the Client's project if not specified.
dataset = client.get_dataset(dataset_id)  # API request

## Exportar tabla con informacion de ventas

In [None]:
table_ref = dataset.table("SE_02_map_seguimiento")

job_config = bigquery.job.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

job = client.load_table_from_dataframe(mapa , table_ref, location="US", job_config=job_config)

job.result()  # Waits for table load to complete.
print("Loaded dataframe to {}".format(table_ref.path))

In [None]:
## Exportar mapa con geomtrias de estrato, localidades, etc.

In [None]:
table_ref = dataset.table("SE_01_base_seguimiento")

job_config = bigquery.job.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

job = client.load_table_from_dataframe(ventas , table_ref, location="US")

job.result()  # Waits for table load to complete.
print("Loaded dataframe to {}".format(table_ref.path))

In [None]:
table_ref = dataset.table("SE_03_maplocalidades_seguimiento")

job_config = bigquery.job.LoadJobConfig()
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

job = client.load_table_from_dataframe(mapa_agr , table_ref, location="US")

job.result()  # Waits for table load to complete.
print("Loaded dataframe to {}".format(table_ref.path))

In [None]:
# client = bigquery.Client(location="us-east4",project='atlas-323415')
# print("Client creating using default project: {}".format(client.project))
# # Define a name for the new dataset.
# dataset_id = 'creditos'

# # The project defaults to the Client's project if not specified.
# dataset = client.get_dataset(dataset_id)  # API reque<zqa12cxst
# table_ref = dataset.table("SE_01_base_seguimiento")
# job_config = bigquery.job.LoadJobConfig()
# job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

# job = client.load_table_from_dataframe(ventas , table_ref, location="US")

# job.result()  # Waits for table load to complete.
# print("Loaded dataframe to {}".format(table_ref.path))