In [1]:
import logging
import os
import subprocess
import pandas as pd
import datetime 
import gc

In [2]:
%%writefile testutility.py
def read_config_file(filepath):
    import yaml
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def convert_to_datetime(df):
    for column in df.columns:
        if column.endswith("Date") and df[column].dtype == 'object':
            try:
                df[column] = pd.to_datetime(df[column], errors='coerce')

            except ValueError as e:
                print(f"Error converting column {column} to datetime: {e}")
                
        elif column.endswith("TimeRaw") and df[column].dtype == 'object':
            try:
                df[column] = pd.to_datetime(df[column], errors='coerce')
            
            except ValueError as e:
                print(f"Error converting column {column} to timestamp: {e}")
    print("Success convert to datetime")

def replacer(string, char):
    import re
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df, table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file", mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded", missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Overwriting testutility.py


# Write YAML

In [3]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: df_test
table_name: sample
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - legId
    - searchDate
    - flightDate
    - startingAirport
    - destinationAirport
    - fareBasisCode
    - travelDuration
    - elapsedDays
    - isBasicEconomy
    - isRefundable
    - isNonStop
    - baseFare
    - totalFare
    - seatsRemaining
    - totalTravelDistance
    - segmentsDepartureTimeEpochSeconds
    - segmentsDepartureTimeRaw
    - segmentsArrivalTimeEpochSeconds
    - segmentsArrivalTimeRaw
    - segmentsArrivalAirportCode
    - segmentsDepartureAirportCode
    - segmentsAirlineName
    - segmentsAirlineCode
    - segmentsEquipmentDescription
    - segmentsDurationInSeconds
    - segmentsDistance
    - segmentsCabinCode

Overwriting file.yaml


In [4]:
import testutility as util
config_data = util.read_config_file("file.yaml")

In [5]:
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'df_test',
 'table_name': 'sample',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['legId',
  'searchDate',
  'flightDate',
  'startingAirport',
  'destinationAirport',
  'fareBasisCode',
  'travelDuration',
  'elapsedDays',
  'isBasicEconomy',
  'isRefundable',
  'isNonStop',
  'baseFare',
  'totalFare',
  'seatsRemaining',
  'totalTravelDistance',
  'segmentsDepartureTimeEpochSeconds',
  'segmentsDepartureTimeRaw',
  'segmentsArrivalTimeEpochSeconds',
  'segmentsArrivalTimeRaw',
  'segmentsArrivalAirportCode',
  'segmentsDepartureAirportCode',
  'segmentsAirlineName',
  'segmentsAirlineCode',
  'segmentsEquipmentDescription',
  'segmentsDurationInSeconds',
  'segmentsDistance',
  'segmentsCabinCode']}

In [6]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'

df_sample = pd.read_csv(source_file)
df_sample.head()

Unnamed: 0,legId,searchDate,flightDate,startingAirport,destinationAirport,fareBasisCode,travelDuration,elapsedDays,isBasicEconomy,isRefundable,...,segmentsArrivalTimeEpochSeconds,segmentsArrivalTimeRaw,segmentsArrivalAirportCode,segmentsDepartureAirportCode,segmentsAirlineName,segmentsAirlineCode,segmentsEquipmentDescription,segmentsDurationInSeconds,segmentsDistance,segmentsCabinCode
0,9ca0e81111c683bec1012473feefd28f,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H29M,0,False,False,...,1650223560,2022-04-17 15:26:00-04:00,BOS,ATL,Delta,DL,Airbus A321,8940,947,coach
1,98685953630e772a098941b71906592b,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H30M,0,False,False,...,1650200400,2022-04-17 09:00:00-04:00,BOS,ATL,Delta,DL,Airbus A321,9000,947,coach
2,98d90cbc32bfbb05c2fc32897c7c1087,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H30M,0,False,False,...,1650218700,2022-04-17 14:05:00-04:00,BOS,ATL,Delta,DL,Boeing 757-200,9000,947,coach
3,969a269d38eae583f455486fa90877b4,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H32M,0,False,False,...,1650227460,2022-04-17 16:31:00-04:00,BOS,ATL,Delta,DL,Airbus A321,9120,947,coach
4,980370cf27c89b40d2833a1d5afc9751,2022-04-16,2022-04-17,ATL,BOS,LA0NX0MC,PT2H34M,0,False,False,...,1650213180,2022-04-17 12:33:00-04:00,BOS,ATL,Delta,DL,Airbus A321,9240,947,coach


In [7]:
util.col_header_val(df_sample,config_data)

column name and column length validation passed


1

In [8]:
util.convert_to_datetime(df_sample)

Success convert  to datetime


In [10]:
# Save to gz
output_file_path = "df_test.csv.gz"
df_sample.to_csv(output_file_path, sep='|', index=False, compression='gzip')

print(f"Data has been successfully saved to {output_file_path}")

Data has been successfully saved to df_test.csv.gz
