# Проект исследования поездок такси

## 1. Бизнес-анализ

### Цель бизнеса

Компания *** собирает данных для всех поездок желтого такси.

Попробуем воспользоваться этими данных, чтобы получить выгоду для бизнеса, для примера, пусть это будет маленький парк такси или один таксист.

Прибыль за смену

$$ Опер.прибыль = Выручка - Опер. расходы $$

$$ Выручка = \sum_{i=1}^n [F_{Счетчик}(Расстояние, Время, Скорость) + F_{чаевые}(F_{Счетчик})] $$

$$ Опер. расходы = F_{Бензин}(Расход, Расстояние) $$

$$ Опер.прибыль = \sum_{i=1}^n [F_{Счетчик}(Расстояние, Время, Скорость) + F_{чаевые} - $$

$$ - F_{Бензин}(Расход, Расстояние)] - F_{Бензин}(Расход, Расстояние) $$

Тариф - 2.5 доллара стоит остановить машину; если она едет от десяти километров в час, то каждые 320 метров (одна пятая мили) обойдутся пассажиру в 0.5 доллара. Если скорость меньше или машина стоит в пробке, то каждая минута также будет стоить 0.5 доллара.

Бензин

150 центов за галлон (1 гал = 4,54609 л)

2,5 $ за 4,54609 л

0.5 $ за л

Расход 10 л/100 км

### Цель анализа данных

В работе мы можем влиять только на выбор водителя - в какое время суток выходить на работу, в какой день недели и остаться в районе или ехать в другой за заказом.

Помощь в принятии решений

In [None]:
!pip install -q plotly geopandas

## 2. Анализ данных

### Исходные данных

https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [None]:
import os
import glob
import urllib
import zipfile
from tqdm.notebook import tqdm
import bz2

In [None]:
path_to_rawdata = 'data/green_taxi/'
path_to_geoinfo = 'data/shape/'

In [None]:
if not os.path.exists(path_to_rawdata):
    os.mkdir(path_to_rawdata)
if not os.path.exists(path_to_geoinfo):
    os.mkdir(path_to_geoinfo)

In [None]:
download_range = {
    '2017': range(13),
    '2018': range(13),
    '2019': range(13),
}

In [None]:
url_list = []
data_url = "https://s3.amazonaws.com/nyc-tlc/trip+data/"
for year in download_range.keys():
    for month in download_range[year]:
        filename = 'green_tripdata_{0}-{1:0=2d}.csv'.format(year, month)
        path_to_file = os.path.join(path_to_rawdata, filename)
        if not os.path.exists(path_to_file) and not os.path.exists(path_to_file + '.bz2'):
            urllib.request.urlretrieve(data_url + filename, path_to_file)
            url_list.append(data_url + filename)
url_list

In [None]:
from multiprocessing import Pool

In [None]:
def chunk_load(url, chunksize=100000):
    with urllib.request.urlopen(url) as f:
        while True:
            chunk = f.readlines(chunksize)
            if not chunk:
                break
            yield b''.join(chunk)

In [None]:
def load_compress(csv_url):
    filename = csv_url.split('/')[-1] + '.bz2'
    path_to_file = os.path.join(path_to_rawdata, filename)
    with open(path_to_file, 'wb') as f:
        comp = bz2.BZ2Compressor()
        for chunk in chunk_load(csv_url):
            f.write(comp.compress(chunk))
        f.write(comp.flush())

In [None]:
pool = Pool(4)
pool.map(load_compress, url_list)
pool.close()
pool.join()

In [None]:
if not os.path.exists("taxi_zones.zip"):
    urllib.request.urlretrieve("https://s3.amazonaws.com/nyc-tlc/misc/taxi_zones.zip", "taxi_zones.zip")
    with zipfile.ZipFile("taxi_zones.zip", "r") as zip_ref:
        zip_ref.extractall(path_to_geoinfo)

### Исследование

In [None]:
import pandas as pd
import numpy as np

Проверка полей в файлах:

In [None]:
filelist = glob.glob(os.path.join(path_to_rawdata, '*'))
filelist.sort()

In [None]:
cols = []
for f in filelist:
    cols.append(list(pd.read_csv(f, nrows=0).columns))
pd.DataFrame(cols, index=[x.split('/')[-1] for x in filelist])

In [None]:
df = pd.read_csv(filelist[-1], nrows=5)
df.head()

In [None]:
drop_cols = ['VendorID', 'store_and_fwd_flag', 'extra', 'mta_tax', 'ehail_fee', 'improvement_surcharge','congestion_surcharge']

In [None]:
categorical_cols = ['RatecodeID', 'PULocationID', 'DOLocationID', 'payment_type', 'trip_type']
numerical_cols = ['passenger_count', 'trip_distance', 'fare_amount', 'tip_amount', 
                  'tolls_amount', 'total_amount']
datetime_cols = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

#### Pyspark

In [None]:
import seaborn as sns
sns.set()
sns.set(rc={'figure.figsize':(15,8)})

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [None]:
MEMORY_SIZE = '5g'
spark = SparkSession \
    .builder \
    .config('spark.driver.memory', MEMORY_SIZE) \
    .appName('spark-taxi') \
    .getOrCreate()

In [None]:
schema_trips = StructType([StructField('VendorID',ByteType(),True),
                           StructField('lpep_pickup_datetime',TimestampType(),True),
                           StructField('lpep_dropoff_datetime',TimestampType(),True),
                           StructField('store_and_fwd_flag',StringType(),True),
                           StructField('RatecodeID',ByteType(),True),
                           StructField('PULocationID',IntegerType(),True),
                           StructField('DOLocationID',IntegerType(),True),
                           StructField('passenger_count',ByteType(),True),
                           StructField('trip_distance',FloatType(),True),
                           StructField('fare_amount',FloatType(),True),
                           StructField('extra',FloatType(),True),
                           StructField('mta_tax',FloatType(),True),
                           StructField('tip_amount',FloatType(),True),
                           StructField('tolls_amount',FloatType(),True),
                           StructField('ehail_fee',FloatType(),True),
                           StructField('improvement_surcharge',FloatType(),True),
                           StructField('total_amount',FloatType(),True),
                           StructField('payment_type',ByteType(),True),
                           StructField('trip_type',ByteType(),True),
                           StructField('congestion_surcharge',FloatType(),True)])

In [None]:
df = spark.read \
    .format('csv') \
    .schema(schema_trips) \
    .option('header', 'true') \
    .option('delimiter', ',') \
    .load(path_to_rawdata)

In [None]:
drop_cols = ['VendorID', 'store_and_fwd_flag', 'extra', 'mta_tax', 'ehail_fee', 'improvement_surcharge','congestion_surcharge']

In [None]:
df = df.drop(*drop_cols)

сразу добавим дополнительные признаки для анализа

In [None]:
diff_secs_col = col("lpep_dropoff_datetime").cast("long") - col("lpep_pickup_datetime").cast("long")
df = df.withColumn("duration", diff_secs_col / 60) \
        .withColumn("speed", col('trip_distance') / (col('duration') / 60)) \
        .withColumn("hour_pickup", hour(col('lpep_pickup_datetime'))) \
        .withColumn("weekday_pickup", dayofweek(col('lpep_pickup_datetime'))) \
        .withColumn('date_file', date_trunc('month', to_timestamp(regexp_extract(input_file_name(), r'(\d+\-\d+)', 1), 'yyyy-MM'))) \
        .cache()
#df.limit(5).toPandas()

In [None]:
init_len_df = df.count()
print(f'Количество строк - {init_len_df}')

In [None]:
categorical_cols = ['RatecodeID', 'PULocationID', 'DOLocationID', 'payment_type', 'trip_type']
numerical_cols = ['passenger_count', 'trip_distance', 'fare_amount', 'tip_amount', 
                  'tolls_amount', 'total_amount']
datetime_cols = ['lpep_pickup_datetime', 'lpep_dropoff_datetime']

#### Анализ дат

In [None]:
min_date, max_date = df.select(min("lpep_pickup_datetime"), max("lpep_pickup_datetime")).first()
print(min_date, max_date)
total_hours = (max_date - min_date).days * 24
print(total_hours)

In [None]:
df = df.filter(date_trunc('month', col('lpep_pickup_datetime')) == df['date_file']).drop('date_file')
df = df.filter('lpep_pickup_datetime < lpep_dropoff_datetime')

####  Анализ категориальных признаков

##### RatecodeID

In [None]:
RatecodeID_hist = df.groupBy('RatecodeID').count().toPandas()
RatecodeID_hist

In [None]:
sns.barplot(x='RatecodeID', y='count', data=RatecodeID_hist);

In [None]:
df = df.filter('RatecodeID < 5').dropna(subset=['RatecodeID'])

##### payment_type

In [None]:
payment_type_hist = df.groupBy('payment_type').count().toPandas()
payment_type_hist

In [None]:
sns.barplot(x='payment_type', y='count', data=payment_type_hist);

In [None]:
df = df.filter('payment_type < 3')

#### Анализ количественных признаков

In [None]:
print(*numerical_cols)

In [None]:
df.select(numerical_cols).describe().toPandas()

**passenger_count** - поездки без пассажиров?

**trip_distance** - нулевая дистанция и огромная дистанция (73 тыс. км)

**стоимость поездки** - отрицательные значения и большие максимальные

Требуется очистка данных от некорректных значений и выбросов

##### passenger_count

In [None]:
passenger_count_hist = df.groupBy('passenger_count').count().toPandas()

In [None]:
sns.barplot(x='passenger_count', y='count', data=passenger_count_hist);

In [None]:
df = df.filter('passenger_count > 0')

Показать распределение по районам

##### trip_distance

In [None]:
df.select('trip_distance').describe().show()

In [None]:
df.approxQuantile('trip_distance', [0.05, 0.95], 0.0001)

In [None]:
x = list(np.linspace(0, 10, 21)) + [10000]

In [None]:
trip_distance_hist = df.select('trip_distance').rdd.flatMap(lambda x: x).histogram(x)

In [None]:
trip_distance_hist_pd = pd.DataFrame(zip(*trip_distance_hist), columns=['bins', 'count'])
#trip_distance_hist_pd['count'] = trip_distance_hist_pd['count'] / total_hours
trip_distance_hist_pd['count_log'] = trip_distance_hist_pd['count'].apply(np.log)

In [None]:
sns.barplot(data=trip_distance_hist_pd, x='bins', y='count');

In [None]:
x = list(np.arange(0, 30, 1))
trip_distance_hist = df.select('trip_distance').rdd.flatMap(lambda x: x).histogram(x)
trip_distance_hist_pd = pd.DataFrame(zip(*trip_distance_hist), columns=['bins', 'count'])

In [None]:
sns.barplot(data=trip_distance_hist_pd, x='bins', y='count');

In [None]:
df = df.filter('trip_distance >= 0.5').filter('trip_distance < 10')

In [None]:
df.select('trip_distance').describe().show()

#### duration

In [None]:
df.select('duration').describe().show()

In [None]:
df.approxQuantile('duration', [0.05, 0.95], 0.001)

In [None]:
x = list(np.arange(0, 40, 1)) + [10000]
duration_hist = df.select(log('duration')).rdd.flatMap(lambda x: x).histogram(20)
duration_hist_pd = pd.DataFrame(zip(*duration_hist), columns=['bins', 'count'])

In [None]:
sns.barplot(data=duration_hist_pd, x='bins', y='count');

In [None]:
x = list(np.arange(0, 30, 0.5))
duration_hist = df.select('duration').rdd.flatMap(lambda x: x).histogram(x)
duration_hist_pd = pd.DataFrame(zip(*duration_hist), columns=['bins', 'count'])
duration_hist_pd['count_log'] = duration_hist_pd['count'].apply(np.log)

In [None]:
sns.barplot(data=duration_hist_pd, x='bins', y='count');

In [None]:
df = df.filter('duration > 1').filter('duration < 60')

In [None]:
df.select('duration').describe().show()

#### speed

In [None]:
df.select('speed').describe().show()

In [None]:
df.approxQuantile('speed', [0.05, 0.95], 0.001)

In [None]:
x = list(np.arange(0, 120, 10)) + [5000]
speed_hist = df.select('speed').rdd.flatMap(lambda x: x).histogram(x)
speed_hist_pd = pd.DataFrame(zip(*speed_hist), columns=['bins', 'count'])
speed_hist_pd['count_log'] = speed_hist_pd['count'].apply(np.log)

In [None]:
g = sns.barplot(data=speed_hist_pd, x='bins', y='count')
#g.set_xticklabels([f'{x:.2}' for x in speed_hist_pd['bins']], rotation=45);

In [None]:
x = list(np.arange(0, 20, 0.5))
speed_hist = df.select('speed').rdd.flatMap(lambda x: x).histogram(x)
speed_hist_pd = pd.DataFrame(zip(*speed_hist), columns=['bins', 'count'])

In [None]:
sns.barplot(data=speed_hist_pd, x='bins', y='count');

In [None]:
df = df.filter('speed > 0.5').filter('speed < 100')

#### fare_amount

In [None]:
df.select('fare_amount').describe().show()

In [None]:
df.approxQuantile('fare_amount', [0.05, 0.95], 0.0001)

In [None]:
x = [-100] + list(np.arange(0, 300, 10)) + [10000]
fare_amount_hist = df.select('fare_amount').rdd.flatMap(lambda x: x).histogram(x)
fare_amount_hist_pd = pd.DataFrame(zip(*fare_amount_hist), columns=['bins', 'count'])
fare_amount_hist_pd['count_log'] = fare_amount_hist_pd['count'].apply(np.log)

In [None]:
sns.barplot(data=fare_amount_hist_pd, x='bins', y='count');

In [None]:
x = list(np.arange(0, 20, 0.5))
fare_amount_hist = df.select('fare_amount').rdd.flatMap(lambda x: x).histogram(x)
fare_amount_hist_pd = pd.DataFrame(zip(*fare_amount_hist), columns=['bins', 'count'])
fare_amount_hist_pd['count_log'] = fare_amount_hist_pd['count'].apply(np.log)

In [None]:
sns.barplot(data=fare_amount_hist_pd, x='bins', y='count');

In [None]:
df = df.filter('fare_amount >= 3').filter('fare_amount < 150')

In [None]:
print(f'Количество строк - {df.count()}')

In [None]:
(init_len_df - df.count()) / df.count() * 100

In [None]:
%%writefile load.py

from pyspark.sql.types import *
from pyspark.sql.functions import *

def load_data(spark, path):
    
    schema_trips = StructType([StructField('VendorID',ByteType(),True),
                               StructField('lpep_pickup_datetime',TimestampType(),True),
                               StructField('lpep_dropoff_datetime',TimestampType(),True),
                               StructField('store_and_fwd_flag',StringType(),True),
                               StructField('RatecodeID',ByteType(),True),
                               StructField('PULocationID',IntegerType(),True),
                               StructField('DOLocationID',IntegerType(),True),
                               StructField('passenger_count',ByteType(),True),
                               StructField('trip_distance',FloatType(),True),
                               StructField('fare_amount',FloatType(),True),
                               StructField('extra',FloatType(),True),
                               StructField('mta_tax',FloatType(),True),
                               StructField('tip_amount',FloatType(),True),
                               StructField('tolls_amount',FloatType(),True),
                               StructField('ehail_fee',FloatType(),True),
                               StructField('improvement_surcharge',FloatType(),True),
                               StructField('total_amount',FloatType(),True),
                               StructField('payment_type',ByteType(),True),
                               StructField('trip_type',ByteType(),True),
                               StructField('congestion_surcharge',FloatType(),True)])
    
    df = spark.read \
        .format('csv') \
        .schema(schema_trips) \
        .option('header', 'true') \
        .option('delimiter', ',') \
        .load(path)
    
    return df

In [None]:
%%writefile filter.py

from pyspark.sql.functions import *

def filter_data(df):
    
    drop_cols = ['VendorID', 'store_and_fwd_flag', 'extra', 'mta_tax', 'ehail_fee', 'improvement_surcharge','congestion_surcharge']
    df = df.drop(*drop_cols)

    diff_secs_col = col("lpep_dropoff_datetime").cast("long") - col("lpep_pickup_datetime").cast("long")
    df = df.withColumn("duration", (diff_secs_col / 60).cast('int')) \
        .withColumn("speed", col('trip_distance') / (col('duration') / 60)) \
        .withColumn('date_file', date_trunc('month', to_timestamp(regexp_extract(input_file_name(), r'(\d+\-\d+)', 1), 'yyyy-MM')))

    
    df = df.filter('fare_amount >= 3').filter('fare_amount < 150')
    df = df.filter('speed > 0.5').filter('speed < 100')
    df = df.filter('duration > 1').filter('duration < 150')
    df = df.filter('trip_distance > 0.1').filter('trip_distance < 150')
    df = df.filter('passenger_count > 0')
    df = df.filter('payment_type < 3')
    df = df.filter('RatecodeID < 5')
    df = df.filter(date_trunc('month', col('lpep_pickup_datetime')) == df['date_file']).drop('date_file')
    df = df.filter('lpep_pickup_datetime < lpep_dropoff_datetime')
    df = df.dropna()
    
    #select_cols = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'PULocationID', 'DOLocationID','total_amount']
    #df = df.select(select_cols)
    
    return df

In [None]:
w = Window().partitionBy("PULocationID").orderBy("tpep_pickup_datetime")
pause_secs = col("tpep_pickup_datetime").cast("long") - lag(col("tpep_pickup_datetime"), 1).over(w).cast("long")

In [None]:
data = df.withColumn("pause", pause_secs).dropna()

### Анализ признаков

In [None]:
path_to_rawdata = 'data/green_taxi/green_tripdata_2017-06.csv.bz2'
path_to_geoinfo = 'shape/'

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [None]:
from filter import filter_data
from load import load_data

In [None]:
import pandas as pd
import numpy as np

In [None]:
MEMORY_SIZE = '5g'
spark = SparkSession \
    .builder \
    .config('spark.driver.memory', MEMORY_SIZE) \
    .appName('spark-taxi') \
    .getOrCreate()

In [None]:
df = load_data(spark, path_to_rawdata)
df = filter_data(df)

In [None]:
%%time
df = df.cache()
df.count()

## Карта Нью-Йорка

In [None]:
import os
import plotly.express as px
import geopandas as gpd
import json

In [None]:
taxi_zone = gpd.read_file(os.path.join(path_to_geoinfo, 'taxi_zones.shp')).to_crs("EPSG:4326")
taxi_zone = taxi_zone.set_index('LocationID')
taxi_zone_json = json.loads(taxi_zone.to_json())
taxi_zone.head()

In [None]:
def plot_maps_region(df, locations_col, color_col):
    fig = px.choropleth_mapbox(df, geojson=taxi_zone_json, locations=locations_col, color=color_col,
                           color_continuous_scale="Viridis",
                           mapbox_style="carto-positron",
                           zoom=9.5, center = {"lat": 40.7142700, "lon": -74.0059700},
                           opacity=0.5,
                          )
    fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
    fig.show()

In [None]:
count_by_zones = df.groupBy('PULocationID').count().toPandas()
count_by_zones.head()

In [None]:
count_by_zones['count'] = count_by_zones['count']

In [None]:
count_by_zones.sort_values('count',ascending=False).head(20)['PULocationID'].values

In [None]:
plot_maps_region(count_by_zones, 'PULocationID', 'count')

## Доля кредитных карт

In [None]:
credit_by_zones = df.filter('payment_type == 1').groupBy('PULocationID').count().toPandas()
cash_by_zones = df.filter('payment_type == 2').groupBy('PULocationID').count().toPandas()
paynement_by_zones = cash_by_zones.merge(credit_by_zones, on='PULocationID',suffixes=('_cash', '_credit'))
paynement_by_zones['credit/cash'] = paynement_by_zones['count_credit'] / (paynement_by_zones['count_cash'] + paynement_by_zones['count_credit']) * 100
paynement_by_zones.head()

In [None]:
plot_maps_region(paynement_by_zones, 'PULocationID', 'credit/cash')

## Процент чаевых

In [None]:
tips_by_zones = df.filter('payment_type == 1') \
    .withColumn('tip_prop', col('tip_amount') / col('total_amount') * 100) \
    .groupBy('PULocationID') \
    .agg({'tip_prop':'avg'}) \
    .toPandas()
tips_by_zones.head()

In [None]:
plot_maps_region(tips_by_zones.merge(count_by_zones).query('count > 10'), 'PULocationID', 'avg(tip_prop)')

В среднем чаевые составляют 15%, но есть районы больше.

## Средний заработок в час

In [None]:
from pyspark.sql.window import Window

In [None]:
w = Window().partitionBy("PULocationID").orderBy("lpep_pickup_datetime")
pause_secs = col("lpep_pickup_datetime").cast("long") - lag(col("lpep_pickup_datetime"), 1).over(w).cast("long")

In [None]:
df = df.withColumn("duration_wait", pause_secs).dropna()
df.limit(5).toPandas()

In [None]:
fare_hour_by_reg = df \
        .withColumn("fare_hour", (col('fare_amount') / ((col('duration') + col('duration_wait') / 60) / 60))) \
        .groupBy('PULocationID') \
        .agg(mean('fare_hour').alias('mean_fare_hour')) \
        .toPandas()
fare_hour_by_reg.head()

In [None]:
fare_hour_by_reg.sort_values('mean_fare_hour', ascending=False).head(10)['PULocationID'].values

In [None]:
plot_maps_region(fare_hour_by_reg.merge(count_by_zones).query('count > 10'), 'PULocationID', 'mean_fare_hour')

## Чаевые по времени суток

In [None]:
tips_by_hour = df \
    .withColumn('tip_prop', df['tip_amount'] / df['total_amount'] * 100) \
    .groupby('hour_pickup') \
    .agg({'tip_prop':'avg'}) \
    .toPandas()

In [None]:
sns.barplot(data=tips_by_hour, x='hour_pickup', y='avg(tip_prop)')

In [None]:
tips_by_weekday = df.filter('payment_type == 1 AND PULocationID == 132') \
    .withColumn('tip_prop', df['tip_amount'] / df['total_amount'] * 100) \
    .groupby('weekday_pickup') \
    .agg({'tip_prop':'avg'}) \
    .toPandas()

In [None]:
sns.barplot(data=tips_by_weekday, x='weekday_pickup', y='avg(tip_prop)')

Чаевые не зависят от времени суток и дней недели

## Время ожидания заказа

Пропорционально количеству поездок, но благодаря абсолютной величине можно оценить ехать в другой район или остаться ждать заказ

In [None]:
minute_by_zones = df.groupBy('PULocationID').agg(mean('duration_wait').alias('mean_wait')).toPandas()
minute_by_zones.head()

In [None]:
plot_maps_region(minute_by_zones.query('mean_wait < 6000'), 'PULocationID', 'mean_wait')

Время ожидания заказа менее 30 минут только в центральных районах и аэропорте.

### Временные ряды

## Продолжительность поездок

In [None]:
timeseries = df \
        .withColumn("datetime_day", date_trunc('day', col('lpep_pickup_datetime'))) \
        .groupBy('datetime_day') \
        .agg(mean('duration').alias('duration')) \
        .orderBy('datetime_day') \
        .toPandas()
timeseries.head()

In [None]:
px.line(data_frame=timeseries,x='datetime_day', y='duration')

In [None]:
df.groupby(['PULocationID', 'DOLocationID']).count().filter(col('PULocationID') != col('DOLocationID')).orderBy('count', ascending=False).limit(10).toPandas()

In [None]:
timeseries = df.filter('PULocationID == 7 AND DOLocationID == 7') \
        .withColumn("datetime_day", date_trunc('day', col('lpep_pickup_datetime'))) \
        .groupBy('datetime_day') \
        .agg(mean('duration').alias('duration')) \
        .orderBy('datetime_day') \
        .toPandas()

In [None]:
px.line(data_frame=timeseries,x='datetime_day', y='duration')

### Чаевые

In [None]:
timeseries = df.filter('payment_type == 1') \
        .withColumn('tip_prop', df['tip_amount'] / df['fare_amount'] * 100) \
        .withColumn("datetime_day", date_trunc('day', col('lpep_pickup_datetime'))) \
        .groupBy('datetime_day') \
        .agg(mean('tip_prop').alias('tip_prop')) \
        .orderBy('datetime_day') \
        .toPandas()
px.line(data_frame=timeseries,x='datetime_day', y='tip_prop')

### Ожидание

In [None]:
timeseries = df \
        .withColumn("datetime_day", date_trunc('day', col('lpep_pickup_datetime'))) \
        .groupBy('datetime_day') \
        .count() \
        .orderBy('datetime_day') \
        .toPandas()
px.line(data_frame=timeseries,x='datetime_day', y='count')

### fare

In [None]:
timeseries = df \
        .withColumn("datetime_day", date_trunc('day', col('lpep_pickup_datetime'))) \
        .groupBy('datetime_day') \
        .agg(mean('fare_amount').alias('fare')) \
        .orderBy('datetime_day') \
        .toPandas()
px.line(data_frame=timeseries,x='datetime_day', y='fare')

### trip_distanse

In [None]:
timeseries = df \
        .withColumn("datetime_day", date_trunc('day', col('lpep_pickup_datetime'))) \
        .groupBy('datetime_day') \
        .agg(mean('trip_distance').alias('trip_distance')) \
        .orderBy('datetime_day') \
        .toPandas()
px.line(data_frame=timeseries,x='datetime_day', y='trip_distance')

In [None]:
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
df.printSchema()

In [None]:
data = df.select(date_trunc('hour', col('lpep_pickup_datetime')).alias('lpep_pickup_datetime'),
                'PULocationID', 'DOLocationID', log('duration').alias('duration'))

In [None]:
data = data.withColumn('month_1', add_months('lpep_pickup_datetime', -1))
data.limit(5).toPandas()

In [None]:
temp = data.select(col('lpep_pickup_datetime').alias('lpep_pickup_datetime_'), 
                   col('PULocationID').alias('PULocationID_'),
                   col('DOLocationID').alias('DOLocationID_'), 
                   col('duration').alias('duration_')) \
            .groupBy(['lpep_pickup_datetime_', 'PULocationID_', 'DOLocationID_']).agg(mean('duration_'))
temp.limit(5).toPandas()

In [None]:
cond = [data.month_1 == temp.lpep_pickup_datetime_, 
        data.PULocationID == temp.PULocationID_,
        data.DOLocationID == temp.DOLocationID_]

data = data.join(temp, on=cond) \
            .select([data.lpep_pickup_datetime, data.PULocationID, 
                     data.DOLocationID, data.duration, temp['avg(duration_)']]).dropna()

In [None]:
data = data.withColumn("hour", hour('lpep_pickup_datetime')) \
            .withColumn("weekday", dayofweek('lpep_pickup_datetime')) \
            .withColumn("month", month('lpep_pickup_datetime'))

In [None]:
data.limit(5).toPandas()

In [None]:
encoder = OneHotEncoderEstimator(inputCols=['PULocationID', 'DOLocationID', 'hour', 'weekday', 'month'],
                                outputCols=['PULocationIDVec', 'DOLocationIDVec', 'hourVec', 'weekdayVec', 'monthVec'])
vectorAssembler = VectorAssembler(inputCols = ['PULocationIDVec', 'DOLocationIDVec', 'hourVec', 'weekdayVec', 'monthVec', 'avg(duration_)'], outputCol = 'features')
lr = LinearRegression(featuresCol='features',labelCol='duration')
pipeline = Pipeline(stages=[encoder, vectorAssembler, lr])

In [None]:
splits = data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [None]:
model = pipeline.fit(train_df)

In [None]:
predict = model.transform(test_df)

In [None]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='duration')

In [None]:
evaluator.evaluate(predict)

In [None]:
evaluator.evaluate(predict, {evaluator.metricName: "r2"})

In [None]:
duration_hist = predict.select(col('prediction') - col('duration')).rdd.flatMap(lambda x: x).histogram(20)
duration_hist_pd = pd.DataFrame(zip(*duration_hist), columns=['bins', 'count'])

In [None]:
sns.barplot(data=duration_hist_pd, x='bins', y='count');

In [None]:
data = df.select(date_trunc('hour', col('lpep_pickup_datetime')).alias('lpep_pickup_datetime'),
                'PULocationID') \
        .groupBy(['lpep_pickup_datetime', 'PULocationID']).count()

In [None]:
data = data.withColumn('month_1', add_months('lpep_pickup_datetime', -1))
data.limit(5).toPandas()

In [None]:
temp = data.select(col('lpep_pickup_datetime').alias('lpep_pickup_datetime_'), 
                   col('PULocationID').alias('PULocationID_'),
                   col('count').alias('count_'))
temp.limit(5).toPandas()

In [None]:
cond = [data.month_1 == temp.lpep_pickup_datetime_, 
        data.PULocationID == temp.PULocationID_]

data = data.join(temp, on=cond) \
            .select([data.lpep_pickup_datetime, data.PULocationID, 
                     data['count'], temp['count_']]).dropna()

In [None]:
data = data.withColumn("hour", hour('lpep_pickup_datetime')) \
            .withColumn("weekday", dayofweek('lpep_pickup_datetime')) \
            .withColumn("month", month('lpep_pickup_datetime'))

In [None]:
data.limit(5).toPandas()

In [None]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [None]:
encoder = OneHotEncoderEstimator(inputCols=['PULocationID', 'hour', 'weekday', 'month'],
                                outputCols=['PULocationIDVec', 'hourVec', 'weekdayVec', 'monthVec'])
vectorAssembler = VectorAssembler(inputCols = ['PULocationIDVec', 'hourVec', 'weekdayVec', 'monthVec', 'count_'], outputCol = 'features')
lr = LinearRegression(featuresCol='features',labelCol='count')
pipeline = Pipeline(stages=[encoder, vectorAssembler, lr])

In [None]:
splits = data.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [None]:
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

In [None]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='count')

In [None]:
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=evaluator,
                           trainRatio=0.8)

In [None]:
model = tvs.fit(train_df)

In [None]:
summary = model.bestModel.stages[2].summary

In [None]:
summary.r2

In [None]:
predict = model.transform(test_df)

In [None]:
evaluator.evaluate(predict)

In [None]:
evaluator.evaluate(predict, {evaluator.metricName: "r2adj"})

## 3. Предобработка данных

## 4. Моделирование

In [None]:
a = np.array([1,2,3,4])
a

In [None]:
a.reshape((2,2))

In [None]:
x = np.zeros((5,5))
x[:,:-1] = a
x[:,-1] = range(1,6)
x