## Importing the necessary libraries

In [4]:
import os
from datetime import datetime

import pyarrow.dataset as ds

import pyarrow.compute as pc
import pyarrow.parquet as pq

## Configuration of AWS to get access to the dataset

In [None]:
### Installing the ASW command line interface
#!pip install awscli

In [2]:
### Connecting to google drive
# from google.colab import drive
# drive.mount('/content/drive')

### indicating the ini file fot AWS configuration (The ini file has already been uploaded manually.)
# !export AWS_SHARED_CREDENTIALS_FILE=/content/drive/MyDrive/config/awscli.ini
# path = "/content/drive/My Drive/config/awscli.ini"
# os.environ['AWS_SHARED_CREDENTIALS_FILE'] = path

### Checking the configuration to make sure all good!
# !aws configure list

Mounted at /content/drive
      Name                    Value             Type    Location
      ----                    -----             ----    --------
   profile                <not set>             None    None
access_key     ****************ATZI shared-credentials-file    
secret_key     ****************e3kB shared-credentials-file    
    region                us-west-1      config-file    ~/.aws/config


## Loading the dataset

In [5]:
%%time

dataset = ds.dataset("s3://ursa-labs-taxi-data/", partitioning=["year", "month"])

CPU times: user 165 ms, sys: 35.3 ms, total: 200 ms
Wall time: 1.37 s


## Questions

### 1. can you get the average transaction between 2:00-2:59 PM?:

In [6]:
%%time
total_amount = 0
total_count = 0
for fragment in dataset.get_fragments():
  table = fragment.to_table(columns=['dropoff_at', 'total_amount'])

  table_row_size = table.to_batches()[0].num_rows
  sorted_indices = pc.sort_indices(table, sort_keys=[("dropoff_at", "ascending")])
  sorted_table = table.take(sorted_indices)

  pq.write_table(sorted_table, 'optimized_parquet_file.parquet', row_group_size=table_row_size)

  optimized_parquet_file = pq.ParquetFile('optimized_parquet_file.parquet')
  dropoff_at_col_idx = 0
  for i in range(optimized_parquet_file.num_row_groups):
    min_time = optimized_parquet_file.metadata.row_group(i).column(dropoff_at_col_idx).statistics.min
    max_time = optimized_parquet_file.metadata.row_group(i).column(dropoff_at_col_idx).statistics.max
    diff = (max_time - min_time).total_seconds()

    min_hour = min_time.hour
    max_hour = max_time.hour

    if (min_hour <= 14 and max_hour >= 14) or (diff >= 86_400):
      all_transactions = optimized_parquet_file.read_row_group(i)['total_amount']
      dropoff_at = optimized_parquet_file.read_row_group(i)['dropoff_at']

      time_checker_arr = [record.as_py().hour for record in dropoff_at]
      desired_time = [hour==14 for hour in time_checker_arr]

      desired_transactions = pc.filter(all_transactions, desired_time)

      if len(desired_transactions) > 0:
        total_amount += pc.sum(desired_transactions).as_py()
        total_count += len(desired_transactions)

print(f'The average is: {total_amount/total_count}')

The average is: 14.090892071363218
CPU times: user 31min 56s, sys: 1min 53s, total: 33min 50s
Wall time: 1h 27min 58s


### 2. Which day (of week), on average has the highest tip?

In [7]:
%%time
tips_count = [0 for _ in range(7)]
tips_amount = [0 for _ in range(7)]

for fragment in dataset.get_fragments():
  table = fragment.to_table(columns=['dropoff_at', 'tip_amount'])

  table_row_size = table.to_batches()[0].num_rows
  sorted_indices = pc.sort_indices(table, sort_keys=[("dropoff_at", "ascending")])
  sorted_table = table.take(sorted_indices)

  pq.write_table(sorted_table, 'optimized_parquet_file.parquet', row_group_size=table_row_size)

  optimized_parquet_file = pq.ParquetFile('optimized_parquet_file.parquet')
  dropoff_at_col_idx = 0
  for i in range(optimized_parquet_file.num_row_groups):
    min_date = optimized_parquet_file.metadata.row_group(i).column(0).statistics.min.date()
    max_date = optimized_parquet_file.metadata.row_group(i).column(0).statistics.max.date()

    if max_date == min_date:
      min_time = optimized_parquet_file.metadata.row_group(i).column(0).statistics.min
      day_of_week = min_time.weekday()
      tips = optimized_parquet_file.read_row_group(i)['tip_amount']

      tips_amount[day_of_week] += pc.sum(tips).as_py()
      tips_count[day_of_week] += len(tips)
    else:
      dropoff_at = optimized_parquet_file.read_row_group(i)['dropoff_at']
      tip_amount = optimized_parquet_file.read_row_group(i)['tip_amount']

      for j in range(len(dropoff_at)):

        day_of_week = dropoff_at[j].as_py().weekday()
        tips_amount[day_of_week] += tip_amount[j].as_py()
        tips_count[day_of_week] += 1

average_tips = []
actual_days_of_week = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
for i in range(7):
  average_tip = tips_amount[i] / tips_count[i]
  print(f'Average Tip in {actual_days_of_week[i]}s: {average_tip}')
  average_tips.append(average_tip)


print(f'The Day of the Week with the highest tip is {actual_days_of_week[average_tips.index(max(average_tips))]}')

Average Tip in Mondays: 1.3469167844979941
Average Tip in Tuesdays: 1.3594625526716777
Average Tip in Wednesdays: 1.3903943953032298
Average Tip in Thursdays: 1.4014479354310914
Average Tip in Fridays: 1.3373793122010493
Average Tip in Saturdays: 1.1671709626702114
Average Tip in Sundays: 1.2886291171967024
The Day of the Week with the highest tip is Thursday
CPU times: user 32min 37s, sys: 1min 44s, total: 34min 22s
Wall time: 1h 26min 10s
