In [1]:
%pip install -e ..

Obtaining file:///home/sagemaker-user/athena_bridge
  Installing build dependencies ... [?25ldone
[?25h  Checking if build backend supports build_editable ... [?25ldone
[?25h  Getting requirements to build editable ... [?25ldone
[?25h  Preparing editable metadata (pyproject.toml) ... [?25ldone
Building wheels for collected packages: athena_bridge
  Building editable for athena_bridge (pyproject.toml) ... [?25ldone
[?25h  Created wheel for athena_bridge: filename=athena_bridge-0.0.2-0.editable-py3-none-any.whl size=6498 sha256=cb6951e8af1a940d0f9cdb1e005004553a2242c6eb913cc63cc38847376bf6e5
  Stored in directory: /tmp/pip-ephem-wheel-cache-8p8vet5o/wheels/32/99/bc/4c7ada3e84e2673f4d4776e044d89dca78028a63859a1ae19e
Successfully built athena_bridge
Installing collected packages: athena_bridge
  Attempting uninstall: athena_bridge
    Found existing installation: athena_bridge 0.0.2
    Uninstalling athena_bridge-0.0.2:
      Successfully uninstalled athena_bridge-0.0.2
Successful

# 0. Execution Mode

We define the execution mode, which allows this notebook to run either with **PySpark** (on an EMR cluster or Glue Interactive Session — note that in GIS you must add and run the session configuration cells at the beginning of the notebook) or with **Python**.  
Set the variable to **True** to run with Python + Athena, or leave it as **False** to run with PySpark.

In [2]:
python = True  # True to run with athena_bridge (Python), False to run with PySpark (EMR cluster or Glue Interactive Session)


# 1. Library Installation

In [3]:
if python:
  !pip athena_bridge

ERROR: unknown command "athena_bridge"


# 2. Library Import

In [4]:
if python:
  import athena_bridge.functions as F
  import athena_bridge.data_types as T
  from athena_bridge.window import Window as W
  Window = W()
  from athena_bridge.spark_athena_bridge import get_spark
else:
  import pyspark.sql.functions as F
  import pyspark.sql.types as T
  from pyspark.sql.window import Window
  from pyspark.sql import SparkSession


from datetime import *
import pandas as pd
import time
import functools

# 3. Dataproc Athena Bridge Creation for Table and File Reading

In [6]:
if python:
  # The database **datatemp** does not exist by default. As a temporary database, we can use any existing
  # database in Athena or create a new one for this purpose. If you want to use **datatemp**, 
  # you can create it in Athena by running:
  # CREATE DATABASE datatemp

  base_datos_temporal = "__YOUR_ATHENA_DATABASE_TEMP_FOR_ATHENA_BRIDGE__"   # Temporary database in Glue/Athena
  directorio_temporal = "s3://__YOUR_S3_PATH_TEMP_FOR_ATHENA_BRIDGE__"  # Staging path
  workgroup = '__YOUR_ATHENA_WORKGROUP__'

  # When creating the reader, you must specify a database that is available within your Athena
  # sandbox workgroup. Temporary tables associated with the sandbox files that are read directly
  # as files, as well as those created when using the `df.cache()` method, will be created under this database.
  # These temporary tables and temporary files will be deleted at the end when calling the reader’s `exit()` method.

  spark = get_spark(
        database_tmp=base_datos_temporal,
        path_tmp=directorio_temporal,
        workgroup=workgroup
    )
else:
  spark = SparkSession.builder.getOrCreate()

# 4. Data Reading

## 4.1 Table Reading

In [None]:
base_datos = "__YOUR_DATABASE__"
table_1 =  "__YOUR_TABLE_1__"
df = dataproc.read().table(base_datos + "." + table_1)

## 4.2 Reading Parquet File

In [None]:
df_parquet = dataproc.read().parquet('s3://__YOUR_S3_PATH_PARQUET__')

## 4.3 Reading CSV File

In [None]:
# In Athena, CSV files are read through the directory that contains them — 
# you cannot specify the full path including the CSV file name.
# The directory must contain only CSV files with the same structure.
# If you want to read a single CSV file, its parent directory should contain only that file.

ruta_lectura = 's3://__YOUR_S3_PATH_CSV__'
df_csv = (
    dataproc.read()
    .option('inferSchema', 'false')
    .option('header', 'true')
    .option('sep', ';')
    .option('encoding', 'latin1')  # ✅ forzar encoding
    .csv(ruta_lectura)
)

# 5.Example of Transformations Using PySpark-like Code Executed by the Library Under Python + Athena

In [None]:
base_datos = "__YOUR_DATABASE__"
table_1 =  "__YOUR_TABLE_1__"
table_2 = "__YOUR_TABLE_2__"
table_3 = "__YOUR_TABLE_3__"

In [None]:
def max_fecha(df):
    fechas = df.select(F.max(F.col("closing_date")).alias("closing_date"))
    fechas = fechas.withColumn("closing_date",F.col("closing_date").cast(T.StringType()))
    maxFechaTabla = fechas.head().closing_date[0]
    return maxFechaTabla

In [None]:
df = dataproc.read().table(base_datos + "." + table_1)
print("Number of rows df:" + str(df.count()))

FECHA_PROCESO = max_fecha(df)

print(FECHA_PROCESO)

empleados = dataproc.read().table(base_datos + "." + table_2)
# Filtro no necesario por ser tabla diaria
# empleados = empleados.where(col("closing_date")==f.to_date(FECHA_PROCESO))

print("Number of rows in table_2:" + str(empleados.count()))

empleados=empleados.withColumn("employee_id",F.trim(F.col("employee_id")))
empleados= empleados.select("employee_id", "user_id", "entity_id", "branch_id").distinct()

ofis = dataproc.read().table(base_datos + "." + table_3)
ofis = ofis.where((F.col("entity_id")=="0182") & (F.col("closing_date") == max_fecha(ofis)))
ofis=ofis.where( (F.col("level55_gen_manag_id")=="6055") )

print("Number of Rows in table_3:" + str(ofis.count()))

In [None]:
empleados_join = ofis.join(empleados, on = ['entity_id', 'branch_id'] , how='inner')

print("Number of Rows after Join:" + str(empleados.count()))

In [None]:
df = df.where(F.col("closing_date")==F.to_date(F.lit(FECHA_PROCESO)))
df=df.withColumn("matricula",F.substring(F.col("g_worker_id"),5,7) )
df=df.withColumn("Estructurales",F.substring(F.col("g_worker_id"),4,1) )

#Loes estrcuturales (ES y BB) tienen en la 4º posición el campo g_worker_id una "S"
df=df.where(F.col("Estructurales")=="S")

empleados = empleados.withColumnRenamed('user_id', 'user_id_empleado')

In [None]:
# Creamos esta columna solo para ejemplificar uso de Window
empleados_ejemplo_window = empleados.withColumn('saldo', F.lit(1000))
window = Window.partitionBy("employee_id").orderBy("user_id_empleado")   #

empleados_ejemplo_window = empleados_ejemplo_window.withColumn("saldo_anterior", F.lag(F.col("saldo")).over(window))

In [None]:
df_join = df.join(empleados, (df.matricula==empleados.employee_id), how = 'inner')

print("Number of rows df:" + str(df_join.count()))

df_join = df_join.select("g_worker_id","g_job_profile_id","gf_wrk_posn_ocpn_start_date","audit_date","closing_date","matricula","Estructurales","user_id")

print("Number of rows df:" + str(df_join.count()))

# 6. Writing

## 6.1 Writing in Parquet Format

In [None]:
dataproc.write().partitionBy(['closing_date']).option('partitionOverwriteMode', 'dynamic').mode(
        'overwrite').parquet(ofis, 's3://__YOUR_S3_PATH_TO_SAVE_PARQUET__')

## 6.2 Writing in CSV Format

In [None]:
 #.option("header", True) \  No permitido guardar con cabeceras, unload guarda sin cabeceras
dataproc.write().option("sep", ";").option('header', 'true').mode("overwrite").partitionBy(["closing_date"]).csv(
    ofis, "s3://__YOUR_S3_PATH_TO_SAVE_CSV__")

# 7. Notebook Cost

In [None]:
if python:
  datos_escaneados = spark._reader.data_scanned

  print(f"Coste: {(datos_escaneados/(1024*1024*1024*1024)) * 5.65} usd")

# 8. Cleanup of Tables and Temporary Files

In [None]:
if python:
  # It is important to call the `stop()` method to remove temporary tables and files.
  spark.stop()