# Configurando GlueContext

In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
35,application_1620749420494_0036,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Obtener distancia recorrida usuarios sudamerica 

Revisando sesion de spark

In [2]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
df = spark.sql('''select 'spark' as hello ''')
df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+
|hello|
+-----+
|spark|
+-----+

In [4]:
spark.sparkContext.getConf().get('spark.driver.memory')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'10g'

In [5]:
spark.sparkContext.getConf().get('spark.executor.memory')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'10g'

In [6]:
spark.sparkContext.getConf().get('spark.executor.cores')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'8'

### Preparar ambiente

Configurar conexion al bucket para guardar archivos

In [7]:
import os

bucket = 'iadbprod-csd-hub-analyticaldata' # already created on S3

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Calcular la distancia de los usuarios

Importar librerías, función auxiliar de distancia

In [8]:
import time
from pyspark.sql.functions import acos, cos, sin, lit, toRadians
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

def dist(long_x, lat_x, long_y, lat_y):
    return acos(
        sin(toRadians(lat_x)) * sin(toRadians(lat_y)) + 
        cos(toRadians(lat_x)) * cos(toRadians(lat_y)) * 
            cos(toRadians(long_x) - toRadians(long_y))
    ) * lit(6371.0)

def print_log(partition_2, start_time, end_time, message):
    minutes = (time.time() - start_time)/60
    print(f"--- {minutes} {partition_2} {message}---" )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
from pyspark.sql.functions import col

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Funcion que calcula la distancia

In [10]:
from pyspark.sql.functions import col

def indicadores(df):
    inicio = time.time()
    df_dia = df
    w = Window().partitionBy("caid").orderBy("utc_timestamp")

    distance = df_dia.withColumn("dist", dist("longitude",
                                              "latitude",
                                              lag("longitude", 1).over(w), 
                                              lag("latitude", 1).over(w)).alias("dist"))
    
    distance = distance.fillna(0, subset=['dist'])
    
    user_indicators = distance.groupBy(['iso_country_code','caid']).agg({'dist':'sum',
                                                    'utc_timestamp':'count',
                                                    'year':'max',
                                                    'month':'max',
                                                    'day':'max'})
    user_indicators = user_indicators.select('caid',
                                             'iso_country_code',
                                             col("sum(dist)").alias("dist"), 
                                             col("max(year)").alias("year"),
                                             col("max(month)").alias("month"), 
                                             col("max(day)").alias("day"),
                                             col("count(utc_timestamp)").alias("n_obs_dist"))    
    print(time.time()-inicio)
    return user_indicators

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Revisando base de datos

#### Calcular distancias para tablas con todos los paises separadas por dia

In [11]:
import pandas as pd
fecha_inicio = '09/21/2020'
fecha_final = '09/30/2020'
fechas = pd.date_range(fecha_inicio,fecha_final)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
fechas

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DatetimeIndex(['2020-09-21', '2020-09-22', '2020-09-23', '2020-09-24',
               '2020-09-25', '2020-09-26', '2020-09-27', '2020-09-28',
               '2020-09-29', '2020-09-30'],
              dtype='datetime64[ns]', freq='D')

#### Cargar query pais

In [13]:
for fecha in fechas:
    start = time.time()
    mes = str(fecha.month)
    dia = str(fecha.day)
    if len(dia) == 1:
        dia = f'0{dia}'
    codigo_tabla = f'{mes}{dia}'
    
    print(f'Calculando distancia tabla {codigo_tabla}')
    
    path_tabla = f"s3://iadbprod-csd-hub-analyticaldata/graphdata-mobility-temporal/Tablas_pings/Todos_pings_delta_{codigo_tabla}"
    df = spark.read.parquet(path_tabla).select('caid',
                                               'iso_country_code',
                                               'latitude',
                                               'longitude',
                                               'utc_timestamp',
                                               'year',
                                               'month',
                                               'day')
    
    print(df.count())
    print(df.printSchema())
    print(df.show())
    print(df.select('caid').distinct().count())
    
    df_dia = indicadores(df)
    file_path = os.path.join(f's3://{bucket}/graphdata-mobility-temporal/mobility/distancia_recorrida_usuarios_{codigo_tabla}')
    df_dia.repartition(1).write.save(file_path, format='csv', header=True,mode='overwrite')
    print(time.time()-start)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Calculando distancia tabla 921
236540054
root
 |-- caid: string (nullable = true)
 |-- iso_country_code: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- utc_timestamp: long (nullable = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- day: long (nullable = true)

None
+--------------------+----------------+---------+----------+-------------+----+-----+---+
|                caid|iso_country_code| latitude| longitude|utc_timestamp|year|month|day|
+--------------------+----------------+---------+----------+-------------+----+-----+---+
|31e443161f850f2d6...|              MX| 25.52869|-103.42437|   1600730634|2020|    9| 21|
|2b9d29312f9567315...|              MX|19.368687|-99.072021|   1600721302|2020|    9| 21|
|b9f48b369218e2022...|              MX| 21.16934|-101.75607|   1600671197|2020|    9| 21|
|a14fdcd1387aeb655...|              MX| 19.77241| -99.02102|   1600705166|2020|    9| 21|
|802fd82