In [15]:
import findspark
findspark.init()

In [16]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import  types
from pyspark.sql import functions as F
import pandas as pd

In [17]:
credentials_location = '/home/Дмитрий/.google/credentials/datacamp-378414-e59ad2993706.json'

# config for google cloud storage
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set("spark.jars", "/home/Дмитрий/datacamp/dataeng-zoomcamp/week_5_batch_processing/data/lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [None]:
sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
sc._jsc.hadoopConfiguration().set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.json.keyfile", credentials_location)
sc._jsc.hadoopConfiguration().set("fs.gs.auth.service.account.enable", "true")

In [18]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()


In [19]:
df_green = spark.read.parquet('gs://dtc_data_lake_datacamp-378414/data/green/*/*')
df_yellow = spark.read.parquet('gs://dtc_data_lake_datacamp-378414/data/yellow/*/*')

In [20]:
df_yellow.dtypes

[('VendorID', 'int'),
 ('lpep_pickup_datetime', 'timestamp'),
 ('lpep_dropoff_datetime', 'timestamp'),
 ('store_and_fwd_flag', 'string'),
 ('RatecodeID', 'int'),
 ('PULocationID', 'int'),
 ('DOLocationID', 'int'),
 ('passenger_count', 'int'),
 ('trip_distance', 'double'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'double'),
 ('ehail_fee', 'double'),
 ('improvement_surcharge', 'double'),
 ('total_amount', 'double'),
 ('payment_type', 'int'),
 ('trip_type', 'int'),
 ('congestion_surcharge', 'double')]

In [26]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime') 


df_yellow = df_yellow \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime') 

In [28]:
common_columns = [
    'VendorID',
    'pickup_datetime',
    'dropoff_datetime',
    'store_and_fwd_flag',
    'RatecodeID',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'fare_amount',
    'extra',
    'mta_tax',
    'tip_amount',
    'tolls_amount',
    'improvement_surcharge',
    'total_amount',
    'payment_type',
    'congestion_surcharge'
]

In [29]:
df_green_sel = df_green \
    .select(common_columns) \
    .withColumn('service_type', F.lit('green'))


df_yellow_sel = df_yellow \
    .select(common_columns) \
    .withColumn('service_type', F.lit('yellow')) 

In [30]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [31]:
df_trips_data.registerTempTable('trips_data')

df_result = spark.sql("""
      select 
    -- Reveneue grouping 
    PULocationID as revenue_zone,
    date_trunc('month', pickup_datetime) as revenue_month, 
    service_type, 

    -- Revenue calculation 
    sum(fare_amount) as revenue_monthly_fare,
    sum(extra) as revenue_monthly_extra,
    sum(mta_tax) as revenue_monthly_mta_tax,
    sum(tip_amount) as revenue_monthly_tip_amount,
    sum(tolls_amount) as revenue_monthly_tolls_amount,
    sum(improvement_surcharge) as revenue_monthly_improvement_surcharge,
    sum(total_amount) as revenue_monthly_total_amount,
    sum(congestion_surcharge) as revenue_monthly_congestion_surcharge,

    -- Additional calculations
    avg(passenger_count) as avg_montly_passenger_count,
    avg(trip_distance) as avg_montly_trip_distance

    from trips_data
    group by 1,2,3
""")

