In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/local/opt/openjdk"
os.environ["SPARK_HOME"] = "/usr/local/Cellar/apache-spark/3.3.0/libexec/"

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark

22/08/08 16:31:31 WARN Utils: Your hostname, Mykhailos-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.31.225 instead (on interface en0)
22/08/08 16:31:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/08 16:31:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
import os
import json

with open('./config.json', 'r') as f:
    config = json.load(f)

### Extract data from CSV file

In [4]:
taxi_tripdata = spark.read \
    .option('header', True) \
    .option('inferSchema', True) \
    .csv(config['data']['source'], header=True)

taxi_tripdata.show(5, truncate=False, vertical=True)

-RECORD 0------------------------------------
 VendorID              | 1                   
 lpep_pickup_datetime  | 2021-07-01 00:30:52 
 lpep_dropoff_datetime | 2021-07-01 00:35:36 
 store_and_fwd_flag    | N                   
 RatecodeID            | 1                   
 PULocationID          | 74                  
 DOLocationID          | 168                 
 passenger_count       | 1                   
 trip_distance         | 1.2                 
 fare_amount           | 6.0                 
 extra                 | 0.5                 
 mta_tax               | 0.5                 
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 ehail_fee             | null                
 improvement_surcharge | 0.3                 
 total_amount          | 7.3                 
 payment_type          | 2                   
 trip_type             | 1                   
 congestion_surcharge  | 0.0                 
-RECORD 1-------------------------

### Map Vendor and Payment Type to string value from the VendorID and payment_type

In [5]:
from pyspark.sql.functions import col
from itertools import chain
from pyspark.sql.functions import create_map, lit

vendors_dict = { 1: 'Creative Mobile Technologies', 2: 'VeriFone Inc.' }
verndors_mapping_expr = create_map([lit(x) for x in chain(*vendors_dict.items())])

payment_types_dict = { 1: 'Credit card', 2: 'Cash', 3: 'No charge', 4: 'Dispute', 5: 'Unknown', 6: 'Voided trip' }
payment_types_mapping_expr = create_map([lit(x) for x in chain(*payment_types_dict.items())])

report = taxi_tripdata \
    .withColumn('Vendor', verndors_mapping_expr[col('VendorID')]) \
    .withColumn('Payment Type', payment_types_mapping_expr[col('payment_type')])

report.show(5, truncate=False, vertical=True)

-RECORD 0---------------------------------------------
 VendorID              | 1                            
 lpep_pickup_datetime  | 2021-07-01 00:30:52          
 lpep_dropoff_datetime | 2021-07-01 00:35:36          
 store_and_fwd_flag    | N                            
 RatecodeID            | 1                            
 PULocationID          | 74                           
 DOLocationID          | 168                          
 passenger_count       | 1                            
 trip_distance         | 1.2                          
 fare_amount           | 6.0                          
 extra                 | 0.5                          
 mta_tax               | 0.5                          
 tip_amount            | 0.0                          
 tolls_amount          | 0.0                          
 ehail_fee             | null                         
 improvement_surcharge | 0.3                          
 total_amount          | 7.3                          
 payment_t

### Calculate Passanger count per Vendor and Payment Type

In [6]:
pass_count_per_vendor_payment = report \
    .groupBy('VendorID', 'payment_type').sum('passenger_count') \
    .withColumnRenamed("sum(passenger_count)", 'PCPVP')
pass_count_per_vendor_payment.show()

+--------+------------+-----+
|VendorID|payment_type|PCPVP|
+--------+------------+-----+
|       2|           2|24091|
|       2|           3|  154|
|       1|           2| 3546|
|       1|           1| 5115|
|       1|           3|  199|
|       1|           4|   25|
|       2|           1|33770|
|       2|           4|   26|
|       1|           5|    1|
|    null|        null| null|
+--------+------------+-----+



### Join calulated Passenger count to report

In [7]:
report = report \
    .join(pass_count_per_vendor_payment, ['VendorID', 'payment_type'])
report.show(5, truncate=False, vertical=True)

-RECORD 0---------------------------------------------
 VendorID              | 1                            
 payment_type          | 2                            
 lpep_pickup_datetime  | 2021-07-01 00:30:52          
 lpep_dropoff_datetime | 2021-07-01 00:35:36          
 store_and_fwd_flag    | N                            
 RatecodeID            | 1                            
 PULocationID          | 74                           
 DOLocationID          | 168                          
 passenger_count       | 1                            
 trip_distance         | 1.2                          
 fare_amount           | 6.0                          
 extra                 | 0.5                          
 mta_tax               | 0.5                          
 tip_amount            | 0.0                          
 tolls_amount          | 0.0                          
 ehail_fee             | null                         
 improvement_surcharge | 0.3                          
 total_amo

### Calculate Payment Rate for report based on Passenger count and Total amount

In [8]:
report = report.withColumn('Payment Rate', col('total_amount') / col('PCPVP'))
report.show(5, truncate=False, vertical=True)

-RECORD 0---------------------------------------------
 VendorID              | 1                            
 payment_type          | 2                            
 lpep_pickup_datetime  | 2021-07-01 00:30:52          
 lpep_dropoff_datetime | 2021-07-01 00:35:36          
 store_and_fwd_flag    | N                            
 RatecodeID            | 1                            
 PULocationID          | 74                           
 DOLocationID          | 168                          
 passenger_count       | 1                            
 trip_distance         | 1.2                          
 fare_amount           | 6.0                          
 extra                 | 0.5                          
 mta_tax               | 0.5                          
 tip_amount            | 0.0                          
 tolls_amount          | 0.0                          
 ehail_fee             | null                         
 improvement_surcharge | 0.3                          
 total_amo

### Add Next Payment Rate
1. Add Rank number by Payment Rate
2. Find next Rank and take Payment Rate value

In [9]:
from pyspark.sql.window import Window
from pyspark.sql.functions import min, rank

windowSpecRank = Window.partitionBy('VendorID', 'payment_type').orderBy('Payment Rate')
windowSpecNextRank = Window.partitionBy('VendorID', 'payment_type').orderBy('payment_rate_rank').rangeBetween(1, Window.unboundedFollowing)

report = report \
    .withColumn('payment_rate_rank', rank().over(windowSpecRank)) \
    .withColumn('Next Payment Rate', min('Payment Rate').over(windowSpecNextRank))

report.show(10, truncate=False, vertical=True)

22/08/08 16:31:38 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
-RECORD 0---------------------------------------------
 VendorID              | 1                            
 payment_type          | 1                            
 lpep_pickup_datetime  | 2021-07-01 18:33:14          
 lpep_dropoff_datetime | 2021-07-01 18:44:52          
 store_and_fwd_flag    | N                            
 RatecodeID            | 1                            
 PULocationID          | 35                           
 DOLocationID          | 76                           
 passenger_count       | 1                            
 trip_distance         | 0.0                          
 fare_amount           | 0.0                          
 extra                 | 0.0                          
 mta_tax               | 0.0                          
 tip_amount            | 0.0                      

### Calculate Max Payment Rate by Vendor

In [10]:
max_payment_reate_per_vendor = report \
    .groupBy('VendorID').max('Payment Rate') \
    .withColumnRenamed("max(Payment Rate)", 'Max Payment Rate')

max_payment_reate_per_vendor.show()

+--------+--------------------+
|VendorID|    Max Payment Rate|
+--------+--------------------+
|       1|               39.55|
|       2|0.014222978975421973|
+--------+--------------------+



### Join Max Payment Rate to report

In [11]:
report = report \
    .join(max_payment_reate_per_vendor, 'VendorID')

report.show(10, truncate=False, vertical=True)

-RECORD 0---------------------------------------------
 VendorID              | 1                            
 payment_type          | 1                            
 lpep_pickup_datetime  | 2021-07-01 18:33:14          
 lpep_dropoff_datetime | 2021-07-01 18:44:52          
 store_and_fwd_flag    | N                            
 RatecodeID            | 1                            
 PULocationID          | 35                           
 DOLocationID          | 76                           
 passenger_count       | 1                            
 trip_distance         | 0.0                          
 fare_amount           | 0.0                          
 extra                 | 0.0                          
 mta_tax               | 0.0                          
 tip_amount            | 0.0                          
 tolls_amount          | 0.0                          
 ehail_fee             | null                         
 improvement_surcharge | 0.0                          
 total_amo

### Calculate Percents to Next Payment Rate

In [12]:
from pyspark.sql.functions import round, concat

report = report.withColumn(
    'Percents to next rate',
    concat(
        round(100 - col('Payment Rate') * 100 / col('Next Payment Rate'), 2),
        lit('%')
    )
)
report.show(15, truncate=False, vertical=True)

-RECORD 0---------------------------------------------
 VendorID              | 1                            
 payment_type          | 1                            
 lpep_pickup_datetime  | 2021-07-01 18:33:14          
 lpep_dropoff_datetime | 2021-07-01 18:44:52          
 store_and_fwd_flag    | N                            
 RatecodeID            | 1                            
 PULocationID          | 35                           
 DOLocationID          | 76                           
 passenger_count       | 1                            
 trip_distance         | 0.0                          
 fare_amount           | 0.0                          
 extra                 | 0.0                          
 mta_tax               | 0.0                          
 tip_amount            | 0.0                          
 tolls_amount          | 0.0                          
 ehail_fee             | null                         
 improvement_surcharge | 0.0                          
 total_amo

### Select only required fields

In [13]:
report = report.select('Vendor', 'Payment Type', 'Payment Rate', 'Next Payment Rate', 'Max Payment Rate', 'Percents to next rate')

### Save report to the CSV file

In [14]:
import glob
import os

destination = config['data']['destination']
report.coalesce(1).write.mode('overwrite').option('header', 'true').csv(destination)

list_of_files = glob.glob(f'{destination}/*.csv')
latest_csv_file = max(list_of_files, key=os.path.getctime)

                                                                                

### Send report to the email

In [15]:
import smtplib, ssl
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email import encoders

def send_report(config, file):
    msg = MIMEMultipart()
    msg['Subject'] = config['subject'] 
    msg['From'] = config['user']
    msg['To'] = config['receiver']
    
    part = MIMEBase('application', "octet-stream")
    part.set_payload(open(file, "rb").read())
    encoders.encode_base64(part)

    part.add_header('Content-Disposition', 'attachment; filename="report.csv"')

    msg.attach(part)
    
    ssl_context = ssl.create_default_context()
    service = smtplib.SMTP_SSL(config['server'], config['port'], context=ssl_context)
    service.login(config['user'], config['password'])

    result = service.sendmail(config['user'], config['receiver'], msg.as_string())

    service.quit()
    

send_report(
    config['mail'],
    latest_csv_file
)