In [1]:
from pyspark.sql import SparkSession 
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DateType, LongType, BooleanType, FloatType, TimestampType
from pyspark.sql import Row 

In [2]:
spark = (SparkSession.builder
    .appName("example")
    .config("spark.sql.catalog.mycatalog", "org.apache.iceberg.spark.SparkCatalog")
    # .config("spark.sql.catalog.mycatalog.type", "rest")
    # .config("spark.sql.catalog.mycatalog.uri", "http://localhost")
    .config("spark.sql.catalog.mycatalog.type", "hive")
    .config("spark.sql.catalog.mycatalog.uri", "thrift://hive-metastore")
    .getOrCreate()
)

25/12/07 02:55:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
spark.sql("create namespace if not exists demo.nyc")

DataFrame[]

In [4]:
company_schema = StructType([
    StructField("id", LongType(), nullable=False),
    StructField("name", StringType(), nullable=False),
    StructField("test", BooleanType(), nullable=False),
    StructField("employee", ArrayType(
        StructType([
            StructField("id", LongType(), nullable=False),
            StructField("name", StringType(), nullable=False),
            StructField("contract", ArrayType(
                StructType([
                    StructField("id", LongType(), nullable=False),
                    StructField("contract_id", StringType(), nullable=False),
                    StructField("since", DateType(), nullable=False),
                    StructField("until", DateType(), nullable=True),
                    StructField("position", StringType(), nullable=True)
                ])
            ), nullable=True)
        ])
    ), nullable=True),
])

In [5]:
spark.sql("""
create table if not exists demo.nyc.company (
    id bigint not null,
    name string not null,
    test boolean not null,
    employee array<
                struct<
                    id bigint not null,
                    name string not null,
                    contract array<
                        struct<
                            id bigint not null,
                            contract_id string not null,
                            since date not null,
                            until date,
                            position string
                        >
                    >
                >
            >
)
using iceberg
partitioned by (
    id
)
"""
)

DataFrame[]

In [6]:
payslip_schema = StructType([
    StructField("id", LongType(), nullable=False),
    StructField("type", StringType(), nullable=False),
    StructField("contract_id", LongType(), nullable=False),
    StructField("company_id", LongType(), nullable=False),
    StructField("period", DateType(), nullable=False),
    StructField("created_at", DateType(), nullable=False),
    StructField("employee_id", LongType(), nullable=False),
    StructField("item", ArrayType(
        StructType([
            StructField("id", LongType(), nullable=False),
            StructField("name", StringType(), nullable=False),
            StructField("value", FloatType(), nullable=False)
        ])
    ), nullable=False)
])

In [7]:
sql_payslip = """
create table if not exists demo.nyc.payslip (
    id bigint not null,
    type string not null,
    contract_id bigint not null,
    company_id bigint not null,
    period date not null,
    created_at date not null,
    employee_id bigint not null,
    item array<
            struct<
                id bigint not null,
                name string not null,
                value float not null
            >
        >
)
using iceberg
partitioned by (
    company_id,
    year(period)
)
"""
spark.sql(sql_payslip)

DataFrame[]

In [8]:
workshift_schema = StructType([
    StructField("id", LongType(), nullable=False),
    StructField("type", StringType(), nullable=False),
    StructField("company_id", LongType(), nullable=False),
    StructField("period", DateType(), nullable=False),
    StructField("since", DateType(), nullable=False),
    StructField("until", DateType(), nullable=False),
    StructField("employee_id", LongType(), nullable=False),
    StructField("mark", ArrayType(
        StructType([
            StructField("id", LongType(), nullable=False),
            StructField("type", StringType(), nullable=False),
            StructField("ts", TimestampType(), nullable=False),
            StructField("status", StringType(), nullable=False)
        ])
    ), nullable=False)
])

In [9]:
sql_workshift = """
create table if not exists demo.nyc.workshift (
    id bigint not null,
    type string not null,
    company_id bigint not null,
    period date not null,
    since date not null,
    until date not null,
    employee_id bigint not null,
    mark array<
            struct<
                id bigint not null,
                type string not null,
                ts timestamp not null,
                status string not null
            >
        >
)
"""
spark.sql(sql_workshift)

DataFrame[]

In [10]:
pip install faker

Collecting faker
  Downloading faker-38.2.0-py3-none-any.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m17.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
Installing collected packages: faker
Successfully installed faker-38.2.0
[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.0.1[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [11]:
from faker import Faker
import random
import uuid
from datetime import date
from dateutil.relativedelta import relativedelta

fake = Faker(['es_CL'])

company_data = []
employee_ids = []
employee_id, contract_id = 0, 0
for i in range(100):
    data = {}
    data["id"] = i
    data["name"] = fake.company()
    data["test"] = i < 95
    data["employee"] = []
    for ii in range(random.randint(0,500)):
        contracts = []
        for iii in range(random.randint(1,10)):
            if iii == 0:
                since = fake.date_between(start_date='-10y', end_date='-1w')
            else:
                since = until - relativedelta(days=1)
            until = since + relativedelta(days=random.randint(30,500))
            contracts.append({
                "id": iii,
                "contract_id": uuid.uuid4(),
                "since": since,
                "until": until,
                "position": fake.job()
            })
            employee_ids.append((i,employee_id,contract_id))
            contract_id += 1
        data["employee"].append({
            "id": ii,
            "name": fake.name(),
            "contract": contracts
        })
        employee_id += 1
    company_data.append(data)

In [12]:
# delete_sql = """
# truncate table demo.nyc.company
# """
# spark.sql(delete_sql)
company_df = spark.createDataFrame(company_data, company_schema)
company_df.writeTo("demo.nyc.company").append()

                                                                                

In [13]:
payslip_data = []
item = 0
payslip = 0
for company, employee, contract in employee_ids:
    items = []
    for _ in range(random.randint(1,10)):
        items.append({
            "id": item,
            "name": random.choice(['sueldo líquido', 'sueldo bruto', 'bono marzo', 'bono navidad', 'bono', 'salud', 'afp', 'seguro cesantía', 'adelanto', 'vacaciones']),
            "value": float(random.randint(50000,1500000))
        })
        item += 1
    payslip_data.append({
        "id": payslip,
        "type": random.choice(['sueldo', 'reproceso', 'finiquito']),
        "contract_id": contract,
        "company_id": company,
        "period": fake.date_between(start_date='-1y', end_date='-1d'),
        "created_at": fake.date_between(start_date='-1y', end_date='-1d'),
        "employee_id": employee,
        "item": items
    })
    payslip += 1

In [14]:
from datetime import datetime

workshift_data = []
workshift = 0
mark = 0
cmp, emp, cnt = zip(*employee_ids)
employee_ids_wt = list(set(zip(cmp, emp)))
for employee, company in employee_ids_wt:
    marks = []
    for _ in range(random.randint(2,60)):
        marks.append({
            "id": mark,
            "type": random.choice(['zkteco', 'mobile', 'totem']),
            "ts": fake.date_time_between(start_date="-1y", end_date="-1d"),
            "status": random.choice(['ok', 'not ok'])
        })
        mark += 1
    workshift_data.append({
        "id": workshift,
        "type": random.choice(['lunes a viernes', 'nocturno', 'rotativo']),
        "company_id": company,
        "period": fake.date_between(start_date="-1y", end_date="-1d"),
        "since": fake.date_between(start_date="-1y", end_date="-1d"),
        "until": fake.date_between(start_date="-1y", end_date="-1d"),
        "employee_id": employee,
        "mark": marks
    })
    workshift += 1

In [15]:
payslip_df = spark.createDataFrame(payslip_data, payslip_schema)
payslip_df.writeTo("demo.nyc.payslip").append()

25/12/07 02:57:14 WARN TaskSetManager: Stage 3 contains a task of very large size (1015 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [None]:
workshift_df = spark.createDataFrame(workshift_data, workshift_schema)
workshift_df.writeTo("demo.nyc.workshift").append()

In [20]:
workshift_df

DataFrame[id: bigint, type: string, company_id: bigint, period: date, since: date, until: date, employee_id: bigint, mark: array<struct<id:bigint,type:string,ts:timestamp,status:string>>]