# Unequal schema problems

There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. This issue most commonly arises when some portions of the data contain only empty (null) values in a column, but other portions have non-null values and so are interpreted as integer or string values. When we try to merge these partial files together later, the parquet engine does not want to perform a cast between these types and throws an error.

For example, at the reduce stage, we're combining several intermediate parquet files for a single spatial tile into the final parquet file. It's possible at this stage that some files will contain only empty (null) values in a column that we expect to be a string field.

e.g. 

#### File1

| int_field | string_field | float_field |
| --------- | ------------ | ----------  |
|         5 |      <empty> |         3.4 |
|         8 |      <empty> |         3.8 |

which will have a schema like:
       
    optional int64 field_id=-1 int_field;
    optional int32 field_id=-1 string_field **(Null)**;
    optional double field_id=-1 float_field;
    
#### File2
    
| int_field | string_field | float_field |
| --------- |------------- | ----------- |
|         6 |      hello   |         4.1 |
|         7 |      <empty> |         3.9 |

will have a schema like:

    optional int64 field_id=-1 int_field;
    optional binary field_id=-1 string_field (String);
    optional double field_id=-1 float_field;

When we try to merge these files together, we see an error like the following:
```
Key:       4_2666
Function:  reduce_pixel_shards
args:      ()
kwargs:    {...}
Exception: "ArrowNotImplementedError('Unsupported cast from string to null using function cast_null')"
```


## Error Demonstration

Here, we attempt an import with some unequal schema, and see that the attempt fails in the reducing stage, when we're trying to combine partial parquet files into a single file with common metadata.

In [None]:
import tempfile
import os
from dask.distributed import Client

from hats_import.pipeline import pipeline_with_client
from hats_import.catalog.arguments import ImportArguments
from hats_import.catalog.file_readers import get_file_reader

mixed_schema_csv_dir = "../../tests/data/mixed_schema"
tmp_path = tempfile.TemporaryDirectory()

args = ImportArguments(
    output_artifact_name="mixed_csv_bad",
    input_file_list=[
        os.path.join(mixed_schema_csv_dir, "input_01.csv"),
        os.path.join(mixed_schema_csv_dir, "input_02.csv"),
    ],
    file_reader="csv",
    output_path=tmp_path.name,
    highest_healpix_order=1,
)

with Client(n_workers=1, threads_per_worker=1) as client:
    try:
        pipeline_with_client(args, client)
    except:
        pass  # we know it's going to fail!!
tmp_path.cleanup()

We can overcome may of these issues by using a *parquet schema* file. This is a special kind of parquet file that only contains information on the columns (their names, data types, and additional key-value metadata).

Let's take a look inside the schema structure and see the field types it expects to see:

In [None]:
import pyarrow.parquet as pq

mixed_schema_csv_parquet = "../../tests/data/mixed_schema/schema.parquet"

parquet_file = pq.ParquetFile(mixed_schema_csv_parquet)
print(parquet_file.schema)

We already have a parquet metadata file for this data set, but we'll show you how to create one of your own later in this notebook.

In [None]:
tmp_path = tempfile.TemporaryDirectory()
args = ImportArguments(
    output_artifact_name="mixed_csv_good",
    input_file_list=[
        os.path.join(mixed_schema_csv_dir, "input_01.csv"),
        os.path.join(mixed_schema_csv_dir, "input_02.csv"),
    ],
    output_path=tmp_path.name,
    highest_healpix_order=1,
    file_reader=get_file_reader("csv", schema_file=mixed_schema_csv_parquet),
    use_schema_file=mixed_schema_csv_parquet,
)
with Client(n_workers=1, threads_per_worker=1) as client:
    pipeline_with_client(args, client)

## Making a new parquet schema file

There are a few different strategies we can use to create a schema file:

* using some string representations of pandas datatypes
* using an explicit list of pyarrow data types
* and many more!

We'll stick to these two, since they exercise the most common code paths through schema generation.

### Using pandas type strings

Something like the `tic_types.csv` file contains a list of the columns that the TIC data will contain, in a table like:

```
name,type
ID,Int64
version,str
HIP,Int32
TYC,str
etc...
```

Such files are a common way to send type information when the data files have no header.

In this method, we will use pandas' type parsing to convert these strings into understood data types, and create the relevant parquet metadata.

In [None]:
import pandas as pd

## Fetch the name/type information from a file.
type_list_frame = pd.read_csv("../static/tic_types.csv")

## For each row, add to a dictionary with name and a pandas series with the parsed data type.
## "str" is not understood as "string", so add a special case.
type_map = {
    row["name"]: pd.Series(dtype=("string" if row["type"] == "str" else row["type"]))
    for _, row in type_list_frame.iterrows()
}
dtype_frame = pd.DataFrame(type_map)

## Now write our empty data frame to a parquet file.
schema_file = os.path.join(tmp_path.name, "schema_from_csv_list.parquet")
dtype_frame.to_parquet(schema_file)

Let's look at the parquet file's metadata and see if it matches what we'd expect.

You'll notice that that there are A LOT of fields, and this is why you might not want to deal with column-by-column type discrepancies.

In [None]:
parquet_file = pq.ParquetFile(schema_file)
print(parquet_file.schema)

### Explict list of pyarrow data types

Here, we know what pyarrow types we want to use for each column. This is helpful if you know you want nullable, or you know you DON'T want to use nullable types, but it requires some deeper knowledge of pyarrow data types.

In [None]:
import pyarrow as pa

## List all of our columns as pyarrow fields.
schema_from_pyarrow = pa.schema(
    [
        pa.field("id", pa.int64()),
        pa.field("ra", pa.float64()),
        pa.field("dec", pa.float64()),
        pa.field("ra_error", pa.float64()),
        pa.field("dec_error", pa.float64()),
        pa.field("comment", pa.string()),
        pa.field("code", pa.string()),
    ]
)
schema_file = os.path.join(tmp_path.name, "schema_from_pyarrow.parquet")
pq.write_metadata(schema_from_pyarrow, schema_file)

Again, we'll check that the generated parquet metadata is what we expect:

In [None]:
parquet_file = pq.ParquetFile(schema_file)
print(parquet_file.schema)

Finally, let's clean up.

In [None]:
tmp_path.cleanup()