In [59]:
#Importing Dependencies

import pandas as pd
import requests
import json
import time
from datetime import datetime
import pickle
import psycopg2
from psycopg2 import sql
from dotenv import load_dotenv 
import os
import great_expectations as ge
from great_expectations.validator.validator import Validator
from great_expectations.execution_engine.pandas_execution_engine import PandasExecutionEngine
from great_expectations.core.batch import Batch
from great_expectations.core import ExpectationSuite
import tempfile


In [18]:

from great_expectations.core.batch import Batch
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.core.expectation_suite import ExpectationSuite

In [58]:
#Extracting the data from the api

app_id = os.getenv('APP_ID'),
APi_key = os.getenv('API_KEY'),
POLL_INTERVAL = 60  # Time in seconds between requests

# Define the endpoint and parameters for scheduled data
station_code = 'WAT'  # Example station code (London Waterloo)
url = f'https://transportapi.com/v3/uk/train/station/{station_code}/live.json'

params = {
    'app_id': app_id,
    'api_key': APi_key,
    #'time_of_day': '19:00',
    #'request_time': '2024-10-30T18:50:00+00:00',
    'darwin': 'false',  
    'train_status': 'passenger',  # Status filter, e.g., passenger trains only
    'live' :'True',
    #'station_detail': 'destination'
}
response = requests.get(url, params=params)

if response.status_code == 200:
    response= response.json()
else:
    print(f"Failed to fetch data: {response.status_code}")

In [5]:
response

{'date': '2024-11-05',
 'time_of_day': '22:03',
 'request_time': '2024-11-05T22:03:27+00:00',
 'station_name': 'London Waterloo',
 'station_code': 'WAT',
 'departures': {'all': [{'mode': 'train',
    'service': '24620204',
    'train_uid': 'L56714',
    'platform': '8',
    'operator': 'SW',
    'operator_name': 'South Western Railway',
    'aimed_departure_time': '22:05',
    'aimed_arrival_time': None,
    'aimed_pass_time': None,
    'origin_name': 'London Waterloo',
    'destination_name': 'Bournemouth',
    'source': 'Network Rail',
    'category': 'XX',
    'service_timetable': {'id': 'https://transportapi.com/v3/uk/train/service_timetables/L56714:2024-11-05.json?app_id=e2e4d023&app_key=bf7baff0c295d4fc1ab3e121837d3d68&live=true'},
    'status': 'STARTS HERE',
    'expected_arrival_time': None,
    'expected_departure_time': '22:05',
    'best_arrival_estimate_mins': None,
    'best_departure_estimate_mins': 1},
   {'mode': 'train',
    'service': '24673205',
    'train_uid': 'L6

In [6]:
with open("response.pkl", "wb") as f:
    pickle.dump(response, f)

In [7]:
train_columns = []

#date = response['date']
request_time = response['request_time']
station_name = response['station_name']

train_info = {
    #'date': date,
    'request_time': request_time,
    'station_name': station_name
}
train_columns.append(train_info)

train_columns

[{'request_time': '2024-11-05T22:03:27+00:00',
  'station_name': 'London Waterloo'}]

In [8]:
train_columns_df = pd.DataFrame(train_columns)
train_columns_df.reset_index(inplace=True)
train_columns_df

Unnamed: 0,index,request_time,station_name
0,0,2024-11-05T22:03:27+00:00,London Waterloo


In [9]:
train__departure_columns = []

for columns in response['departures']['all']:
    try:
        row ={
            'mode':columns['mode'],
            'train_uid': columns['train_uid'],
            'origin_name': columns['origin_name'],
            'operator_name':columns['operator_name'],
            'platform': columns['platform'],
            'destination_name': columns['destination_name'],
            'aimed_departure_time': columns['aimed_departure_time'],
            'expected_departure_time': columns['expected_departure_time'],
            'best_departure_estimate_mins': columns['best_departure_estimate_mins'],
            'aimed_arrival_time': columns['aimed_arrival_time']
        }
        train__departure_columns.append( row)
    except (KeyError, TypeError) as e:
        print(f"Error processing columns: {e}")     

print(train__departure_columns)

[{'mode': 'train', 'train_uid': 'L56714', 'origin_name': 'London Waterloo', 'operator_name': 'South Western Railway', 'platform': '8', 'destination_name': 'Bournemouth', 'aimed_departure_time': '22:05', 'expected_departure_time': '22:05', 'best_departure_estimate_mins': 1, 'aimed_arrival_time': None}, {'mode': 'train', 'train_uid': 'L60419', 'origin_name': 'London Waterloo', 'operator_name': 'South Western Railway', 'platform': '3', 'destination_name': 'Hampton Court', 'aimed_departure_time': '22:06', 'expected_departure_time': '22:06', 'best_departure_estimate_mins': 2, 'aimed_arrival_time': None}, {'mode': 'train', 'train_uid': 'L57830', 'origin_name': 'London Waterloo', 'operator_name': 'South Western Railway', 'platform': '15', 'destination_name': 'Fratton', 'aimed_departure_time': '22:08', 'expected_departure_time': '22:08', 'best_departure_estimate_mins': 4, 'aimed_arrival_time': None}, {'mode': 'train', 'train_uid': 'L60042', 'origin_name': 'London Waterloo', 'operator_name': 'S

In [10]:
train__departure_columns_df = pd.DataFrame(train__departure_columns)
train__departure_columns_df.reset_index(inplace=True)
train__departure_columns_df

Unnamed: 0,index,mode,train_uid,origin_name,operator_name,platform,destination_name,aimed_departure_time,expected_departure_time,best_departure_estimate_mins,aimed_arrival_time
0,0,train,L56714,London Waterloo,South Western Railway,8,Bournemouth,22:05,22:05,1,
1,1,train,L60419,London Waterloo,South Western Railway,3,Hampton Court,22:06,22:06,2,
2,2,train,L57830,London Waterloo,South Western Railway,15,Fratton,22:08,22:08,4,
3,3,train,L60042,London Waterloo,South Western Railway,6,Shepperton,22:12,22:12,8,
4,4,train,L61064,London Waterloo,South Western Railway,4,Chessington South,22:17,22:17,13,
5,5,train,L58735,London Waterloo,South Western Railway,23,Reading,22:20,22:20,16,
6,6,train,L59433,London Waterloo,South Western Railway,3,Woking,22:20,22:20,16,
7,7,train,L57340,London Waterloo,South Western Railway,10,Yeovil Junction,22:20,22:20,16,
8,8,train,L61888,London Waterloo,South Western Railway,21,Weybridge,22:22,22:22,18,
9,9,train,Y00132,London Waterloo,South Western Railway,11,Alton,22:23,22:23,19,


In [11]:
#Merging both dataframes

raw_train_schedule_df = pd.merge(train_columns_df, train__departure_columns_df, on='index', how='outer' )
raw_train_schedule_df

Unnamed: 0,index,request_time,station_name,mode,train_uid,origin_name,operator_name,platform,destination_name,aimed_departure_time,expected_departure_time,best_departure_estimate_mins,aimed_arrival_time
0,0,2024-11-05T22:03:27+00:00,London Waterloo,train,L56714,London Waterloo,South Western Railway,8,Bournemouth,22:05,22:05,1,
1,1,,,train,L60419,London Waterloo,South Western Railway,3,Hampton Court,22:06,22:06,2,
2,2,,,train,L57830,London Waterloo,South Western Railway,15,Fratton,22:08,22:08,4,
3,3,,,train,L60042,London Waterloo,South Western Railway,6,Shepperton,22:12,22:12,8,
4,4,,,train,L61064,London Waterloo,South Western Railway,4,Chessington South,22:17,22:17,13,
5,5,,,train,L58735,London Waterloo,South Western Railway,23,Reading,22:20,22:20,16,
6,6,,,train,L59433,London Waterloo,South Western Railway,3,Woking,22:20,22:20,16,
7,7,,,train,L57340,London Waterloo,South Western Railway,10,Yeovil Junction,22:20,22:20,16,
8,8,,,train,L61888,London Waterloo,South Western Railway,21,Weybridge,22:22,22:22,18,
9,9,,,train,Y00132,London Waterloo,South Western Railway,11,Alton,22:23,22:23,19,


In [17]:
from great_expectations.data_context.data_context.ephemeral_data_context import EphemeralDataContext
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.execution_engine.pandas_execution_engine import PandasExecutionEngine
from great_expectations.validator.validator import Validator
from great_expectations.core import ExpectationSuite
from great_expectations.core.data_context.config import DataContextConfig



ModuleNotFoundError: No module named 'great_expectations.core.data_context'

In [20]:
from great_expectations.data_context import EphemeralDataContext
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.execution_engine.pandas_execution_engine import PandasExecutionEngine
from great_expectations.validator.validator import Validator
from great_expectations.core import ExpectationSuite


In [23]:
from great_expectations.data_context.types.base import DataContextConfig

In [29]:
#validating the ingested data

try:
    

    

    #raw_execution_engine = PandasExecutionEngine()


    # Set up an execution engine
   

    # Define your DataFrame (assuming train_schedule_df is your DataFrame)
    raw_schedule_df = raw_train_schedule_df

    raw_execution_engine = PandasExecutionEngine()

    context_config = DataContextConfig(
    config_version=1,
    stores={
        "expectations_store": {
            "class_name": "ExpectationsStore",
        },
        "validations_store": {
            "class_name": "ValidationsStore",
        },
        "evaluation_parameter_store": {
            "class_name": "EvaluationParameterStore",
        },
    },
    expectations_store_name="expectations_store",
    #evaluation_parameter_store_name="evaluation_parameter_store",
    data_docs_sites={},
    #anonymous_usage_statistics={"enabled": False}
    )

    context = EphemeralDataContext(project_config=context_config)



    


    #context = EphemeralDataContext( project_config=None, runtime_environment=None )

    # Add or update Expectation Suite 
    suite = context.add_or_update_expectation_suite("raw_train_schedule_suite", overwrite_existing=True)

    batch_request = RuntimeBatchRequest(
    datasource_name="my_datasource",
    data_connector_name="my_data_connector",
    data_asset_name="my_data_asset",
    runtime_parameters={"batch_data": raw_train_schedule_df},
    batch_identifiers={"default_identifier_name": "default_identifier"}
    )

    raw_validator = Validator(
    execution_engine=raw_execution_engine,
    batches=[batch_request],
    expectation_suite=suite
    )


    # Add expectations directly to the Validator
    raw_validator.expect_column_values_to_not_be_null(column="index")
    raw_validator.expect_column_values_to_not_be_null(column="request_time")
    raw_validator.expect_column_values_to_not_be_null(column="station_name")
    raw_validator.expect_column_values_to_not_be_null(column="mode")
    raw_validator.expect_column_values_to_not_be_null(column="train_uid")
    raw_validator.expect_column_values_to_not_be_null(column="origin_name")
    raw_validator.expect_column_values_to_not_be_null(column="operator_name")
    raw_validator.expect_column_values_to_not_be_null(column="platform")
    raw_validator.expect_column_values_to_not_be_null(column="destination_name")
    raw_validator.expect_column_values_to_not_be_null(column="aimed_departure_time")
    raw_validator.expect_column_values_to_not_be_null(column="expected_departure_time")
    raw_validator.expect_column_values_to_not_be_null(column="best_departure_estimate_mins")
    raw_validator.expect_column_values_to_not_be_null(column="aimed_arrival_time") 

    # Step 6: Validate the DataFrame and print the results
    raw_results = raw_validator.validate()
    print("Validation results:", raw_results)

except Exception as e:
    print(f"Validation failed: {e}")

    

Error The module: `great_expectations.data_context.store` does not contain the class: `ValidationsStore`.
        - Please verify that the class named `ValidationsStore` exists. occurred while attempting to instantiate a store.


Validation failed: The module "great_expectations.data_context.store" exists; however, the system is unable to create an instance of the class "ValidationsStore", searched for inside this module.  Please make sure that the class named "ValidationsStore" is properly defined inside its intended module and declared correctly by the calling entity.  This error is unrecoverable.
            


In [43]:
print([f"'{col}'" for col in raw_train_schedule_df.columns])


["'id'", "'request_date_time'", "'station_name'", "'mode'", "'train_uid'", "'origin_name'", "'operator_name'", "'platform'", "'destination_name'", "'aimed_departure_time'", "'expected_departure_time'", "'best_departure_estimate_mins'", "'aimed_arrival_time'"]


In [31]:
train_schedule_df = raw_train_schedule_df

In [32]:
train_schedule_df.rename(columns= {'index':'id'}, inplace = True)

In [33]:
#Renaming request time column

train_schedule_df.rename(columns= {'request_time':'request_date_time'}, inplace = True)

In [34]:
train_schedule_df['request_date_time'] = train_schedule_df['request_date_time'].ffill()


In [35]:
train_schedule_df['station_name'] = train_schedule_df['station_name'].ffill()

In [36]:
#Filling up missing values

train_schedule_df['aimed_arrival_time'].fillna('Unknonwn', inplace=True)
train_schedule_df.reset_index(drop=True, inplace=True)
#train_schedule_df = train_schedule_df.loc[:, ~train_schedule_df.columns.str.contains('^level_')]


In [37]:
train_schedule_df

Unnamed: 0,id,request_date_time,station_name,mode,train_uid,origin_name,operator_name,platform,destination_name,aimed_departure_time,expected_departure_time,best_departure_estimate_mins,aimed_arrival_time
0,0,2024-11-05T22:03:27+00:00,London Waterloo,train,L56714,London Waterloo,South Western Railway,8,Bournemouth,22:05,22:05,1,Unknonwn
1,1,2024-11-05T22:03:27+00:00,London Waterloo,train,L60419,London Waterloo,South Western Railway,3,Hampton Court,22:06,22:06,2,Unknonwn
2,2,2024-11-05T22:03:27+00:00,London Waterloo,train,L57830,London Waterloo,South Western Railway,15,Fratton,22:08,22:08,4,Unknonwn
3,3,2024-11-05T22:03:27+00:00,London Waterloo,train,L60042,London Waterloo,South Western Railway,6,Shepperton,22:12,22:12,8,Unknonwn
4,4,2024-11-05T22:03:27+00:00,London Waterloo,train,L61064,London Waterloo,South Western Railway,4,Chessington South,22:17,22:17,13,Unknonwn
5,5,2024-11-05T22:03:27+00:00,London Waterloo,train,L58735,London Waterloo,South Western Railway,23,Reading,22:20,22:20,16,Unknonwn
6,6,2024-11-05T22:03:27+00:00,London Waterloo,train,L59433,London Waterloo,South Western Railway,3,Woking,22:20,22:20,16,Unknonwn
7,7,2024-11-05T22:03:27+00:00,London Waterloo,train,L57340,London Waterloo,South Western Railway,10,Yeovil Junction,22:20,22:20,16,Unknonwn
8,8,2024-11-05T22:03:27+00:00,London Waterloo,train,L61888,London Waterloo,South Western Railway,21,Weybridge,22:22,22:22,18,Unknonwn
9,9,2024-11-05T22:03:27+00:00,London Waterloo,train,Y00132,London Waterloo,South Western Railway,11,Alton,22:23,22:23,19,Unknonwn


In [38]:
#Saving file to CSV
train_schedule_df.to_csv('train_schedule.csv')

In [39]:
#validating transformed data

try:

    suite = ExpectationSuite("train_schedule_suite")


    # Set up an execution engine
    execution_engine = PandasExecutionEngine()

    # Use a Batch to wrap the DataFrame
    batch = Batch(data=train_schedule_df)

    # Step 3: Create an Expectation Suite
    #suite_name = "train_schedule_suite"
    #suite = ExpectationSuite(expectation_suite_name=suite_name)

    # Create a Validator with the Batch and ExpectationSuite
    validator = Validator(
        execution_engine=execution_engine,
        batches=[batch],
        expectation_suite=suite
    )

    # Add expectations directly to the Validator
    validator.expect_column_values_to_not_be_null(column="id")
    validator.expect_column_values_to_not_be_null(column="request_date_time")
    validator.expect_column_values_to_not_be_null(column="station_name")
    validator.expect_column_values_to_not_be_null(column="mode")
    validator.expect_column_values_to_not_be_null(column="train_uid")
    validator.expect_column_values_to_not_be_null(column="origin_name")
    validator.expect_column_values_to_not_be_null(column="operator_name")
    validator.expect_column_values_to_not_be_null(column="platform")
    validator.expect_column_values_to_not_be_null(column="destination_name")
    validator.expect_column_values_to_not_be_null(column="aimed_departure_time")
    validator.expect_column_values_to_not_be_null(column="expected_departure_time")
    validator.expect_column_values_to_not_be_null(column="best_departure_estimate_mins")
    validator.expect_column_values_to_not_be_null(column="aimed_arrival_time") 

    # Step 6: Validate the DataFrame and print the results
    transformed_results = validator.validate()
    print("Validation results:", transformed_results)

except Exception as e:
    print(f"Data Validation failed{e}")



# Show all expectations added to the suite
#print("Expectations in suite:", validator.get_expectation_suite().expectations)

Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 360.13it/s]
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 354.12it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 459.07it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 247.41it/s]
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 404.55it/s]
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 421.71it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 438.15it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 232.81it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 442.54it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 486.90it/s] 
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 480.37it/s]
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 385.11it/s]
Calculating Metrics: 100%|██████████| 6/6 [00:00<00:00, 159.10it/s]
Calculating Metrics: 100%|██████████| 42/42 [00:00<00:00, 497.13it/s]

Validation results: {
  "success": true,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "column": "id"
        },
        "meta": {}
      },
      "result": {
        "element_count": 50,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "partial_unexpected_list": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_traceback": null,
        "exception_message": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "column": "request_date_time"
        },
        "meta": {}
      },
      "result": {
        "element_count": 50,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "partial_unexpected_list": []
      },
      "meta": {},
      "exception_info": {
    




In [55]:
# Database connection details

local_conn = psycopg2.connect(
                        host= 'localhost', 
                        database = 'velocity_railway', 
                        user = 'postgres', 
                        password = 'Password', 
                        port= '5432'
)

# Connect to Azure and Local PostgreSQL
#azure_conn = psycopg2.connect(**azure_conn_details)
#local_conn = psycopg2.connect(local_conn_details)

cursor = local_conn.cursor()

#Create table

#Execute query to create table
cursor.execute("""CREATE TABLE train_schedule(
    id SERIAL PRIMARY KEY,
    request_date_time  VARCHAR (100),
    station_name VARCHAR (100),
    mode VARCHAR (100),
    train_uid VARCHAR (100),
    origin_name VARCHAR (100),
    operator_name VARCHAR (100),
    platform VARCHAR(100),
    destination_name VARCHAR(100),
    aimed_departure_time VARCHAR (100),
    expected_departure_time VARCHAR(100),
    best_departure_estimate_mins INT,
    aimed_arrival_time VARCHAR (100)

);
""")

#committing the query to database
local_conn.commit()

In [57]:
# Data loading function
def load_data_to_db(local_conn, train_schedule_df, train_schedule):
    cursor = local_conn.cursor()
    
    # Define the SQL insert query with correct syntax
    insert_query = f'''
    INSERT INTO {train_schedule} (
        id, request_date_time, station_name, mode, train_uid, origin_name, operator_name, platform, 
        destination_name, aimed_departure_time, expected_departure_time, best_departure_estimate_mins, aimed_arrival_time
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    '''
    
    # Define rows by selecting individual columns in the correct format
    rows = [tuple(x) for x in train_schedule_df[[
        "id", "request_date_time", "station_name", "mode", "train_uid", "origin_name",
        "operator_name", "platform", "destination_name", "aimed_departure_time",
        "expected_departure_time", "best_departure_estimate_mins", "aimed_arrival_time"
    ]].values]
    
    try:
        cursor.executemany(insert_query, rows)
        local_conn.commit()
        print(f"Data loaded successfully into {train_schedule}")
    except Exception as e:
        local_conn.rollback()
        print(f"Error loading data into {train_schedule}: {e}")
    finally:
        cursor.close()

# Define DataFrame and load data
train_schedule = 'train_schedule'
# Ensure 'train_schedule_df' is the DataFrame you want to load
load_data_to_db(local_conn, train_schedule_df, train_schedule)

# Close connections
local_conn.close()



Data loaded successfully into train_schedule


In [82]:
#Loading data into postgresql database

# Defining database connection parameters
conn_params = {
    "host": "postgres-server123.postgres.database.azure.com",
    "database": "train_schedule",
    "user": "adminuser",
    "password": "Password1",
    "port": "5432",
    "sslmode": "require"
}

# Connect to the database
try:
    azure_conn = psycopg2.connect(**conn_params)
    print("Connection successful")
except Exception as e:
    print(f"Error connecting to database: {e}")


Connection successful


In [83]:
azure_cursor = azure_conn.cursor()

#Create table

#Execute query to create table
azure_cursor.execute("""CREATE TABLE train_schedule(
    id SERIAL PRIMARY KEY,
    request_date_time  VARCHAR (100),
    station_name VARCHAR (100),
    mode VARCHAR (100),
    train_uid VARCHAR (100),
    origin_name VARCHAR (100),
    operator_name VARCHAR (100),
    platform VARCHAR(100),
    destination_name VARCHAR(100),
    aimed_departure_time VARCHAR (100),
    expected_departure_time VARCHAR(100),
    best_departure_estimate_mins INT,
    aimed_arrival_time VARCHAR (100)

);
""")

#committing the query to database
azure_conn.commit()

In [84]:
# Data loading function to azure db
def load_data_to_azure(azure_conn, train_schedule_df, train_schedule):
    azure_cursor = azure_conn.cursor()
    
    # Define the SQL insert query with correct syntax
    insert_query = f'''
    INSERT INTO {train_schedule} (
        id, request_date_time, station_name, mode, train_uid, origin_name, operator_name, platform, 
        destination_name, aimed_departure_time, expected_departure_time, best_departure_estimate_mins, aimed_arrival_time
    ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
    '''
    
    # Define rows by selecting individual columns in the correct format
    rows = [tuple(x) for x in train_schedule_df[[
        "id", "request_date_time", "station_name", "mode", "train_uid", "origin_name",
        "operator_name", "platform", "destination_name", "aimed_departure_time",
        "expected_departure_time", "best_departure_estimate_mins", "aimed_arrival_time"
    ]].values]
    
    try:
        azure_cursor.executemany(insert_query, rows)
        azure_conn.commit()
        print(f"Data loaded successfully into {train_schedule}")
    except Exception as e:
        azure_conn.rollback()
        print(f"Error loading data into {train_schedule}: {e}")
    finally:
        azure_cursor.close()

# Define DataFrame and load data
train_schedule = 'train_schedule'
# Ensure 'train_schedule_df' is the DataFrame you want to load
load_data_to_db(azure_conn, train_schedule_df, train_schedule)

# Close connections
azure_conn.close()

Data loaded successfully into train_schedule
