# Проект 5. Служба такси.

### Импорт библиотек

In [3]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import count
from pyspark.sql.functions import col, when, count

### Создание спарк-сессии

In [4]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("SparkFirst")\
    .config("spark.executor.memory", "10g")\
    .config("spark.executor.cores", 5)\
    .config("spark.dynamicAllocation.enabled", "true")\
    .config("spark.dynamicAllocation.maxExecutors", 5)\
    .config("spark.shuffle.service.enabled", "true")\
.getOrCreate()

### Создание dataframe из скачанного файла

In [5]:
schema = StructType([
    StructField('VendorId', IntegerType(), True),
    StructField('tpep_pickup_datetime', TimestampType(), True),
    StructField('tpep_dropoff_datetime', TimestampType(), True),
    StructField('passenger_count', IntegerType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', IntegerType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', IntegerType(), True),
    StructField('DOLocationID', IntegerType(), True),
    StructField('payment_type', IntegerType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
])

df = spark.read.option('header', 'true').csv('yellow_tripdata_2020-01.csv', schema=schema)

### Проверка датафрейма

In [6]:
df.printSchema()

root
 |-- VendorId: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [7]:
# проверяем датафрейм на наличие нулевых значений

df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorId|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|   65441|                   0|                    0|          65441|            0|     65441|             65441|           0|           0|       65441|          0|    0|      0|         0|           0|                    0

In [8]:
#чистим df от нулевых значений

df_clean = df.dropna(how='any')

In [9]:
# снова проверяем датафрейм на наличие нулевых значений, убеждаемся в их отсутствии

df_clean.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorId|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|       0|                   0|                    0|              0|            0|         0|                 0|           0|           0|           0|          0|    0|      0|         0|           0|                    0

In [10]:
df_clean.createOrReplaceTempView('taxi_service_in_process')

In [11]:
# запрос для очистки от отрицательных значений (для корректности входных данных)

sql = """
SELECT *
FROM taxi_service_in_process
WHERE 
total_amount > 0 AND
trip_distance > 0
"""

In [12]:
# Создание копии df с очищенными данными

df_done = spark.sql(sql)

df_done.createOrReplaceTempView('taxi_service')

# df_done.show(100)

In [13]:
# Запрос для создания витрины данных 

sql = """
WITH
clear_date_from_table
AS
(
	SELECT date(tpep_pickup_datetime) AS date
	FROM taxi_service ts 
),
date_count
AS
(
	SELECT count(date) AS count_date,
	date AS date
	FROM clear_date_from_table
	GROUP BY date
),
info_zero
AS
(
	SELECT max(total_amount) AS zero_max_amm,
	min(total_amount) AS zero_min_amm,
	date(tpep_pickup_datetime) AS date,
	count(passenger_count) AS percentage_zero
	FROM taxi_service ts 
	WHERE passenger_count = 0
	GROUP BY date
),
info_1p
AS
(
	SELECT max(total_amount) AS max_amm_1p,
	min(total_amount) AS min_amm_1p,
	date(tpep_pickup_datetime) AS date,
	count(passenger_count) AS percentage_1p
	FROM taxi_service ts 
	WHERE passenger_count = 1
	GROUP BY date
),
info_2p
AS
(
	SELECT max(total_amount) AS max_amm_2p,
	min(total_amount) AS min_amm_2p,
	date(tpep_pickup_datetime) AS date,
	count(passenger_count) AS percentage_2p
	FROM taxi_service ts 
	WHERE passenger_count = 2
	GROUP BY date
),
info_3p
AS
(
	SELECT max(total_amount) AS max_amm_3p,
	min(total_amount) AS min_amm_3p,
	date(tpep_pickup_datetime) AS date,
	count(passenger_count) AS percentage_3p
	FROM taxi_service ts 
	WHERE passenger_count = 3
	GROUP BY date
),
info_4p_plus
AS
(
	SELECT max(total_amount) AS max_amm_4p_plus,
	min(total_amount) AS min_amm_4p_plus,
	date(tpep_pickup_datetime) AS date,
	count(passenger_count) AS percentage_4p_plus
	FROM taxi_service ts 
	WHERE passenger_count > 3
	GROUP BY date
)
SELECT date,
COALESCE(round((CAST(percentage_zero AS decimal) / count_date) * 100), 0) AS percentage_zero,
COALESCE(zero_max_amm, 0) AS zero_max_amm,
COALESCE(zero_min_amm, 0) AS zero_min_amm,
COALESCE(round((CAST(percentage_1p AS decimal) / count_date) * 100), 0) AS percentage_1p,
COALESCE(max_amm_1p, 0) AS max_amm_1p,
COALESCE(min_amm_1p, 0) AS min_amm_1p,
COALESCE(round((CAST(percentage_2p AS decimal) / count_date) * 100), 0) AS percentage_2p,
COALESCE(max_amm_2p, 0) AS max_amm_2p,
COALESCE(min_amm_2p, 0) AS min_amm_2p,
COALESCE(round((CAST(percentage_3p AS decimal) / count_date) * 100), 0) AS percentage_3p,
COALESCE(max_amm_3p, 0) AS max_amm_3p,
COALESCE(min_amm_3p, 0) AS min_amm_3p,
COALESCE(round((CAST(percentage_4p_plus AS decimal) / count_date) * 100), 0) AS percentage_4p_plus,
COALESCE(max_amm_4p_plus, 0) AS max_amm_4p_plus,
COALESCE(min_amm_4p_plus, 0) AS min_amm_4p_plus
FROM date_count 
LEFT JOIN info_1p USING(date)
LEFT JOIN info_2p USING(date)
LEFT JOIN info_3p USING(date)
LEFT JOIN info_4p_plus USING(date)
LEFT JOIN info_zero USING(date)
"""

# Создание итоговой витрины

df2 = spark.sql(sql)

df2.createOrReplaceTempView('taxi_service_done')

df2.show(100)

+----------+---------------+------------+------------+-------------+----------+----------+-------------+----------+----------+-------------+----------+----------+------------------+---------------+---------------+
|      date|percentage_zero|zero_max_amm|zero_min_amm|percentage_1p|max_amm_1p|min_amm_1p|percentage_2p|max_amm_2p|min_amm_2p|percentage_3p|max_amm_3p|min_amm_3p|percentage_4p_plus|max_amm_4p_plus|min_amm_4p_plus|
+----------+---------------+------------+------------+-------------+----------+----------+-------------+----------+----------+-------------+----------+----------+------------------+---------------+---------------+
|2009-01-01|              0|         0.0|         0.0|           79|      74.8|       3.3|            5|      31.3|      31.3|            5|      13.8|      13.8|                11|          61.42|            9.3|
|2020-01-04|              2|      152.54|         0.3|           66|     965.8|       0.3|           18|     481.3|       0.3|            5|    