In [24]:
import pandas as pd
import pandera as pa
from pandera.typing import Index, DataFrame, Series
from datetime import datetime
from pandera import dtypes
import logging as log

import config
config.log

class InputSchema:
    class YellowTripData(pa.SchemaModel):
        VendorID: Series[int] = pa.Field(coerce=True, nullable=True)
        tpep_pickup_datetime: Series[pa.DateTime] = pa.Field(coerce=True)
        tpep_dropoff_datetime: Series[pa.DateTime] = pa.Field(coerce=True)
        passenger_count: Series[float] = pa.Field(coerce=True, nullable=True)
        trip_distance: Series[float] = pa.Field(coerce=True, nullable=True)
        RatecodeID: Series[float] = pa.Field(coerce=True, nullable=True)
        store_and_fwd_flag: Series[str] = pa.Field(coerce=True, nullable=True)
        PULocationID: Series[int] = pa.Field(coerce=True, nullable=True)
        DOLocationID: Series[int] = pa.Field(coerce=True, nullable=True)
        payment_type: Series[int] = pa.Field(coerce=True, nullable=True)
        fare_amount: Series[float] = pa.Field(coerce=True, nullable=True)
        extra: Series[float] = pa.Field(coerce=True, nullable=True)
        mta_tax: Series[float] = pa.Field(coerce=True, nullable=True)
        tip_amount: Series[float] = pa.Field(coerce=True, nullable=True)
        tolls_amount: Series[float] = pa.Field(coerce=True, nullable=True)
        improvement_surcharge: Series[float] = pa.Field(coerce=True, nullable=True)
        total_amount: Series[float] = pa.Field(coerce=True, nullable=True)
        congestion_surcharge: Series[float] = pa.Field(coerce=True, nullable=True)
        airport_fee: Series[float] = pa.Field(coerce=True, nullable=True)

class OutputSchema:
    class YellowTripData(InputSchema.YellowTripData):
        pass
    
       
@pa.check_types
def transform_YellowTripData(
    df: DataFrame[InputSchema.YellowTripData]) -> DataFrame[OutputSchema.YellowTripData]:
    log.info(f"Table YellowTripData validation complete. DataFrame shape: {df.shape}")
    return df
        
        
        
        

In [None]:
import pandas as pd
import pandera as pa
from pandera.typing import Index, DataFrame, Series
from datetime import datetime
from pandera import dtypes


class InputSchema:
    class GlobalTempSchema(pa.SchemaModel):
        dt: Series[pa.DateTime] = pa.Field(coerce=True)
        LandAverageTemperature: Series[float] = pa.Field(coerce=True, nullable=True)
        LandAverageTemperatureUncertainty: Series[float] = pa.Field(
            coerce=True, nullable=True
        )
        LandMaxTemperature: Series[float] = pa.Field(coerce=True, nullable=True)
        LandMaxTemperatureUncertainty: Series[float] = pa.Field(
            coerce=True, nullable=True
        )
        LandMinTemperatureUncertainty: Series[float] = pa.Field(
            coerce=True, nullable=True
        )
        LandAndOceanAverageTemperature: Series[float] = pa.Field(
            coerce=True, nullable=True
        )
        LandAndOceanAverageTemperatureUncertainty: Series[float] = pa.Field(
            coerce=True, nullable=True
        )

    class GlobalLandTemperature(pa.SchemaModel):
        dt: Series[pa.DateTime] = pa.Field(coerce=True)
        AverageTemperature: Series[float] = pa.Field(coerce=True, nullable=True)
        AverageTemperatureUncertainty: Series[float] = pa.Field(
            coerce=True, nullable=True
        )
        City: Series[str] = pa.Field(coerce=True, nullable=True)
        Country: Series[str] = pa.Field(coerce=True, nullable=True)
        Latitude: Series[str] = pa.Field(coerce=True, nullable=True)
        Longitude: Series[str] = pa.Field(coerce=True, nullable=True)


class OutputSchema:
    class GlobalTempSchema(InputSchema.GlobalTempSchema):
        pass

    class GlobalLandTemperature(InputSchema.GlobalLandTemperature):
        pass


@pa.check_types
def transform_globaltempschema(
    df: DataFrame[InputSchema.GlobalTempSchema],
) -> DataFrame[OutputSchema.GlobalTempSchema]:
    return df


@pa.check_types
def transform_globallandtemp(
    df: DataFrame[InputSchema.GlobalLandTemperature],
) -> DataFrame[OutputSchema.GlobalLandTemperature]:
    return df


In [54]:
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2.0,3.8,1.0,N,142,236,1,14.5,3.0,0.5,3.650391,0.0,0.300049,21.953125,2.5,0.0
1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1.0,2.1,1.0,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.300049,13.296875,0.0,0.0
2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1.0,0.97,1.0,N,166,166,1,7.5,0.5,0.5,1.759766,0.0,0.300049,10.5625,0.0,0.0
3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1.0,1.09,1.0,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.300049,11.796875,2.5,0.0
4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1.0,4.3,1.0,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.300049,30.296875,2.5,0.0


In [35]:
import pandas as pd

df = pd.DataFrame({
    "name": ["dar", "var", "jar", "mar", "kar"],
    "tel": [1, 2, 3, 4, 5],
    "level": ['yellow', 'red', 'yellow', 'red', 'yellow']
})

# Pavertimas kategoriniais duomenimis
columns_to_category = []

df['VendorID'] = df['VendorID'].astype('category')


# Patikrinti rezultatus
print(df.dtypes)


name     category
tel         int64
level    category
dtype: object


In [1]:

import sys

sys.path.append(".\\scripts\\")
import load_and_save_data as data
import category_and_dtypes as cat


df = data.load_data_frame(month='05')
df = cat.change_dtypes_YellowData(df)



INFO:root:Table YellowTripData validation complete. DataFrame shape: (3588295, 19)
INFO:root:
Transform dtype: Complete
 Validate: Complete

INFO:root:DataFrame size: 687.06 MB


In [2]:
df.head(5)

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2022-05-01 00:00:36,2022-05-01 00:19:18,1.0,4.1,1.0,N,246,151,2,17.0,3.0,0.5,0.0,0.0,0.300049,20.796875,2.5,0.0
1,1,2022-05-01 00:27:44,2022-05-01 00:41:33,1.0,2.3,1.0,N,238,74,2,11.0,3.0,0.5,0.0,0.0,0.300049,14.796875,2.5,0.0
2,1,2022-05-01 00:59:00,2022-05-01 01:14:22,1.0,4.2,1.0,N,163,260,2,15.5,3.0,0.5,0.0,0.0,0.300049,19.296875,2.5,0.0
3,1,2022-05-01 00:48:18,2022-05-01 01:28:02,1.0,0.0,1.0,N,79,182,1,41.1875,0.0,0.5,0.0,0.0,0.300049,42.0,0.0,0.0
4,1,2022-05-01 00:28:26,2022-05-01 00:37:49,1.0,1.6,1.0,N,238,75,1,7.5,3.0,0.5,2.25,0.0,0.300049,13.546875,2.5,0.0


In [30]:
import pandas as pd
import logging
import psutil

import load_and_save_data as data
import category_and_dtypes as cat
import data_procedures as DPro
import calculate as calc

import config

config.log


def load_data_frame(month: str = "01", year: str = "2022") -> pd.DataFrame:
    file_name = f"yellow_tripdata_{year}-{month}.parquet"
    path = ".\\data\\yellow_trip_data_2022\\"
    df = pd.read_parquet(f"{path}{file_name}", engine="pyarrow")

    return df


def data_after_dtypes_changes(df):
    df = cat.change_dtypes_YellowData(df)

    return df


def test_calculations(df: pd.DataFrame):
    df = df[
        [
            "trip_distance",
            "fare_amount",
            "extra",
            "mta_tax",
            "tip_amount",
            "tolls_amount",
        ]
    ]
    df = df.apply(
        lambda x: x[["extra", "mta_tax", "tip_amount", "tolls_amount", "fare_amount"]]
        / x["trip_distance"],
        axis=1,
    )
    return df


def get_dataframes():
    clear_data = load_data_frame()
    new_data = data_after_dtypes_changes(clear_data)

    return clear_data, new_data


def memory_usage_test(df: pd.DataFrame):
    before_memory = psutil.Process().memory_info().rss
    df = test_calculations(df.head(10000))
    after_memory = psutil.Process().memory_info().rss
    memory_usage = (after_memory - before_memory) / (1024 * 1024)
    return memory_usage


def compramison_memory_usage(memory1, memory2):
    info = f"""
    1 table memory usage: {memory1:.2f} MB\n
    2 table memory usage: {memory2:.2f} MB\n
    """
    logging.info(info)



clear_data = load_data_frame()
new_data = data_after_dtypes_changes(clear_data)

memory1 = memory_usage_test(clear_data)   
memory2 = memory_usage_test(new_data)
compramison_memory_usage(memory1=memory1, memory2=memory2)


INFO:root:
    1 table memory usage: 1.86 MB

    2 table memory usage: 0.14 MB

    
