##  Проект № 5
### Служба такси.
___Полное техзадание проекта прилагается в doc/ТЗ.doc___

#### Предварительный этап. 
__Подготовка и настройка среды__

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import lit, col, when, to_date, round, sum, max, min, count
from pyspark.sql.functions import lit, row_number, monotonically_increasing_id

import os

In [2]:
! pip install pyarrow



In [3]:
# Создаем сессию. 
spark = SparkSession.builder\
                    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark

In [6]:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
PATH_PREFIX = 'Desktop/DE_project/'

#### 1. Загрузка сырых данных

In [7]:
# Загружаем 
df = spark.read.csv(PATH_PREFIX + 'src/yellow_tripdata_2020-01.csv', header=True, inferSchema=True)

#### 2. EDA предварительный анализ данных

In [8]:
# Схема исходных данных
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [9]:
# Первые строки
df.show(n=5, truncate=False, vertical=True)

-RECORD 0------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2020-01-01 00:28:15 
 tpep_dropoff_datetime | 2020-01-01 00:33:03 
 passenger_count       | 1                   
 trip_distance         | 1.2                 
 RatecodeID            | 1                   
 store_and_fwd_flag    | N                   
 PULocationID          | 238                 
 DOLocationID          | 239                 
 payment_type          | 1                   
 fare_amount           | 6.0                 
 extra                 | 3.0                 
 mta_tax               | 0.5                 
 tip_amount            | 1.47                
 tolls_amount          | 0.0                 
 improvement_surcharge | 0.3                 
 total_amount          | 11.27               
 congestion_surcharge  | 2.5                 
-RECORD 1------------------------------------
 VendorID              | 1                   
 tpep_pickup_datetime  | 2020-01-0

In [10]:
# Количество пустых значений
df.pandas_api().isna().sum()

VendorID                 65441
tpep_pickup_datetime         0
tpep_dropoff_datetime        0
passenger_count          65441
trip_distance                0
RatecodeID               65441
store_and_fwd_flag       65441
PULocationID                 0
DOLocationID                 0
payment_type             65441
fare_amount                  0
extra                        0
mta_tax                      0
tip_amount                   0
tolls_amount                 0
improvement_surcharge        0
total_amount                 0
congestion_surcharge         0
dtype: int64

In [11]:
# Удаляем аномальные строки
df = df.dropna()
# Дополнительная проверка
df.pandas_api().isna().sum()

VendorID                 0
tpep_pickup_datetime     0
tpep_dropoff_datetime    0
passenger_count          0
trip_distance            0
RatecodeID               0
store_and_fwd_flag       0
PULocationID             0
DOLocationID             0
payment_type             0
fare_amount              0
extra                    0
mta_tax                  0
tip_amount               0
tolls_amount             0
improvement_surcharge    0
total_amount             0
congestion_surcharge     0
dtype: int64

In [12]:
# Наличие аномальных сумм
df.filter(df.total_amount < 0).show(2, truncate=False, vertical=True)

-RECORD 0------------------------------------
 VendorID              | 2                   
 tpep_pickup_datetime  | 2020-01-01 00:54:28 
 tpep_dropoff_datetime | 2020-01-01 00:58:41 
 passenger_count       | 1                   
 trip_distance         | 0.6                 
 RatecodeID            | 1                   
 store_and_fwd_flag    | N                   
 PULocationID          | 170                 
 DOLocationID          | 137                 
 payment_type          | 3                   
 fare_amount           | -4.5                
 extra                 | -0.5                
 mta_tax               | -0.5                
 tip_amount            | 0.0                 
 tolls_amount          | 0.0                 
 improvement_surcharge | -0.3                
 total_amount          | -8.3                
 congestion_surcharge  | -2.5                
-RECORD 1------------------------------------
 VendorID              | 2                   
 tpep_pickup_datetime  | 2020-01-0

In [13]:
# Отрицательные суммы явно аномальны или являются раезультатом сторнирования ошибок
# Данные записи следует удалить или скорректировать
df.filter(df.total_amount < 0).count()

19441

In [14]:
# Количество таких строк составляет 0.3%
# от общего коичества = 6405008, удаляем их
df = df.filter(df.total_amount >= 0)

#### 3. Трансформируем данные

In [15]:
# Копируем только необходимые столбцы в новый временный датафрейм 
temp_df = df.select('tpep_pickup_datetime',
                   'passenger_count',
                   'total_amount'
                   )

# Добавляем день поездки и временные столбцы для данных
# количественным группам: флаг принадлежности к группе и стоимость поездки для группы
temp_df = temp_df.withColumn('trip_date', to_date(temp_df.tpep_pickup_datetime,'yyyyMMdd'))\
                 .withColumn('haul_0p', when(temp_df.passenger_count == 0, 1).otherwise(0))\
                 .withColumn('total_amount_0p', when(temp_df.passenger_count == 0, 
                                                     temp_df.total_amount).otherwise(0))\
                 .withColumn('haul_1p',  when(temp_df.passenger_count == 1, 1).otherwise(0))\
                 .withColumn('total_amount_1p', when(temp_df.passenger_count == 1, 
                                                     temp_df.total_amount).otherwise(0))\
                 .withColumn('haul_2p',  when(temp_df.passenger_count == 2, 1).otherwise(0))\
                 .withColumn('total_amount_2p', when(temp_df.passenger_count == 2, 
                                                     temp_df.total_amount).otherwise(0))\
                 .withColumn('haul_3p',  when(temp_df.passenger_count == 3, 1).otherwise(0))\
                 .withColumn('total_amount_3p', when(temp_df.passenger_count == 3, 
                                                     temp_df.total_amount).otherwise(0))\
                 .withColumn('haul_4p_plus',  when(temp_df.passenger_count > 3, 1).otherwise(0))\
                 .withColumn('total_amount_4p_plus', when(temp_df.passenger_count > 3, 
                                                     temp_df.total_amount).otherwise(0))
                 
# Больше время посадки не понадобится
temp_df = temp_df.drop(temp_df.tpep_pickup_datetime)  
# Стоимости разнесены по группам
temp_df = temp_df.drop(temp_df.total_amount) 

In [16]:
# Агрегируем данные по дням и рассчитываем количество поездок по группам.
# Определяем минимальную и максимальную стоимости поездок за день
temp_df = temp_df.groupBy('trip_date') \
                          .agg(count('*').alias('trips_count'), \
                               sum('haul_0p').alias('haul_0p_sum'), \
                               max('total_amount_0p').alias('max_amount_zero'), \
                               min('total_amount_0p').alias('min_amount_zero'), \
                               sum('haul_1p').alias('haul_1p_sum'), \
                               max('total_amount_1p').alias('max_amount_1p'), \
                               min('total_amount_1p').alias('min_amount_1p'), \
                               sum('haul_2p').alias('haul_2p_sum'), \
                               max('total_amount_2p').alias('max_amount_2p'), \
                               min('total_amount_2p').alias('min_amount_2p'), \
                               sum('haul_3p').alias('haul_3p_sum'), \
                               max('total_amount_3p').alias('max_amount_3p'), \
                               min('total_amount_3p').alias('min_amount_3p'), \
                               sum('haul_4p_plus').alias('haul_4p_plus'), \
                               max('total_amount_4p_plus').alias('max_amount_4p_plus'), \
                               min('total_amount_4p_plus').alias('min_amount_4p_plus'))                                   

In [17]:
# Рассчитываем относительное количество поездок
temp_df = temp_df.withColumn('percentage_zero', col('haul_0p_sum') / col('trips_count') * 100)\
                 .withColumn('percentage_1p', col('haul_1p_sum') / col('trips_count') * 100)\
                 .withColumn('percentage_2p', col('haul_2p_sum') / col('trips_count') * 100)\
                 .withColumn('percentage_3p', col('haul_3p_sum') / col('trips_count') * 100)\
                 .withColumn('percentage_4p_plus', col('haul_4p_plus') / col('trips_count') * 100)
# Оставляем только колонки, затребованные ТЗ
temp_df = temp_df.select('trip_date',
                         'percentage_zero','max_amount_zero','min_amount_zero',                                                   
                         'percentage_1p','max_amount_1p','min_amount_1p', 
                         'percentage_2p','max_amount_2p','min_amount_2p', 
                         'percentage_3p','max_amount_3p','min_amount_3p', 
                         'percentage_4p_plus','max_amount_4p_plus','min_amount_4p_plus'
                         )
# Сокращаем все поля с плавающей точкой до 2 знаков
for c_name, c_type in temp_df.dtypes:
    if c_type in ('double', 'float'):
        temp_df = temp_df.withColumn(c_name, round(c_name, 2))

In [18]:
# PySpark не сортирует после groupby. Упорядочиваем датафрейм
temp_df =temp_df.sort('trip_date')

In [19]:
# Проверка итоговой витрины
temp_df.show(n=5,truncate=False, vertical=True)

-RECORD 0------------------------
 trip_date          | 2003-01-01 
 percentage_zero    | 0.0        
 max_amount_zero    | 0.0        
 min_amount_zero    | 0.0        
 percentage_1p      | 100.0      
 max_amount_1p      | 0.0        
 min_amount_1p      | 0.0        
 percentage_2p      | 0.0        
 max_amount_2p      | 0.0        
 min_amount_2p      | 0.0        
 percentage_3p      | 0.0        
 max_amount_3p      | 0.0        
 min_amount_3p      | 0.0        
 percentage_4p_plus | 0.0        
 max_amount_4p_plus | 0.0        
 min_amount_4p_plus | 0.0        
-RECORD 1------------------------
 trip_date          | 2008-12-31 
 percentage_zero    | 0.0        
 max_amount_zero    | 0.0        
 min_amount_zero    | 0.0        
 percentage_1p      | 80.0       
 max_amount_1p      | 73.7       
 min_amount_1p      | 0.0        
 percentage_2p      | 0.0        
 max_amount_2p      | 0.0        
 min_amount_2p      | 0.0        
 percentage_3p      | 10.0       
 max_amount_3p

In [20]:
# Итоговая витрина. Схема данный
temp_df.printSchema()

root
 |-- trip_date: date (nullable = true)
 |-- percentage_zero: double (nullable = true)
 |-- max_amount_zero: double (nullable = true)
 |-- min_amount_zero: double (nullable = true)
 |-- percentage_1p: double (nullable = true)
 |-- max_amount_1p: double (nullable = true)
 |-- min_amount_1p: double (nullable = true)
 |-- percentage_2p: double (nullable = true)
 |-- max_amount_2p: double (nullable = true)
 |-- min_amount_2p: double (nullable = true)
 |-- percentage_3p: double (nullable = true)
 |-- max_amount_3p: double (nullable = true)
 |-- min_amount_3p: double (nullable = true)
 |-- percentage_4p_plus: double (nullable = true)
 |-- max_amount_4p_plus: double (nullable = true)
 |-- min_amount_4p_plus: double (nullable = true)



#### Сохраняем итоговую витрину

In [22]:
df_datamart = temp_df.toPandas()
df_datamart.to_parquet(PATH_PREFIX+'output/yellow_trips_datamart.parquet')