# core data utilities

> core utilities for data processing: 
> datetime processing, time zone processing, validity, filtering

In [None]:
#| default_exp data.utils

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| hide
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [None]:
#| export
from datetime import datetime
import pandas as pd
import pytz

In [None]:
#| export
pd.options.mode.chained_assignment = None

In [None]:
#| export
def validate_datetime(date_string, format_string):
    """
    Validates if a string is a valid datetime according to the given format.
    """

    try:
        datetime.strptime(date_string, format_string)
        return True
    except ValueError:
        return False
    
def validate_datetime_in_iso_format(date_text):
        try:
            datetime.fromisoformat(date_text)
            return True
        except ValueError:
            # raise ValueError("Incorrect data format, should be YYYY-MM-DD")
            return False


In [None]:
# Example usage
date_strings = [
    "2023-12-25 24:00:00",
    "2023-12-25 12:60:00.12",
    "2023-12-25 12:10:00",
    "2023-12-25 23:00:00",
    "2011-11-04",
    "20111104",
    "2011-11-04T00:05:23",
    "2011-11-04T00:05:23.283185",
    "20111104T000523",
    "20111104T000523.283185",
    "2011-11-04T00:05:23Z",
    "2011-11-04T00:05:23.283185+08:00",
    "2011-11-04T00:05:23+08:00",
    ]
format_string = "%Y-%m-%d"

for s in date_strings:
    if validate_datetime_in_iso_format(s):
        print(f"{s} is Valid datetime string")
    else:
        print(f"{s} is Invalid datetime string")

In [None]:
df_datetime_str = pd.DataFrame(date_strings)
df_datetime_str.columns = ['datetime']
# df_datetime_str
df_datetime_str['validity'] = df_datetime_str.apply(lambda x: validate_datetime_in_iso_format(x.iloc[0]), axis=1)
df_datetime_str


In [None]:
df_datetime = df_datetime_str[df_datetime_str.apply(lambda x: validate_datetime_in_iso_format(x.iloc[0]), axis=1)]
df_datetime


In [None]:
df_datetime_invalid = df_datetime_str[df_datetime_str.apply(lambda x: not validate_datetime_in_iso_format(x.iloc[0]), axis=1)]
df_datetime_invalid


In [None]:
df_datetime['datetime']

In [None]:
# df_datetime_str.apply(lambda x: validate_datetime_in_iso_format(x.iloc[0]), axis=1)
df_datetime1 = df_datetime.apply(lambda x: datetime.fromisoformat(x['datetime']), axis=1)
df_datetime1.name = 'datetime'
df_datetime1


In [None]:
df_datetime1.loc[0:8]

In [None]:
df_no_tz = pd.to_datetime(df_datetime1.loc[0:8])
df_no_tz

In [None]:
#| export
def get_timezone_abbreviation(timezone_name):
    timezone = pytz.timezone(timezone_name)
    now = datetime.now(timezone)
    return now.strftime("%Z")

def validate_timezone_in_iana(timezone_name):
    return timezone_name in pytz.all_timezones


In [None]:

time_zone_strings = [
    'Eastern Standard Time',
    'Eastern Daylight Time',
    'US/Eastern',
    'US/Daylight',
    'Asia/Shanghai',
    'Asia/Mumbai',
    'America/New_York',
    'Europe/London',
    'America/Los_Angeles',
    'Asia/Kolkata',
    'Europe/London',
    'Asia/Hong_Kong',
    'Asia/Tokyo',
]
df_timezone_str = pd.DataFrame(time_zone_strings)
df_timezone_str.columns = ['timezone']

# df_datetime_str
df_timezone_str['validity'] = df_timezone_str.apply(lambda x: validate_timezone_in_iana(x['timezone']), axis=1)
df_timezone_str


In [None]:
#| export
timezone_fixing_map = {'Eastern Standard Time': 'US/Eastern',
                       'Eastern Daylight Time': 'US/Eastern',
                       'US/Daylight': 'US/Eastern',
                       'Asia/Mumbai': 'Asia/Calcutta',}

In [None]:
#| export
def fix_timezone(timezone_name):
    return timezone_fixing_map.get(timezone_name, timezone_name)

In [None]:
for tz in time_zone_strings:
    print(f"{tz} -> {fix_timezone(tz)} -> {get_timezone_abbreviation(fix_timezone(tz))}")


In [None]:

df_timezone_unidentified = df_timezone_str[df_timezone_str.apply(lambda x: not validate_timezone_in_iana(x['timezone']), axis=1)]
df_timezone_unidentified

In [None]:

df_timezone_unidentified.apply(lambda x: fix_timezone(x['timezone']), axis=1)


In [None]:
df_timezone_str['timezone_fixed'] = df_timezone_str.apply(lambda x: fix_timezone(x['timezone']), axis=1)
df_timezone_str


In [None]:

df_timezone = df_timezone_str[df_timezone_str.apply(lambda x: validate_timezone_in_iana(x['timezone_fixed']), axis=1)]
df_timezone

In [None]:
# extract invalid data
df_timezone_invalid = df_timezone_str[df_timezone_str.apply(lambda x: not validate_timezone_in_iana(x.iloc[0]), axis=1)]
df_timezone_invalid

In [None]:
df_abbr = df_timezone.apply(lambda x: get_timezone_abbreviation(x.loc['timezone_fixed']), axis=1)
df_abbr.name = 'abbr'
df_abbr

In [None]:
df_timezone

In [None]:
# df_timezone_str['validity'] = df_timezone_str.apply(lambda x: validate_timezone_in_iana(x.iloc[0]), axis=1)
# df_timezone['abbr'] = df_timezone.apply(lambda x: get_timezone_abbreviation(x.iloc[0]), axis=1)
# df_timezone.apply(lambda x: get_timezone_abbreviation(x.iloc[0]), axis=1)
df_timezone.loc[:,'abbr'] = df_abbr
df_timezone
# df_timezone


In [None]:

df_utc_offset = (df_timezone.apply(lambda x: datetime.now(pytz.timezone(x.loc['timezone_fixed'])), axis=1)
                            .apply(lambda x: x.strftime('%z')))
df_utc_offset.name = 'utc_offset'
df_utc_offset


In [None]:

df_timezone.loc[:,'utc_offset'] = df_utc_offset
df_timezone


In [None]:
for tz in df_timezone.loc[:,'timezone_fixed']:
    print(f"{tz}: {datetime.now(pytz.timezone(tz))} - {datetime.now(pytz.timezone(tz)).strftime('%Z')}, {datetime.now(pytz.timezone(tz)).strftime('%z')}")

In [None]:
for dt,tz in zip(df_no_tz, df_timezone.loc[:,'timezone_fixed']):
    print(f"{dt}: {dt.tz_localize(tz)}, as {dt.tz_localize(tz).astimezone('utc')}")

In [None]:
from zoneinfo import ZoneInfo

In [None]:
for dt,tz in zip(df_no_tz, df_timezone.loc[:,'timezone_fixed']):
    tz_zone = pytz.timezone(tz)
    print(f"{dt}: {tz_zone.localize(dt)}, or {dt.replace(tzinfo=tz_zone)}, or {dt.tz_localize(tz)}, as {dt.tz_localize(tz).astimezone('utc')}")

# Convert csv to sqlite db

In [None]:
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
from tqdm import tqdm

In [None]:
sqlite_eng = create_engine('sqlite:///../data/price_training_raw.db', echo=False)


In [None]:
csv_file_path = '../data/bq-results-20240920-103832-1726828742594.csv'
chunk_size = 10000
total_lines = sum(1 for line in open(csv_file_path))
print(total_lines)

In [None]:
sample = pd.read_csv(csv_file_path, index_col='dispatch_id', nrows=5)
sample

In [None]:
# data = np.zeros((2,), dtype=[("A", "i4"), ("B", "f4"), ("C", "U")])
# data

# data = np.empty((2,), dtype=[('start','U'), ('end','U')])
# data

# df = pd.DataFrame(data)
# df

In [None]:
# it = pd.read_csv(csv_file_path, index_col='dispatch_id', chunksize=chunk_size)
# d = it.get_chunk()
# d
# df = pd.DataFrame(it.get_chunk())
# df

In [None]:
for chunk in tqdm(pd.read_csv(csv_file_path, index_col='dispatch_id', chunksize=chunk_size), total=total_lines//chunk_size +1):
# for chunk in tqdm(pd.read_csv(csv_file_path, chunksize=chunk_size), total=total_lines//chunk_size +1):
    data = np.empty((chunk.shape[0],), dtype=[('route_start','U'), ('route_end','U')])
    df_append = pd.DataFrame(index=chunk.index, data=data)
    chunk = pd.concat([chunk, df_append], axis = 1)
    chunk.convert_dtypes()
    # chunk.astype({ 
    #     'ride_id': 'int64',
    #     'trip_count': 'int8',
    #     'from_utc': 'float64',
    #     'from_time_str': 'string',
    #     'from_timezone_str': 'string',
    #     'to_time_str': 'string',
    #     'to_timezone_str': 'string',
    #     'passenger_count': 'int8',
    #     'luggage_count': 'int8',
    #     'children_count': 'int8',
    #     'infant_count': 'int8',
    #     'distance': 'float64',
    #     'duration': 'float64',
    #     'trip_no': 'int8',
    #     'dispatch_amount': 'float64',
    #     'dispatch_currency': 'string',
    #     'from_date_str': 'string',
    #     'from_time_fix_str': 'string',
    #     'from_datetime_fix_str': 'string',
    #     'trip_type_id': 'int64',
    #     'trip_type': 'string',
    #     'ride_status_id': 'int64',
    #     'ride_status': 'string',
    #     'dispatch_status_id': 'int64',
    #     'distpatch_status': 'string',
    #     'dispatch_type': 'string',
    #     'fleet': 'string',
    #     'partner_id': 'int64',
    #     'start_place_id': 'int64',
    #     'start_place': 'string',
    #     'start_lng': 'float64',
    #     'start_ltt': 'float64',
    #     'end_place_id': 'int64',
    #     'end_place': 'string',
    #     'end_lng': 'float64',
    #     'end_ltt': 'float64',
    #     'vehicle_class_id': 'int64',
    #     'vehicle_class': 'string',
    #     'row_num': 'int64',
    #     'route_start': 'string', 
    #     'route_end': 'string',
    #     })
    chunk.to_sql('price_training_raw_2024_usd', sqlite_eng, if_exists= 'append',index=True)

## convert route deduplication result to sqlite db

In [None]:
csv_result_file_list = ['../data/price_training_raw_2024_usd_start_result_deduplicated.csv',
                        '../data/price_training_raw_2024_usd_end_result_deduplicated.csv']
csv_result_table_list = ['price_training_raw_2024_usd_start_result_deduplicated',
                        'price_training_raw_2024_usd_end_result_deduplicated']

chunk_size = 10000
total_lines = [sum(1 for line in open(csv_result_file_list[0])),sum(1 for line in open(csv_result_file_list[1]))]
print(total_lines)


In [None]:

from sqlalchemy import create_engine
import pandas as pd
import numpy as np
from tqdm import tqdm

sqlite_eng = create_engine('sqlite:///../data/price_training_raw.db', echo=False)

sample = pd.read_csv(csv_result_file_list[1], index_col='dispatch_id', nrows=5)
sample

In [None]:
for t in range(2):
    for chunk in tqdm(pd.read_csv(csv_result_file_list[t], index_col='dispatch_id', chunksize=chunk_size),
                      total=total_lines[t] // chunk_size + 1):
        # for chunk in tqdm(pd.read_csv(csv_file_path, chunksize=chunk_size), total=total_lines//chunk_size +1):
        chunk.to_sql(csv_result_table_list[t], sqlite_eng, if_exists='append', index=True)


## Get the union of the label tables

In [None]:
#| export
from sqlalchemy import MetaData, create_engine, asc, desc, and_, or_, not_, case, extract, cast, Numeric, text, distinct, Column, update, bindparam
from sqlalchemy.types import DateTime, Date, Time, String
from sqlalchemy.schema import *
from sqlalchemy.sql import func as F, Selectable, select, union
from sqlalchemy.dialects import registry
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
from sqlalchemy.orm import sessionmaker


In [None]:
sql_eng = create_engine('sqlite:///../data/price_training_raw.db', echo=False)
conn = sql_eng.connect()
metadata = MetaData()
Session = sessionmaker(bind=sql_eng)
session = Session()
chunk_size = 100

In [None]:
label_list = ['price_training_raw_2024_usd_start_result_deduplicated',
              'price_training_raw_2024_usd_end_result_deduplicated',
              'price_training_raw_2024_usd_route_label_deduplicated',]

In [None]:
label_start_t = Table(label_list[0], metadata, autoload_with=sql_eng)
print(session.query(label_start_t).count())
label_end_t = Table(label_list[1], metadata, autoload_with=sql_eng)
print(session.query(label_end_t).count())

In [None]:
label_start_q = session.query(label_start_t)
label_start_q.count()
label_end_q = session.query(label_end_t)
label_end_q.count()


In [None]:
label_start_t.columns.keys()

In [None]:
label_routes_q = union(label_start_q, label_end_q)

In [None]:
df = pd.read_sql(label_routes_q, sql_eng)
# {l:r for l,r in zip(df.columns,label_start_t.columns.keys())}
df.columns = label_start_t.columns.keys()
# df.index = df['dispatch_id']
df.set_index(['start_ltt_lp', 'start_lng_lp', 'end_ltt_lp', 'end_lng_lp'], inplace=True)
# df.set_index('dispatch_id', inplace=True)
df


In [None]:
# df.sort_values(['start_ltt_lp', 'start_lng_lp', 'end_ltt_lp', 'end_lng_lp'], inplace=True)

In [None]:
df.to_sql(label_list[2], sqlite_eng, if_exists='append', index=True)


# Precision reduction for start_ltt, start_lng, end_ltt, end_lng and pick distinct values

In [None]:
from tqdm.notebook import tqdm
from sqlalchemy import create_engine, select, func, distinct, MetaData, Table, update, bindparam, Column, insert, desc, asc, and_, or_, not_, Numeric, cast, func
from sqlalchemy.orm import sessionmaker, aliased
import pandas as pd
import numpy as np

In [None]:
sql_eng = create_engine('sqlite:///../data/price_training_raw.db', echo=False)
conn = sql_eng.connect()
my_table = Table('price_training_raw_2024_usd', MetaData(), autoload_with=sql_eng)

In [None]:
query = select(func.count(distinct(my_table.c.dispatch_id)))

In [None]:
with sql_eng.connect() as connection:
    result = connection.execute(query)
    unique_count = result.scalar()
    
print(unique_count)

In [None]:
query = select(my_table).limit(10)
df = pd.read_sql(query, conn, index_col='dispatch_id')
# df[['route_start','route_end']]
df

# order by start_ltt, start_lng, end_ltt, end_lng

In [None]:
sql_eng = create_engine('sqlite:///../data/price_training_raw.db', echo=False)
conn = sql_eng.connect()
metadata = MetaData()
my_table = Table('price_training_raw_2024_usd', metadata, autoload_with=sql_eng)

In [None]:
metadata.reflect(bind=sql_eng)
raw_t = metadata.tables['price_training_raw_2024_usd']
s = select(raw_t).limit(10)
rp = sql_eng.connect().execute(s)
df = pd.DataFrame(rp)
df

In [None]:
# new_columns = [Column(col.name, col.type, primary_key=col.primary_key) for col in my_table.columns]
# new_table = Table('price_training_raw_2024_usd_reordered', metadata, *new_columns)
# new_table.create(sql_eng)
new_table = Table('price_training_raw_2024_usd_reordered', metadata, autoload_with=sql_eng)


In [None]:
Session = sessionmaker(bind=sql_eng)
session = Session()

## create new table with latitutde and longitude reduced to 5 decimal places

In [None]:
ordered_t = select(my_table).where(
    and_(
        my_table.c.start_ltt.isnot(None),
        my_table.c.start_lng.isnot(None),
    )
).order_by(asc(my_table.c.start_ltt), asc(my_table.c.start_ltt))
ordered_t = ordered_t.select_from(my_table)
ordered_t = ordered_t.alias()

In [None]:
# ordered_t = ordered_t.limit(10)
# df = pd.read_sql(ordered_t,sql_eng)
# df

In [None]:
lp_ltt_lng_t = select(ordered_t.c.dispatch_id,
                      cast(ordered_t.c.start_ltt, Numeric(9,5)).label('start_ltt_lp'),
                      cast(ordered_t.c.start_lng, Numeric(9,5)).label('start_lng_lp'),
                      cast(ordered_t.c.end_ltt, Numeric(9,5)).label('end_ltt_lp'),
                      cast(ordered_t.c.end_lng, Numeric(9,5)).label('end_lng_lp'))
lp_ltt_lng_t = lp_ltt_lng_t.select_from(ordered_t)
lp_ltt_lng_t = lp_ltt_lng_t.alias()

In [None]:
# df = pd.read_sql(lp_ltt_lng_t, sql_eng)
# df



In [None]:

distinct_start_lp_t = select(lp_ltt_lng_t.c.dispatch_id, 
                         lp_ltt_lng_t.c.start_ltt_lp,
                         # func.max(lp_ltt_lng_t.c.start_ltt_lp).label('unique_start_ltt_lp'), 
                         # func.max(lp_ltt_lng_t.c.start_lng_lp).label('unique_start_lng_lp'),
                         lp_ltt_lng_t.c.start_lng_lp, 
                         lp_ltt_lng_t.c.end_ltt_lp,
                         lp_ltt_lng_t.c.end_lng_lp
                         ).group_by(lp_ltt_lng_t.c.start_ltt_lp).distinct(lp_ltt_lng_t.c.start_lng_lp)#.subquery()
# distinct_subq_t = distinct_subq_t.limit(100)

In [None]:

# df = pd.read_sql(distinct_start_lp_t, sql_eng)
# df

In [None]:

distinct_end_lp_t = select(lp_ltt_lng_t.c.dispatch_id, 
                         lp_ltt_lng_t.c.start_ltt_lp,
                         lp_ltt_lng_t.c.end_ltt_lp,
                         lp_ltt_lng_t.c.end_lng_lp
                         ).group_by(lp_ltt_lng_t.c.end_ltt_lp).distinct(lp_ltt_lng_t.c.end_lng_lp)
distinct_end_lp_t = distinct_end_lp_t.alias()

In [None]:

# df = pd.read_sql(distinct_end_lp_t, sql_eng)
# df

In [None]:

# distinct_t = select(distinct(lp_ltt_lng_t.c.start_lng_lp))
# distinct_t = distinct_t.limit(1000)#.distinct(lp_ltt_lng_t.c.start_ltt)
# df = pd.read_sql(distinct_t, sql_eng)
# df

## Create new table with ordered data

In [None]:
ordered_t = select(my_table).where(
    and_(
        my_table.c.start_ltt.isnot(None),
        my_table.c.start_lng.isnot(None),
    )
).order_by(asc(my_table.c.start_ltt), asc(my_table.c.start_ltt))
ordered_t = ordered_t.select_from(my_table)


In [None]:
with sql_eng.connect() as conn:
    result = conn.execute(ordered_t)
    ordered_rows = result.fetchall()
# ordered_rows[:3]

In [None]:
data = ordered_rows[:10]
cols = [c.name for c in ordered_t.subquery().columns]
df = pd.DataFrame(data=data, columns=cols)
df

In [None]:
batch = [dict(zip(cols, row)) for row in data]
batch[:2]

# Query FP-Server and add label to database

In [None]:
result = session.execute(
    insert(new_table).values(batch)
)
session.commit()

In [None]:
chunk_size = 1000
total_rows = len(ordered_rows)
total_rows
total = total_rows//chunk_size+1
total

In [None]:

for chunk in tqdm(range(total), total=total, desc='Overall Processing'):
    start = chunk*chunk_size
    end = (chunk+1)*chunk_size
    if end > total_rows:
        end = total_rows
    chunk_data = ordered_rows[start:end]
    batch = [dict(zip(cols,row)) for row in chunk_data] 
    result = session.execute(insert(new_table).values(batch))
    session.commit()
    # for row in tqdm(chunk_data, total=len(chunk_data), desc='Chunk Processing'):
    #     session.execute(new_table.insert().values(row))
session.close()

In [None]:
df = pd.DataFrame(data=ordered_rows)
   

In [None]:
for row in tqdm(result, total=unique_count):
    for chunk in tqdm(pd.read_sql(query.statement, conn, index_col='dispatch_id', chunksize=chunk_size), total=total_rows//chunk_size+1, desc='Overall Processing'):
    route_list = []


In [None]:
with sql_eng.connect() as conn:
    result = conn.execute(ordered_t)
    for row in tqdm(result, total=unique_count):
        session.execute(new_table.insert().values(row))
    session.commit()

In [None]:
# list(my_table.columns)

In [None]:
# metadata_new = MetaData()
# my_table.to_metadata(metadata_new)
# metadata_new.tables

In [None]:
new_table = Table('price_training_raw_2024_usd_geo_ordered', metadata, *my_table.columns)
# list(new_table.columns)
# new_table.name = 'price_training_raw_2024_usd_geo_ordered'
# metadata_new.create_all(sql_eng)

# reduce precision of start_ltt, start_lng, end_ltt, end_lng by 3 digits

In [None]:
df.loc[:,'route_start'] = 'start'
df

In [None]:

stmt = (
    update(my_table)  # 'raw' is your table object
    .where(my_table.c.dispatch_id == 822019)
    .values(
        route_start='New Zone C',
        route_end='New Zone D'
    )
)
print(stmt)

In [None]:
compiled = stmt.compile()
compiled.params

In [None]:
with sql_eng.connect() as conn:
    result = conn.execute(stmt)
    conn.commit()

In [None]:
s = select(my_table).where(my_table.c.dispatch_id == 822019)
rp = sql_eng.connect().execute(s)
results = rp.fetchall()
results

In [None]:
df

In [None]:
for i,r in df.iterrows():
    l = [i, r['ride_id'], r['dispatch_amount'],r['fleet']]
    # print(r)
    df.at[i,'route_end'] = 'Shanghai'
    stmt = (
        update(my_table)
        .where(my_table.c.dispatch_id == int(i))
        .values(
            route_start='Beijing',
            route_end='Shanghai',
        )
    )
    with sql_eng.connect() as conn:
        conn.execute(stmt)
        conn.commit()
    # print('----')
    print(l)
df

# Update data batchwise in sqlite db

In [None]:
df.loc[:,'route_start'] = 'Zone 0'
df.loc[:,'route_end'] = 'Zone 1'
df

In [None]:
stmt = (
    update(my_table)  # 'raw' is your table object
    .where(my_table.c.dispatch_id == bindparam('b_dispatch_id'))
    .values(
        route_start=bindparam('route_start'),
        route_end=bindparam('route_end')
    )
)
print(stmt)

In [None]:
l = [ 
            {'dispatch_id': i,
             'route_start': r['route_start'],
             'route_end': r['route_end']} 
            for i,r in df.iterrows()
        ]
l

In [None]:

with sql_eng.begin() as conn:
    conn.execute(
        stmt,
        [ 
            {'b_dispatch_id': i,
             'route_start': r['route_start'],
             'route_end': r['route_end']} 
            for i,r in df.iterrows()
        ],
    )
    conn.commit()
    

In [None]:
s = select(my_table).limit(20)
rp = sql_eng.connect().execute(s)
results = rp.fetchall()
df = pd.DataFrame(results)
df


In [None]:
for i,r in df.iterrows():
    print(i, r['ride_id'], r['dispatch_amount'],r['fleet'])

In [None]:

# df.to_sql('price_training_raw_2024_usd', sqlite_eng, if_exists= 'replace',index=True, index_label='dispatch_id')

In [None]:

for r in df.itertuples():
    # print(r)
    print(r.ride_id, r.fleet)
    df.at[r.Index, 'route_start'] = 'PVG'
    # r.route_start = 'start'
    # r.route_end = 'end'
df


In [None]:

# df.to_sql('price_training_raw_2024_usd', sqlite_eng, if_exists= 'replace',index=True, index_label='dispatch_id')





In [None]:
#| hide
import nbdev; nbdev.nbdev_export()