## Structured types

In IoT, complex machinery means complex data. How are complex structures handled by Parquet? Let's read an inventory of devices with more interesting structure.

In [None]:
import json
import dateutil
from time import time
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import duckdb
from pathlib import Path
from IPython.display import display, HTML
import matplotlib.pyplot as plt

pd.set_option('display.max_columns', None)
pd.set_option('display.width', 160)

%reload_ext autoreload
%autoreload 2
from helpers import read_jsonl, inspect, find_type_conflicts

### Using automated schema discovery

In [None]:
cmdata_file = Path('../data/input/cmdata.jsonl')
cmdata = read_jsonl(cmdata_file)

print("Displaying first rows of the original data...")
df = pd.DataFrame(cmdata)
display(df)

print("Creating Parquet file with pyarrow schema discovery...")
cmdata_table = pa.Table.from_pylist(cmdata)
display(cmdata_table.to_pandas().head())

cmdata_parquet_file = Path('../data/output/cmdata.parquet')
pq.write_table(cmdata_table, cmdata_parquet_file)

inspect(cmdata_parquet_file)

What do you notice?

* How are the arrays for `childDevices` and so on shown in the table and in the actual file? Check also the path and the definition and repetition levels.
* What happened to the `c8y_ActiveAlarmsStatus`?

### Encoding structured types

Arbitrarily deeply nested structured are still represented as a flat list of columns in Parquet. The algorithm for this originates from [Google's Dremel](https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/36632.pdf). 

* **Repetition level** level tells us at what level a value repeated. If it is 0, a new top-level record is started. If it is 1, the repetition is in a top-level list in the record, and so on.
* **Definition level** tells us where in the path to a value a null appeared. If it is 0, the top-level property was not present. If it is one, the top-level property was present but the next part of the path was null, and so on.

Here is the example from the original paper from Google:

![Shredding](../images/dremel-shredding.png)

### Defining the schema

PyArrow discovers the schema [from the first row only](https://github.com/apache/arrow/blob/534ef71eca582006668f6f4ac83b47dd695d2020/python/pyarrow/table.pxi#L6450). That means if any properties are not in the first row, they will not be transferred. Also, if there are any schema conflicts in the rows, it will stop with an error. 

Let's define a schema. We will use dictionary encoding for `type` and `owner` and convert timestamps from strings to real timestamps. Also, we explicitely define `c8y_ActiveAlarmsStatus`. We ignore `childAdditions`, `fragments` and `supportedMeasurements` for this example.


In [None]:
schema = pa.schema([
    pa.field('id', pa.int64()),
    pa.field('name', pa.string()),
    pa.field('type', pa.dictionary(pa.int8(), pa.string())),
    pa.field('owner', pa.dictionary(pa.int8(), pa.string())),
    pa.field('creationTime', pa.timestamp('ms')),
    pa.field('lastUpdated', pa.timestamp('ms')),
    pa.field('childAssets', pa.list_(pa.string())),
    pa.field('childDevices', pa.list_(pa.string())),
    pa.field('c8y_ActiveAlarmsStatus', pa.struct([
        pa.field('critical', pa.int32()),
        pa.field('major', pa.int32()),
        pa.field('minor', pa.int32()),
        pa.field('warning', pa.int32())
    ])),
])

cmdata_typed = []
for record in cmdata:
    cmdata_typed.append({
        'id': int(record.get('id')),
        'name': record.get('name'),
        'type': record.get('type'),
        'owner': record.get('owner'),
        'creationTime': dateutil.parser.isoparse(record['creationTime']) if 'creationTime' in record else None,
        'lastUpdated': dateutil.parser.isoparse(record['lastUpdated']) if 'lastUpdated' in record else None,
        'childAssets': record.get('childAssets'),
        'childDevices': record.get('childDevices'),
        'c8y_ActiveAlarmsStatus': record.get('c8y_ActiveAlarmsStatus')
    })

table_typed = pa.Table.from_pylist(cmdata_typed, schema=schema)
parquet_file_typed = Path('../data/output/cmdata_typed.parquet')
pq.write_table(table_typed, parquet_file_typed, use_dictionary=['type', 'owner'])

print("Typed Parquet file created successfully!")
inspect(parquet_file_typed)

display(table_typed.to_pandas().head())

### Processing complex structured data

Let's try a more complex, more deeply nested dataset from `radiator.jsonl`. This file contains detailed process data from a machine, with many nested measurements. Let's try first with PyArrows schema discovery again.

In [None]:
radiator_file = Path('../data/input/radiator.jsonl')
radiator_data = read_jsonl(radiator_file)

In [None]:
try:
  radiator_table = pa.Table.from_pylist(radiator_data)
  radiator_parquet_path = Path('../data/output/radiator_typed.parquet')
  pq.write_table(radiator_table, radiator_parquet_path)
  print("Parquet file created successfully with pyarrow schema discovery!")
except Exception as e:
  print("Error creating Parquet file using pyarrow schema discovery:", e)

There seems to be an inconsistency in the types of some fields in the JSONL data. PyArrow schema discovery, or more correctly, the schema discovery of the C++ implementation that PyArrow wraps, cannot handle mixtures of lists or structs with atomic types (among [other things](https://github.com/apache/arrow/blob/5eaf553bfc7aa639fd67bd622b6b808e71fbba39/python/pyarrow/src/arrow/python/inference.cc#L566)).

This is daily business in industrial IoT. Any real IoT deployment sees different versions of devices online at the same time, sending different, potentially conflicting data. These devices may come from various suppliers or organisations in your company. 

In practice, it is rarely possible to centrally control and harmonize all the data that is arriving and you have to deal with some level of inconsistency. However, most data lake technologies are not very forgiving with respect to schema inconsistencies. What to do?

The first challenge is to actually find the conflict in the data. Unfortunately, there is no further log or debug information available. Let's analyze the data to find these conflicts.

In [None]:
find_type_conflicts(radiator_file)

In [None]:
radiator_fixed = []
for record in radiator_data:
    if 'meas_Outcome' in record and 'exceptions' in record['meas_Outcome']:
        if not isinstance(record['meas_Outcome']['exceptions'], list):
            record['meas_Outcome']['exceptions'] = []
    if 'meas_ModelNumber' in record and 'value' in record['meas_ModelNumber'] and isinstance(record['meas_ModelNumber']['value'], str):
        try:
            record['meas_ModelNumber']['value'] = int(record['meas_ModelNumber']['value'].strip())
        except (ValueError, TypeError):
            record['meas_ModelNumber']['value'] = None
    radiator_fixed.append(record)

radiator_table = pa.Table.from_pylist(radiator_fixed)
radiator_parquet_path = Path('../data/output/radiator_fixed.parquet')
pq.write_table(radiator_table, radiator_parquet_path)
print("Parquet file created successfully with pyarrow schema discovery!")

inspect(radiator_parquet_path)


How can this Parquet file be improved? Try applying the techniques already learned to make this file more officient.
 

## Parquet predicate pushdown

The previous section said that query engines can use Parquet metadata to skip reading unnecessary parts of the file. Let's investigate this with a first test.

In the queries in the previous section, we saw that the devices produced roughly between 100.000 and 200.000 events each. So let's reduce the size of the row groups from the default of more than a million to just 100000 and check the resulting file. 

If you open the various row groups and check the statistics for `source` and `time` inside them, what can you see?

In [None]:
radiator_sorted = sorted(radiator_fixed, key=lambda x: (x['source']['value'], x['time']))
radiator_sorted_table = pa.Table.from_pylist(radiator_sorted)
radiator_sorted_parquet_path = Path('../data/output/radiator_sorted.parquet')
pq.write_table(radiator_sorted_table, radiator_sorted_parquet_path, row_group_size=10000)
inspect(radiator_sorted_parquet_path)

con = duckdb.connect()
con.execute("PRAGMA enable_profiling='json'")
con.execute("PRAGMA profiling_mode='detailed'")

query_template = '''
SELECT time, meas_Load_1.current_ForceValue.value as force
FROM '{}'
WHERE source.value = '1822301'
'''

start = time()
result = con.execute(query_template.format(radiator_parquet_path)).fetchdf()
full_time = time() - start
print(f"Query time on unoptimized file: {full_time:.2f} seconds")

start = time()
result = con.execute(query_template.format(radiator_sorted_parquet_path)).fetchdf()
full_time = time() - start
print(f"Query time on optimized file: {full_time:.2f} seconds")

con.close()


Not the total bytes here --> Just 60k

Move the sorting and pruning groups part to the second section and add it to the summary there.


In [None]:
events_parquet_path_split = Path('../data/output/events_split.parquet')
table_split = pq.read_table(events_parquet_path_split)

events_parquet_path_rowgroups = Path('../data/output/events_rowgroups.parquet')

pq.write_table(
    table_split,
    events_parquet_path_rowgroups,
    column_encoding={'time': 'DELTA_BINARY_PACKED'},
    use_dictionary=['source', 'type', 'text'],
    row_group_size=100000
)

inspect(events_parquet_path_rowgroups)

From the statistics, you can see that the device `140672` occupies row group #0 and part of row group #1. The device `140673` occupies a part of row group #1, row group #2 and a part of row group #3 and so on.

Let's query the device `140673` while DuckDB profiling is enabled.

In [None]:
import pyarrow.dataset as ds
import time

dataset = ds.dataset(events_parquet_path_rowgroups, format="parquet")
filter_expr = ds.field("source") == 140673

start = time.time()
table_full = dataset.scanner().to_table()
full_time = time.time() - start

start = time.time()
table_filtered = dataset.scanner(filter=filter_expr).to_table()
filtered_time = time.time() - start

print(f"Rows read full scan: {table_full.num_rows}, time: {full_time}")
print(f"Rows read filtered scan: {table_filtered.num_rows}, time: {filtered_time}")


In [None]:
con = duckdb.connect()
#con.execute("PRAGMA enable_profiling='json'")
#con.execute("PRAGMA profiling_mode='detailed'")

query = f"""
SELECT
    DATE_TRUNC('week', time) as week,
    COUNT(distinct workpiece_id) as pieces_per_week
FROM '{events_parquet_path_rowgroups}'
WHERE source = 140673
GROUP BY week
ORDER BY week
"""

start = time.time()
result = con.execute(query).fetchdf()
full_time = time.time() - start
print(f"Query time on row-grouped file: {full_time:.2f} seconds")

plt.figure(figsize=(12, 6))
plt.plot(result['week'], result['pieces_per_week'], marker='o', linewidth=2, markersize=6)
plt.xlabel('Week', fontsize=12)
plt.ylabel('Workpieces per Week', fontsize=12)
plt.title('Weekly Workpiece Production for Source 140673', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.show()

con.close()




TBD: Add the stats part and write with smaller data pages to demonstrate pruning.
https://stackoverflow.com/questions/76696239/predicate-pushdown-in-duckdb-for-a-parquet-file-in-s3
Exercises: 
Manipulate the data_page_size parameter: https://arrow.apache.org/docs/python/generated/pyarrow.parquet.ParquetWriter.html and check the DuckDB stats
Do we need the ID of events? We can't edit in Parquet anyways.