In [6]:
import trino.dbapi as trino

conn = trino.Connection(host="presto", port=8080, user="demo", catalog="hive")
cur = conn.cursor()


def execute_query(query, howmany=2, msg="Result: "):
    global cur;
    
    cur.execute(query)
    if howmany == 1:
        print(msg, cur.fetchone())
    else:
        print(msg, cur.fetchall())
        
        
print("------------ US ------------")
        

execute_query("show catalogs")

execute_query("show schemas")

execute_query("create schema if not exists covid")

execute_query("show schemas")

us_create_csv = "create table if not exists covid.covid_us_csv \
                    (state varchar, region varchar, last_update varchar, lat varchar, long_ varchar, \
                    confirmed varchar, deaths varchar, recovered varchar, active varchar, fips varchar, \
                    incident_rate varchar, total_test_results varchar, people_hospitalized varchar, \
                    case_fatality varchar, uid varchar, iso3 varchar, testing_rate varchar, hospitalization_rate varchar) \
                        with (format='CSV', skip_header_line_count = 1, external_location = 's3a://covid-us/')"

execute_query(us_create_csv)

execute_query("select column_name from information_schema.columns where table_name = 'covid_us_csv'")

execute_query("select * from covid.covid_us_csv limit 5")

execute_query("show tables from covid")

us_create_parq = "create table if not exists covid.covid_us_parq with (format = 'PARQUET') as \
                    select \
                        try(cast(trim(state) as varchar(30))) as state, \
                        try(cast(trim(region) as varchar(30))) as region, \
                        try(date_parse(last_update, '%Y-%m-%d %k:%i:%s')) as last_update, \
                        try(cast(lat as decimal(8, 2))) as lat, \
                        try(cast(long_ as decimal(8, 2))) as long_, \
                        try(cast(confirmed as integer)) as confirmed, \
                        try(cast(deaths as integer)) as deaths, \
                        try(cast(recovered as decimal(8, 2))) as recovered, \
                        try(cast(active as decimal(8, 2))) as active, \
                        try(cast(fips as integer)) as fips, \
                        try(cast(incident_rate as decimal(8, 2))) as incident_rate, \
                        try(cast(total_test_results as integer)) as total_test_results, \
                        try(cast(people_hospitalized as integer)) as people_hospitalized, \
                        try(cast(case_fatality as decimal(8, 2))) as case_fatality, \
                        try(cast(uid as integer)) as uid, \
                        try(cast(trim(iso3) as varchar(30))) as iso3, \
                        try(cast(testing_rate as decimal(8, 2))) as testing_rate, \
                        try(cast(hospitalization_rate as decimal(8, 2))) as hospitalization_rate \
                    from covid.covid_us_csv"

execute_query(us_create_parq)

execute_query("show tables from covid")

execute_query("select * from covid.covid_us_parq limit 5")

execute_query("select count(*) from covid.covid_us_csv", msg="Raws in CSV:")

execute_query("select count(*) from covid.covid_us_parq", msg="Raws in Parquet:")

execute_query("select distinct(state) from covid.covid_us_parq")

print("------------ Worldwide ------------")

ww_create_csv = "create table if not exists covid.covid_ww_csv \
                    (fips varchar, admin varchar, province_state varchar, country_region varchar, last_update varchar, \
                    lat varchar, long_ varchar, confirmed varchar, deaths varchar, recovered varchar, active varchar, \
                    combined_key varchar, incident_rate varchar, case_fatality_ratio varchar) \
                        with (format='CSV', skip_header_line_count = 1, external_location = 's3a://covid-ww/')"

execute_query(ww_create_csv)

execute_query("select column_name from information_schema.columns where table_name = 'covid_ww_csv'")

execute_query("select * from covid.covid_ww_csv limit 5")

execute_query("show tables from covid")

ww_create_parq = "create table if not exists covid.covid_ww_parq with (format = 'PARQUET') as \
                            select \
                                try(cast(trim(fips) as varchar(30))) as fips, \
                                try(cast(trim(admin) as varchar(30))) as admin, \
                                try(cast(trim(province_state) as varchar(40))) as province_state, \
                                try(cast(trim(country_region) as varchar(40))) as country_region, \
                                try(date_parse(last_update, '%Y-%m-%d %k:%i:%s')) as last_update, \
                                try(cast(lat as decimal(8, 2))) as lat, \
                                try(cast(long_ as decimal(8, 2))) as long_, \
                                try(cast(confirmed as integer)) as confirmed, \
                                try(cast(deaths as integer)) as deaths, \
                                try(cast(recovered as decimal(8, 2))) as recovered, \
                                try(cast(active as decimal(8, 2))) as active, \
                                try(cast(combined_key as varchar(50))) as combined_key, \
                                try(cast(incident_rate as decimal(8, 2))) as incident_rate, \
                                try(cast(case_fatality_ratio as decimal(8, 2))) as case_fatality_ratio \
                            from covid.covid_ww_csv"

execute_query(ww_create_parq)

execute_query("select * from covid.covid_us_parq limit 5")

execute_query("select count(*) from covid.covid_ww_csv", msg="Raws in CSV:")

execute_query("select count(*) from covid.covid_ww_parq", msg="Raws in Parquet:")

------------ US ------------
Result:  [['hive'], ['minio'], ['mysql'], ['postgres'], ['system'], ['tcph']]
Result:  [['covid'], ['information_schema']]
Result:  [[True]]
Result:  [['covid'], ['information_schema']]
Result:  [[True]]
Result:  [['state'], ['region'], ['last_update'], ['lat'], ['long_'], ['confirmed'], ['deaths'], ['recovered'], ['active'], ['fips'], ['incident_rate'], ['total_test_results'], ['people_hospitalized'], ['case_fatality'], ['uid'], ['iso3'], ['testing_rate'], ['hospitalization_rate']]
Result:  [['Alabama', 'US', '2021-04-02 04:30:39', '32.3182', '-86.9023', '515866', '10553', '', '', '1.0', '10521.03887575117', '2406977.0', '', '2.0456862828719085', '84000001.0', 'USA', '49090.07104565706', ''], ['Alaska', 'US', '2021-04-02 04:30:39', '61.3707', '-152.4044', '63298', '313', '', '', '2.0', '8637.746140018728', '1904230.0', '', '0.4953393786893288', '84000002.0', 'USA', '260302.5104402327', ''], ['American Samoa', 'US', '2021-04-02 04:30:39', '-14.271', '-170.1