# Unequal schema problems

There are a few ways in which parquet files written with slightly different schema can cause issues in the import pipeline. They have a similar correction mechanism, so we discuss both here.

### Approaching "Unsupported cast" errors

Occasionally, an import will throw errors like the following:

```
Key:       4_2666
Function:  reduce_pixel_shards
args:      ()
kwargs:    {...}
Exception: "ArrowNotImplementedError('Unsupported cast from string to null using function cast_null')"
```

We've observed that this is due to the way that PyArrow encodes types in parquet files.

At the reduce stage, we're combining several intermediate parquet files for a single spatial tile into the final parquet file. It's possible at this stage that some files will contain only empty (null) values in a column that we expect to be a string field.

e.g. 

#### File1

| int_field | string_field | float_field |
| --------- | ------------ | ----------  |
|         5 |      <empty> |         3.4 |
|         8 |      <empty> |         3.8 |

which will have a schema like:
       
    optional int64 field_id=-1 int_field;
    optional int32 field_id=-1 string_field **(Null)**;
    optional double field_id=-1 float_field;
    
#### File2
    
| int_field | string_field | float_field |
| --------- |------------- | ----------- |
|         6 |      hello   |         4.1 |
|         7 |      <empty> |         3.9 |

will have a schema like:

    optional int64 field_id=-1 int_field;
    optional binary field_id=-1 string_field (String);
    optional double field_id=-1 float_field;

When we try to merge these files together, the parquet engine does not want to perform a cast between these types, and throws an error.

### Approaching unequal schema

In the final stages of the import pipeline, we create a top-level parquet metadata file that includes the full schema for the dataset (`_common_metadata`). This expects that **all** parquet files have the same column-level schema.

If one or more files are misbehaving, this can cause the whole pipeline to fail, and in order to succeed, the files will need to be re-written using a standard parquet schema. We have found that this is caused by similar data discrepancies to the above case, but the issue can surface in different ways depending on the shape of the dataset.

# Objective

In this notebook, we will look at finding those misbehaving parquet files and generating a single standard parquet schema file to use when writing the dataset.

The first stage shows how to explore the datasets and see what fields are potentially causing issues.

In [30]:
import os
import pyarrow.parquet as pq
import glob
import os


## list all of our parquet data files

catalog_dir = "/data3/epyc/data3/hipscat/catalogs/tic_1/"
all_files = glob.glob(os.path.join(catalog_dir, "**/**/**.parquet"))
# catalog_dir = "/epyc/data/ztf_matchfiles/zubercal_dr16/atua.caltech.edu/F3215/"
# all_files = glob.glob(os.path.join(catalog_dir, "**.parquet"))
print(f'found {len(all_files)} parquet files')
all_files.sort()

## Find the first pair of differences
file_name = all_files[0]
md1 = pq.read_metadata(file_name)
print("first file")
print(file_name)

num_equal = 1
for file_name in all_files[1:]:
    md2 = pq.read_metadata(file_name)

    if not md1.schema.equals(md2.schema):
        print("mismatch file")
        print(file_name)
        
#         print(md1.schema)
#         print(md2.schema)
#         break
    else: num_equal+=1

print(f"compared {num_equal} files")

found 3768 parquet files
first file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=2/Dir=0/Npix=0.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=5/Dir=0/Npix=3589.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=5/Dir=0/Npix=3633.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=5/Dir=0/Npix=8030.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=5/Dir=0/Npix=8031.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=5/Dir=0/Npix=8049.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=5/Dir=0/Npix=8052.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=5/Dir=0/Npix=8053.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=6/Dir=10000/Npix=14344.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=6/Dir=10000/Npix=14345.parquet
mismatch file
/data3/epyc/data3/hipscat/catalogs/tic_1/Norder=6/Dir=10000/Npix=14346.parque

**Read a single input file**

Start by reading a single input file as a pandas dataframe. You can reuse the InputReader class that you've instantiated for the import pipeline.

In [26]:
import pandas as pd
from hipscat_import.catalog.file_readers import CsvReader

input_file="/data3/epyc/data3/hipscat/raw/tic_csv/tic_dec56_00S__54_00S.csv.gz"

## This input CSV file requires header and type data from another source.
type_frame = pd.read_csv("/astro/users/mmd11/git/hipscripts/epyc/allwise/tic_types.csv")
type_names = type_frame["name"].values.tolist()
type_map = dict(zip(type_frame["name"], type_frame["type"]))

file_reader = CsvReader(
                    header=None,
                    column_names=type_frame["name"].values.tolist(),
                    type_map=type_map,
                    chunksize=50_000
                )

data_frame = next(file_reader.read(input_file))
data_frame

Unnamed: 0,ID,version,HIP,TYC,UCAC,TWOMASS,SDSS,ALLWISE,GAIA,APASS,...,splists,e_RA,e_Dec,RA_orig,Dec_orig,e_RA_orig,e_Dec_orig,raddflag,wdflag,objID
0,421102183,20190415,,,,17261819-5512067,,,5922830166853154944,29366166,...,,6.040034,5.723953,261.575979,-55.201992,0.189400,0.213219,1,0,1244322854
1,421229958,20190415,,,,17270117-5439406,,J172701.26-543942.1,5922959054539196800,25407987,...,,2.501635,2.143100,261.754774,-54.661488,0.086798,0.082517,1,0,1247347547
2,421230090,20190415,,,,17271673-5437554,,J172716.75-543755.7,5922964895681108992,25408261,...,,2.738124,2.371065,261.819850,-54.631986,0.130497,0.091899,1,0,1248828237
3,421228714,20190415,,,176-197391,17270502-5457554,,J172705.23-545754.1,5922859857961061760,,...,,4.549063,3.928844,261.770905,-54.965539,0.146773,0.146296,1,0,1247346996
4,421098618,20190415,,,,17260598-5418155,,J172605.95-541815.9,5923042857934705280,,...,,7.995923,6.302023,261.525699,-54.304340,0.307969,0.232219,1,0,1244321199
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
49995,1044757491,20190415,,,,,,,6063861126032459136,,...,,8.085994,13.512841,201.222372,-55.945179,0.125915,0.326936,1,0,848094308
49996,1046515676,20190415,,,,,,,6067114336119961216,,...,,11.718694,7.443775,202.040730,-54.277964,0.256131,0.286796,1,0,853812771
49997,1046750717,20190415,,,,,,,6068601910926032512,,...,,25.734570,15.305327,202.492182,-54.246797,0.577675,0.490129,1,0,853813897
49998,1044730684,20190415,,,,,,,6063812953698211456,,...,,1.195783,1.148171,202.738866,-55.257337,0.023952,0.030967,1,0,855564392


Now that we have the typed data from the input file, write out only the column-level schema to a new file:

In [27]:
import pyarrow.parquet as pq
import pyarrow as pa

schema_only_file = "/data3/epyc/data3/hipscat/tmp/tic_schema.parquet"
pq.write_table(pa.Table.from_pandas(data_frame).schema.empty_table(), where=schema_only_file)

## What next?

Add a reference to this new schema only file with the `use_schema_file`.

### Unsupported cast

Set `resume=True`, then restart your pipeline, and any reduce stages that previously failed will re-run, using this new schema file as column-level metadata.

### Unequal schema

In the case of the unequal schema in the final stages when writing the `_common_metadata` file, you'll need all your partitioned parquet files to have the same metadata before you try to resume your pipeline.

You can:

- re-generate the full output by running the pipeline from scratch
- re-write the mismatching files with the appropriate schema

The below code snippet will look for mismatched files, apply the schema from `schema_only_file`, and overwrite the original file. 

**Use with caution** - The line to overwrite the files has been commented out to keep folks from blindly running every cell in this notebook and overwriting data.

In [None]:
TODO