## Swiss Trains Data to Elasticsearch
This notebook demonstrates how to load a CSV file containing Swiss train data into a Pandas DataFrame, process the data, and insert it into an Elasticsearch index using the `eland` library.
You will need to modify the `cloud_id` and `api_key` variables with your Elasticsearch Cloud credentials.

In [1]:
# Import necessary libraries
import eland as ed
import pandas as pd
from elasticsearch import Elasticsearch

### Step 1: Define your variables
Specify the `filename` for the CSV file, and provide the `cloud_id` and `api_key` for your Elasticsearch cluster. You must replace `'CHANGE_ME'` with your actual credentials.

In [2]:
# Define variables for the filename, cloud ID, and API key
filename = 'swiss_trains_20240913.csv'
cloud_id = 'CHANGEME'  # Replace with your actual Elasticsearch Cloud ID
api_key = 'CHANGEME'  # Replace with your actual Elasticsearch API key

### Step 2: Read the CSV file
The file is read into a pandas DataFrame using `pd.read_csv`. This reads the Swiss train data from the CSV file and stores it in a structured format.

In [3]:
# Read the CSV file into a pandas DataFrame
df = pd.read_csv(filename, sep=';')
df.head()  # Display the first few rows of the DataFrame for inspection

Unnamed: 0,Betriebstag,Fahrt Bezeichner,Betreiber ID,Betreiber Abkürzung,Betreiber Name,Produkt ID,Linie,Linien Text,circulation id,Verkehrsmittel Text,...,is passing,Ankunftsverspätung,departure delay,Didok-Nummer,Name Haltestelle,Abkuerzung Bahnhof,lod,Geoposition,sloid,Transportunternehmung (Nummer)
0,2024-09-12,ch:1:sjyid:100001:102-001,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,102,ICE,,ICE,...,False,False,True,90.0,Basel Bad Bf,BAD,http://lod.opentransportdata.swiss/didok/didok85,"47.567307905698264, 7.6069204184730825",ch:1:sloid:90,612.0
1,2024-09-12,ch:1:sjyid:100001:103-001,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,103,ICE,,ICE,...,False,False,False,90.0,Basel Bad Bf,BAD,http://lod.opentransportdata.swiss/didok/didok85,"47.567307905698264, 7.6069204184730825",ch:1:sloid:90,612.0
2,2024-09-12,ch:1:sjyid:100001:1058-011,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,1058,IC61,,IC,...,False,False,False,7492.0,Interlaken Ost,IO,http://lod.opentransportdata.swiss/didok/didok85,"46.69049999618799, 7.869000004346448",ch:1:sloid:7492,35.0
3,2024-09-12,ch:1:sjyid:100001:1058-011,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,1058,IC61,,IC,...,False,False,False,7493.0,Interlaken West,IW,http://lod.opentransportdata.swiss/didok/didok85,"46.68262798035661, 7.851453137595281",ch:1:sloid:7493,33.0
4,2024-09-12,ch:1:sjyid:100001:1058-011,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,1058,IC61,,IC,...,False,False,False,7483.0,Spiez,SP,http://lod.opentransportdata.swiss/didok/didok85,"46.68639566834843, 7.680103057796672",ch:1:sloid:7483,33.0


### Step 3: Rename columns
To make the data easier to work with, the column names are renamed to more user-friendly, English names. This step ensures the data can be processed more easily.

In [4]:
# Rename columns to make them more readable and user-friendly
df.rename(
    columns={
        'Betriebstag': 'OperatingDay',
        'Fahrt Bezeichner': 'TripIdentifier',
        'Betreiber ID': 'OperatorID',
        'Betreiber Abkürzung': 'OperatorAbbreviation',
        'Betreiber Name': 'OperatorName',
        'Produkt ID': 'ProductID',
        'Linie': 'Line',
        'Linien Text': 'LineDescription',
        'circulation id': 'CirculationID',
        'Verkehrsmittel Text': 'VehicleTypeDescription',
        'Zusatzfahrt TF': 'AdditionalTrip',
        'Fällt aus': 'Canceled',
        'BPUIC': 'RailwayInfrastructureCode',
        'Haltestellen Name': 'StopName',
        'Ankunftszeit': 'ArrivalTime',
        'An Prognose': 'ArrivalForecast',
        'An Prognose Status': 'ArrivalForecastStatus',
        'Abfahrtszeit': 'DepartureTime',
        'departure forecast': 'DepartureForecast',
        'Ab Prognsoe Status': 'DepartureForecastStatus',
        'is passing': 'IsPassing',
        'Ankunftsverspätung': 'ArrivalDelay',
        'departure delay': 'DepartureDelay',
        'Didok-Nummer': 'SwissStopIdentifier',
        'Name Haltestelle': 'StopName',
        'Abkuerzung Bahnhof': 'StationAbbreviation',
        'lod': 'LevelOfDetail',
        'Geoposition': 'Geoposition',
        'sloid': 'StopLocationIdentifier',
        'Transportunternehmung (Nummer)': 'TransportationCompanyNumber',
    },
    inplace=True
)
df.head()  # Check the first few rows after renaming columns

Unnamed: 0,OperatingDay,TripIdentifier,OperatorID,OperatorAbbreviation,OperatorName,ProductID,Line,LineDescription,CirculationID,VehicleTypeDescription,...,IsPassing,ArrivalDelay,DepartureDelay,SwissStopIdentifier,StopName,StationAbbreviation,LevelOfDetail,Geoposition,StopLocationIdentifier,TransportationCompanyNumber
0,2024-09-12,ch:1:sjyid:100001:102-001,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,102,ICE,,ICE,...,False,False,True,90.0,Basel Bad Bf,BAD,http://lod.opentransportdata.swiss/didok/didok85,"47.567307905698264, 7.6069204184730825",ch:1:sloid:90,612.0
1,2024-09-12,ch:1:sjyid:100001:103-001,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,103,ICE,,ICE,...,False,False,False,90.0,Basel Bad Bf,BAD,http://lod.opentransportdata.swiss/didok/didok85,"47.567307905698264, 7.6069204184730825",ch:1:sloid:90,612.0
2,2024-09-12,ch:1:sjyid:100001:1058-011,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,1058,IC61,,IC,...,False,False,False,7492.0,Interlaken Ost,IO,http://lod.opentransportdata.swiss/didok/didok85,"46.69049999618799, 7.869000004346448",ch:1:sloid:7492,35.0
3,2024-09-12,ch:1:sjyid:100001:1058-011,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,1058,IC61,,IC,...,False,False,False,7493.0,Interlaken West,IW,http://lod.opentransportdata.swiss/didok/didok85,"46.68262798035661, 7.851453137595281",ch:1:sloid:7493,33.0
4,2024-09-12,ch:1:sjyid:100001:1058-011,85:11,SBB,Schweizerische Bundesbahnen SBB,Zug,1058,IC61,,IC,...,False,False,False,7483.0,Spiez,SP,http://lod.opentransportdata.swiss/didok/didok85,"46.68639566834843, 7.680103057796672",ch:1:sloid:7483,33.0


### Step 4: Connect to Elasticsearch
Use the `cloud_id` and `api_key` to establish a connection to your Elasticsearch cluster. Ensure you replace these values with your actual credentials.

In [5]:
# Establish a connection to Elasticsearch
es = Elasticsearch(cloud_id=cloud_id, api_key=api_key)
es.ping()

True

### Step 5: Insert the data into Elasticsearch
Use the `eland.pandas_to_eland` function to insert the DataFrame into Elasticsearch. The data will be inserted into the `swiss_trains` index with appropriate type overrides for specific fields (e.g., `geo_point` for geographical data).

In [6]:
# Insert the data into Elasticsearch using eland's `pandas_to_eland` function
# The data will be inserted into the 'swiss_trains' index with data type overrides
ed_df = ed.pandas_to_eland(
    df,
    es_client=es,
    es_dest_index='swiss_trains',
    es_if_exists='replace',  # Replace the index if it already exists
    es_dropna=True,  # Drop rows with NaN values
    es_refresh=True,  # Refresh the index after inserting the documents
    es_type_overrides={  # Define type overrides for specific fields
        'Geoposition': 'geo_point',
        'OperatingDay': 'date',
        'DepartureTime': 'date',
        'ArrivalTime': 'date',
        'DepartureForecast': 'date',
        'ArrivalForecast': 'date',
    }
)
print('Data successfully inserted into Elasticsearch index: swiss_trains')

  for column, dtype in dataframe.dtypes.iteritems():


Data successfully inserted into Elasticsearch index: swiss_trains
