In [201]:
import os
import pandas as pd
import sqlalchemy
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL

In [329]:
# make sure that the database and the schema with the different tables have been created.
# Update the information from the Snowflake account
engine = create_engine(URL(
        #The account name is in the url to access the Snowflake UI, example: something.snowflakecomputing.com, where "something" is the account name.
        account="sn34773.europe-west2.gcp",
        user="AALFEREA91",
        password="",#deleted
        role="ACCOUNTADMIN",
        warehouse="COMPUTE_WH",
        database="DW_AERO",
        schema="PUBLIC"
    ))

In [330]:
conn = engine.connect()
conn

<sqlalchemy.engine.base.Connection at 0x186315e1060>

# 1. Load the table with countries.

### First import both files in the same dataframe

In [204]:
urlpaises='input/paises.xls'

In [205]:
urlcountries='input/countries.csv'

In [206]:
df_paises=pd.read_excel(urlpaises).merge(pd.read_csv(urlcountries),how='outer',left_on='cod_pais2',right_on='country')

In [207]:
df_paises.head(3)

Unnamed: 0,cod_pais,cod_pais2,desc_pais,cod_continente,desc_continente,country,latitude,longitude,name
0,AFG,AF,Afghanistan,AS,Asia,AF,33.93911,67.709953,Afghanistan
1,ALB,AL,Albania,EU,Europe,AL,41.153332,20.168331,Albania
2,DZA,DZ,Algeria,AF,Africa,DZ,28.033886,1.659626,Algeria


### Now let's check the Null values

In [208]:
df_paises.isnull().sum()

cod_pais            2
cod_pais2           2
desc_pais           2
cod_continente     38
desc_continente     2
country             1
latitude            2
longitude           2
name                1
dtype: int64

In [209]:
df_paises[df_paises['cod_pais'].isnull()]

Unnamed: 0,cod_pais,cod_pais2,desc_pais,cod_continente,desc_continente,country,latitude,longitude,name
245,,,,,,GZ,31.354676,34.308825,Gaza Strip
246,,,,,,XK,42.602636,20.902977,Kosovo


In [210]:
df_paises.iloc[245,[0,1,2,3,4]]=['GAZ','GZ','Gaza','AS','Asia']

In [211]:
df_paises.iloc[246,[0,1,2,3,4]]=['XKX','XK','Kosovo','EU','Europe']

In [212]:
indexes_wrong_continent=df_paises[df_paises['cod_continente'].isnull()].index

In [213]:
df_paises.iloc[indexes_wrong_continent,3]='NA'

In [214]:
df_paises.isnull().sum()

cod_pais           0
cod_pais2          0
desc_pais          0
cod_continente     0
desc_continente    0
country            1
latitude           2
longitude          2
name               1
dtype: int64

In [215]:
df_paises[df_paises['country'].isnull()]

Unnamed: 0,cod_pais,cod_pais2,desc_pais,cod_continente,desc_continente,country,latitude,longitude,name
244,Z99,Z9,desconocido,Z9,desconocido,,,,


In [216]:
df_paises.iloc[244,[5,8]]=['Z9','desconocido']

In [217]:
df_paises[df_paises['latitude'].isnull()]

Unnamed: 0,cod_pais,cod_pais2,desc_pais,cod_continente,desc_continente,country,latitude,longitude,name
231,UMI,UM,United States Minor Outlying Islands,OC,Oceania,UM,,,U.S. Minor Outlying Islands
244,Z99,Z9,desconocido,Z9,desconocido,Z9,,,desconocido


In [218]:
df_paises.iloc[231,[6,7]]=[19.2823,166.6470]

### Now let's check the duplicates

In [219]:
df_paises[df_paises.duplicated(subset='cod_pais',keep=False)]

Unnamed: 0,cod_pais,cod_pais2,desc_pais,cod_continente,desc_continente,country,latitude,longitude,name
135,MLT,MT,Malta,EU,Europe,MT,35.937496,14.375416,Malta
136,MLT,MT,Malta,EU,Europe,MT,35.937496,14.375416,Malta


In [220]:
df_paises.drop_duplicates(inplace=True)

In [221]:
df_paises[df_paises.duplicated(subset='cod_pais',keep=False)]

Unnamed: 0,cod_pais,cod_pais2,desc_pais,cod_continente,desc_continente,country,latitude,longitude,name


### Finally we have to format the table with the same format as the destination table in SnowFlake

In [222]:
df_paises_final=(
    df_paises
    .drop(['country','name'],axis=1)
    .reindex(columns=['cod_pais','desc_pais','cod_pais2','cod_continente','continente','longitude','latitude'])
    .rename(columns={'desc_pais':'pais','longitude':'longitud','latitude':'latitud'})
)

In [223]:
df_paises_final.dtypes

cod_pais           object
pais               object
cod_pais2          object
cod_continente     object
continente        float64
longitud          float64
latitud           float64
dtype: object

### At last we load the data into Snowflake

In [224]:
try:
    print('Insertando datos')
    df_paises_final.to_sql('dm_pais', con = conn, if_exists = 'replace', index=False)
    rows = len(df_paises_final)
    print("Volcado correcto. Filas cargadas: " + str(rows) )
except Exception as e:
    print("Error en el volcado: " + str(e))

Insertando datos
Volcado correcto. Filas cargadas: 246


# 2. Load the airports table

In [225]:
urlaeropuertos='input/aeropuertos.csv'

In [226]:
df_aeropuertos=pd.read_csv(urlaeropuertos)

In [227]:
df_aeropuertos.head(3)

Unnamed: 0,cod_aeropuerto,cod_pais,nombre_aeropuerto,ciudad_aeropuerto,pais_aeropuerto,cod_iata_faa,cod_icao,latitud,longitud,altitud,zona_horaria,dst,zona_horaria_tz
0,1,PNG,Goroka,Goroka,Papua New Guinea,GKA,AYGA,-6.081689,145.391881,5282.0,10.0,U,Pacific/Port_Moresby
1,2,PNG,Madang,Madang,Papua New Guinea,MAG,AYMD,-5.207083,145.7887,20.0,10.0,U,Pacific/Port_Moresby
2,3,PNG,Mount Hagen,Mount Hagen,Papua New Guinea,HGU,AYMH,-5.826789,144.295861,5388.0,10.0,U,Pacific/Port_Moresby


### Check Null Values

In [228]:
df_aeropuertos.isnull().sum()

cod_aeropuerto       0
cod_pais             0
nombre_aeropuerto    0
ciudad_aeropuerto    0
pais_aeropuerto      0
cod_iata_faa         2
cod_icao             1
latitud              1
longitud             1
altitud              1
zona_horaria         1
dst                  1
zona_horaria_tz      1
dtype: int64

In [229]:
df_aeropuertos[df_aeropuertos['cod_iata_faa'].isnull()]

Unnamed: 0,cod_aeropuerto,cod_pais,nombre_aeropuerto,ciudad_aeropuerto,pais_aeropuerto,cod_iata_faa,cod_icao,latitud,longitud,altitud,zona_horaria,dst,zona_horaria_tz
22,23,CAN,Shearwater,Halifax,Canada,,CYAW,44.639721,-63.499444,167.0,-4.0,A,America/Halifax
297,300,BEL,Beauvechain,Beauvechain,Belgium,,EBBE,50.75861,4.768333,370.0,1.0,E,Europe/Brussels


In [230]:
df_aeropuertos.iloc[[22,297],5]='Z99'

In [231]:
df_aeropuertos[df_aeropuertos['cod_icao'].isnull()]

Unnamed: 0,cod_aeropuerto,cod_pais,nombre_aeropuerto,ciudad_aeropuerto,pais_aeropuerto,cod_iata_faa,cod_icao,latitud,longitud,altitud,zona_horaria,dst,zona_horaria_tz
6916,8345,USA,Montgomery Field,San Diego,United States,MYF,,32.4759,117.759,17.0,8.0,A,Asia/Chongqing


In [232]:
df_aeropuertos.iloc[6916,6]='Z999'

In [233]:
df_aeropuertos[df_aeropuertos['latitud'].isnull()]

Unnamed: 0,cod_aeropuerto,cod_pais,nombre_aeropuerto,ciudad_aeropuerto,pais_aeropuerto,cod_iata_faa,cod_icao,latitud,longitud,altitud,zona_horaria,dst,zona_horaria_tz
8107,Z999,Z99,Desconocido,Desconocido,Desconocido,Z99,Z999,,,,,,


### Check duplicates

In [234]:
df_aeropuertos.duplicated(subset='cod_aeropuerto').sum()

0

### Integrity validation with the countries table

In [235]:
df_paises_final.head(3)

Unnamed: 0,cod_pais,pais,cod_pais2,cod_continente,continente,longitud,latitud
0,AFG,Afghanistan,AF,AS,,67.709953,33.93911
1,ALB,Albania,AL,EU,,20.168331,41.153332
2,DZA,Algeria,DZ,AF,,1.659626,28.033886


In [171]:
df_aeropuertos.head(3)

Unnamed: 0,cod_aeropuerto,cod_pais,nombre_aeropuerto,ciudad_aeropuerto,pais_aeropuerto,cod_iata_faa,cod_icao,latitud,longitud,altitud,zona_horaria,dst,zona_horaria_tz
0,1,PNG,Goroka,Goroka,Papua New Guinea,GKA,AYGA,-6.081689,145.391881,5282.0,10.0,U,Pacific/Port_Moresby
1,2,PNG,Madang,Madang,Papua New Guinea,MAG,AYMD,-5.207083,145.7887,20.0,10.0,U,Pacific/Port_Moresby
2,3,PNG,Mount Hagen,Mount Hagen,Papua New Guinea,HGU,AYMH,-5.826789,144.295861,5388.0,10.0,U,Pacific/Port_Moresby


In [236]:
missing_countries=[element for element in df_aeropuertos['cod_pais'] if element not in df_paises_final['cod_pais'].values]

In [237]:
missing_countries

['SSD', 'WAK', 'SSD']

In [238]:
indexes=df_aeropuertos[df_aeropuertos['cod_pais'].isin(missing_countries)].index

In [239]:
df_aeropuertos.iloc[indexes,1]='Z99'

### Format the table as the destination table in Snwflake

In [241]:
df_aeropuertos.head(3)

Unnamed: 0,cod_aeropuerto,cod_pais,nombre_aeropuerto,ciudad_aeropuerto,pais_aeropuerto,cod_iata_faa,cod_icao,latitud,longitud,altitud,zona_horaria,dst,zona_horaria_tz
0,1,PNG,Goroka,Goroka,Papua New Guinea,GKA,AYGA,-6.081689,145.391881,5282.0,10.0,U,Pacific/Port_Moresby
1,2,PNG,Madang,Madang,Papua New Guinea,MAG,AYMD,-5.207083,145.7887,20.0,10.0,U,Pacific/Port_Moresby
2,3,PNG,Mount Hagen,Mount Hagen,Papua New Guinea,HGU,AYMH,-5.826789,144.295861,5388.0,10.0,U,Pacific/Port_Moresby


In [242]:
df_aeropuertos_final=(
    df_aeropuertos
    .iloc[:,[0,2,3,4,1,5,6,7,8,9,10,11,12]]
    #.rename(columns={'desc_pais':'pais','longitude':'longitud','latitude':'latitud'})
)

In [243]:
df_aeropuertos_final.dtypes

cod_aeropuerto        object
nombre_aeropuerto     object
ciudad_aeropuerto     object
pais_aeropuerto       object
cod_pais              object
cod_iata_faa          object
cod_icao              object
latitud              float64
longitud             float64
altitud              float64
zona_horaria         float64
dst                   object
zona_horaria_tz       object
dtype: object

### Load the data into SnowFlake

In [244]:
try:
    print('Insertando datos')
    df_aeropuertos_final.to_sql('dm_aeropuertos', con = conn, if_exists = 'replace', index=False)
    rows = len(df_aeropuertos_final)
    print("Volcado correcto. Filas cargadas: " + str(rows) )
except Exception as e:
    print("Error en el volcado: " + str(e))

Insertando datos
Volcado correcto. Filas cargadas: 8108


# 3. Load the airlines table

In [245]:
urlaerolineas='input/aerolineas.csv'

In [246]:
df_aerolineas=pd.read_csv(urlaerolineas)

In [247]:
df_aerolineas.head(3)

Unnamed: 0,cod_aerolinea,nombre_aerolinea,cod_pais,cod_iata,cod_icao,identificacion,pais_aerolinea,sw_activa
0,1,Private flight,Z99,Z9,Z99,I999,,Y
1,2,135 Airways,USA,Z9,GNL,GENERAL,United States,N
2,3,1Time Airline,ZAF,1T,RNX,NEXTIME,South Africa,Y


### Check Null values

In [248]:
df_aerolineas.isnull().sum()

cod_aerolinea        0
nombre_aerolinea     0
cod_pais             0
cod_iata             2
cod_icao             0
identificacion      11
pais_aerolinea      16
sw_activa            1
dtype: int64

In [249]:
df_aerolineas[df_aerolineas['cod_iata'].isnull()]

Unnamed: 0,cod_aerolinea,nombre_aerolinea,cod_pais,cod_iata,cod_icao,identificacion,pais_aerolinea,sw_activa
3592,3598,National Airlines,USA,,NAL,NATIONAL,United States,N
5690,13190,Al-Naser Airlines,IRQ,,Z99,I999,Iraq,Y


In [250]:
df_aerolineas.iloc[[3592,5690],3]='Z9'

### Check duplicate values

In [251]:
df_aerolineas[df_aerolineas.duplicated(subset='cod_aerolinea')]

Unnamed: 0,cod_aerolinea,nombre_aerolinea,cod_pais,cod_iata,cod_icao,identificacion,pais_aerolinea,sw_activa


### Integrity validation with the countries table

In [252]:
df_aerolineas.head(3)

Unnamed: 0,cod_aerolinea,nombre_aerolinea,cod_pais,cod_iata,cod_icao,identificacion,pais_aerolinea,sw_activa
0,1,Private flight,Z99,Z9,Z99,I999,,Y
1,2,135 Airways,USA,Z9,GNL,GENERAL,United States,N
2,3,1Time Airline,ZAF,1T,RNX,NEXTIME,South Africa,Y


In [253]:
df_paises_final.head(3)

Unnamed: 0,cod_pais,pais,cod_pais2,cod_continente,continente,longitud,latitud
0,AFG,Afghanistan,AF,AS,,67.709953,33.93911
1,ALB,Albania,AL,EU,,20.168331,41.153332
2,DZA,Algeria,DZ,AF,,1.659626,28.033886


In [254]:
missing_airlines=[element for element in df_aerolineas['cod_pais'] if element not in df_paises_final['cod_pais'].values]

In [256]:
missing_airlines

[]

### Convert the field sw_activa into a 1/0 field

In [260]:
indexes_Y=df_aerolineas[df_aerolineas['sw_activa']=='Y'].index

In [261]:
indexes_N=df_aerolineas[df_aerolineas['sw_activa']=='N'].index

In [263]:
df_aerolineas.iloc[indexes_Y,7]=1

In [264]:
df_aerolineas.iloc[indexes_N,7]=0

In [271]:
#el formato de la tabla ya esta como el formato destino, solo falta comprobar los tipos de los campos

In [274]:
df_aerolineas.dtypes

cod_aerolinea       object
nombre_aerolinea    object
cod_pais            object
cod_iata            object
cod_icao            object
identificacion      object
pais_aerolinea      object
sw_activa           object
dtype: object

### Load the data into SnowFlake

In [275]:
try:
    print('Insertando datos')
    df_aerolineas.to_sql('dm_aerolineas', con = conn, if_exists = 'replace', index=False)
    rows = len(df_aerolineas)
    print("Volcado correcto. Filas cargadas: " + str(rows) )
except Exception as e:
    print("Error en el volcado: " + str(e))

Insertando datos
Volcado correcto. Filas cargadas: 6048


# 4. Load the routes table

In [293]:
urlrutas='input/rutas.json'

In [294]:
df_rutas=pd.read_json(urlrutas)

In [295]:
df_rutas.head(3)

Unnamed: 0,cod_ruta,cod_aeropuerto_ori,cod_aeropuerto_des,cod_aerolinea,equipamiento
0,1,2965,2990,410,CR2
1,2,2966,2990,410,CR2
2,3,2966,2962,410,CR2


### Check for Null Values

In [296]:
df_rutas.isnull().sum()

cod_ruta               0
cod_aeropuerto_ori     0
cod_aeropuerto_des     0
cod_aerolinea          0
equipamiento          18
dtype: int64

In [297]:
indexes=df_rutas[df_rutas['equipamiento'].isnull()].index

In [298]:
df_rutas.iloc[indexes,4]='Z99'

### Check for duplicate values

In [299]:
df_rutas[df_rutas.duplicated(subset='cod_ruta')]

Unnamed: 0,cod_ruta,cod_aeropuerto_ori,cod_aeropuerto_des,cod_aerolinea,equipamiento


### Integrity validation with airports table

In [301]:
df_rutas.head(3)

Unnamed: 0,cod_ruta,cod_aeropuerto_ori,cod_aeropuerto_des,cod_aerolinea,equipamiento
0,1,2965,2990,410,CR2
1,2,2966,2990,410,CR2
2,3,2966,2962,410,CR2


In [300]:
df_aeropuertos_final.head(3)

Unnamed: 0,cod_aeropuerto,nombre_aeropuerto,ciudad_aeropuerto,pais_aeropuerto,cod_pais,cod_iata_faa,cod_icao,latitud,longitud,altitud,zona_horaria,dst,zona_horaria_tz
0,1,Goroka,Goroka,Papua New Guinea,PNG,GKA,AYGA,-6.081689,145.391881,5282.0,10.0,U,Pacific/Port_Moresby
1,2,Madang,Madang,Papua New Guinea,PNG,MAG,AYMD,-5.207083,145.7887,20.0,10.0,U,Pacific/Port_Moresby
2,3,Mount Hagen,Mount Hagen,Papua New Guinea,PNG,HGU,AYMH,-5.826789,144.295861,5388.0,10.0,U,Pacific/Port_Moresby


In [302]:
[element for element in df_rutas['cod_aeropuerto_ori'] if element not in df_aeropuertos_final['cod_aeropuerto'].values]

[]

In [303]:
[element for element in df_rutas['cod_aeropuerto_des'] if element not in df_aeropuertos_final['cod_aeropuerto'].values]

[]

### Integrity validation with airlines table

In [304]:
df_rutas.head(3)

Unnamed: 0,cod_ruta,cod_aeropuerto_ori,cod_aeropuerto_des,cod_aerolinea,equipamiento
0,1,2965,2990,410,CR2
1,2,2966,2990,410,CR2
2,3,2966,2962,410,CR2


In [305]:
df_aerolineas.head(3)

Unnamed: 0,cod_aerolinea,nombre_aerolinea,cod_pais,cod_iata,cod_icao,identificacion,pais_aerolinea,sw_activa
0,1,Private flight,Z99,Z9,Z99,I999,,1
1,2,135 Airways,USA,Z9,GNL,GENERAL,United States,0
2,3,1Time Airline,ZAF,1T,RNX,NEXTIME,South Africa,1


In [307]:
missing_airlines=[element for element in df_rutas['cod_aerolinea'] if element not in df_aerolineas['cod_aerolinea'].values]

In [308]:
missing_airlines

['20410']

In [311]:
index=df_rutas[df_rutas['cod_aerolinea'].isin(missing_airlines)].index

In [316]:
df_rutas.iloc[index,3]='Z9999'

In [326]:
df_rutas.dtypes

cod_ruta               int64
cod_aeropuerto_ori    object
cod_aeropuerto_des    object
cod_aerolinea         object
equipamiento          object
dtype: object

### Format the table as the destination table in Snowflake

In [323]:
df_rutas_final=df_rutas.iloc[:,[0,3,1,2]]

In [324]:
df_rutas_final

Unnamed: 0,cod_ruta,cod_aerolinea,cod_aeropuerto_ori,cod_aeropuerto_des
0,1,00410,2965,2990
1,2,00410,2966,2990
2,3,00410,2966,2962
3,4,00410,2968,2990
4,5,00410,2968,4078
...,...,...,...,...
67658,67659,04178,6334,3341
67659,67660,19016,4029,2912
67660,67661,19016,2912,4029
67661,67662,19016,2912,2913


In [328]:
df_rutas_final.dtypes

cod_ruta               int64
cod_aerolinea         object
cod_aeropuerto_ori    object
cod_aeropuerto_des    object
dtype: object

### Load the data in Snowflake

In [331]:
try:
    print('Insertando datos')
    df_rutas_final.to_sql('hc_rutas', con = conn, if_exists = 'replace', index=False)
    rows = len(df_rutas_final)
    print("Volcado correcto. Filas cargadas: " + str(rows) )
except Exception as e:
    print("Error en el volcado: " + str(e))

Insertando datos
Error en el volcado: (snowflake.connector.errors.ProgrammingError) 001795 (42601): SQL compilation error: error line 1 at position 87
maximum number of expressions in a list exceeded, expected at most 16,384, got 67,663
[SQL: INSERT INTO hc_rutas (cod_ruta, cod_aerolinea, cod_aeropuerto_ori, cod_aeropuerto_des) VALUES (%(cod_ruta)s, %(cod_aerolinea)s, %(cod_aeropuerto_ori)s, %(cod_aeropuerto_des)s)]
[parameters: ({'cod_ruta': 1, 'cod_aerolinea': '00410', 'cod_aeropuerto_ori': '2965', 'cod_aeropuerto_des': '2990'}, {'cod_ruta': 2, 'cod_aerolinea': '00410', 'cod_aeropuerto_ori': '2966', 'cod_aeropuerto_des': '2990'}, {'cod_ruta': 3, 'cod_aerolinea': '00410', 'cod_aeropuerto_ori': '2966', 'cod_aeropuerto_des': '2962'}, {'cod_ruta': 4, 'cod_aerolinea': '00410', 'cod_aeropuerto_ori': '2968', 'cod_aeropuerto_des': '2990'}, {'cod_ruta': 5, 'cod_aerolinea': '00410', 'cod_aeropuerto_ori': '2968', 'cod_aeropuerto_des': '4078'}, {'cod_ruta': 6, 'cod_aerolinea': '00410', 'cod_aero