# ETL - Stage to Warehouse

Das Script bringt die umfragedaten der kuzu Zug in das gewünschte zielformat und speichert die tabellen in der localen Datenbank.

In [1]:
import pandas as pd
from sqlalchemy import create_engine, text, inspect, types
from sqlalchemy_utils import database_exists, create_database
import sqlalchemy
import os
from dotenv import load_dotenv
import numpy as np
import datetime 
import decimal

## Configuration

In [2]:
## Load .env file
load_dotenv('../config/.env')

True

### Connect to DB

In [3]:
## Import credentials for local DB from .env file
USERNAME_LOCAL = os.environ.get('USERNAME_LOCAL', "default")  
PASSWORD_LOCAL = os.environ.get('PASSWORD_LOCAL', "default")
ENDPOINT_LOCAL = os.environ.get('ENDPOINT_LOCAL', "default")
DATABASE_LOCAL = os.environ.get('DATABASE_LOCAL', "default")

In [4]:
conn_string = f'postgresql://{USERNAME_LOCAL}:{PASSWORD_LOCAL}@{ENDPOINT_LOCAL}/{DATABASE_LOCAL}'
engine = create_engine(conn_string)
dbconnect = engine.connect()

## Extract Data (local DB)

In [5]:
df_kuzu_zug_raw = pd.read_sql_table('kuzu_zug', dbconnect) #load full dataset

In [6]:
list(df_kuzu_zug_raw.columns)

['time',
 'participant',
 'UmfrageName',
 'file_name',
 'wime_puenktlich',
 'wime_komfort',
 'wime_fahrplan',
 'SF_kanal_zuf',
 'wime_infokanal',
 'wime_personal',
 'wime_kundenorientierung',
 'wime_mobile',
 'wime_platzangebot',
 'wime_preis_leistung',
 'Wime_Sauberkeit_BhfZiel',
 'Wime_Sauberkeit_BhfStart',
 'wime_sauberkeit',
 'wime_wc',
 'wime_oes_ziel',
 'wime_oes_start',
 'wime_oes_fahrt',
 'wime_sf_behebung',
 'wime_wc_verfuegb',
 'wime_gastro_ambiente',
 'wime_gastro_auswahl',
 'wime_gastro_freundlich',
 'wime_gastro_kompetenz',
 'wime_gastro_preis',
 'wime_gastro_qualitaet',
 'Wime_Wegweisung_BhfZiel',
 'Wime_Wegweisung_BhfStart',
 'BFH_sahre_non',
 'BFH_sahre_start',
 'BFH_sahre_ziel',
 'BHF_park',
 'BHF_park_non',
 'BHF_park_start',
 'BHF_park_ziel',
 'BHF_platz',
 'BHF_sauberkeit',
 'BHF_share',
 'BHF_umsteig',
 'BHF_velo',
 'BHF_velo_non',
 'BHF_velo_start',
 'BHF_velo_ziel',
 'BHF_wc',
 'BHF_wegweisung',
 'createdAt',
 'date_of_first_mail',
 'date_of_last_access',
 'devic

## Tranform Data

#### Kommentar without Label

In [7]:
df_text = df_kuzu_zug_raw[["participant",
                           "u_date",
                           "Kommentar",
                           "S_sprache"]]

df_text = df_text[df_text["Kommentar"]!="-66"]
df_text = df_text[df_text["S_sprache"]!="0"]
df_text = df_text[df_text.Kommentar.notnull()]
df_text = df_text[df_text.u_date.notnull()]
df_text = df_text[df_text.Kommentar.apply(lambda x: len(str(x))>=4)] # min 4 characters for valid comment
df_text["u_date"] = pd.to_datetime(df_text["u_date"])
df_text.sort_values(by='u_date', inplace=True)

# col Naming and order
df_text.rename(columns = {'participant':'ParticipantId','Kommentar':'Value','S_sprache':'Language','u_date':'Date'}, inplace = True)


#### Kommentare with Label

In [8]:
# load config file and create mask for filtering
labeled_kuzu_zug = pd.read_excel(open('../config/config.xlsx', 'rb'),sheet_name='Fragecodes')
labeled_kuzu_zug =list(labeled_kuzu_zug.loc[(labeled_kuzu_zug['UmfrageName']=='kuzu_zug') & (labeled_kuzu_zug['FrageType']== 'text_labeled')].reset_index()['FrageCode']) #select all codes of satisfaction questions
labeled_kuzu_zug.insert(0, "participant") #add participant for mapping

# filter df
df_text_labeled = df_kuzu_zug_raw[labeled_kuzu_zug]

# transform to long
df_text_labeled = pd.melt(df_text_labeled.reset_index(), id_vars='participant',value_vars=labeled_kuzu_zug, var_name='FrageCode', value_name='Value').reset_index(drop=True)


# join Date and Language
df_text_labeled = pd.merge(df_text_labeled, df_kuzu_zug_raw[['u_date','participant','S_sprache']],
                           left_on=  ['participant'],
                           right_on= ['participant'],
                           how = 'left')
# cleaining
df_text_labeled = df_text_labeled[df_text_labeled["Value"]!="-66"]
df_text_labeled = df_text_labeled[df_text_labeled.Value.notnull()]
df_text_labeled = df_text_labeled[df_text_labeled.Value.apply(lambda x: len(str(x))>=4)] # min 4 characters for valid comment
df_text_labeled = df_text_labeled[df_text_labeled["S_sprache"]!="0"]

# convert correct datatype
df_text_labeled = df_text_labeled[df_text_labeled.u_date.notnull()]
df_text_labeled["u_date"] = pd.to_datetime(df_text_labeled["u_date"])

# Order cols
df_text_labeled = df_text_labeled.reindex(['participant','u_date','FrageCode','Value','S_sprache'], axis=1)

# Col renaming
df_text_labeled.rename(columns = {'participant':'ParticipantId','u_date':'Date','S_sprache':'Language'}, inplace = True)



#### Satisfaction

In [9]:
# Load config file and create mask for filtering
satisfaction_kuzu_zug = pd.read_excel(open('../config/config.xlsx', 'rb'),sheet_name='Fragecodes')
satisfaction_kuzu_zug =list(satisfaction_kuzu_zug.loc[(satisfaction_kuzu_zug['UmfrageName']=='kuzu_zug') & (satisfaction_kuzu_zug['FrageType']== 'satisfaction')].reset_index()['FrageCode']) #select all codes of satisfaction questions
satisfaction_kuzu_zug.insert(0, "participant") #add participant for mapping

# Filter df
df_satisfaction = df_kuzu_zug_raw[satisfaction_kuzu_zug]

# transform to long
df_satisfaction = pd.melt(df_satisfaction.reset_index(), id_vars='participant',value_vars=satisfaction_kuzu_zug, var_name='FrageCode', value_name='Value').reset_index(drop=True)

# cleaining
df_satisfaction = df_satisfaction [df_satisfaction.Value.isin([str(i) for i in range(1,11)])]

# join Date
df_satisfaction = pd.merge(df_satisfaction, df_kuzu_zug_raw[['u_date','participant']],
                           left_on=  ['participant'],
                           right_on= ['participant'],
                           how = 'left')

# convert correct datatype
df_satisfaction = df_satisfaction[df_satisfaction.u_date.notnull()]
df_satisfaction["u_date"] = pd.to_datetime(df_satisfaction["u_date"])
df_satisfaction.sort_values(by='u_date', inplace=True)
df_satisfaction["Value"] = df_satisfaction["Value"].astype(str).astype(int)

# Transform from 5 and 10 scaling to 1harmonized 100 scale (10 until 03.2020 and 5 since 5.2020 - april 2020 no data)
mask_10 = (df_satisfaction['u_date'] > '2000-1-1') & (df_satisfaction['u_date'] <= '2020-4-30')
mask_5 = (df_satisfaction['u_date'] >= '2020-5-1')
df_satisfaction_10 = df_satisfaction.loc[mask_10]
df_satisfaction_10 = df_satisfaction_10.assign(Value=df_satisfaction_10['Value'].apply(lambda x:(x-1)/9*100))
df_satisfaction_5 = df_satisfaction.loc[mask_5]
df_satisfaction_5 = df_satisfaction_5.assign(Value=df_satisfaction_5['Value'].apply(lambda x:(x-1)/4*100))
df_satisfaction = pd.concat([df_satisfaction_5, df_satisfaction_10])

# Order cols
df_satisfaction = df_satisfaction.reindex(['participant','u_date','FrageCode','Value'], axis=1)

# Col renaming
df_satisfaction.rename(columns = {'participant':'ParticipantId','u_date':'Date'}, inplace = True)

# Change dtypes and round float
df_satisfaction["FrageCode"] = df_satisfaction["FrageCode"].astype('category')
df_satisfaction["Value"] = df_satisfaction["Value"].round(3)


In [10]:
df_satisfaction.sort_values("Date",ascending=False)

Unnamed: 0,ParticipantId,Date,FrageCode,Value
5002583,534535,2022-07-06,Wime_Wegweisung_BhfZiel,100.000
3975633,534419,2022-07-06,wime_oes_start,100.000
3975776,534617,2022-07-06,wime_oes_start,100.000
3975775,534616,2022-07-06,wime_oes_start,100.000
3975772,534612,2022-07-06,wime_oes_start,100.000
...,...,...,...,...
4860938,168435,2017-01-01,wime_gastro_preis,55.556
4860937,168427,2017-01-01,wime_gastro_preis,88.889
4860936,168424,2017-01-01,wime_gastro_preis,88.889
4860935,168361,2017-01-01,wime_gastro_preis,44.444


#### Additional Meta data

In [11]:
## Filter df
df_meta = df_kuzu_zug_raw[["participant",
                           "u_date",
                           "S_sex",
                           "S_alter",
                           "u_bezugsart",
                           "u_fahrausweis",
                           "u_ga",
                           "u_klassencode",
                           "u_preis",
                           "u_ticket",
                           "device_type",
                           "dispcode",
                           "fg_startort",
                           "fg_abfahrt",
                           "ft_startort",
                           "ft_abfahrt",
                           "ft_zielort",
                           "ft_ankunft",
                           "fg_zielort",
                           "fg_ankunft",
                           "ft_haltestellen",
                           "ft_vm_kurz",
                           "R_anschluss",
                           "R_stoerung",
                           "R_zweck",
                           "S_berufstaetigkeit",
                           "S_wohnsitz",
                           "S_Usertyp1",
                           "S_Usertyp2",
                           "S_Usertyp3"
                          ]]

## convert empty invalid anwers
df_meta = df_meta.replace('-66', np.nan)
df_meta = df_meta.replace('-99', np.nan)
df_meta = df_meta.replace('-77', np.nan)
df_meta = df_meta.replace('0', np.nan)
df_meta = df_meta.replace('Weiss nicht', np.nan)
df_meta = df_meta[df_meta.u_date.notnull()]


## convert correct datatype
df_meta["u_date"] = pd.to_datetime(df_meta["u_date"])
df_meta["fg_abfahrt"] = pd.to_datetime(df_meta["fg_abfahrt"])
df_meta["ft_abfahrt"] = pd.to_datetime(df_meta["ft_abfahrt"])
df_meta["ft_ankunft"] = pd.to_datetime(df_meta["ft_ankunft"])
df_meta["fg_ankunft"] = pd.to_datetime(df_meta["fg_ankunft"])

df_meta["u_preis"] = pd.to_numeric(df_meta["u_preis"], downcast='float')
df_meta['S_alter'] = pd.to_numeric(df_meta["S_alter"], downcast='float')

tocategory = ['S_sex',
              'u_bezugsart',
              'u_fahrausweis',
              'u_ga',
              'u_klassencode',
              'u_ticket',
              "device_type",
              "dispcode",
              "fg_startort",
              "ft_startort",
              "ft_zielort",
              "fg_zielort",
              "ft_haltestellen",
              "ft_vm_kurz",
              "R_anschluss",
              "R_stoerung",
              "R_zweck",
              "S_berufstaetigkeit",
              "S_wohnsitz",
              "S_Usertyp1",
              "S_Usertyp2",
              "S_Usertyp3"
             ]

for item in tocategory:
    df_meta[item] = df_meta[item].astype('category')
    
    
## Col specific cleaning
  ## Fahrzweck
df_meta.R_zweck = df_meta.R_zweck.replace('Freizeitfahrt  ohne Übernachtung (Ausflug, Kino, Sport, Besuch, usw.)','Freizeitfahrt / Ferienreise oder alltägliche Erledigungen')
df_meta.R_zweck = df_meta.R_zweck.replace('Freizeitfahrt/ private Ferienreise/ alltägliche Erledigungen (z.B. Arztbesuch, Einkaufen, jmd. Abhol','Freizeitfahrt / Ferienreise oder alltägliche Erledigungen')
df_meta.R_zweck = df_meta.R_zweck.replace('Freizeitfahrt/ private Ferienreise\r\n','Freizeitfahrt / Ferienreise oder alltägliche Erledigungen')
df_meta.R_zweck = df_meta.R_zweck.replace('Alltägliche Erledigungen (z.B. Arztbesuch, Einkaufen, jmd. abholen)','Freizeitfahrt / Ferienreise oder alltägliche Erledigungen')
df_meta.R_zweck = df_meta.R_zweck.replace('Private Ferienreise (Reise mit mind. 1 Übernachtung)','Freizeitfahrt / Ferienreise oder alltägliche Erledigungen')
df_meta.R_zweck = df_meta.R_zweck.replace('alltägliche Erledigungen (z.B. Arztbesuch, Einkaufen, jmd. Abholen)\r\n','Freizeitfahrt / Ferienreise oder alltägliche Erledigungen')
df_meta.R_zweck = df_meta.R_zweck.replace('Fahrt zum Arbeitsort / Ausbildungsort','Pendeln')
df_meta.R_zweck = df_meta.R_zweck.replace('Fahrt zum Arbeitsort','Pendeln')
df_meta.R_zweck = df_meta.R_zweck.replace('Fahrt zum Ausbildungsort','Pendeln')
df_meta.R_zweck = df_meta.R_zweck.replace('Fahrt vom oder zum Arbeits-/ Ausbildungsort','Pendeln')

  ## U_Bezugsart
df_meta.u_bezugsart = df_meta.u_bezugsart.replace('MobileTicket','Mobile Ticket')

  ## U_Ticket
df_meta.u_ticket = df_meta.u_ticket.replace('3', np.nan)


## Col renaming
df_meta.rename(columns = {'participant':'ParticipantId','u_date':'Date'}, inplace = True)

## Load Data

### Option 1: Load data to local DB

In [19]:
## Import credentials for local DB from .env file
USERNAME_LOCAL = os.environ.get('USERNAME_LOCAL', "default")  
PASSWORD_LOCAL = os.environ.get('PASSWORD_LOCAL', "default")
ENDPOINT_LOCAL = os.environ.get('ENDPOINT_LOCAL', "default")
DATABASE_LOCAL = os.environ.get('DATABASE_LOCAL', "default")

In [20]:
# Create Database on local machine
conn_string = f'postgresql://{USERNAME_LOCAL}:{PASSWORD_LOCAL}@{ENDPOINT_LOCAL}/{DATABASE_LOCAL}'
dbEngine = sqlalchemy.create_engine(conn_string, connect_args={'connect_timeout': 10}, echo=False)

In [21]:
try:
    with dbEngine.connect() as con:
        con.execute("SELECT 1")
    print('engine is valid')
except Exception as e:
    print(f'Engine invalid: {str(e)}')

engine is valid


In [22]:
def sqlinsert(df,tablename,db_engine,chunksize):
    df.to_sql(tablename,
              dbEngine,
              if_exists='replace',
              index=False,
              method='multi',
              chunksize=chunksize)

In [53]:
#sqlinsert(df_satisfaction,'Satisfaction',dbEngine,10000) #write into local db !!!Long runtime!!!

In [59]:
#sqlinsert(df_meta,'Metadata',dbEngine,10000) #write into local db !!!Long runtime!!!

In [28]:
#sqlinsert(df_text,'Unlabeled',dbEngine,1000) #write into local db

In [26]:
#sqlinsert(df_text_labeled,'Labeled',dbEngine,1000) #write into local db

### Option 2: Load data to .csv files

In [12]:
df_satisfaction.to_csv("../data/Data_Satisfaction.csv", encoding='utf-8',index=False)

In [13]:
df_meta.to_csv("../data/Data_Metadata.csv", encoding='utf-8',index=False)

In [14]:
df_text.to_csv("../data/Data_Unlabeled.csv", encoding='utf-8',index=False)

In [15]:
df_text_labeled.to_csv("../data/Data_Labeled.csv", encoding='utf-8',index=False)

### Option 3: Load to Altlas mongoDB

In [21]:
import pymongo

In [25]:
## Database details
## Import credentials for mongo DB from .env file
USERNAME_MONGO = os.environ.get('USERNAME_MONGO', "xxxxxx")
PASSWORD_MONGO = os.environ.get('PASSWORD_MONGO', "xxxxxx")
ENDPOINT_MONGO = os.environ.get('ENDPOINT_MONGO', "xxxxx")

connection_string = f'mongodb+srv://{USERNAME_MONGO}:{PASSWORD_MONGO}@{ENDPOINT_MONGO}'
database = "kuzu"
collection = "warehouse"

## Connection to MongoDB
client = pymongo.MongoClient(connection_string)
db = client[database]
collection = db[collection]

Merge dataframes in one collection. Base df is the df_text.

In [26]:
df_satisfaction_min = df_satisfaction[df_satisfaction.FrageCode == 'wime_gesamtzuf'] #only keep overall satisfaction
df_satisfaction_min = df_satisfaction_min.drop(['FrageCode'], axis=1)
df_satisfaction_min = df_satisfaction_min.rename(columns={"Value": "OverallSatisfaction"})

df_temp = df_text.merge(df_satisfaction_min, left_on='ParticipantId', right_on='ParticipantId',how='left')
df_full = df_temp.merge(df_meta, left_on='ParticipantId', right_on='ParticipantId',how='left')

df_full = df_full.drop(['Date_x'], axis=1)
df_full = df_full.drop(['Date_y'], axis=1)

# reorder colnames for export
new_cols = ['ParticipantId',
            'Date',
            'Value',
            'Language',
            'OverallSatisfaction',
            'S_sex',
            'S_alter',
            'u_bezugsart',
            'u_fahrausweis',
            'u_ga',
            'u_klassencode',
            'u_preis',
            'u_ticket',
            'device_type',
            'dispcode',
            'fg_startort',
            'fg_abfahrt',
            'ft_startort',
            'ft_abfahrt',
            'ft_zielort',
            'ft_ankunft',
            'fg_zielort',
            'fg_ankunft',
            'ft_haltestellen',
            'ft_vm_kurz',
            'R_anschluss',
            'R_stoerung',
            'R_zweck',
            'S_berufstaetigkeit',
            'S_wohnsitz',
            'S_Usertyp1',
            'S_Usertyp2',
            'S_Usertyp3'
           ]

df_full = df_full[new_cols]




#Replace it with None values which can be interpreted by pandas
dates = ['Date','fg_abfahrt','ft_abfahrt','ft_ankunft','fg_ankunft']

for x in dates:
    df_full[[x]] = df_full[[x]].astype(object).where(df_full[[x]].notnull(), None)

In [27]:
## Remove all existing documents in collection
collection.drop()

In [28]:
## write to mongodb
df_full.reset_index(inplace=True)
df_full_dict = df_full.to_dict("records")
# Insert collection
collection.insert_many(df_full_dict)

<pymongo.results.InsertManyResult at 0x254230cc0>

In [29]:
## Count number of documents inserted
collection.count_documents({})

131825

Write the filtered and merged data also to csv

In [30]:
df_full.to_csv("../data/Data.csv", encoding='utf-8',index=False)