In [192]:
import pandas as pd
from sqlalchemy import create_engine
import datetime as dt
import psycopg2
import requests

In [193]:
POSTGRES_USER="ariel"
POSTGRES_PASS="ariel"
POSTGRES_HOST="localhost"
POSTGRES_DB="eco_bikes"
POSTGRES_PORT="5432"
POSTGRES_SCHEMA="eco_bikes"
DB_STR = f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASS}@{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"

In [194]:
conn = create_engine(DB_STR, connect_args={'options': f'-csearch_path={POSTGRES_SCHEMA}'})

In [195]:
data_scr=pd.read_parquet("/tmp/station_info_eco_bikes.parquet")

In [196]:
data_scr.drop(columns="reload_id",inplace=True)

In [197]:
data_scr.columns

Index(['station_id', 'station_name', 'physical_configuration', 'lat', 'lon',
       'altitude', 'address', 'capacity', 'is_charging_station',
       'nearby_distance', '_ride_code_support'],
      dtype='object')

In [198]:
data_target=pd.read_sql_query("SELECT * FROM eco_bikes.station_info_eco_bikes",con=conn)

In [199]:
data_target

Unnamed: 0,pk_surrogate_station_info,station_id,station_name,physical_configuration,lat,lon,altitude,address,capacity,is_charging_station,nearby_distance,_ride_code_support,start_date,end_date,is_active


## CODE TO MAKE SCD

In [200]:
datetime_now=pd.to_datetime(dt.datetime.now(),format="%Y-%m-%d %HH:%MM:%SS")

In [201]:
data_target_current=data_target[(data_target["is_active"]==1)]

In [202]:
df_merge_col=pd.merge(data_scr,data_target_current,on='station_id',how='left')

In [203]:
new_records_filter=pd.isnull(df_merge_col).any(axis=1)


In [204]:
df_new_records = df_merge_col[new_records_filter]

In [205]:
df_excluding_new = pd.concat([df_merge_col, df_new_records],axis=0).drop_duplicates(keep=False)

In [206]:
df_new_records_final=df_new_records.copy()
df_new_records_final=df_new_records_final[['station_id',"station_name_x","physical_configuration_x","lat_x","lon_x","altitude_x","address_x","capacity_x","is_charging_station_x","nearby_distance_x","_ride_code_support_x"]]
df_new_records_final.columns=['station_id', 'station_name', 'physical_configuration', 'lat', 'lon','altitude', 'address', 'capacity', 'is_charging_station','nearby_distance', '_ride_code_support']

In [207]:
df_new_records_final['start_date']=datetime_now
df_new_records_final['end_date']="9999-12-30 00:00:00"
df_new_records_final['is_active']=1

In [208]:
df_scd2_records=df_excluding_new[(df_excluding_new["station_name_x"]!=df_excluding_new["station_name_y"] ) | 
                                 (df_excluding_new["physical_configuration_x"]!=df_excluding_new["physical_configuration_y"]) | 
                                 (df_excluding_new["lat_x"]!=df_excluding_new["lat_y"]) | 
                                 (df_excluding_new["lon_x"]!=df_excluding_new["lon_y"]) | 
                                 (df_excluding_new["altitude_x"]!=df_excluding_new["altitude_y"]) | 
                                 (df_excluding_new["address_x"]!=df_excluding_new["address_y"]) | 
                                 (df_excluding_new["capacity_x"]!=df_excluding_new["capacity_y"]) | 
                                 (df_excluding_new["is_charging_station_x"]!=df_excluding_new["is_charging_station_y"]) | 
                                 (df_excluding_new["nearby_distance_x"]!=df_excluding_new["nearby_distance_y"]) | 
                                 (df_excluding_new["_ride_code_support_x"]!=df_excluding_new["_ride_code_support_y"])]

In [209]:
df_scd2_records_final_replace=df_scd2_records.copy()
df_scd2_records_final_replace=df_scd2_records_final_replace[['station_id',"station_name_y","physical_configuration_y","lat_y","lon_y","altitude_y","address_y","capacity_y","is_charging_station_y","nearby_distance_y","_ride_code_support_y","start_date"]]
df_scd2_records_final_replace.columns=['station_id', 'station_name', 'physical_configuration', 'lat', 'lon','altitude', 'address', 'capacity', 'is_charging_station','nearby_distance', '_ride_code_support',"start_date"]
df_scd2_records_final_replace['start_date']=df_scd2_records_final_replace['start_date'].astype(str)
df_scd2_records_final_replace['end_date']=datetime_now
df_scd2_records_final_replace['is_active']=0
df_scd2_records_final_replace

Unnamed: 0,station_id,station_name,physical_configuration,lat,lon,altitude,address,capacity,is_charging_station,nearby_distance,_ride_code_support,start_date,end_date,is_active


In [210]:
df_scd2_records_final_append=df_scd2_records.copy()
df_scd2_records_final_append=df_scd2_records_final_append[['station_id',"station_name_x","physical_configuration_x","lat_x","lon_x","altitude_x","address_x","capacity_x","is_charging_station_x","nearby_distance_x","_ride_code_support_x"]]
df_scd2_records_final_append.columns=['station_id', 'station_name', 'physical_configuration', 'lat', 'lon','altitude', 'address', 'capacity', 'is_charging_station','nearby_distance', '_ride_code_support']
df_scd2_records_final_append['start_date']=datetime_now
df_scd2_records_final_append['end_date']="9999-12-30 00:00:00"
df_scd2_records_final_append['is_active']=1
df_scd2_records_final_append

Unnamed: 0,station_id,station_name,physical_configuration,lat,lon,altitude,address,capacity,is_charging_station,nearby_distance,_ride_code_support,start_date,end_date,is_active


# LOAD TO DATABSE

In [211]:
try:
    connection = psycopg2.connect(user="ariel",
                                  password="ariel",
                                  host="localhost",
                                  port="5432",
                                  database="eco_bikes")
    cur=connection.cursor()
    for index, row in df_scd2_records_final_replace.iterrows():
        # Assuming your_table_name is the name of the table you want to update
        # Assuming your_primary_key_column is the primary key column of your table
        # Assuming your_primary_key_value is the value of the primary key for the specific row you want to update
        update_query = f"UPDATE eco_bikes.station_info_eco_bikes SET "
        
        # Dynamically construct the SET clause of the update query
        set_clauses = ', '.join([f"{col} = %s" for col in df_scd2_records_final_replace.columns])
        update_query += set_clauses
        
        # Specify the WHERE clause for the specific row to update
        update_query += f" WHERE station_id = '{row['station_id']}' and is_active=1 "
        
        # Extract values from the DataFrame
        values = tuple([row[col] for col in df_scd2_records_final_replace.columns])
               
        # Execute the update query with parameterized values
        cur.execute(update_query, values)
        connection.commit()
except (Exception, psycopg2.Error) as error:
    print("Error while fetching data from PostgreSQL", error)

finally:
    cur.close()
    connection.close()

In [212]:
df_new_records_final.to_sql('station_info_eco_bikes',index=False,con=conn,if_exists="append")

351

In [213]:
df_scd2_records_final_append.to_sql('station_info_eco_bikes',index=False,con=conn,if_exists="append")

0

In [None]:
def load_station_info_eco_bikes:

In [None]:
def load_station_info(path_parquet):
    logging.info(f"Reading parquet file: {path_parquet['station_info_eco_bikes']}.parquet in {path_parquet['station_info_eco_bikes']}")
    data_src = pd.read_parquet(path_parquet['station_info_eco_bikes'])
        data_target=pd.read_sql_query("SELECT * FROM eco_bikes.station_info_eco_bikes",con=conn)
    datetime_now=pd.to_datetime(dt.datetime.now(),format="%Y-%m-%d %HH:%MM:%SS")
    data_target_current=data_target[(data_target["is_active"]==1)]
    df_merge_col=pd.merge(data_scr,data_target_current,on='station_id',how='left')
    new_records_filter=pd.isnull(df_merge_col).any(axis=1)

    df_new_records = df_merge_col[new_records_filter]
    df_excluding_new = pd.concat([df_merge_col, df_new_records],axis=0).drop_duplicates(keep=False)
    df_new_records_final=df_new_records.copy()
    df_new_records_final=df_new_records_final[['station_id',"station_name_x","physical_configuration_x","lat_x","lon_x","altitude_x","address_x","capacity_x","is_charging_station_x","nearby_distance_x","_ride_code_support_x"]]
    df_new_records_final.columns=['station_id', 'station_name', 'physical_configuration', 'lat', 'lon','altitude', 'address', 'capacity', 'is_charging_station','nearby_distance', '_ride_code_support']
    df_new_records_final['start_date']=datetime_now
    df_new_records_final['end_date']="9999-12-30 00:00:00"
    df_new_records_final['is_active']=1
    df_scd2_records=df_excluding_new[(df_excluding_new["station_name_x"]!=df_excluding_new["station_name_y"] ) | 
                                    (df_excluding_new["physical_configuration_x"]!=df_excluding_new["physical_configuration_y"]) | 
                                    (df_excluding_new["lat_x"]!=df_excluding_new["lat_y"]) | 
                                    (df_excluding_new["lon_x"]!=df_excluding_new["lon_y"]) | 
                                    (df_excluding_new["altitude_x"]!=df_excluding_new["altitude_y"]) | 
                                    (df_excluding_new["address_x"]!=df_excluding_new["address_y"]) | 
                                    (df_excluding_new["capacity_x"]!=df_excluding_new["capacity_y"]) | 
                                    (df_excluding_new["is_charging_station_x"]!=df_excluding_new["is_charging_station_y"]) | 
                                    (df_excluding_new["nearby_distance_x"]!=df_excluding_new["nearby_distance_y"]) | 
                                    (df_excluding_new["_ride_code_support_x"]!=df_excluding_new["_ride_code_support_y"])]
    df_scd2_records_final_replace=df_scd2_records.copy()
    df_scd2_records_final_replace=df_scd2_records_final_replace[['station_id',"station_name_y","physical_configuration_y","lat_y","lon_y","altitude_y","address_y","capacity_y","is_charging_station_y","nearby_distance_y","_ride_code_support_y","start_date"]]
    df_scd2_records_final_replace.columns=['station_id', 'station_name', 'physical_configuration', 'lat', 'lon','altitude', 'address', 'capacity', 'is_charging_station','nearby_distance', '_ride_code_support',"start_date"]
    df_scd2_records_final_replace['start_date']=df_scd2_records_final_replace['start_date'].astype(str)
    df_scd2_records_final_replace['end_date']=datetime_now
    df_scd2_records_final_replace['is_active']=0
    df_scd2_records_final_replace
    df_scd2_records_final_append=df_scd2_records.copy()
    df_scd2_records_final_append=df_scd2_records_final_append[['station_id',"station_name_x","physical_configuration_x","lat_x","lon_x","altitude_x","address_x","capacity_x","is_charging_station_x","nearby_distance_x","_ride_code_support_x"]]
    df_scd2_records_final_append.columns=['station_id', 'station_name', 'physical_configuration', 'lat', 'lon','altitude', 'address', 'capacity', 'is_charging_station','nearby_distance', '_ride_code_support']
    df_scd2_records_final_append['start_date']=datetime_now
    df_scd2_records_final_append['end_date']="9999-12-30 00:00:00"
    df_scd2_records_final_append['is_active']=1
    df_scd2_records_final_append

    try:
        connection = psycopg2.connect(user="ariel",
                                    password="ariel",
                                    host="localhost",
                                    port="5432",
                                    database="eco_bikes")
        cur=connection.cursor()
        for index, row in df_scd2_records_final_replace.iterrows():
            # Assuming your_table_name is the name of the table you want to update
            # Assuming your_primary_key_column is the primary key column of your table
            # Assuming your_primary_key_value is the value of the primary key for the specific row you want to update
            update_query = f"UPDATE eco_bikes.station_info_eco_bikes SET "
            
            # Dynamically construct the SET clause of the update query
            set_clauses = ', '.join([f"{col} = %s" for col in df_scd2_records_final_replace.columns])
            update_query += set_clauses
            
            # Specify the WHERE clause for the specific row to update
            update_query += f" WHERE station_id = '{row['station_id']}' and is_active=1 "
            
            # Extract values from the DataFrame
            values = tuple([row[col] for col in df_scd2_records_final_replace.columns])
                
            # Execute the update query with parameterized values
            cur.execute(update_query, values)
            connection.commit()
        df_new_records_final.to_sql('station_info_eco_bikes',index=False,con=conn,if_exists="append")
        df_scd2_records_final_append.to_sql('station_info_eco_bikes',index=False,con=conn,if_exists="append")
    except (Exception, psycopg2.Error) as error:
        print("Error while fetching data from PostgreSQL", error)
    finally:
        cur.close()
        connection.close()
    