In [None]:
import os, sys
!{sys.executable} -m pip install azure-storage-blob
!{sys.executable} -m pip install pyarrow
!{sys.executable} -m pip install pandas
!{sys.executable} -m pip install pyspark

Collecting azure-storage-blob
  Downloading azure_storage_blob-12.19.0-py3-none-any.whl (394 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m394.2/394.2 kB[0m [31m6.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting azure-core<2.0.0,>=1.28.0 (from azure-storage-blob)
  Downloading azure_core-1.29.5-py3-none-any.whl (192 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m192.4/192.4 kB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
Collecting isodate>=0.6.1 (from azure-storage-blob)
  Downloading isodate-0.6.1-py2.py3-none-any.whl (41 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.7/41.7 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
Collecting typing-extensions>=4.3.0 (from azure-storage-blob)
  Downloading typing_extensions-4.8.0-py3-none-any.whl (31 kB)
Installing collected packages: typing-extensions, isodate, azure-core, azure-storage-blob
  Attempting uninstall: typing-extensions
    Found existing installation: typing_exten

In [None]:
import os, sys
import pandas as pd
from datetime import datetime, timedelta
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

class YellowTaxi():
    def __init__(self, start_date, end_date):
        self.azure_storage_account_name = "azureopendatastorage"
        self.azure_storage_sas_token = r""
        self.container_name = "nyctlc"
        self.folder_name = "yellow"
        self.start_date=start_date
        self.end_date=end_date

    def months_and_dates_between(self,start_date, end_date):
        start = datetime.strptime(start_date, '%Y-%m-%d')
        end = datetime.strptime(end_date, '%Y-%m-%d')
        current_date = start
        all_dates = []
        while current_date <= end:
            all_dates.append(current_date.strftime('%Y-%-m'))  # Formatting for YYYY-M
            current_date += timedelta(days=1)
        unique_months = list(set(all_dates))
        return unique_months

    def access_data(self):
        print('Looking for the first parquet under the folder ' + self.folder_name + ' in container "' + self.container_name + '"...')
        container_url = f"https://{self.azure_storage_account_name}.blob.core.windows.net/"
        blob_service_client = BlobServiceClient(
                                    container_url,
                                    self.azure_storage_sas_token if self.azure_storage_sas_token else None
                                )
        container_client = blob_service_client.get_container_client(self.container_name)
        blobs = container_client.list_blobs(self.folder_name)
        return blobs, container_client

    def get_data(self):
        blobs, container_client=self.access_data()
        dates_to_check=self.months_and_dates_between(self.start_date,self.end_date)
        targetBlobName = []
        for blob in blobs:
            if blob.name.startswith(self.folder_name) and blob.name.endswith('.parquet'):
                partition_date=str(blob).split('/')[1].replace('puYear=','')+'-'+str(blob).split('/')[2].replace('puMonth=','')
                if partition_date in dates_to_check:
                    targetBlobName.append(blob.name)

        for file_name in targetBlobName:
            _, filename = os.path.split(str(file_name))
            blob_client = container_client.get_blob_client(file_name)
            with open(filename, 'wb') as local_file:
                try:
                    blob_client.download_blob().download_to_stream(local_file)
                except Exception as e:
                    print(e)



In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from itertools import chain
import logging
import os


class Summarize():
    def __init__(self,

                start_date=None,
                end_date=None):
        self.data=None
        self.master_list=[]
        self.start_date=start_date
        self.end_date=end_date
        self.pwd=os.getcwd()
        self.spark = SparkSession \
                    .builder \
                    .appName("Gray Matter Analytics") \
                    .getOrCreate()


    def create_dataset(self):
        self.master_list = []
        for filename in os.listdir(self.pwd):
            _,extension=os.path.splitext(filename)
            if extension=='.parquet':
               self.master_list.append(filename)
        print('here')
        self.data=self.spark.read.parquet(*self.master_list)

    def cleanUp(self):
        for filename in self.master_list:
            os.remove(f"{self.pwd}/{filename}")

    def transformation(self):
        columns=['paymentType',
                 'totalAmount',
                 'tipAmount',
                 'tollsAmount',
                 'fareAmount',
                 'extra',
                 'mtaTax',
                 'improvementSurcharge',
                 'tpepPickupDateTime',
                 'tpepDropoffDateTime',
                 'passengerCount',
                 'tripDistance',
                 'rateCodeId',
                 'vendorID'
                 ]
        subset=self.data.select(*columns)
        subset=subset.filter(
            (to_date(col('tpepPickupDateTime'))>=self.start_date) & (to_date(col('tpepPickupDateTime'))<=self.end_date)
        )
        conditions = {
            "paymentType" : {
                "1":"Credit card",
                "2": "Cash",
                "3": "No charge",
                "4": "Dispute" ,
                "5": "Unknown",
                "6": "Voided trip"
            },
            "rateCodeId" : {
                "1": "Standard rate",
                "2": "JFK",
                "3": "Newark",
                "4": "Nassau or Westchester",
                "5": "Negotiated fare",
                "6": "Group ride"
            }
        }

        for col_name in conditions.keys():
            mapping_expr = create_map([lit(x) for x in chain(*conditions[col_name].items())])
            subset=subset.withColumn(f"{col_name}Code", mapping_expr[col(col_name)])
        subset=subset.withColumn ("TripDurationMinutes",round((unix_timestamp(col("tpepDropoffDateTime")) - unix_timestamp(col("tpepPickupDateTime"))) / 60))
        subset=subset.withColumn('year',year(col('tpepPickupDateTime'))).withColumn('month',month(col('tpepPickupDateTime')))
        subset=subset.drop('paymentType','rateCodeId')
        return subset

    def primary_analysis(self,df):
        print("Generating primary analysis...")
        df=df.groupBy(
            'paymentTypeCode',
            'year',
            'month'
        ).agg(
            round(mean('fareAmount'),2).alias('meanFareAmount'),
            round(mean('totalAmount'),2).alias('meanTotalAmount'),
            round(mean('passengerCount')).alias('meanPassengerCount'),
            round(median('fareAmount'),2).alias('medianFareAmount'),
            round(median('totalAmount'),2).alias('medianTotalAmount'),
            round(median('passengerCount')).alias('medianPassengerCount'),
            sum('passengerCount').alias('TotalPassengers')
        ).orderBy(
            'month',
            'year'
        )
        return df

    def secondary_analysis(self, df):
        print("Generating secondary analysis....")
        df=df.groupBy(
            'paymentTypeCode',
            'rateCodeIdCode',
            'year',
            'month'
        ).agg(
            count('paymentTypeCode').alias('TotalTrips'),
            round(sum('totalAmount'),2).alias('totalRevenueByPayment'),
            round(sum('tipAmount'),2).alias('totalTipsCollected'),
            round(sum('tollsAmount'),2).alias('totalTollsPaid')
        ).orderBy(
            'month',
            'year'
        )
        return df

    def generate_report(self, tabs=[]):
        for sheet in tabs:
            sheet[0].write.mode('overwrite').csv(f"{self.pwd}/{sheet[1]}",header=True)

    def analysis(self):
        data=self.transformation()
        distinct_payments=data.select('paymentTypeCode').distinct()
        print(f"The number of distinct payment types are : {distinct_payments.count()} \n")

        average_trip_duration=data.select(avg('TripDurationMinutes')).collect()[0][0]
        print(f"Average Trip Duration in Minutes: {average_trip_duration} \n")

        print(f"Total trips including voided and non-chargeable transactions: {data.count()} \n")

        invalid_trips=data.filter(col('paymentTypeCode').isin('Dispute','Voided trip'))
        print(f"Total disputed or invalid-trips: {invalid_trips.count()} \n")

        valid_trips=data.filter(col('paymentTypeCode').isin('Cash','Credit card'))
        print(f"Total trips paid by Cash/Credit {valid_trips.count()} \n")

        total_analyzed=self.primary_analysis(data)
        valid_analyzed=self.primary_analysis(valid_trips)
        invalid_analyzed=self.primary_analysis(invalid_trips)

        self.generate_report([
            [total_analyzed,'total_analyzed/total_primary.csv'],
            [valid_analyzed,'valid_analyzed/valid_primary.csv'],
            [invalid_analyzed,'invalid_analyzed/invalid_primary.csv']
        ])

        total_analyzed=self.secondary_analysis(data)
        valid_analyzed=self.secondary_analysis(valid_trips)
        invalid_analyzed=self.secondary_analysis(invalid_trips)

        self.generate_report([
            [total_analyzed,'total_analyzed/total_secondary.csv'],
            [valid_analyzed,'valid_analyzed/valid_secondary.csv'],
            [invalid_analyzed,'invalid_analyzed/invalid_secondary.csv']
        ])



In [35]:
data_object=YellowTaxi(start_date='2018-05-1',end_date='2018-07-1')
data_object.get_data()
summary=Summarize(start_date='2018-05-1',end_date='2018-07-1')
summary.create_dataset()
summary.analysis()
summary.cleanUp()

Looking for the first parquet under the folder yellow in container "nyctlc"...
here
The number of distinct payment types are : 4 

Average Trip Duration in Minutes: 17.641518487182626 

Total trips including voided and non-chargeable transactions: 18173456 

Total disputed or invalid-trips: 27615 

Total trips paid by Cash/Credit 18046919 

Generating primary analysis...
Generating primary analysis...
Generating primary analysis...
Generating secondary analysis....
Generating secondary analysis....
Generating secondary analysis....
/content/part-00012-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426337-118.c000.snappy.parquet
/content/part-00019-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426333-118.c000.snappy.parquet
/content/part-00002-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426334-121.c000.snappy.parquet
/content/part-00004-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426331-117.c000.snappy.parquet
/content/part-

FileNotFoundError: ignored