In [19]:
import pandas as pd
import pandera as pa
import io
from minio import Minio
import os

ACCESS_ROOT = os.environ.get("PREFECT_MINIO_ACCESS_ROOT")
SECRET_ROOT = os.environ.get("PREFECT_MINIO_SECRET_ROOT")
MINIO_HOST = os.environ.get("PREFECT_MINIO_HOST")
minio_client = Minio(MINIO_HOST, access_key=ACCESS_ROOT,
                     secret_key=SECRET_ROOT, secure=False)
# Query parquet file from MinIO
data = minio_client.get_object(
    "landing-zone", "Recintos_Almendros_Cercanos_y_Otros_Cultivos.xlsx").read()
df = pd.read_excel(io.BytesIO(data), engine="openpyxl",
                       sheet_name="Tratamientos", na_values=[''])




In [20]:
df.dtypes

MovimientoCosecha                       int64
MovimientoFechaDeInicio                object
Producto                               object
ProductoNombre                         object
Formulado                              object
TratamientosPlagaEfectosEnPlagasId      int64
EfectosEnPlagas                        object
TratamientosPlagaMalasHierbasId       float64
SecUserNombre                          object
SecUserNIF                             object
SecUserId                               int64
ParcelaProvinciaId                      int64
ParcelaMunicipioId                      int64
ParcelaPoligono                         int64
Parcela                                 int64
ParcelaRecinto                         object
ParcelaParaje                          object
ParcelaAgregado                        object
ParcelaZona                             int64
ParcelaCosechaCodigoPAC                 int64
ParcelaCosechaCultivoPAC               object
Caldo                             

In [21]:
schema = pa.DataFrameSchema({
        "MovimientoCosecha": pa.Column(pa.Int),
        "MovimientoFechaDeInicio": pa.Column(pa.DateTime),
        "Producto": pa.Column(pa.String, nullable=True),
        "ProductoNombre": pa.Column(pa.String),
        "Formulado": pa.Column(pa.String, nullable=True),
        "TratamientosPlagaEfectosEnPlagasId": pa.Column(pa.String, nullable=True),
        "EfectosEnPlagas": pa.Column(pa.String),
        "TratamientosPlagaMalasHierbasId": pa.Column(pa.String, nullable=True),
        "SecUserNombre": pa.Column(pa.String),
        "SecUserNIF": pa.Column(pa.String, nullable=True),
        "SecUserId": pa.Column(pa.String),
        "ParcelaProvinciaId": pa.Column(pa.String),
        "ParcelaMunicipioId": pa.Column(pa.String),
        "ParcelaPoligono": pa.Column(pa.String),
        "Parcela": pa.Column(pa.String),
        "ParcelaRecinto": pa.Column(pa.String),
        "ParcelaParaje": pa.Column(pa.String, nullable=True),
        "ParcelaAgregado": pa.Column(pa.String),
        "ParcelaZona": pa.Column(pa.String),
        "ParcelaCosechaCodigoPAC": pa.Column(pa.String),
        "ParcelaCosechaCultivoPAC": pa.Column(pa.String),
        "Caldo": pa.Column(pa.String, nullable=True),
        "TipoDeDosisId": pa.Column(pa.String, nullable=True),
        "TipoDeDosisDetalle": pa.Column(pa.String),
        "MovimientoParcelaSuperficieTratada": pa.Column(pa.Float),
        "Cantidad": pa.Column(pa.Float),
        "MovimientoPlazoDeSeguridad": pa.Column(pa.String, nullable=True),
        "MovimientoDosis": pa.Column(pa.Float),
        "ParcelaSuperficieCultivo": pa.Column(pa.Float),
        "ParcelaSuperficieSIGPAC": pa.Column(pa.Float, nullable=True),
        "ParcelaZonaVulnerable": pa.Column(pa.String, nullable=True),
        "UsoDeParcelasId": pa.Column(pa.String, nullable=True),
    }, coerce=True, unique_column_names=True)

validated_df = schema.validate(df)
validated_df.dtypes

MovimientoCosecha                              int64
MovimientoFechaDeInicio               datetime64[ns]
Producto                                      object
ProductoNombre                                object
Formulado                                     object
TratamientosPlagaEfectosEnPlagasId            object
EfectosEnPlagas                               object
TratamientosPlagaMalasHierbasId               object
SecUserNombre                                 object
SecUserNIF                                    object
SecUserId                                     object
ParcelaProvinciaId                            object
ParcelaMunicipioId                            object
ParcelaPoligono                               object
Parcela                                       object
ParcelaRecinto                                object
ParcelaParaje                                 object
ParcelaAgregado                               object
ParcelaZona                                   

In [22]:

validated_df.rename(columns={"MovimientoCosecha": "harvestYear", "MovimientoFechaDeInicio": "harvestInitDate", "Producto": "phytosanitaryId", "ProductoNombre": "phytosanitaryName", "Formulado": "phytosanitaryFormula", "TratamientosPlagaEfectosEnPlagasId": "plagueTreatmentEffectsId", "EfectosEnPlagas": "plagueEffects", "TratamientosPlagaMalasHierbasId": "plagueTreatmentWeedsId", "SecUserNombre": "secUserName", "SecUserNIF": "secUserNIF", "SecUserId": "secUserId", "ParcelaProvinciaId": "parcelProvinceId", "ParcelaMunicipioId": "parcelMunicipalityId", "ParcelaPoligono": "parcelPolygonId", "Parcela": "parcelId", "ParcelaRecinto": "parcelEnclosureId",
              "ParcelaParaje": "parcelGeographicSpot", "ParcelaAgregado": "parcelAggregatedId", "ParcelaZona": "parcelZoneId", "ParcelaCosechaCodigoPAC": "parcelHarvestPACCode", "ParcelaCosechaCultivoPAC": "parcelHavestPACCropTree", "Caldo": "broth", "TipoDeDosisId": "doseKind", "TipoDeDosisDetalle": "doseUnit", "MovimientoParcelaSuperficieTratada": "treatedArea", "Cantidad": "phytosanitaryQuantityMovement", "MovimientoPlazoDeSeguridad": "safePeriodMovement", "MovimientoDosis": "doseMovement", "ParcelaSuperficieCultivo": "parcelArea", "ParcelaSuperficieSIGPAC": "parcelAreaSIGPAC", "ParcelaZonaVulnerable": "parcelVulnerableArea", "UsoDeParcelasId": "parcelSIGPACCode"}, inplace=True)
# Hide sensitive data
validated_df = validated_df.drop(columns=["secUserNIF"])
# NOTE: Thanks to Jupyter Notebook, I found out that some number columns are being read as objects
# Trim spaces and tabs to all object columns
validated_df = validated_df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
# Convert NULL, NP, NaN, etc. to None
validated_df = validated_df.replace(
    {pd.NA: None, "NP": None, "NaN": None, "": None, "NULL": None})
# Remove parcelEnclosureId row if it is None
validated_df = validated_df[validated_df["parcelAggregatedId"].notna()]
# Convert strings to uppercase
validated_df["secUserName"] = validated_df["secUserName"].str.upper()
# Get data year
data_year = validated_df['harvestYear'].iloc[:1].values[0]

In [23]:
validated_df.dtypes

harvestYear                               int64
harvestInitDate                  datetime64[ns]
phytosanitaryId                          object
phytosanitaryName                        object
phytosanitaryFormula                     object
plagueTreatmentEffectsId                 object
plagueEffects                            object
plagueTreatmentWeedsId                  float64
secUserName                              object
secUserId                                object
parcelProvinceId                         object
parcelMunicipalityId                     object
parcelPolygonId                          object
parcelId                                 object
parcelEnclosureId                        object
parcelGeographicSpot                     object
parcelAggregatedId                       object
parcelZoneId                             object
parcelHarvestPACCode                     object
parcelHavestPACCropTree                  object
broth                                   

In [24]:
data = validated_df.to_parquet()
# To dataframe
df = pd.read_parquet(io.BytesIO(data))
# Show complete dataframe
pd.set_option('display.max_columns', None)
# Show rows which column "parcelEnclosureId" is not '0'
df[df["parcelAggregatedId"] != '0']

Unnamed: 0,harvestYear,harvestInitDate,phytosanitaryId,phytosanitaryName,phytosanitaryFormula,plagueTreatmentEffectsId,plagueEffects,plagueTreatmentWeedsId,secUserName,secUserId,parcelProvinceId,parcelMunicipalityId,parcelPolygonId,parcelId,parcelEnclosureId,parcelGeographicSpot,parcelAggregatedId,parcelZoneId,parcelHarvestPACCode,parcelHavestPACCropTree,broth,doseKind,doseUnit,treatedArea,phytosanitaryQuantityMovement,safePeriodMovement,doseMovement,parcelArea,parcelAreaSIGPAC,parcelVulnerableArea,parcelSIGPACCode
