This notebook example involves using a managed version of Trino (Starburst). It will work without Starburst provided you are able to import data into a Trino cluster connected to a lake. We will be using one month of Yellow Taxi data from https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page and a zone look up file provided on the same page. Please download both files and register in either your Starburst or Trino cluster before proceeding.

In [6]:
import ibis
ibis.options.interactive = True

#from trino.auth import OAuth2Authentication

IMPORTANT!!!! Change your user, host, port, database, schema and roles to be relevant to your Starburst Galaxy setup. If you are using OAuth2, uncomment the keyword lines roles, and auth. Then comment PASSWORD to proceed. You can reference: https://ibis-project.org/backends/trino#connecting-to-starburst-managed-trino-instances for more information. 

In [7]:
import os

In [8]:
con = ibis.trino.connect(
  user=os.environ['user'],
  host=os.environ['host'],
  password=os.environ['password'],
  port=443,
  database=os.environ['database'],
  schema=os.environ['schema'],
  #roles="accountadmin",
  #auth=OAuth2Authentication(),
  http_scheme="https"
)

Within Ibis con.list_tables() allows us to list all the tables.


In [9]:
con.list_tables()

['groupbyboroughtrips', 'taxizonenyc', 'zonelookup']

Ibis tables in trino can be referenced through the use of con.table. We're going to create two reference ibis tables to Trino from our tables below:

In [10]:
nycjantrips = con.table("taxizonenyc")
zonelookup = con.table("zonelookup")

In Ibis we can check the schema of the tables we just referenced through .schema()

In [11]:
nycjantrips.schema()

ibis.Schema {
  vendorid               int64
  tpep_pickup_datetime   timestamp(3)
  tpep_dropoff_datetime  timestamp(3)
  passenger_count        float64
  trip_distance          float64
  ratecodeid             float64
  store_and_fwd_flag     string
  pulocationid           int64
  dolocationid           int64
  payment_type           int64
  fare_amount            float64
  extra                  float64
  mta_tax                float64
  tip_amount             float64
  tolls_amount           float64
  improvement_surcharge  float64
  total_amount           float64
  congestion_surcharge   float64
  airport_fee            float64
}

In [12]:
zonelookup.schema()

ibis.Schema {
  locationid    string
  borough       string
  zone          string
  service_zone  string
}

We're going to preview the dataset with ibis slice method. We can see the first 10 rows here. We also included ibis.options.interactive = True
 at the start of our notebook which allows us to display the ibis tables in a prettified way.

In [13]:
nycjantrips[0:10]

To understand the dataset a little more we can try an order by. Looks like there are some columns with passenger count of undefined. In this case we're going to want
to curate the dataset and clean it up a bit to ensure more accurate data.

In [14]:
nycjantrips.order_by(nycjantrips.trip_distance.desc())

We can chain together expressions with filter - similar to a WHERE clause in SQL. We can see nan (not a number) involved, ibis also has built-in support for that.

In [15]:
nyc_filtered = nycjantrips.filter((nycjantrips.passenger_count != 0) | (nycjantrips.passenger_count.isnan() is False))
nyc_filtered

You can see with the command below that nan has been filtered out! 

In [16]:
nyc_filtered.order_by(nyc_filtered.trip_distance.desc())

Let's add a column to our dataset. I want to add a column to help calculate the average ride duration. We are going to use the Ibis 'Delta' function for this result
Ibis is also pretty cool and can simply visualize a column in isolation:

In [17]:
ride_duration = nyc_filtered.tpep_dropoff_datetime.delta(nyc_filtered.tpep_pickup_datetime, "minute").name("rideminutes")
ride_duration

We can also combine the column with our original table using the 'mutate' method shown here. 

In [18]:
nycjanduration = nyc_filtered.mutate(rideminutes=nyc_filtered.tpep_dropoff_datetime.delta(nyc_filtered.tpep_pickup_datetime, "minute"))
nycjanduration["vendorid","rideminutes","trip_distance"]

In [19]:
nycjanduration["vendorid","rideminutes","trip_distance"].head(3)

Next up are some basic analytics and aggregations in Ibis  - let's get total revenue with sum(), longest trip with max(), and average trip duration with mean(). 
Ibis is able to chain expressions similar to pandas. 

In [20]:
#some basic analytics - let's get total revenue, longest trip. 
insights = nycjanduration.agg(
    [
        ibis._.count().name("total_trips"),
        ibis._["total_amount"].sum().name("total_revenue"),
        ibis._["trip_distance"].sum().name("total_distance_all"),
        ibis._["rideminutes"].max().name("longest"),    
        ibis._["rideminutes"].mean().round(2).name("average_ride")
    ]
)
insights

Wait, the longest trip seems a bit... lengthy... Note: we added a .round function to display the average ride more nicely. Let's check out the ride itself. 

In [21]:
nycjanduration.filter(nycjanduration["rideminutes"] == 10030)

7 day trip? looks like the trip distance is zero, we can decide to remove the row from future calculations of average
Let's remove the outliers and join with a lookup table to get more information about the "where" of our analytical datasets - zones.

In [22]:
nycjanduration_new = (
    nycjanduration.filter(nycjanduration.trip_distance != 0.0)
)
nycjanduration_new
    

Let's create a cleaner set similar to before.

In [23]:
insights_new = nycjanduration_new.agg(
    [
        ibis._.count().name("total_trips"),
        ibis._["total_amount"].sum().name("total_revenue"),
        ibis._["trip_distance"].sum().name("total_distance_all"),
        ibis._["rideminutes"].max().name("longest"),    
        ibis._["rideminutes"].mean().round(2).name("average_ride")
    ]
)
insights_new

You can already see a slightly more massaged dataset - the longest trip is lower, alongside average_ride has changed and the total number of trips has gone down by almost 40k

Next up we want to do something more powerful - join with related datasets to get more insights on geographical behaviour of taxi trips around NYC. Let's look over the zonelookup table again.

In [24]:
zonelookup

We can see pulocationid is int64, so we must cast to have the tables fully joined. Ibis supports casting data types within its library as well. In the line below, we use .cast("str") to ensure the two tables can be joined together. You can try without the cast and see what happens :). 


In [25]:
joineddata = nycjanduration_new.inner_join(zonelookup, nycjanduration_new.pulocationid.cast("str") == zonelookup.locationid)

In [26]:
joineddata

Now we can do more cool things in ibis with group bys and aggregate by with zones and boroughs!


In [27]:
groupbyboroughtrips = (
    joineddata
    .group_by("zone")
    .aggregate(
        trips=joineddata.vendorid.count(),
        totalrev=joineddata.fare_amount.sum(),
        totalpassengers=joineddata.passenger_count.sum(),
        averageride=joineddata.rideminutes.mean().round(2)
        
        )
    .order_by(ibis.desc("totalrev"))
    .limit(10)
)
groupbyboroughtrips
    

If you want to see what sql ibis generates, you can use the ibis.to_sql() method.

In [28]:
ibis.to_sql(groupbyboroughtrips)

```sql
WITH t0 AS (
  SELECT
    t3.vendorid AS vendorid,
    t3.tpep_pickup_datetime AS tpep_pickup_datetime,
    t3.tpep_dropoff_datetime AS tpep_dropoff_datetime,
    t3.passenger_count AS passenger_count,
    t3.trip_distance AS trip_distance,
    t3.ratecodeid AS ratecodeid,
    t3.store_and_fwd_flag AS store_and_fwd_flag,
    t3.pulocationid AS pulocationid,
    t3.dolocationid AS dolocationid,
    t3.payment_type AS payment_type,
    t3.fare_amount AS fare_amount,
    t3.extra AS extra,
    t3.mta_tax AS mta_tax,
    t3.tip_amount AS tip_amount,
    t3.tolls_amount AS tolls_amount,
    t3.improvement_surcharge AS improvement_surcharge,
    t3.total_amount AS total_amount,
    t3.congestion_surcharge AS congestion_surcharge,
    t3.airport_fee AS airport_fee
  FROM dh_nyctaxi_video.taxizonenyc AS t3
  WHERE
    t3.passenger_count <> 0 OR FALSE
), t1 AS (
  SELECT
    t0.vendorid AS vendorid,
    t0.tpep_pickup_datetime AS tpep_pickup_datetime,
    t0.tpep_dropoff_datetime AS tpep_dropoff_datetime,
    t0.passenger_count AS passenger_count,
    t0.trip_distance AS trip_distance,
    t0.ratecodeid AS ratecodeid,
    t0.store_and_fwd_flag AS store_and_fwd_flag,
    t0.pulocationid AS pulocationid,
    t0.dolocationid AS dolocationid,
    t0.payment_type AS payment_type,
    t0.fare_amount AS fare_amount,
    t0.extra AS extra,
    t0.mta_tax AS mta_tax,
    t0.tip_amount AS tip_amount,
    t0.tolls_amount AS tolls_amount,
    t0.improvement_surcharge AS improvement_surcharge,
    t0.total_amount AS total_amount,
    t0.congestion_surcharge AS congestion_surcharge,
    t0.airport_fee AS airport_fee,
    DATE_DIFF(
      'minute',
      DATE_TRUNC('minute', t0.tpep_pickup_datetime),
      DATE_TRUNC('minute', t0.tpep_dropoff_datetime)
    ) AS rideminutes
  FROM t0
  WHERE
    t0.trip_distance <> 0.0
)
SELECT
  t2.zone,
  t2.trips,
  t2.totalrev,
  t2.totalpassengers,
  t2.averageride
FROM (
  SELECT
    t3.zone AS zone,
    COUNT(t1.vendorid) AS trips,
    SUM(t1.fare_amount) AS totalrev,
    SUM(t1.passenger_count) AS totalpassengers,
    ROUND(AVG(t1.rideminutes), 2) AS averageride
  FROM t1
  JOIN dh_nyctaxi_video.zonelookup AS t3
    ON CAST(t1.pulocationid AS VARCHAR) = t3.locationid
  GROUP BY
    1
) AS t2
ORDER BY
  t2.totalrev DESC
LIMIT 10
```

Airport rides give the most revenue to taxi companies, that makes a lot of sense. 

Let's write our result tables back to trino (to show some write functionality, of course).


In [30]:
con.create_table("groupbyboroughtrips_new", groupbyboroughtrips)

There you have it, a quick tutorial with Ibis, and Starburst Galaxy! 