In [1]:
%load_ext sparksql_magic

In [8]:
import os
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
from utils import read_query

In [3]:
import os
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
spark = (
    SparkSession.builder
    .appName("TestDataPlatform")
    .master("local[*]")
    .config('spark.jars', ','.join(os.path.join(dp, f) for dp, _, fs in os.walk('/home/jovyan/jars') for f in fs))
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.path.style.access", os.getenv("SPARK_S3_PATH_STYLE_ACCESS"))
    .config("spark.hadoop.fs.s3a.endpoint", os.getenv("SPARK_S3_ENDPOINT"))
    .config("spark.hadoop.fs.s3a.access.key", os.getenv("SPARK_S3_ACCESS_KEY"))
    .config("spark.hadoop.fs.s3a.secret.key", os.getenv("SPARK_S3_SECRET_KEY"))
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.sql.catalogImplementation", "hive")
    .config("spark.sql.warehouse.dir", os.getenv("SPARK_HIVE_WAREHOUSE_DIR"))
    .config("hive.metastore.uris", os.getenv("SPARK_HIVE_METASTORE_URIS"))
    .enableHiveSupport()
)
spark = configure_spark_with_delta_pip(spark).getOrCreate()

In [4]:
spark

In [14]:
USER = "sa"
PASSWORD = "YourStrong@Passw0rd"
URL = "jdbc:sqlserver://sqlserver:1433;databaseName=c;encrypt=false;trustServerCertificate=true"

In [20]:
BUCKET = os.getenv("BUCKET", default="lhchdev")
CAPA = "brz"
FUENTE = "SOFMAC"
TABLA = "KPYMADEUDADOS"

In [21]:
spark.sql("create database if not exists brz;")

DataFrame[]

In [22]:
spark.sql(f"""
CREATE EXTERNAL TABLE IF NOT EXISTS {CAPA}.{FUENTE}_{TABLA} (
    cCodCorAde STRING COMMENT 'Código correlativo de adeudado',
    cDesAdeuda STRING COMMENT 'Descripción del adeudado'
)
USING DELTA
COMMENT 'Tabla de adeudados'
LOCATION 's3a://{BUCKET}/{CAPA}/{FUENTE}/{TABLA}'
;""")


DataFrame[]

In [16]:
df = read_query(query="""(
    SELECT * FROM SOFCMACHYO_BI_SNAP.DBO.KPYMADEUDADOS
)dym""",spark=spark,url=URL,user=USER,password=PASSWORD)

In [23]:
df.write.format("delta").insertInto(f"{CAPA}.{FUENTE}_{TABLA}", overwrite=True)

In [19]:
df.show(10,False)

+--------------------+---------------------------------+
|cCodCorAde          |cDesAdeuda                       |
+--------------------+---------------------------------+
|00100000000000472   |AUQUI AUCCAPIÑA, ROBERT          |
|00100000000000473   |MENDOZA MATA, NANCY ELIZABETH    |
|00100000000000474   |VILCAÑAUAPA HUAROC, NANCY        |
|00100000000000475   |ROJAS FABIAN, CLEDY FLORENTINA   |
|00100000000000476   |ORDOÑEZ VALLADOLID, RAUL ERASMO  |
|00100000000000477   |HUANCHAJURI CARRION, YOLANDA J...|
|00100000000000478   |VALLEJO MERLO, GUISSELLA NANCY   |
|00100000000000479   |CAMPOS PACHECO, DELIA            |
|00100000000000480   |PEREZ HUANUCO, MICHEL ELI        |
|00100000000000481   |FLORES SAENZ, JULIANA PAOLA      |
+--------------------+---------------------------------+
only showing top 10 rows



In [5]:
%%sparksql
SHOW DATABASES;

0
namespace
bronze
default
gold
silver


In [6]:
%%sparksql
DROP DATABASE IF EXISTS landing;

In [8]:
%%sparksql
CREATE DATABASE IF NOT EXISTS bronze
LOCATION 's3a://warehouse/bronze/';

In [9]:
%%sparksql
CREATE DATABASE IF NOT EXISTS silver
LOCATION 's3a://warehouse/silver/';

In [10]:
%%sparksql
CREATE DATABASE IF NOT EXISTS gold
LOCATION 's3a://warehouse/gold/';

In [28]:
df = spark.read.format("parquet").load("./data/STA_TIPO_CAMBIO/")

In [None]:
df.show(10,False)

In [None]:
df.write.format("parquet").saveAsTable("bronze.STA_TIPO_CAMBIO")

In [None]:
df.write.format("delta").saveAsTable("silver.STA_TIPO_CAMBIO")

In [13]:
%%sparksql
DROP TABLE IF EXISTS bronze.datos_aeropuerto

In [14]:
%%sparksql
SHOW TABLES IN bronze;

0,1,2
namespace,tableName,isTemporary
bronze,sta_tipo_cambio,False
,aeropuerto,False


In [29]:
schema = StructType([
    StructField("fecha", StringType(), True),
    StructField("tipocambio", StringType(), True)
])
df = spark.read.format("parquet").load("./data/STA_TIPO_CAMBIO/")

In [27]:
%%sparksql
CREATE OR REPLACE TABLE silver.sta_tipo_cambio_2(
    fecha STRING,
    tipocambio STRING
)
USING DELTA
LOCATION 's3a://warehouse/silver/sta_tipo_cambio_2';

In [30]:
%%sparksql
SELECT * FROM silver.sta_tipo_cambio_2

0,1
fecha,tipocambio


In [34]:
df = df.withColumnRenamed("Fecha", "fecha") \
       .withColumnRenamed("Tipocambio", "tipocambio")
df.write.format("delta").mode("overwrite").saveAsTable("silver.STA_TIPO_CAMBIO_2")

AnalysisException: [DELTA_FAILED_TO_MERGE_FIELDS] Failed to merge fields 'fecha' and 'fecha'

Si previmante creaste un DDL y no es compatible en tipo de datos con la información que vas a ingresar producira errores.

In [None]:
%%sparksql
SELECT * FROM silver.sta_tipo_cambio_2

In [25]:
%%sparksql
DESCRIBE EXTENDED silver.sta_tipo_cambio

0,1,2
col_name,data_type,comment
Fecha,date,
TipoCambio,"decimal(10,2)",
,,
# Detailed Table Information,,
Name,spark_catalog.silver.sta_tipo_cambio,
Type,MANAGED,
Location,s3a://warehouse/silver/sta_tipo_cambio,
Provider,delta,
Owner,jovyan,


In [16]:
%%sparksql
CREATE EXTERNAL TABLE IF NOT EXISTS bronze.datos_aeropuerto (
    anio INT,
    mes INT,
    entidad_prestadora STRING,
    concesion STRING,
    aeropuerto STRING,
    tipo_vuelo STRING,
    total_pasajeros INT
)
USING DELTA
LOCATION 's3a://warehouse/bronze/datos_aeropuerto_raw';

In [8]:
%%sparksql
CREATE OR REPLACE TEMP VIEW aeropuerto
USING csv
OPTIONS (
    path "s3a://warehouse/landing/data/datos_aeropuerto.csv",
    header "true",
    delimiter "," ,
    encoding "UTF-8"
);


In [21]:
%%sparksql
DELETE FROM bronze.datos_aeropuerto

0
num_affected_rows
7


In [9]:
%%sparksql
SELECT * FROM aeropuerto

0,1,2,3,4,5,6
anio,mes,entidad_prestadora,concesion,aeropuerto,tipo_vuelo,total_pasajeros
2023,1,LATAM Airlines,Lima - Perú,Jorge Chávez,Nacional,150000
2023,1,Avianca,Bogotá - Colombia,El Dorado,Internacional,200000
2023,2,LATAM Airlines,Lima - Perú,Jorge Chávez,Internacional,120000
2023,2,Copa Airlines,Panamá - Panamá,Tocumen,Internacional,90000
2023,3,Viva Air,Medellín - Colombia,José María Córdova,Nacional,80000
2024,1,LATAM Airlines,Santiago - Chile,Arturo Merino Benítez,Nacional,160000
2024,1,American Airlines,Miami - USA,Miami International,Internacional,250000


In [22]:
%%sparksql
INSERT INTO bronze.datos_aeropuerto
SELECT * FROM aeropuerto

In [23]:
%%sparksql
DESCRIBE EXTENDED aeropuerto

0,1,2
col_name,data_type,comment
anio,string,
mes,string,
entidad_prestadora,string,
concesion,string,
aeropuerto,string,
tipo_vuelo,string,
total_pasajeros,string,


In [19]:
%%sparksql
DESCRIBE EXTENDED bronze.datos_aeropuerto

0,1,2
col_name,data_type,comment
anio,int,
mes,int,
entidad_prestadora,string,
concesion,string,
aeropuerto,string,
tipo_vuelo,string,
total_pasajeros,int,
,,
# Detailed Table Information,,


In [24]:
%%sparksql
SHOW TABLES IN bronze;

0,1,2
namespace,tableName,isTemporary
bronze,datos_aeropuerto,False
bronze,sta_tipo_cambio,False
,aeropuerto,False
