In [None]:
# Crear entorno virtual
python -m venv env

# Activar entorno virtual
source env/bin/activate

# instalar paquetes
pip install duckdb pyodbc pandas polars pyarrow pyspark arrow python-dotenv faker dotenv

# instalar el cliente de DuckDB
winget install DuckDB.cli

# Guardar dependencias
pip freeze > requirements.txt

In [4]:
# crear BBDD de poblaciones

import time 
import duckdb
import pandas as pd
import numpy as np
from faker import Faker

# Inicializar Faker para generar nombres aleatorios
faker = Faker()
Faker.seed(0)  # Fijar la semilla para reproducibilidad

# Conectar a la base de datos DuckDB
con = duckdb.connect('./env/data/bbdd_duckdb2.db', read_only=False)

# Borrar la tabla si existe
con.execute("DROP TABLE IF EXISTS poblaciones")

# Crear la tabla si no existe
con.execute("""
    CREATE TABLE IF NOT EXISTS poblaciones (
        id BIGINT PRIMARY KEY, 
        poblacion VARCHAR, 
        x INTEGER,
        y INTEGER
    )
""")

# Generar datos aleatorios

np.random.seed(0)
n = 500_000
t = time.time()
df = pd.DataFrame({
    'id': np.arange(n),
    'poblacion': [faker.city() for _ in range(n)],  # Generar nombres aleatorios
    'x': np.random.randint(0, 1000000, n),  # Números aleatorios para simular otro dato
    'y': np.random.randint(0, 1000000, n)  # Números aleatorios para simular otro dato
})

print(f"Datos generados en {time.time() - t:.4f} segundos")

# Registrar el DataFrame en DuckDB como una tabla temporal
con.register('tmp_poblaciones', df)

t = time.time()

# Insertar los datos de manera eficiente
con.execute("INSERT INTO poblaciones SELECT * FROM tmp_poblaciones")
print (f"Datos insertados en {time.time() - t:.4f} segundos")

t = time.time()
# Mostrar un resumen de la tabla
print(con.execute("SELECT COUNT(*) AS registros FROM poblaciones").fetchdf())
print (f"Datos count(*) en {time.time() - t:.4f} segundos")

t = time.time()

# Mostrar las 10 posiciones con la población más alta
print(con.execute("SELECT * FROM poblaciones ORDER BY id asc LIMIT 10").fetchdf())

# Cerrar la conexión
con.close()

Datos generados en 28.4734 segundos
Datos insertados en 0.3889 segundos
   registros
0     500000
Datos count(*) en 0.0263 segundos
   id           poblacion       x       y
0   0        Changchester  985772  476857
1   1      West Tammyfort  305711  517204
2   2            Hullport  435829  780857
3   3       Howardborough  117952  223093
4   4         West Donald  963395  812427
5   5      New Laurenside  152315  198129
6   6          West Corey  882371  836174
7   7  Port Gabriellafort  359783  962460
8   8    West Ryanborough  304137  106646
9   9          Ramoshaven  122579  790135


In [None]:
# formas mas elegantes de insertar datos: con panda

import time
import duckdb
import pandas as pd
import numpy as np
from faker import Faker

# Inicializar Faker para generar nombres aleatorios
faker = Faker()
Faker.seed(0)  # Fijar la semilla para reproducibilidad

# Conectar a la base de datos DuckDB
con = duckdb.connect('./env/data/bbdd_duckdb.db', read_only=False)

np.random.seed(0)
n = 500_000
t = time.time()

df = pd.DataFrame({
    'id': np.arange(n),
    'poblacion': [faker.city() for _ in range(n)],  # Generar nombres de ciudades
    'x': np.random.randint(0, 1000000, n),
    'y': np.random.randint(0, 1000000, n)
})

print(f"Datos generados en {time.time() - t:.4f} segundos")

t = time.time()
# 🔹 Crear una tabla temporal en DuckDB desde el DataFrame
duckdb_df = con.from_df(df)
print (f"Datos insertados en duckdb_df: {time.time() - t:.4f} segundos")

t = time.time()
# 🔹 Crear la tabla real desde la tabla temporal
con.execute("DROP TABLE IF EXISTS poblaciones")
con.execute("CREATE TABLE IF NOT EXISTS poblaciones AS SELECT * FROM duckdb_df")
print(f"Datos insertados en duck_db: {time.time() - t:.4f} segundos")

### si la tabla ya existe
## con.execute("INSERT INTO poblaciones SELECT * FROM duckdb_df")

# Verificar que los datos están en la tabla
print(con.execute("SELECT COUNT(*) registros FROM poblaciones").fetchdf())

# Cerrar conexión
con.close()

Datos generados en 34.0981 segundos
Datos insertados en duckdb_df: 0.1020 segundos
Datos insertados en duck_db: 0.2411 segundos
   registros
0     500000


In [None]:
# formas mas elegantes de insertar datos: con arrow

import duckdb
import pyarrow as pa
import numpy as np
import time
from faker import Faker

faker = Faker()
Faker.seed(0)  # Fijar la semilla para reproducibilidad

# Conectar a DuckDB
con = duckdb.connect('./env/data/bbdd_duckdb.db')

# Generar datos aleatorios en Apache Arrow
np.random.seed(0)
n = 500_000
t = time.time()

arrow_df = pa.table({
    'id': pa.array(np.arange(n), type=pa.int64()),  # ID como BigInt
    'poblacion': pa.array([faker.city() for _ in range(n)], type=pa.string()),  # Nombres aleatorios
    'x': pa.array(np.random.randint(0, 1000000, n), type=pa.int32()),  # Coordenada X
    'y': pa.array(np.random.randint(0, 1000000, n), type=pa.int32())   # Coordenada Y
})

print(f"Datos generados en {time.time() - t:.4f} segundos")

t = time.time()
# 🔹 Insertar en DuckDB directamente desde Apache Arrow
con.register("arrow_df", arrow_df)
con.execute("DROP TABLE IF EXISTS poblaciones")
con.execute("CREATE TABLE IF NOT EXISTS poblaciones AS SELECT * FROM arrow_df")
print(f"Datos insertados en {time.time() - t:.4f} segundos")

# Verificar que los datos están en la tabla
print(con.execute("SELECT COUNT(*) registros FROM poblaciones").fetchdf())

# Cerrar conexión
con.close()

Datos generados en 33.9158 segundos
Datos insertados en 0.1594 segundos
   registros
0     500000


In [7]:
# queries varias tipo sql

import time 
import duckdb

# Conectar a la base de datos DuckDB
con = duckdb.connect('./env/data/bbdd_duckdb.db', read_only=False)

t = time.time()
# Mostrar un resumen de la tabla
print(con.execute("SELECT COUNT(*) AS registros FROM poblaciones").fetchdf())
print (f"Datos count* en {time.time() - t:.4f} segundos")

t = time.time()
# Mostrar un resumen de la tabla
print(con.execute("SELECT COUNT(*) as registros FROM (SELECT poblacion, count(*) FROM poblaciones group by ALL) AS V").fetchdf())
print (f"Datos GROUP BY en {time.time() - t:.4f} segundos")

t = time.time()
# Mostrar un resumen de la tabla
print(con.execute("SELECT COUNT(*) as registros FROM (SELECT poblacion, count(*) FROM poblaciones group by ALL HAVING count(*) > 250) AS V").fetchdf())
print (f"Datos GROUP BY HAVING 250 en {time.time() - t:.4f} segundos")

t = time.time()
# Mostrar un resumen de la tabla
print(con.execute("SELECT poblacion, count(*) registros FROM poblaciones group by ALL HAVING count(*) > 250").fetchdf())
print (f"Datos SELECT GROUP BY HAVING 250 en {time.time() - t:.4f} segundos")

poblacion = faker.city()
t = time.time()
# Mostrar un resumen de la tabla
print(con.execute(f"SELECT COUNT(*) registros FROM poblaciones WHERE poblacion = '{poblacion}'").fetchdf())
print (f"Datos SEEK en {time.time() - t:.4f} segundos")

# Cerrar la conexión
con.close()

   registros
0     500000
Datos count* en 0.0040 segundos
   registros
0      76180
Datos GROUP BY en 0.0390 segundos
   registros
0         36
Datos GROUP BY HAVING 250 en 0.0292 segundos
            poblacion  registros
0          Port James        299
1       North Michael        404
2          Smithmouth        258
3         New Michael        404
4        Lake Michael        403
5       South Michael        395
6        South Robert        257
7        Michaelmouth        314
8           New David        275
9         South James        259
10          New James        283
11          West John        257
12         Lake David        281
13         East James        276
14         West David        273
15         East David        289
16        North James        260
17       West Michael        419
18        West Robert        260
19         West James        283
20      Lake Jennifer        264
21     South Jennifer        264
22      East Jennifer        256
23        South Dav

In [None]:
# exportar resultado a csv, y parquet con pandas y polars

import duckdb
import time
import pandas as pd
import polars as pl

# Conectar a la base de datos DuckDB
con = duckdb.connect('./env/data/bbdd_duckdb.db', read_only=True)

print ("PANDAS")
print ("------")

t = time.time()
# dataframe
df = con.execute("SELECT * FROM poblaciones").fetchdf()
print (f"Datos SELECT en {time.time() - t:.4f} segundos")

t = time.time()
df.to_csv('./env/export/poblaciones.csv', index=False)
print (f"Datos exportados a csv en {time.time() - t:.4f} segundos")

t = time.time()
df.to_parquet('./env/export/poblaciones.parquet', index=False)
print (f"Datos exportados a parquet en {time.time() - t:.4f} segundos")

print (f"\nPOLARS")
print ("------")

t = time.time()
df = pl.from_arrow(con.execute("SELECT * FROM poblaciones").fetch_arrow_table())
print (f"Datos SELECT en {time.time() - t:.4f} segundos")

t = time.time()
df.write_csv('poblaciones.csv')
print (f"Datos exportados a csv en {time.time() - t:.4f} segundos")

t = time.time()
df.write_parquet('poblaciones.parquet')
print (f"Datos exportados a parquet en {time.time() - t:.4f} segundos")

# Cerrar la conexión
con.close()

PANDAS
------
Datos SELECT en 0.0800 segundos
Datos exportados a csv en 0.6770 segundos
Datos exportados a parquet en 0.1655 segundos

POLARS
------
Datos SELECT en 0.0450 segundos
Datos exportados a csv en 0.0495 segundos
Datos exportados a parquet en 0.0740 segundos


In [None]:
# uso del cli de duckDB

duckdb "./env/data/bbdd_duckdb.db"

-- metadata
show tables;
describe poblaciones;
PRAGMA table_info('poblaciones');
SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'poblaciones';

-- exportar la tabla poblaciones a un archivo csv
COPY poblaciones TO './env/export/poblaciones_cli.csv' (HEADER TRUE, DELIMITER ',');

-- importar y exportar datos csv
DROP TABLE IF EXISTS poblaciones2;
CREATE TABLE poblaciones2 AS SELECT * FROM poblaciones where 1 = 0;
COPY poblaciones2 FROM './env/export/poblaciones_cli.csv' (HEADER TRUE, DELIMITER ',');
SELECT COUNT(*) registros from poblaciones2;

-- Exportar la tabla poblaciones a un archivo Parquet
COPY poblaciones TO './env/export/poblaciones_cli.parquet' (FORMAT 'parquet');

-- importar y exportar parquet
DROP TABLE IF EXISTS poblaciones3;
CREATE TABLE poblaciones3 AS SELECT * FROM poblaciones WHERE 1 = 0;
COPY poblaciones3 FROM './env/export/poblaciones_cli.parquet' (FORMAT 'parquet');
SELECT COUNT(*) AS registros FROM poblaciones3;

.exit


In [None]:
# exportar a iceberg, y leer iceberg (parquet)

import duckdb
import time
import os

# Conectar a DuckDB
con = duckdb.connect('./env/bbdd_duckdb.db')

# Habilitar Iceberg
con.execute("INSTALL iceberg;")
con.execute("LOAD iceberg;")

# 🔹 Crear directorio de salida (si no existe)
iceberg_path = "./env/export/iceberg_data/"

# Asegurar que el directorio existe
os.makedirs(iceberg_path, exist_ok=True)

total_rows = con.execute("SELECT COUNT(*) FROM poblaciones").fetchone()[0]
batch_size = 100_000  # Número de registros por fragmento


t1 = time.time()
output_file = f"{iceberg_path}/poblaciones.parquet"
query = f"""
    COPY (SELECT * FROM poblaciones) 
    TO '{output_file}' (FORMAT 'parquet', OVERWRITE);
"""
con.execute(query)
print(f"Exportado {output_file} en {time.time() - t1:.4f} segundos")

time.sleep(1)

# batches

# t1 = time.time()
# for offset in range(0, total_rows, batch_size):
#     t2 = time.time()
#     output_file = f"{iceberg_path}/poblaciones_part_{offset}.parquet"
#     query = f"""
#         COPY (SELECT * FROM poblaciones LIMIT {batch_size} OFFSET {offset}) 
#         TO '{output_file}' (FORMAT 'parquet', PARTITION_BY (id), OVERWRITE);
#     """
#     con.execute(query)
#     print(f"Exportado {output_file} en {time.time() - t2:.4f} segundos")

# print(f"Datos exportados a Iceberg en {iceberg_path}, {time.time() - t1:.4f} segundos")

# 🔹 Crear un catálogo Iceberg con ubicación en disco

# drop table if exists
con.execute("DROP TABLE IF EXISTS poblaciones_ice")

t = time.time()
con.execute(f"""
CREATE TABLE poblaciones_ice AS 
SELECT * FROM READ_PARQUET('{iceberg_path}poblaciones.parquet');
""")
print(f"Datos insertados en Iceberg en {time.time() - t:.4f} segundos")

print(con.execute("SELECT COUNT(*) registros FROM poblaciones_ice").fetchdf())

# Cerrar conexión
con.close()

Exportado ./env/export/iceberg_data//poblaciones.parquet en 0.0508 segundos
Datos insertados en Iceberg en 0.1397 segundos
   registros
0     500000


In [None]:
-- exportar a iceberg, y leer iceberg (parquet) tablas más grandes

-- LENTO !!!!

duckdb "./env/data/tpch-sf30.db"

-- metadata
show tables;
describe orders;

SELECT o_orderdate, count(*) registros FROM orders group by all;

COPY (SELECT * FROM orders) 
TO './env/export/iceberg_big_data/' (FORMAT 'parquet', PARTITION_BY (o_orderdate), OVERWRITE);

-- Exportar la tabla poblaciones a un archivo Parquet
COPY (SELECT * FROM orders limit 500_000) 
TO './env/export/orders.parquet' (FORMAT 'parquet', OVERWRITE);

COPY (SELECT * FROM lineitem) 
TO './env/export/lineitem.parquet' (FORMAT 'parquet', OVERWRITE);



In [None]:
import duckdb
import time
import os

# Conectar a DuckDB
con = duckdb.connect('./env/data/tpch-sf30.db')

# Habilitar Iceberg
con.execute("INSTALL iceberg;")
con.execute("LOAD iceberg;")

# 🔹 Crear directorio de salida (si no existe)
iceberg_path = "./env/export/iceberg_big_data/"

# Asegurar que el directorio existe
os.makedirs(iceberg_path, exist_ok=True)

# batches
# recuperar el array las fechas distintas de orders
fechas = con.execute("SELECT distinct(o_orderdate) fecha FROM orders order by fecha").fetchdf()

# filtrar para las 30 primeras fechas
fechas = fechas.head(30)

t = time.time()
# para cada fecha, exportar a iceberg
for fecha in fechas['fecha']:

    t1 = time.time()
    output_file = f"{iceberg_path}"
    query = f"""
        COPY (SELECT * FROM orders WHERE o_orderdate = '{fecha}') 
        TO '{output_file}' (FORMAT 'parquet', PARTITION_BY(o_orderdate), OVERWRITE);
    """
    con.execute(query)
    print(f"Exportado {output_file} [{fecha}] en {time.time() - t1:.4f} segundos")

print(f"Exportado TODO a {output_file} [{fecha}] en {time.time() - t:.4f} segundos")

# drop table if exists
con.execute("DROP TABLE IF EXISTS poblaciones_big_table_ice")

t = time.time()
con.execute(f"""
    CREATE TABLE poblaciones_big_table_ice AS 
    SELECT * FROM READ_PARQUET('{iceberg_path}/*/*.parquet');
""")

print(f"Datos insertados en Iceberg en {time.time() - t:.4f} segundos")

print(con.execute("SELECT COUNT(*) registros FROM poblaciones_big_table_ice").fetchdf())

# Cerrar conexión
con.close()

Exportado ./env/export/iceberg_big_data/ [1992-01-01 00:00:00] en 1.0945 segundos
Exportado ./env/export/iceberg_big_data/ [1992-01-02 00:00:00] en 1.0427 segundos
Exportado ./env/export/iceberg_big_data/ [1992-01-03 00:00:00] en 1.0710 segundos
Exportado ./env/export/iceberg_big_data/ [1992-01-04 00:00:00] en 1.0895 segundos
Exportado ./env/export/iceberg_big_data/ [1992-01-05 00:00:00] en 1.0610 segundos
Exportado ./env/export/iceberg_big_data/ [1992-01-06 00:00:00] en 1.2108 segundos
Exportado ./env/export/iceberg_big_data/ [1992-01-07 00:00:00] en 1.1505 segundos
Exportado ./env/export/iceberg_big_data/ [1992-01-08 00:00:00] en 1.1429 segundos
Exportado ./env/export/iceberg_big_data/ [1992-01-09 00:00:00] en 1.0945 segundos
Exportado ./env/export/iceberg_big_data/ [1992-01-10 00:00:00] en 1.0009 segundos
Exportado TODO a ./env/export/iceberg_big_data/ [1992-01-10 00:00:00] en 10.9629 segundos
Datos insertados en Iceberg en 0.1454 segundos
   registros
0      18924


In [None]:
# consultas tipo tpch

import time
import duckdb

con = duckdb.connect('./env/data/tpch-sf10.db')

# 🔹 Group By: Ventas por cliente y año
query = """
SELECT 
    c.c_name AS cliente,
    EXTRACT(YEAR FROM o.o_orderdate) AS anio,
    SUM(l.l_extendedprice * (1 - l.l_discount)) AS total_ventas
FROM customer c
JOIN orders o ON c.c_custkey = o.o_custkey
JOIN lineitem l ON o.o_orderkey = l.l_orderkey
GROUP BY cliente, anio
ORDER BY total_ventas DESC
LIMIT 10;
"""
t = time.time()

# Mostrar resultados
print(con.execute(query).fetchdf())
print (f"\nConsulta 1: Ventas Totales por Cliente y Año")
print (f"Datos SELECT en {time.time() - t:.4f} segundos")

# 🔹 Group By: Ventas por cliente y año
query = """
SELECT 
    p.p_name AS producto,
    SUM(l.l_quantity) AS cantidad_total
FROM part p
JOIN lineitem l ON p.p_partkey = l.l_partkey
GROUP BY producto
ORDER BY cantidad_total DESC
LIMIT 10;
"""

t = time.time()
# Mostrar resultados
print(con.execute(query).fetchdf())
print (f"\nConsulta 2: Ranking de Productos más Vendidos")
print (f"Datos SELECT en {time.time() - t:.4f} segundos")

con.close()

              cliente  anio  total_ventas
0  Customer#000414892  1994  2.606656e+06
1  Customer#001439371  1997  2.592001e+06
2  Customer#000650719  1995  2.558529e+06
3  Customer#001390222  1994  2.528584e+06
4  Customer#000845770  1992  2.466209e+06
5  Customer#000643666  1994  2.429590e+06
6  Customer#000130426  1995  2.414659e+06
7  Customer#000083635  1996  2.392658e+06
8  Customer#001032646  1995  2.353206e+06
9  Customer#000309214  1994  2.347204e+06
Consulta 1: Ventas Totales por Cliente y Año
Datos SELECT en 16.9094 segundos
                                        producto  cantidad_total
0               khaki plum antique blush frosted          2143.0
1            lavender maroon lime black cornsilk          2083.0
2   goldenrod cornflower chartreuse antique lace          2012.0
3        firebrick cornflower yellow violet lace          1985.0
4                    steel cream pale dark azure          1968.0
5  blanched yellow gainsboro cornflower metallic          1968.0
6    

In [4]:
# azure como fuente de datos

import polars as pl
import duckdb
import time
import os
from dotenv import load_dotenv

load_dotenv()

con = duckdb.connect('./env/data/bbdd_duckdb.db')

azure_storage_connection_string = os.getenv('AZURE_STORAGE_CONNECTION_STRING')

    # DefaultEndpointsProtocol=https;
    # AccountName={accountName};
    # AccountKey={key};
    # EndpointSuffix=core.windows.net

container_name = os.getenv('CONTAINER_NAME')
folder_name = os.getenv('FOLDER_NAME')
blob_prefix = os.getenv('BLOB_PREFIX')
blob_sufix = os.getenv('BLOB_SUFIX')

path = f"az://{container_name}/{folder_name}/{blob_prefix}{blob_sufix}"

# Install and load the Azure connector
con.execute("INSTALL azure;")
con.execute("LOAD azure;")

print(f" Creating secret...")

con.execute(f"""
    CREATE OR REPLACE SECRET secret1 (
    TYPE AZURE,
    CONNECTION_STRING '{azure_storage_connection_string}'
);""")

t = time.time()
con.execute("DROP TABLE IF EXISTS teams_logs;")
con.execute(f"""CREATE TABLE teams_logs AS SELECT * FROM '{path}';""")
print (f"Datos insertados en {time.time() - t:.4f} segundos")

t = time.time()
print(con.execute("SELECT COUNT(*) registros FROM teams_logs").fetchdf())
print (f"Datos count(*) en {time.time() - t:.4f} segundos")

con.close()

 Creating secret...
Datos insertados en 24.6198 segundos
   registros
0    1943833
Datos count(*) en 0.0017 segundos


In [None]:
-- queries actividades teams

duckdb "./env/data/bbdd_duckdb.db"
describe teams_logs;

select min(datetime) min_date, max(datetime) max_date from teams_logs;

SELECT department, count(*) registros FROM teams_logs
WHERE datetime >= '2024-09-01' AND datetime < '2025-01-01'
group by all;


SELECT upper(jobTitle) , strftime(date_trunc('month', datetime), '%Y-%m') as date, count(*) registros FROM teams_logs
WHERE datetime >= '2024-09-01' AND datetime < '2025-01-01'
and department = 'DATA'
group by all;


PIVOT (
    SELECT upper(jobTitle) jobTitle, strftime(datetime, '%Y-%m') as date, COUNT(*) registros FROM teams_logs
    WHERE datetime >= '2024-09-01' AND datetime < '2025-01-01'
    and department = 'DATA'
    group by all
    )
ON date
USING sum(registros);


select * from teams_logs where displayName like '%Eladio%' limit 10;


select activity, strftime(datetime, '%Y-%m') as date, strftime(datetime, '%HH') as hour, count(*) registros 
from teams_logs where displayName like '%Eladio%'
GROUP BY all;

PIVOT (
    select activity, strftime(datetime, '%HH') as hour, count(*) registros 
    from teams_logs where displayName like '%Eladio%'
    GROUP BY all
    )
ON hour
USING sum(registros);

PIVOT (
    select activity, strftime(datetime, '%HH') as hour, count(*) registros 
    from teams_logs where department = 'DATA'
    GROUP BY all
    )
ON hour
USING sum(registros);



WITH base_data AS (
  SELECT 
    activity, 
    strftime(datetime, '%HH') as hour, 
    count(*) as registros 
  FROM teams_logs 
  WHERE department = 'DATA'
  GROUP BY ALL
),
totals AS (
  SELECT 
    activity, 
    SUM(registros) as total_registros
  FROM base_data
  GROUP BY activity
)
PIVOT (
  SELECT 
    b.activity, 
    b.hour, 
    ROUND(b.registros * 100.0 / t.total_registros, 2) as percentage
  FROM base_data b
  JOIN totals t ON b.activity = t.activity
)
ON hour
USING SUM(percentage);


In [None]:

-- queries fabric

duckdb "./env/data/bbdd_duckdb.db"

INSTALL azure;
LOAD azure;

-- obtener token de acceso desde powershell
Connect-AzAccount
$testToken = Get-AzAccessToken -ResourceTypeName Storage
# Retrieved token is of string type which you can validate with the "$testToken.Token.GetTypeCode()" command.
$testToken.Token | Set-Clipboard

-- crear secret
CREATE OR REPLACE SECRET onelake (
      TYPE AZURE,
      PROVIDER ACCESS_TOKEN,
      ACCESS_TOKEN 'secreto'
  );

FROM duckdb_secrets();
show tables;


CREATE VIEW onelake_orders AS SELECT * 
FROM delta_scan('abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<database_id>/Tables/Orders');

select count(*) registros from onelake_orders;


In [6]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.2 -> 25.0.1
[notice] To update, run: C:\Users\erincon\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


Collecting pyspark
  Downloading pyspark-3.5.4.tar.gz (317.3 MB)
     ---------------------------------------- 0.0/317.3 MB ? eta -:--:--
      -------------------------------------- 4.7/317.3 MB 28.4 MB/s eta 0:00:12
     - ------------------------------------ 11.5/317.3 MB 31.3 MB/s eta 0:00:10
     - ------------------------------------ 16.3/317.3 MB 27.6 MB/s eta 0:00:11
     -- ----------------------------------- 23.6/317.3 MB 29.3 MB/s eta 0:00:11
     --- ---------------------------------- 30.9/317.3 MB 30.7 MB/s eta 0:00:10
     ---- --------------------------------- 39.1/317.3 MB 31.8 MB/s eta 0:00:09
     ----- -------------------------------- 46.7/317.3 MB 32.3 MB/s eta 0:00:09
     ------ ------------------------------- 54.5/317.3 MB 32.7 MB/s eta 0:00:09
     ------- ------------------------------ 62.9/317.3 MB 33.4 MB/s eta 0:00:08
     -------- ----------------------------- 69.7/317.3 MB 33.4 MB/s eta 0:00:08
     --------- ---------------------------- 76.8/317.3 MB 33.5

In [None]:
# en desarrollo. copiar 

import duckdb
import pyspark
from pyspark.sql import SparkSession

# Iniciar sesión de Spark
spark = SparkSession.builder.getOrCreate()

# Conectar a DuckDB y leer los datos
con = duckdb.connect('./env/data/bbdd_duckdb.db')

con.execute("DROP TABLE IF EXISTS mi_tabla")
con.execute("CREATE TABLE mi_tabla (id INT, nombre STRING, fecha_creacion TIMESTAMP)")
con.execute("INSERT INTO mi_tabla VALUES (1, 'Ejemplo', now())")

# Obtener el DataFrame de DuckDB como Pandas
df_pandas = con.query("SELECT * FROM mi_tabla").to_df()

# Convertir Pandas DataFrame a Spark DataFrame
df_spark = spark.createDataFrame(df_pandas)

# Guardar en OneLake (en formato Delta)
df_spark.write.format("delta").mode("append").save("abfss://<workspace_id>@onelake.dfs.fabric.microsoft.com/<database_id>/Tables/NewOrders")


# Cerrar la conexión
con.close()
